From c967731c0fa9d790fdd28f552e7568300aa48161 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?poljar=20=28Damir=20Jeli=C4=87=29?= Date: Mon, 19 Mar 2018 14:51:07 +0100 Subject: [PATCH] events: Make sync event execution batched. --- main.py | 7 +++++++ matrix/events.py | 21 ++++++-------------- matrix/rooms.py | 4 ++-- matrix/server.py | 50 ++++++++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 63 insertions(+), 19 deletions(-) diff --git a/main.py b/main.py index 64a0168..d14ed1b 100644 --- a/main.py +++ b/main.py @@ -143,6 +143,13 @@ def print_certificate_info(buff, sock, cert): W.prnt(buff, message) +@utf8_decode +def matrix_event_timer_cb(server_name, remaining_calls): + server = SERVERS[server_name] + server.handle_events() + return W.WEECHAT_RC_OK + + def wrap_socket(server, file_descriptor): # type: (MatrixServer, int) -> None sock = None # type: socket.socket diff --git a/matrix/events.py b/matrix/events.py index b1582d0..b70141d 100644 --- a/matrix/events.py +++ b/matrix/events.py @@ -17,6 +17,7 @@ from __future__ import unicode_literals from builtins import str +from collections import deque from functools import partial from operator import itemgetter @@ -422,7 +423,7 @@ class MatrixSyncEvent(MatrixEvent): return MatrixErrorEvent.from_dict(server, "Error syncing", False, parsed_dict) - def _execute_joined_info(self): + def _queue_joined_info(self): server = self.server while self.joined_room_infos: @@ -436,17 +437,7 @@ class MatrixSyncEvent(MatrixEvent): if not room.prev_batch: room.prev_batch = info.prev_batch - for event in info.events: - self._execute_room_event(event, info.room_id) - - def _execute_room_event(self, event, room_id): - server = self.server - - room = server.rooms[room_id] - buf = server.buffers[room_id] - - tags = tags_for_message("message") - event.execute(server, room, buf, list(tags)) + server.event_queue.append(info) def execute(self): server = self.server @@ -456,7 +447,7 @@ class MatrixSyncEvent(MatrixEvent): server.sync() return - self._execute_joined_info() - + self._queue_joined_info() server.next_batch = self.next_batch - server.sync() + + server.handle_events() diff --git a/matrix/rooms.py b/matrix/rooms.py index b075210..5b5d93d 100644 --- a/matrix/rooms.py +++ b/matrix/rooms.py @@ -19,7 +19,7 @@ from builtins import str from pprint import pformat -from collections import namedtuple +from collections import namedtuple, deque from datetime import datetime from matrix.globals import W, OPTIONS @@ -192,7 +192,7 @@ class RoomInfo(): # type: (str, str, List[Any], List[Any]) -> None self.room_id = room_id self.prev_batch = prev_batch - self.events = events + self.events = deque(events) @staticmethod def _message_from_event(event): diff --git a/matrix/server.py b/matrix/server.py index ea78287..ab67015 100644 --- a/matrix/server.py +++ b/matrix/server.py @@ -29,7 +29,7 @@ from http_parser.pyparser import HttpParser from matrix.plugin_options import Option, DebugType from matrix.utils import (key_from_value, prnt_debug, server_buffer_prnt, - create_server_buffer) + create_server_buffer, tags_for_message) from matrix.utf import utf8_decode from matrix.globals import W, SERVERS, OPTIONS import matrix.api as API @@ -89,7 +89,8 @@ class MatrixServer: # Queue of messages we send off and are waiting a response for self.receive_queue = deque() # type: Deque[MatrixMessage] - self.message_queue = deque() # type: Deque[MatrixMessage] + self.event_queue_timer = None + self.event_queue = deque() # type: Deque[RoomInfo] self._create_options(config_file) self._create_session_dir() @@ -434,6 +435,51 @@ class MatrixServer: server_buffer_prnt(self, pprint.pformat(message.request.payload)) server_buffer_prnt(self, pprint.pformat(message.response.body)) + def _loop_events(self, info, n): + + for i in range(n+1): + try: + event = info.events.popleft() + except IndexError: + return i + + room = self.rooms[info.room_id] + buf = self.buffers[info.room_id] + + tags = tags_for_message("message") + event.execute(self, room, buf, tags) + + self.event_queue.appendleft(info) + return i + + def handle_events(self): + n = 25 + + while True: + try: + info = self.event_queue.popleft() + except IndexError: + if self.event_queue_timer: + W.unhook(self.event_queue_timer) + self.event_queue_timer = None + + self.sync() + return + + ret = self._loop_events(info, n) + + if ret < n: + n = n - ret + else: + self.event_queue.appendleft(info) + + if not self.event_queue_timer: + hook = W.hook_timer(1 * 100, 0, 0, "matrix_event_timer_cb", + self.name) + self.event_queue_timer = hook + + return + def handle_response(self, message): # type: (MatrixMessage) -> None