server: Utilize partial sync responses.

This commit is contained in:
Damir Jelić 2018-10-12 13:52:48 +02:00
parent 7a5c0c9c0e
commit a8b62577d7
5 changed files with 58 additions and 19 deletions

View file

@ -57,12 +57,13 @@ from matrix.completion import (init_completion, matrix_command_completion_cb,
from matrix.config import (MatrixConfig, config_log_category_cb, from matrix.config import (MatrixConfig, config_log_category_cb,
config_log_level_cb, config_server_buffer_cb, config_log_level_cb, config_server_buffer_cb,
matrix_config_reload_cb, config_pgup_cb) matrix_config_reload_cb, config_pgup_cb)
from matrix.globals import SCRIPT_NAME, SERVERS, W from matrix.globals import SCRIPT_NAME, SERVERS, W, MAX_EVENTS
from matrix.server import (MatrixServer, create_default_server, from matrix.server import (MatrixServer, create_default_server,
matrix_config_server_change_cb, matrix_config_server_change_cb,
matrix_config_server_read_cb, matrix_config_server_read_cb,
matrix_config_server_write_cb, matrix_timer_cb, matrix_config_server_write_cb, matrix_timer_cb,
send_cb, matrix_load_users_cb) send_cb, matrix_load_users_cb,
matrix_partial_sync_cb)
from matrix.utf import utf8_decode from matrix.utf import utf8_decode
from matrix.utils import server_buffer_prnt, server_buffer_set_title from matrix.utils import server_buffer_prnt, server_buffer_set_title
@ -288,7 +289,7 @@ def receive_cb(server_name, file_descriptor):
server.disconnect() server.disconnect()
break break
response = server.client.next_response() response = server.client.next_response(MAX_EVENTS)
# Check if we need to send some data back # Check if we need to send some data back
data_to_send = server.client.data_to_send() data_to_send = server.client.data_to_send()

View file

@ -109,7 +109,7 @@ def matrix_bar_item_buffer_modes(data, item, window, buffer, extra_info):
if not server.connected: if not server.connected:
modes.append("") modes.append("")
if room_buffer.backlog_pending: if room_buffer.backlog_pending or server.busy:
modes.append("") modes.append("")
return "".join(modes) return "".join(modes)

View file

@ -816,6 +816,7 @@ class RoomBuffer(object):
def backlog_pending(self, value): def backlog_pending(self, value):
self._backlog_pending = value self._backlog_pending = value
W.bar_item_update("buffer_modes") W.bar_item_update("buffer_modes")
W.bar_item_update("matrix_modes")
@property @property
def warning_prefix(self): def warning_prefix(self):

View file

@ -39,3 +39,4 @@ SERVERS = dict() # type: Dict[str, MatrixServer]
CONFIG = None # type: Optional[MatrixConfig] CONFIG = None # type: Optional[MatrixConfig]
ENCRYPTION = True # type: bool ENCRYPTION = True # type: bool
SCRIPT_NAME = "matrix" # type: str SCRIPT_NAME = "matrix" # type: str
MAX_EVENTS = 10

View file

@ -32,6 +32,7 @@ from nio import (
Rooms, Rooms,
RoomSendResponse, RoomSendResponse,
SyncResponse, SyncResponse,
PartialSyncResponse,
ShareGroupSessionResponse, ShareGroupSessionResponse,
KeysClaimResponse, KeysClaimResponse,
TransportResponse, TransportResponse,
@ -46,7 +47,7 @@ from nio import (
from . import globals as G from . import globals as G
from .buffer import OwnAction, OwnMessage, RoomBuffer from .buffer import OwnAction, OwnMessage, RoomBuffer
from .config import ConfigSection, Option, ServerBufferType from .config import ConfigSection, Option, ServerBufferType
from .globals import SCRIPT_NAME, SERVERS, W from .globals import SCRIPT_NAME, SERVERS, W, MAX_EVENTS
from .utf import utf8_decode from .utf import utf8_decode
from .utils import create_server_buffer, key_from_value, server_buffer_prnt from .utils import create_server_buffer, key_from_value, server_buffer_prnt
@ -228,6 +229,7 @@ class MatrixServer(object):
self.transaction_id = 0 # type: int self.transaction_id = 0 # type: int
self.lag = 0 # type: int self.lag = 0 # type: int
self.lag_done = False # type: bool self.lag_done = False # type: bool
self.busy = False # type: bool
self.send_fd_hook = None # type: Optional[str] self.send_fd_hook = None # type: Optional[str]
self.send_buffer = b"" # type: bytes self.send_buffer = b"" # type: bytes
@ -240,6 +242,7 @@ class MatrixServer(object):
self.unhandled_users = dict() # type: Dict[str, List[str]] self.unhandled_users = dict() # type: Dict[str, List[str]]
self.lazy_load_hook = None # type: Optional[str] self.lazy_load_hook = None # type: Optional[str]
self.partial_sync_hook = None # type: Optional[str]
self.keys_claimed = defaultdict(bool) self.keys_claimed = defaultdict(bool)
self.group_session_shared = defaultdict(bool) self.group_session_shared = defaultdict(bool)
@ -793,13 +796,6 @@ class MatrixServer(object):
room_buffer = self.find_room_from_id(room_id) room_buffer = self.find_room_from_id(room_id)
room_buffer.handle_joined_room(info) room_buffer.handle_joined_room(info)
if room_buffer.unhandled_users:
should_lazy_hook = True
if should_lazy_hook:
hook = W.hook_timer(1 * 100, 0, 0, "matrix_load_users_cb",
self.name)
self.lazy_load_hook = hook
def add_unhandled_users(self, rooms, n): def add_unhandled_users(self, rooms, n):
# type: (List[RoomBuffer], int) -> bool # type: (List[RoomBuffer], int) -> bool
@ -837,15 +833,31 @@ class MatrixServer(object):
self._handle_room_info(response) self._handle_room_info(response)
self.next_batch = response.next_batch # Full sync response handle everything.
if isinstance(response, SyncResponse):
if self.client.should_upload_keys:
self.keys_upload()
if self.client.should_upload_keys: if self.client.should_query_keys and not self.keys_queried:
self.keys_upload() self.keys_query()
if self.client.should_query_keys and not self.keys_queried: for room_buffer in self.room_buffers.values():
self.keys_query() if room_buffer.unhandled_users:
hook = W.hook_timer(1 * 100, 0, 0, "matrix_load_users_cb",
self.name)
self.lazy_load_hook = hook
break
self.schedule_sync() self.next_batch = response.next_batch
self.schedule_sync()
else:
if not self.partial_sync_hook:
hook = W.hook_timer(1 * 100, 0, 0, "matrix_partial_sync_cb",
self.name)
self.partial_sync_hook = hook
self.busy = True
W.bar_item_update("buffer_modes")
W.bar_item_update("matrix_modes")
def handle_transport_response(self, response): def handle_transport_response(self, response):
self.error( self.error(
@ -877,7 +889,7 @@ class MatrixServer(object):
elif isinstance(response, LoginResponse): elif isinstance(response, LoginResponse):
self._handle_login(response) self._handle_login(response)
elif isinstance(response, SyncResponse): elif isinstance(response, (SyncResponse, PartialSyncResponse)):
self._handle_sync(response) self._handle_sync(response)
elif isinstance(response, RoomSendResponse): elif isinstance(response, RoomSendResponse):
@ -1022,6 +1034,30 @@ def matrix_config_server_change_cb(server_name, option):
return 1 return 1
@utf8_decode
def matrix_partial_sync_cb(server_name, remaining_calls):
start = time.time()
server = SERVERS[server_name]
W.unhook(server.partial_sync_hook)
server.partial_sync_hook = None
response = server.client.next_response(MAX_EVENTS)
while response:
server.handle_response(response)
current = time.time()
if current - start >= 0.1:
break
response = server.client.next_response(MAX_EVENTS)
if not server.partial_sync_hook:
server.busy = False
W.bar_item_update("buffer_modes")
W.bar_item_update("matrix_modes")
return W.WEECHAT_RC_OK
@utf8_decode @utf8_decode
def matrix_load_users_cb(server_name, remaining_calls): def matrix_load_users_cb(server_name, remaining_calls):
server = SERVERS[server_name] server = SERVERS[server_name]