events: Make sync event execution batched.

This commit is contained in:
poljar (Damir Jelić) 2018-03-19 14:51:07 +01:00
parent a724d7fe58
commit c967731c0f
4 changed files with 63 additions and 19 deletions

View file

@ -143,6 +143,13 @@ def print_certificate_info(buff, sock, cert):
W.prnt(buff, message)
@utf8_decode
def matrix_event_timer_cb(server_name, remaining_calls):
server = SERVERS[server_name]
server.handle_events()
return W.WEECHAT_RC_OK
def wrap_socket(server, file_descriptor):
# type: (MatrixServer, int) -> None
sock = None # type: socket.socket

View file

@ -17,6 +17,7 @@
from __future__ import unicode_literals
from builtins import str
from collections import deque
from functools import partial
from operator import itemgetter
@ -422,7 +423,7 @@ class MatrixSyncEvent(MatrixEvent):
return MatrixErrorEvent.from_dict(server, "Error syncing", False,
parsed_dict)
def _execute_joined_info(self):
def _queue_joined_info(self):
server = self.server
while self.joined_room_infos:
@ -436,17 +437,7 @@ class MatrixSyncEvent(MatrixEvent):
if not room.prev_batch:
room.prev_batch = info.prev_batch
for event in info.events:
self._execute_room_event(event, info.room_id)
def _execute_room_event(self, event, room_id):
server = self.server
room = server.rooms[room_id]
buf = server.buffers[room_id]
tags = tags_for_message("message")
event.execute(server, room, buf, list(tags))
server.event_queue.append(info)
def execute(self):
server = self.server
@ -456,7 +447,7 @@ class MatrixSyncEvent(MatrixEvent):
server.sync()
return
self._execute_joined_info()
self._queue_joined_info()
server.next_batch = self.next_batch
server.sync()
server.handle_events()

View file

@ -19,7 +19,7 @@ from builtins import str
from pprint import pformat
from collections import namedtuple
from collections import namedtuple, deque
from datetime import datetime
from matrix.globals import W, OPTIONS
@ -192,7 +192,7 @@ class RoomInfo():
# type: (str, str, List[Any], List[Any]) -> None
self.room_id = room_id
self.prev_batch = prev_batch
self.events = events
self.events = deque(events)
@staticmethod
def _message_from_event(event):

View file

@ -29,7 +29,7 @@ from http_parser.pyparser import HttpParser
from matrix.plugin_options import Option, DebugType
from matrix.utils import (key_from_value, prnt_debug, server_buffer_prnt,
create_server_buffer)
create_server_buffer, tags_for_message)
from matrix.utf import utf8_decode
from matrix.globals import W, SERVERS, OPTIONS
import matrix.api as API
@ -89,7 +89,8 @@ class MatrixServer:
# Queue of messages we send off and are waiting a response for
self.receive_queue = deque() # type: Deque[MatrixMessage]
self.message_queue = deque() # type: Deque[MatrixMessage]
self.event_queue_timer = None
self.event_queue = deque() # type: Deque[RoomInfo]
self._create_options(config_file)
self._create_session_dir()
@ -434,6 +435,51 @@ class MatrixServer:
server_buffer_prnt(self, pprint.pformat(message.request.payload))
server_buffer_prnt(self, pprint.pformat(message.response.body))
def _loop_events(self, info, n):
for i in range(n+1):
try:
event = info.events.popleft()
except IndexError:
return i
room = self.rooms[info.room_id]
buf = self.buffers[info.room_id]
tags = tags_for_message("message")
event.execute(self, room, buf, tags)
self.event_queue.appendleft(info)
return i
def handle_events(self):
n = 25
while True:
try:
info = self.event_queue.popleft()
except IndexError:
if self.event_queue_timer:
W.unhook(self.event_queue_timer)
self.event_queue_timer = None
self.sync()
return
ret = self._loop_events(info, n)
if ret < n:
n = n - ret
else:
self.event_queue.appendleft(info)
if not self.event_queue_timer:
hook = W.hook_timer(1 * 100, 0, 0, "matrix_event_timer_cb",
self.name)
self.event_queue_timer = hook
return
def handle_response(self, message):
# type: (MatrixMessage) -> None