server: Start the nio migration.

This commit is contained in:
Damir Jelić 2018-07-20 15:53:47 +02:00
parent 3993ce830d
commit 91eec1ad85
2 changed files with 105 additions and 73 deletions

View file

@ -56,15 +56,6 @@ class MatrixErrorEvent(MatrixEvent):
self.fatal = fatal
MatrixEvent.__init__(self, server)
def execute(self):
message = ("{prefix}matrix: {error}").format(
prefix=W.prefix("error"), error=self.error_message)
W.prnt(self.server.server_buffer, message)
if self.fatal:
self.server.disconnect(reconnect=False)
@classmethod
def from_dict(cls, server, error_prefix, fatal, parsed_dict):
try:
@ -113,25 +104,6 @@ class MatrixLoginEvent(MatrixEvent):
self.device_id = device_id
MatrixEvent.__init__(self, server)
def execute(self):
self.server.access_token = self.access_token
self.server.user_id = self.user_id
self.server.client.access_token = self.access_token
self.server.device_id = self.device_id
self.server.save_device_id()
message = "{prefix}matrix: Logged in as {user}".format(
prefix=W.prefix("network"), user=self.user_id)
W.prnt(self.server.server_buffer, message)
if not self.server.olm:
self.server.create_olm()
self.server.store_olm()
self.server.upload_keys(device_keys=True, one_time_keys=False)
self.server.sync()
@classmethod
def from_dict(cls, server, parsed_dict):
try:
@ -562,33 +534,3 @@ class MatrixSyncEvent(MatrixEvent):
except (KeyError, ValueError, TypeError):
return MatrixErrorEvent.from_dict(server, "Error syncing", False,
parsed_dict)
def _queue_joined_info(self):
server = self.server
while self.joined_room_infos:
info = self.joined_room_infos.pop()
if info.room_id not in server.buffers:
server.create_room_buffer(info.room_id)
room = server.rooms[info.room_id]
if not room.prev_batch:
room.prev_batch = info.prev_batch
server.event_queue.append(info)
def execute(self):
server = self.server
# we got the same batch again, nothing to do
if self.next_batch == server.next_batch:
server.sync()
return
self._queue_joined_info()
server.next_batch = self.next_batch
server.check_one_time_keys(self.one_time_key_count)
server.handle_events()

View file

@ -23,10 +23,13 @@ import socket
import time
import datetime
import pprint
import json
from collections import deque, defaultdict
from http_parser.pyparser import HttpParser
from nio import Client, LoginResponse
from matrix.plugin_options import Option, DebugType
from matrix.utils import (key_from_value, prnt_debug, server_buffer_prnt,
create_server_buffer, tags_for_message,
@ -53,8 +56,13 @@ from matrix.api import (
)
from .events import (
MatrixLoginEvent,
MatrixSyncEvent,
MatrixSendEvent,
MatrixBacklogEvent
MatrixBacklogEvent,
MatrixErrorEvent,
MatrixEmoteEvent,
MatrixJoinEvent
)
from matrix.encryption import (
@ -108,6 +116,7 @@ class MatrixServer:
self.ssl_context = ssl.create_default_context() # type: ssl.SSLContext
self.client = None
self.nio_client = Client() # type: Option[Client]
self.access_token = None # type: str
self.next_batch = None # type: str
self.transaction_id = 0 # type: int
@ -250,6 +259,7 @@ class MatrixServer:
host = ':'.join([self.address, str(self.port)])
user_agent = 'weechat-matrix/{version}'.format(version="0.1")
self.client = MatrixClient(host, user_agent=user_agent)
# self.nio_client = Client()
def update_option(self, option, option_name):
if option_name == "address":
@ -278,11 +288,13 @@ class MatrixServer:
value = W.config_string(option)
self.user = value
self.access_token = ""
self.nio_client.user = value
self.nio_client.access_token = ""
self._load_device_id()
if self.device_id:
self._load_olm()
# if self.device_id:
# self._load_olm()
elif option_name == "password":
value = W.config_string(option)
@ -696,8 +708,67 @@ class MatrixServer:
raise NotImplementedError("Unsupported message of type {}".format(
type(message)))
def _handle_erorr_response(self, response):
message = ("{prefix}matrix: {error}").format(
prefix=W.prefix("error"), error=self.error_message)
W.prnt(self.server.server_buffer, message)
if self.fatal:
self.server.disconnect(reconnect=False)
def _handle_login(self, response):
self.access_token = response.access_token
self.user_id = response.user_id
self.client.access_token = response.access_token
self.device_id = response.device_id
self.save_device_id()
message = "{prefix}matrix: Logged in as {user}".format(
prefix=W.prefix("network"), user=self.user_id)
W.prnt(self.server_buffer, message)
# if not self.olm:
# self.create_olm()
# self.store_olm()
# self.upload_keys(device_keys=True, one_time_keys=False)
self.sync()
def _queue_joined_info(self, response):
while response.joined_room_infos:
info = response.joined_room_infos.pop()
if info.room_id not in self.buffers:
self.create_room_buffer(info.room_id)
room = self.rooms[info.room_id]
if not room.prev_batch:
room.prev_batch = info.prev_batch
self.event_queue.append(info)
def _handle_sync(self, response):
# we got the same batch again, nothing to do
if self.next_batch == response.next_batch:
self.sync()
return
self._queue_joined_info(response)
self.next_batch = response.next_batch
# self.check_one_time_keys(response.one_time_key_count)
self.handle_events()
def handle_matrix_response(self, response):
if isinstance(response, MatrixSendEvent):
if isinstance(response, MatrixLoginEvent):
self._handle_login(response)
elif isinstance(response, MatrixSyncEvent):
self._handle_sync(response)
elif isinstance(response, MatrixSendEvent):
_, room_buffer = self.find_room_from_id(response.room_id)
self.handle_own_messages(room_buffer, response.message)
@ -708,8 +779,22 @@ class MatrixServer:
room.backlog_pending = False
W.bar_item_update("buffer_modes")
else:
response.execute()
elif isinstance(response, MatrixErrorEvent):
self._handle_erorr_response(response)
def nio_receive(self):
response = self.nio_client.next_response()
if isinstance(response, LoginResponse):
self._handle_login(response)
def nio_parse_response(self, response):
if isinstance(response, MatrixLoginMessage):
self.nio_client.receive("login", response.response.body)
self.nio_receive()
return
def handle_response(self, message):
# type: (MatrixMessage) -> None
@ -718,17 +803,22 @@ class MatrixServer:
if ('content-type' in message.response.headers and
message.response.headers['content-type'] == 'application/json'):
ret, error = message.decode_body(self)
if not ret:
message = ("{prefix}matrix: Error decoding json response from "
"server: {error}").format(
prefix=W.prefix("error"), error=error)
W.prnt(self.server_buffer, message)
return
if isinstance(message, MatrixLoginMessage):
self.nio_parse_response(message)
event = message.event
self.handle_matrix_response(event)
else:
ret, error = message.decode_body(self)
if not ret:
message = ("{prefix}matrix: Error decoding json response"
" from server: {error}").format(
prefix=W.prefix("error"), error=error)
W.prnt(self.server_buffer, message)
return
event = message.event
self.handle_matrix_response(event)
else:
status_code = message.response.status
if status_code == 504: