server: Use server side lazy user loading.

This commit is contained in:
Damir Jelić 2018-10-30 16:15:27 +01:00
parent 60282b4eec
commit 3be1b7bfc3

View file

@ -50,6 +50,8 @@ from nio import (
ErrorResponse, ErrorResponse,
SyncError, SyncError,
LoginError, LoginError,
JoinedMembersResponse,
JoinedMembersError,
) )
from . import globals as G from . import globals as G
@ -250,7 +252,7 @@ class MatrixServer(object):
# type: DefaultDict[str, Deque[EncrytpionQueueItem]] # type: DefaultDict[str, Deque[EncrytpionQueueItem]]
self.backlog_queue = dict() # type: Dict[str, str] self.backlog_queue = dict() # type: Dict[str, str]
self.unhandled_users = dict() # type: Dict[str, List[str]] self.rooms_with_missing_members = [] # type: List[str]
self.lazy_load_hook = None # type: Optional[str] self.lazy_load_hook = None # type: Optional[str]
self.partial_sync_hook = None # type: Optional[str] self.partial_sync_hook = None # type: Optional[str]
@ -580,7 +582,12 @@ class MatrixServer(object):
).format(prefix=W.prefix("network"), script_name=SCRIPT_NAME) ).format(prefix=W.prefix("network"), script_name=SCRIPT_NAME)
W.prnt(self.server_buffer, msg) W.prnt(self.server_buffer, msg)
timeout = 0 if self.transport_type == TransportType.HTTP else 30000 timeout = 0 if self.transport_type == TransportType.HTTP else 30000
sync_filter = {"room": {"timeline": {"limit": 5000}}} sync_filter = {
"room": {
"timeline": {"limit": 5000},
"state": {"lazy_load_members": True}
}
}
self.sync(timeout, sync_filter) self.sync(timeout, sync_filter)
return return
@ -725,6 +732,10 @@ class MatrixServer(object):
self.keys_queried = True self.keys_queried = True
self.send_or_queue(request) self.send_or_queue(request)
def get_joined_members(self, room_id):
_, request = self.client.joined_members(room_id)
self.send(request)
def _print_message_error(self, message): def _print_message_error(self, message):
server_buffer_prnt( server_buffer_prnt(
self, self,
@ -817,7 +828,10 @@ class MatrixServer(object):
sync_filter = { sync_filter = {
"room": { "room": {
"timeline": {"limit": G.CONFIG.network.max_initial_sync_events} "timeline": {
"limit": G.CONFIG.network.max_initial_sync_events
},
"state": {"lazy_load_members": True}
} }
} }
self.sync(timeout=0, sync_filter=sync_filter) self.sync(timeout=0, sync_filter=sync_filter)
@ -891,6 +905,12 @@ class MatrixServer(object):
return False return False
def _hook_lazy_user_adding(self):
if not self.lazy_load_hook:
hook = W.hook_timer(1 * 1000, 0, 0,
"matrix_load_users_cb", self.name)
self.lazy_load_hook = hook
def _handle_sync(self, response): def _handle_sync(self, response):
# we got the same batch again, nothing to do # we got the same batch again, nothing to do
if self.next_batch == response.next_batch: if self.next_batch == response.next_batch:
@ -904,23 +924,23 @@ class MatrixServer(object):
if self.client.should_upload_keys: if self.client.should_upload_keys:
self.keys_upload() self.keys_upload()
# Query the keys for all users in the encrypted rooms after our if self.client.should_query_keys and not self.keys_queried:
# initial sync
if not self.next_batch and not self.keys_queried:
self.keys_query(True)
elif self.client.should_query_keys and not self.keys_queried:
self.keys_query() self.keys_query()
for room_buffer in self.room_buffers.values(): for room_buffer in self.room_buffers.values():
if not self.next_batch:
self.rooms_with_missing_members.append(
room_buffer.room.room_id
)
if room_buffer.unhandled_users: if room_buffer.unhandled_users:
if not self.lazy_load_hook: self._hook_lazy_user_adding()
hook = W.hook_timer(1 * 1000, 0, 0,
"matrix_load_users_cb", self.name)
self.lazy_load_hook = hook
break break
self.next_batch = response.next_batch self.next_batch = response.next_batch
self.schedule_sync() self.schedule_sync()
if self.rooms_with_missing_members:
self.get_joined_members(self.rooms_with_missing_members.pop())
else: else:
if not self.partial_sync_hook: if not self.partial_sync_hook:
hook = W.hook_timer(1 * 100, 0, 0, "matrix_partial_sync_cb", hook = W.hook_timer(1 * 100, 0, 0, "matrix_partial_sync_cb",
@ -955,6 +975,9 @@ class MatrixServer(object):
if isinstance(response, (SyncError, LoginError)): if isinstance(response, (SyncError, LoginError)):
self.disconnect() self.disconnect()
elif isinstance(response, JoinedMembersError):
self.rooms_with_missing_members.append(response.room_id)
self.get_joined_members(self.rooms_with_missing_members.pop())
def handle_response(self, response): def handle_response(self, response):
# type: (Response) -> None # type: (Response) -> None
@ -999,6 +1022,22 @@ class MatrixServer(object):
elif isinstance(response, KeysQueryResponse): elif isinstance(response, KeysQueryResponse):
self.keys_queried = False self.keys_queried = False
elif isinstance(response, JoinedMembersResponse):
room_buffer = self.room_buffers[response.room_id]
users = [user.user_id for user in response.members]
# Don't add the users directly use the lazy load hook.
room_buffer.unhandled_users += users
self._hook_lazy_user_adding()
# Fetch the users for the next room.
if self.rooms_with_missing_members:
self.get_joined_members(self.rooms_with_missing_members.pop())
# We are done adding all the users, do a full key query now since
# the client knows all the encrypted room members.
else:
self.keys_query(True)
elif isinstance(response, KeysClaimResponse): elif isinstance(response, KeysClaimResponse):
self.keys_claimed[response.room_id] = False self.keys_claimed[response.room_id] = False
try: try:
@ -1208,7 +1247,12 @@ def matrix_timer_cb(server_name, remaining_calls):
if server.sync_time and current_time > (server.sync_time + 2): if server.sync_time and current_time > (server.sync_time + 2):
timeout = 0 if server.transport_type == TransportType.HTTP else 30000 timeout = 0 if server.transport_type == TransportType.HTTP else 30000
sync_filter = {"room": {"timeline": {"limit": 5000}}} sync_filter = {
"room": {
"timeline": {"limit": 5000},
"state": {"lazy_load_members": True}
}
}
server.sync(timeout, sync_filter) server.sync(timeout, sync_filter)
if not server.next_batch: if not server.next_batch: