From 91eec1ad85a000a4af34783e9884ad273337832c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Fri, 20 Jul 2018 15:53:47 +0200 Subject: [PATCH] server: Start the nio migration. --- matrix/events.py | 58 ----------------------- matrix/server.py | 120 +++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 105 insertions(+), 73 deletions(-) diff --git a/matrix/events.py b/matrix/events.py index 90b78a1..0d4ab31 100644 --- a/matrix/events.py +++ b/matrix/events.py @@ -56,15 +56,6 @@ class MatrixErrorEvent(MatrixEvent): self.fatal = fatal MatrixEvent.__init__(self, server) - def execute(self): - message = ("{prefix}matrix: {error}").format( - prefix=W.prefix("error"), error=self.error_message) - - W.prnt(self.server.server_buffer, message) - - if self.fatal: - self.server.disconnect(reconnect=False) - @classmethod def from_dict(cls, server, error_prefix, fatal, parsed_dict): try: @@ -113,25 +104,6 @@ class MatrixLoginEvent(MatrixEvent): self.device_id = device_id MatrixEvent.__init__(self, server) - def execute(self): - self.server.access_token = self.access_token - self.server.user_id = self.user_id - self.server.client.access_token = self.access_token - self.server.device_id = self.device_id - self.server.save_device_id() - - message = "{prefix}matrix: Logged in as {user}".format( - prefix=W.prefix("network"), user=self.user_id) - - W.prnt(self.server.server_buffer, message) - - if not self.server.olm: - self.server.create_olm() - self.server.store_olm() - self.server.upload_keys(device_keys=True, one_time_keys=False) - - self.server.sync() - @classmethod def from_dict(cls, server, parsed_dict): try: @@ -562,33 +534,3 @@ class MatrixSyncEvent(MatrixEvent): except (KeyError, ValueError, TypeError): return MatrixErrorEvent.from_dict(server, "Error syncing", False, parsed_dict) - - def _queue_joined_info(self): - server = self.server - - while self.joined_room_infos: - info = self.joined_room_infos.pop() - - if info.room_id not in server.buffers: - server.create_room_buffer(info.room_id) - - room = server.rooms[info.room_id] - - if not room.prev_batch: - room.prev_batch = info.prev_batch - - server.event_queue.append(info) - - def execute(self): - server = self.server - - # we got the same batch again, nothing to do - if self.next_batch == server.next_batch: - server.sync() - return - - self._queue_joined_info() - server.next_batch = self.next_batch - server.check_one_time_keys(self.one_time_key_count) - - server.handle_events() diff --git a/matrix/server.py b/matrix/server.py index e4e6270..be48fb9 100644 --- a/matrix/server.py +++ b/matrix/server.py @@ -23,10 +23,13 @@ import socket import time import datetime import pprint +import json from collections import deque, defaultdict from http_parser.pyparser import HttpParser +from nio import Client, LoginResponse + from matrix.plugin_options import Option, DebugType from matrix.utils import (key_from_value, prnt_debug, server_buffer_prnt, create_server_buffer, tags_for_message, @@ -53,8 +56,13 @@ from matrix.api import ( ) from .events import ( + MatrixLoginEvent, + MatrixSyncEvent, MatrixSendEvent, - MatrixBacklogEvent + MatrixBacklogEvent, + MatrixErrorEvent, + MatrixEmoteEvent, + MatrixJoinEvent ) from matrix.encryption import ( @@ -108,6 +116,7 @@ class MatrixServer: self.ssl_context = ssl.create_default_context() # type: ssl.SSLContext self.client = None + self.nio_client = Client() # type: Option[Client] self.access_token = None # type: str self.next_batch = None # type: str self.transaction_id = 0 # type: int @@ -250,6 +259,7 @@ class MatrixServer: host = ':'.join([self.address, str(self.port)]) user_agent = 'weechat-matrix/{version}'.format(version="0.1") self.client = MatrixClient(host, user_agent=user_agent) + # self.nio_client = Client() def update_option(self, option, option_name): if option_name == "address": @@ -278,11 +288,13 @@ class MatrixServer: value = W.config_string(option) self.user = value self.access_token = "" + self.nio_client.user = value + self.nio_client.access_token = "" self._load_device_id() - if self.device_id: - self._load_olm() + # if self.device_id: + # self._load_olm() elif option_name == "password": value = W.config_string(option) @@ -696,8 +708,67 @@ class MatrixServer: raise NotImplementedError("Unsupported message of type {}".format( type(message))) + def _handle_erorr_response(self, response): + message = ("{prefix}matrix: {error}").format( + prefix=W.prefix("error"), error=self.error_message) + + W.prnt(self.server.server_buffer, message) + + if self.fatal: + self.server.disconnect(reconnect=False) + + def _handle_login(self, response): + self.access_token = response.access_token + self.user_id = response.user_id + self.client.access_token = response.access_token + self.device_id = response.device_id + self.save_device_id() + + message = "{prefix}matrix: Logged in as {user}".format( + prefix=W.prefix("network"), user=self.user_id) + + W.prnt(self.server_buffer, message) + + # if not self.olm: + # self.create_olm() + # self.store_olm() + # self.upload_keys(device_keys=True, one_time_keys=False) + + self.sync() + + def _queue_joined_info(self, response): + while response.joined_room_infos: + info = response.joined_room_infos.pop() + + if info.room_id not in self.buffers: + self.create_room_buffer(info.room_id) + + room = self.rooms[info.room_id] + + if not room.prev_batch: + room.prev_batch = info.prev_batch + + self.event_queue.append(info) + + def _handle_sync(self, response): + # we got the same batch again, nothing to do + if self.next_batch == response.next_batch: + self.sync() + return + + self._queue_joined_info(response) + self.next_batch = response.next_batch + # self.check_one_time_keys(response.one_time_key_count) + self.handle_events() + def handle_matrix_response(self, response): - if isinstance(response, MatrixSendEvent): + if isinstance(response, MatrixLoginEvent): + self._handle_login(response) + + elif isinstance(response, MatrixSyncEvent): + self._handle_sync(response) + + elif isinstance(response, MatrixSendEvent): _, room_buffer = self.find_room_from_id(response.room_id) self.handle_own_messages(room_buffer, response.message) @@ -708,8 +779,22 @@ class MatrixServer: room.backlog_pending = False W.bar_item_update("buffer_modes") - else: - response.execute() + elif isinstance(response, MatrixErrorEvent): + self._handle_erorr_response(response) + + def nio_receive(self): + response = self.nio_client.next_response() + + if isinstance(response, LoginResponse): + self._handle_login(response) + + def nio_parse_response(self, response): + if isinstance(response, MatrixLoginMessage): + self.nio_client.receive("login", response.response.body) + + self.nio_receive() + + return def handle_response(self, message): # type: (MatrixMessage) -> None @@ -718,17 +803,22 @@ class MatrixServer: if ('content-type' in message.response.headers and message.response.headers['content-type'] == 'application/json'): - ret, error = message.decode_body(self) - if not ret: - message = ("{prefix}matrix: Error decoding json response from " - "server: {error}").format( - prefix=W.prefix("error"), error=error) - W.prnt(self.server_buffer, message) - return + if isinstance(message, MatrixLoginMessage): + self.nio_parse_response(message) - event = message.event - self.handle_matrix_response(event) + else: + ret, error = message.decode_body(self) + + if not ret: + message = ("{prefix}matrix: Error decoding json response" + " from server: {error}").format( + prefix=W.prefix("error"), error=error) + W.prnt(self.server_buffer, message) + return + + event = message.event + self.handle_matrix_response(event) else: status_code = message.response.status if status_code == 504: