From 3be1b7bfc3107f09b42d5d236ead373327a90380 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Tue, 30 Oct 2018 16:15:27 +0100 Subject: [PATCH] server: Use server side lazy user loading. --- matrix/server.py | 70 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 57 insertions(+), 13 deletions(-) diff --git a/matrix/server.py b/matrix/server.py index 94922a5..a42afac 100644 --- a/matrix/server.py +++ b/matrix/server.py @@ -50,6 +50,8 @@ from nio import ( ErrorResponse, SyncError, LoginError, + JoinedMembersResponse, + JoinedMembersError, ) from . import globals as G @@ -250,7 +252,7 @@ class MatrixServer(object): # type: DefaultDict[str, Deque[EncrytpionQueueItem]] self.backlog_queue = dict() # type: Dict[str, str] - self.unhandled_users = dict() # type: Dict[str, List[str]] + self.rooms_with_missing_members = [] # type: List[str] self.lazy_load_hook = None # type: Optional[str] self.partial_sync_hook = None # type: Optional[str] @@ -580,7 +582,12 @@ class MatrixServer(object): ).format(prefix=W.prefix("network"), script_name=SCRIPT_NAME) W.prnt(self.server_buffer, msg) timeout = 0 if self.transport_type == TransportType.HTTP else 30000 - sync_filter = {"room": {"timeline": {"limit": 5000}}} + sync_filter = { + "room": { + "timeline": {"limit": 5000}, + "state": {"lazy_load_members": True} + } + } self.sync(timeout, sync_filter) return @@ -725,6 +732,10 @@ class MatrixServer(object): self.keys_queried = True self.send_or_queue(request) + def get_joined_members(self, room_id): + _, request = self.client.joined_members(room_id) + self.send(request) + def _print_message_error(self, message): server_buffer_prnt( self, @@ -817,7 +828,10 @@ class MatrixServer(object): sync_filter = { "room": { - "timeline": {"limit": G.CONFIG.network.max_initial_sync_events} + "timeline": { + "limit": G.CONFIG.network.max_initial_sync_events + }, + "state": {"lazy_load_members": True} } } self.sync(timeout=0, sync_filter=sync_filter) @@ -891,6 +905,12 @@ class MatrixServer(object): return False + def _hook_lazy_user_adding(self): + if not self.lazy_load_hook: + hook = W.hook_timer(1 * 1000, 0, 0, + "matrix_load_users_cb", self.name) + self.lazy_load_hook = hook + def _handle_sync(self, response): # we got the same batch again, nothing to do if self.next_batch == response.next_batch: @@ -904,23 +924,23 @@ class MatrixServer(object): if self.client.should_upload_keys: self.keys_upload() - # Query the keys for all users in the encrypted rooms after our - # initial sync - if not self.next_batch and not self.keys_queried: - self.keys_query(True) - elif self.client.should_query_keys and not self.keys_queried: + if self.client.should_query_keys and not self.keys_queried: self.keys_query() for room_buffer in self.room_buffers.values(): + if not self.next_batch: + self.rooms_with_missing_members.append( + room_buffer.room.room_id + ) if room_buffer.unhandled_users: - if not self.lazy_load_hook: - hook = W.hook_timer(1 * 1000, 0, 0, - "matrix_load_users_cb", self.name) - self.lazy_load_hook = hook + self._hook_lazy_user_adding() break self.next_batch = response.next_batch self.schedule_sync() + + if self.rooms_with_missing_members: + self.get_joined_members(self.rooms_with_missing_members.pop()) else: if not self.partial_sync_hook: hook = W.hook_timer(1 * 100, 0, 0, "matrix_partial_sync_cb", @@ -955,6 +975,9 @@ class MatrixServer(object): if isinstance(response, (SyncError, LoginError)): self.disconnect() + elif isinstance(response, JoinedMembersError): + self.rooms_with_missing_members.append(response.room_id) + self.get_joined_members(self.rooms_with_missing_members.pop()) def handle_response(self, response): # type: (Response) -> None @@ -999,6 +1022,22 @@ class MatrixServer(object): elif isinstance(response, KeysQueryResponse): self.keys_queried = False + elif isinstance(response, JoinedMembersResponse): + room_buffer = self.room_buffers[response.room_id] + users = [user.user_id for user in response.members] + + # Don't add the users directly use the lazy load hook. + room_buffer.unhandled_users += users + self._hook_lazy_user_adding() + + # Fetch the users for the next room. + if self.rooms_with_missing_members: + self.get_joined_members(self.rooms_with_missing_members.pop()) + # We are done adding all the users, do a full key query now since + # the client knows all the encrypted room members. + else: + self.keys_query(True) + elif isinstance(response, KeysClaimResponse): self.keys_claimed[response.room_id] = False try: @@ -1208,7 +1247,12 @@ def matrix_timer_cb(server_name, remaining_calls): if server.sync_time and current_time > (server.sync_time + 2): timeout = 0 if server.transport_type == TransportType.HTTP else 30000 - sync_filter = {"room": {"timeline": {"limit": 5000}}} + sync_filter = { + "room": { + "timeline": {"limit": 5000}, + "state": {"lazy_load_members": True} + } + } server.sync(timeout, sync_filter) if not server.next_batch: