From eee63decb174a10c11db6acdb5b2d62fe03a5cc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Tue, 10 Sep 2019 18:51:58 +0200 Subject: [PATCH] weechat-matrix: Store/restore sync tokens. --- main.py | 34 +++++++------- matrix/buffer.py | 1 + matrix/commands.py | 4 +- matrix/globals.py | 1 - matrix/server.py | 112 +++++++++++++++++++-------------------------- 5 files changed, 69 insertions(+), 83 deletions(-) diff --git a/main.py b/main.py index 609f287..1ff1cec 100644 --- a/main.py +++ b/main.py @@ -82,13 +82,12 @@ from matrix.completion import (init_completion, matrix_command_completion_cb, from matrix.config import (MatrixConfig, config_log_category_cb, config_log_level_cb, config_server_buffer_cb, matrix_config_reload_cb, config_pgup_cb) -from matrix.globals import SCRIPT_NAME, SERVERS, W, MAX_EVENTS +from matrix.globals import SCRIPT_NAME, SERVERS, W from matrix.server import (MatrixServer, create_default_server, matrix_config_server_change_cb, matrix_config_server_read_cb, matrix_config_server_write_cb, matrix_timer_cb, - send_cb, matrix_load_users_cb, - matrix_partial_sync_cb) + send_cb, matrix_load_users_cb) from matrix.utf import utf8_decode from matrix.utils import server_buffer_prnt, server_buffer_set_title @@ -325,7 +324,7 @@ def receive_cb(server_name, file_descriptor): server.disconnect() break - response = server.client.next_response(MAX_EVENTS) + response = server.client.next_response() # Check if we need to send some data back data_to_send = server.client.data_to_send() @@ -493,9 +492,6 @@ def matrix_unload_cb(): G.CONFIG.free() - # for server in SERVERS.values(): - # server.store_olm() - return W.WEECHAT_RC_OK @@ -565,21 +561,27 @@ def buffer_switch_cb(_, _signal, buffer_ptr): if not room_buffer: continue - if room_buffer.should_send_read_marker: - event_id = room_buffer.last_event_id + last_event_id = room_buffer.last_event_id + if room_buffer.should_send_read_marker: # A buffer may not have any events, in that case no event id is # here returned - if event_id: + if last_event_id: server.room_send_read_marker( - room_buffer.room.room_id, event_id) - room_buffer.last_read_event = event_id + room_buffer.room.room_id, last_event_id) + room_buffer.last_read_event = last_event_id - if room_buffer.members_fetched: - return W.WEECHAT_RC_OK + if not room_buffer.members_fetched: + room_id = room_buffer.room.room_id + print("HELLO FETCHING FOR {}".format(room_id)) + server.get_joined_members(room_id) - room_id = room_buffer.room.room_id - server.get_joined_members(room_id) + # The buffer is empty and we are seeing it for the first time. + # Let us fetch some messages from the room history so it doesn't feel so + # empty. + if room_buffer.first_view and not last_event_id: + if server.room_get_messages(room_buffer.room.room_id): + room_buffer.first_view = True break diff --git a/matrix/buffer.py b/matrix/buffer.py index 9060362..db4b36a 100644 --- a/matrix/buffer.py +++ b/matrix/buffer.py @@ -861,6 +861,7 @@ class RoomBuffer(object): self.joined = True self.leave_event_id = None # type: Optional[str] self.members_fetched = False + self.first_view = True self.unhandled_users = [] # type: List[str] self.inactive_users = [] diff --git a/matrix/commands.py b/matrix/commands.py index 4869539..a74f99c 100644 --- a/matrix/commands.py +++ b/matrix/commands.py @@ -938,12 +938,12 @@ def matrix_me_command_cb(data, buffer, args): W.prnt(server.server_buffer, message) return W.WEECHAT_RC_ERROR + room_buffer = server.find_room_from_ptr(buffer) + if not server.client.logged_in: room_buffer.error("You are not logged in.") return W.WEECHAT_RC_ERROR - room_buffer = server.find_room_from_ptr(buffer) - if not args: return W.WEECHAT_RC_OK diff --git a/matrix/globals.py b/matrix/globals.py index 7d34248..8bdfa26 100644 --- a/matrix/globals.py +++ b/matrix/globals.py @@ -42,7 +42,6 @@ SERVERS = dict() # type: Dict[str, MatrixServer] CONFIG = None # type: Optional[MatrixConfig] ENCRYPTION = True # type: bool SCRIPT_NAME = "matrix" # type: str -MAX_EVENTS = 100 TYPING_NOTICE_TIMEOUT = 4000 # 4 seconds typing notice lifetime LOGGER = Logger("weechat-matrix") UPLOADS = OrderedDict() # type: Dict[str, Upload] diff --git a/matrix/server.py b/matrix/server.py index 060605f..8a28170 100644 --- a/matrix/server.py +++ b/matrix/server.py @@ -40,6 +40,7 @@ from uuid import UUID from nio import ( Api, HttpClient, + ClientConfig, LocalProtocolError, LoginResponse, LoginInfoResponse, @@ -48,7 +49,6 @@ from nio import ( RoomSendResponse, RoomSendError, SyncResponse, - PartialSyncResponse, ShareGroupSessionResponse, ShareGroupSessionError, KeysQueryResponse, @@ -80,7 +80,7 @@ from nio import ( from . import globals as G from .buffer import OwnAction, OwnMessage, RoomBuffer from .config import ConfigSection, Option, ServerBufferType -from .globals import SCRIPT_NAME, SERVERS, W, MAX_EVENTS, TYPING_NOTICE_TIMEOUT +from .globals import SCRIPT_NAME, SERVERS, W, TYPING_NOTICE_TIMEOUT from .utf import utf8_decode from .utils import create_server_buffer, key_from_value, server_buffer_prnt from .uploads import Upload @@ -296,7 +296,6 @@ class MatrixServer(object): self.member_request_list = [] # type: List[str] self.rooms_with_missing_members = [] # type: List[str] self.lazy_load_hook = None # type: Optional[str] - self.partial_sync_hook = None # type: Optional[str] # These flags remember if we made some requests so that we don't # make them again while we wait on a response, the flags need to be @@ -379,11 +378,14 @@ class MatrixServer(object): self.address = homeserver.hostname self.homeserver = homeserver + config = ClientConfig(store_sync_tokens=True) + self.client = HttpClient( homeserver.geturl(), self.config.username, self.device_id, self.get_session_path(), + config=config ) self.client.add_to_device_callback( self.key_verification_cb, @@ -749,13 +751,15 @@ class MatrixServer(object): def schedule_sync(self): self.sync_time = time.time() - def sync(self, timeout=None, sync_filter=None): + def sync(self, timeout=None, sync_filter=None, full_state=False): # type: (Optional[int], Optional[Dict[Any, Any]]) -> None if not self.client: return self.sync_time = None - _, request = self.client.sync(timeout, sync_filter) + _, request = self.client.sync(timeout, sync_filter, + full_state=full_state) + self.send_or_queue(request) def login_info(self): @@ -880,6 +884,9 @@ class MatrixServer(object): self.send_or_queue(request) def room_get_messages(self, room_id): + if not self.connected or not self.client.logged_in: + return + room_buffer = self.find_room_from_id(room_id) # We're already fetching old messages @@ -1301,7 +1308,7 @@ class MatrixServer(object): "state": {"lazy_load_members": True} } } - self.sync(timeout=0, sync_filter=sync_filter) + self.sync(timeout=0, sync_filter=sync_filter, full_state=True) def _handle_room_info(self, response): for room_id, info in response.rooms.invite.items(): @@ -1458,41 +1465,40 @@ class MatrixServer(object): # new key # self.decrypt_printed_messages(event) - # Full sync response handle everything. - if isinstance(response, SyncResponse): - if self.client.should_upload_keys: - self.keys_upload() + if self.client.should_upload_keys: + self.keys_upload() - if self.client.should_query_keys and not self.keys_queried: - self.keys_query() + if self.client.should_query_keys and not self.keys_queried: + self.keys_query() - for room_buffer in self.room_buffers.values(): - # It's our initial sync, we need to fetch room members, so add - # the room to the missing members queue. - if not self.next_batch: - if (not G.CONFIG.network.lazy_load_room_users - or room_buffer.room.encrypted): - self.rooms_with_missing_members.append( - room_buffer.room.room_id - ) - if room_buffer.unhandled_users: - self._hook_lazy_user_adding() - break + for room_buffer in self.room_buffers.values(): + # It's our initial sync, we need to fetch room members, so add + # the room to the missing members queue. + # 3 reasons we fetch room members here: + # * If the lazy load room users setting is off, otherwise we will + # fetch them when we switch to the buffer + # * If the room is encrypted, encryption needs the full member + # list for it to work. + # * If we are the only member, it is unlikely really an empty + # room and since we don't want a bunch of "Empty room?" + # buffers in our buffer list we fetch members here. + if not self.next_batch: + if (not G.CONFIG.network.lazy_load_room_users + or room_buffer.room.encrypted + or room_buffer.room.member_count <= 1): + self.rooms_with_missing_members.append( + room_buffer.room.room_id + ) + if room_buffer.unhandled_users: + self._hook_lazy_user_adding() + break - self.next_batch = response.next_batch - self.schedule_sync() - W.bar_item_update("matrix_typing_notice") + self.next_batch = response.next_batch + self.schedule_sync() + W.bar_item_update("matrix_typing_notice") - if self.rooms_with_missing_members: - self.get_joined_members(self.rooms_with_missing_members.pop()) - else: - if not self.partial_sync_hook: - hook = W.hook_timer(1 * 100, 0, 0, "matrix_partial_sync_cb", - self.name) - self.partial_sync_hook = hook - self.busy = True - W.bar_item_update("buffer_modes") - W.bar_item_update("matrix_modes") + if self.rooms_with_missing_members: + self.get_joined_members(self.rooms_with_missing_members.pop()) def handle_delete_device_auth(self, response): device_id = self.device_deletion_queue.pop(response.uuid, None) @@ -1567,7 +1573,7 @@ class MatrixServer(object): elif isinstance(response, LoginInfoResponse): self._handle_login_info(response) - elif isinstance(response, (SyncResponse, PartialSyncResponse)): + elif isinstance(response, SyncResponse): self._handle_sync(response) elif isinstance(response, RoomSendResponse): @@ -1694,8 +1700,10 @@ class MatrixServer(object): room = self.client.rooms[room_id] buf = RoomBuffer(room, self.name, self.homeserver, prev_batch) - if room.members_synced: - buf.members_fetched = True + # We sadly don't get a correct summary on full_state from synapse so we + # can't trust it that the members are fully synced + # if room.members_synced: + # buf.members_fetched = True self.room_buffers[room_id] = buf self.buffers[room_id] = buf.weechat_buffer._ptr @@ -1842,30 +1850,6 @@ def matrix_config_server_change_cb(server_name, option): return 1 -@utf8_decode -def matrix_partial_sync_cb(server_name, remaining_calls): - start = time.time() - server = SERVERS[server_name] - W.unhook(server.partial_sync_hook) - server.partial_sync_hook = None - - response = server.client.next_response(MAX_EVENTS) - - while response: - server.handle_response(response) - current = time.time() - if current - start >= 0.1: - break - response = server.client.next_response(MAX_EVENTS) - - if not server.partial_sync_hook: - server.busy = False - W.bar_item_update("buffer_modes") - W.bar_item_update("matrix_modes") - - return W.WEECHAT_RC_OK - - @utf8_decode def matrix_load_users_cb(server_name, remaining_calls): server = SERVERS[server_name]