From a8b62577d7e6df7c36fe90955ea722d7b8593434 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Fri, 12 Oct 2018 13:52:48 +0200 Subject: [PATCH] server: Utilize partial sync responses. --- main.py | 7 ++--- matrix/bar_items.py | 2 +- matrix/buffer.py | 1 + matrix/globals.py | 1 + matrix/server.py | 66 ++++++++++++++++++++++++++++++++++----------- 5 files changed, 58 insertions(+), 19 deletions(-) diff --git a/main.py b/main.py index 3c17504..9727c9e 100644 --- a/main.py +++ b/main.py @@ -57,12 +57,13 @@ from matrix.completion import (init_completion, matrix_command_completion_cb, from matrix.config import (MatrixConfig, config_log_category_cb, config_log_level_cb, config_server_buffer_cb, matrix_config_reload_cb, config_pgup_cb) -from matrix.globals import SCRIPT_NAME, SERVERS, W +from matrix.globals import SCRIPT_NAME, SERVERS, W, MAX_EVENTS from matrix.server import (MatrixServer, create_default_server, matrix_config_server_change_cb, matrix_config_server_read_cb, matrix_config_server_write_cb, matrix_timer_cb, - send_cb, matrix_load_users_cb) + send_cb, matrix_load_users_cb, + matrix_partial_sync_cb) from matrix.utf import utf8_decode from matrix.utils import server_buffer_prnt, server_buffer_set_title @@ -288,7 +289,7 @@ def receive_cb(server_name, file_descriptor): server.disconnect() break - response = server.client.next_response() + response = server.client.next_response(MAX_EVENTS) # Check if we need to send some data back data_to_send = server.client.data_to_send() diff --git a/matrix/bar_items.py b/matrix/bar_items.py index b154003..7d7f111 100644 --- a/matrix/bar_items.py +++ b/matrix/bar_items.py @@ -109,7 +109,7 @@ def matrix_bar_item_buffer_modes(data, item, window, buffer, extra_info): if not server.connected: modes.append("❌") - if room_buffer.backlog_pending: + if room_buffer.backlog_pending or server.busy: modes.append("⏳") return "".join(modes) diff --git a/matrix/buffer.py b/matrix/buffer.py index cdd5623..c14e3f6 100644 --- a/matrix/buffer.py +++ b/matrix/buffer.py @@ -816,6 +816,7 @@ class RoomBuffer(object): def backlog_pending(self, value): self._backlog_pending = value W.bar_item_update("buffer_modes") + W.bar_item_update("matrix_modes") @property def warning_prefix(self): diff --git a/matrix/globals.py b/matrix/globals.py index b8e60d4..8f2b6bb 100644 --- a/matrix/globals.py +++ b/matrix/globals.py @@ -39,3 +39,4 @@ SERVERS = dict() # type: Dict[str, MatrixServer] CONFIG = None # type: Optional[MatrixConfig] ENCRYPTION = True # type: bool SCRIPT_NAME = "matrix" # type: str +MAX_EVENTS = 10 diff --git a/matrix/server.py b/matrix/server.py index 1e9f8a6..8e6c7c2 100644 --- a/matrix/server.py +++ b/matrix/server.py @@ -32,6 +32,7 @@ from nio import ( Rooms, RoomSendResponse, SyncResponse, + PartialSyncResponse, ShareGroupSessionResponse, KeysClaimResponse, TransportResponse, @@ -46,7 +47,7 @@ from nio import ( from . import globals as G from .buffer import OwnAction, OwnMessage, RoomBuffer from .config import ConfigSection, Option, ServerBufferType -from .globals import SCRIPT_NAME, SERVERS, W +from .globals import SCRIPT_NAME, SERVERS, W, MAX_EVENTS from .utf import utf8_decode from .utils import create_server_buffer, key_from_value, server_buffer_prnt @@ -228,6 +229,7 @@ class MatrixServer(object): self.transaction_id = 0 # type: int self.lag = 0 # type: int self.lag_done = False # type: bool + self.busy = False # type: bool self.send_fd_hook = None # type: Optional[str] self.send_buffer = b"" # type: bytes @@ -240,6 +242,7 @@ class MatrixServer(object): self.unhandled_users = dict() # type: Dict[str, List[str]] self.lazy_load_hook = None # type: Optional[str] + self.partial_sync_hook = None # type: Optional[str] self.keys_claimed = defaultdict(bool) self.group_session_shared = defaultdict(bool) @@ -793,13 +796,6 @@ class MatrixServer(object): room_buffer = self.find_room_from_id(room_id) room_buffer.handle_joined_room(info) - if room_buffer.unhandled_users: - should_lazy_hook = True - - if should_lazy_hook: - hook = W.hook_timer(1 * 100, 0, 0, "matrix_load_users_cb", - self.name) - self.lazy_load_hook = hook def add_unhandled_users(self, rooms, n): # type: (List[RoomBuffer], int) -> bool @@ -837,15 +833,31 @@ class MatrixServer(object): self._handle_room_info(response) - self.next_batch = response.next_batch + # Full sync response handle everything. + if isinstance(response, SyncResponse): + if self.client.should_upload_keys: + self.keys_upload() - if self.client.should_upload_keys: - self.keys_upload() + if self.client.should_query_keys and not self.keys_queried: + self.keys_query() - if self.client.should_query_keys and not self.keys_queried: - self.keys_query() + for room_buffer in self.room_buffers.values(): + if room_buffer.unhandled_users: + hook = W.hook_timer(1 * 100, 0, 0, "matrix_load_users_cb", + self.name) + self.lazy_load_hook = hook + break - self.schedule_sync() + self.next_batch = response.next_batch + self.schedule_sync() + else: + if not self.partial_sync_hook: + hook = W.hook_timer(1 * 100, 0, 0, "matrix_partial_sync_cb", + self.name) + self.partial_sync_hook = hook + self.busy = True + W.bar_item_update("buffer_modes") + W.bar_item_update("matrix_modes") def handle_transport_response(self, response): self.error( @@ -877,7 +889,7 @@ class MatrixServer(object): elif isinstance(response, LoginResponse): self._handle_login(response) - elif isinstance(response, SyncResponse): + elif isinstance(response, (SyncResponse, PartialSyncResponse)): self._handle_sync(response) elif isinstance(response, RoomSendResponse): @@ -1022,6 +1034,30 @@ def matrix_config_server_change_cb(server_name, option): return 1 +@utf8_decode +def matrix_partial_sync_cb(server_name, remaining_calls): + start = time.time() + server = SERVERS[server_name] + W.unhook(server.partial_sync_hook) + server.partial_sync_hook = None + + response = server.client.next_response(MAX_EVENTS) + + while response: + server.handle_response(response) + current = time.time() + if current - start >= 0.1: + break + response = server.client.next_response(MAX_EVENTS) + + if not server.partial_sync_hook: + server.busy = False + W.bar_item_update("buffer_modes") + W.bar_item_update("matrix_modes") + + return W.WEECHAT_RC_OK + + @utf8_decode def matrix_load_users_cb(server_name, remaining_calls): server = SERVERS[server_name]