weechat-matrix: Store/restore sync tokens.

This commit is contained in:
Damir Jelić 2019-09-10 18:51:58 +02:00
parent 58ef479f8e
commit eee63decb1
5 changed files with 69 additions and 83 deletions

34
main.py
View file

@ -82,13 +82,12 @@ from matrix.completion import (init_completion, matrix_command_completion_cb,
from matrix.config import (MatrixConfig, config_log_category_cb,
config_log_level_cb, config_server_buffer_cb,
matrix_config_reload_cb, config_pgup_cb)
from matrix.globals import SCRIPT_NAME, SERVERS, W, MAX_EVENTS
from matrix.globals import SCRIPT_NAME, SERVERS, W
from matrix.server import (MatrixServer, create_default_server,
matrix_config_server_change_cb,
matrix_config_server_read_cb,
matrix_config_server_write_cb, matrix_timer_cb,
send_cb, matrix_load_users_cb,
matrix_partial_sync_cb)
send_cb, matrix_load_users_cb)
from matrix.utf import utf8_decode
from matrix.utils import server_buffer_prnt, server_buffer_set_title
@ -325,7 +324,7 @@ def receive_cb(server_name, file_descriptor):
server.disconnect()
break
response = server.client.next_response(MAX_EVENTS)
response = server.client.next_response()
# Check if we need to send some data back
data_to_send = server.client.data_to_send()
@ -493,9 +492,6 @@ def matrix_unload_cb():
G.CONFIG.free()
# for server in SERVERS.values():
# server.store_olm()
return W.WEECHAT_RC_OK
@ -565,21 +561,27 @@ def buffer_switch_cb(_, _signal, buffer_ptr):
if not room_buffer:
continue
if room_buffer.should_send_read_marker:
event_id = room_buffer.last_event_id
last_event_id = room_buffer.last_event_id
if room_buffer.should_send_read_marker:
# A buffer may not have any events, in that case no event id is
# here returned
if event_id:
if last_event_id:
server.room_send_read_marker(
room_buffer.room.room_id, event_id)
room_buffer.last_read_event = event_id
room_buffer.room.room_id, last_event_id)
room_buffer.last_read_event = last_event_id
if room_buffer.members_fetched:
return W.WEECHAT_RC_OK
if not room_buffer.members_fetched:
room_id = room_buffer.room.room_id
print("HELLO FETCHING FOR {}".format(room_id))
server.get_joined_members(room_id)
room_id = room_buffer.room.room_id
server.get_joined_members(room_id)
# The buffer is empty and we are seeing it for the first time.
# Let us fetch some messages from the room history so it doesn't feel so
# empty.
if room_buffer.first_view and not last_event_id:
if server.room_get_messages(room_buffer.room.room_id):
room_buffer.first_view = True
break

View file

@ -861,6 +861,7 @@ class RoomBuffer(object):
self.joined = True
self.leave_event_id = None # type: Optional[str]
self.members_fetched = False
self.first_view = True
self.unhandled_users = [] # type: List[str]
self.inactive_users = []

View file

@ -938,12 +938,12 @@ def matrix_me_command_cb(data, buffer, args):
W.prnt(server.server_buffer, message)
return W.WEECHAT_RC_ERROR
room_buffer = server.find_room_from_ptr(buffer)
if not server.client.logged_in:
room_buffer.error("You are not logged in.")
return W.WEECHAT_RC_ERROR
room_buffer = server.find_room_from_ptr(buffer)
if not args:
return W.WEECHAT_RC_OK

View file

@ -42,7 +42,6 @@ SERVERS = dict() # type: Dict[str, MatrixServer]
CONFIG = None # type: Optional[MatrixConfig]
ENCRYPTION = True # type: bool
SCRIPT_NAME = "matrix" # type: str
MAX_EVENTS = 100
TYPING_NOTICE_TIMEOUT = 4000 # 4 seconds typing notice lifetime
LOGGER = Logger("weechat-matrix")
UPLOADS = OrderedDict() # type: Dict[str, Upload]

View file

@ -40,6 +40,7 @@ from uuid import UUID
from nio import (
Api,
HttpClient,
ClientConfig,
LocalProtocolError,
LoginResponse,
LoginInfoResponse,
@ -48,7 +49,6 @@ from nio import (
RoomSendResponse,
RoomSendError,
SyncResponse,
PartialSyncResponse,
ShareGroupSessionResponse,
ShareGroupSessionError,
KeysQueryResponse,
@ -80,7 +80,7 @@ from nio import (
from . import globals as G
from .buffer import OwnAction, OwnMessage, RoomBuffer
from .config import ConfigSection, Option, ServerBufferType
from .globals import SCRIPT_NAME, SERVERS, W, MAX_EVENTS, TYPING_NOTICE_TIMEOUT
from .globals import SCRIPT_NAME, SERVERS, W, TYPING_NOTICE_TIMEOUT
from .utf import utf8_decode
from .utils import create_server_buffer, key_from_value, server_buffer_prnt
from .uploads import Upload
@ -296,7 +296,6 @@ class MatrixServer(object):
self.member_request_list = [] # type: List[str]
self.rooms_with_missing_members = [] # type: List[str]
self.lazy_load_hook = None # type: Optional[str]
self.partial_sync_hook = None # type: Optional[str]
# These flags remember if we made some requests so that we don't
# make them again while we wait on a response, the flags need to be
@ -379,11 +378,14 @@ class MatrixServer(object):
self.address = homeserver.hostname
self.homeserver = homeserver
config = ClientConfig(store_sync_tokens=True)
self.client = HttpClient(
homeserver.geturl(),
self.config.username,
self.device_id,
self.get_session_path(),
config=config
)
self.client.add_to_device_callback(
self.key_verification_cb,
@ -749,13 +751,15 @@ class MatrixServer(object):
def schedule_sync(self):
self.sync_time = time.time()
def sync(self, timeout=None, sync_filter=None):
def sync(self, timeout=None, sync_filter=None, full_state=False):
# type: (Optional[int], Optional[Dict[Any, Any]]) -> None
if not self.client:
return
self.sync_time = None
_, request = self.client.sync(timeout, sync_filter)
_, request = self.client.sync(timeout, sync_filter,
full_state=full_state)
self.send_or_queue(request)
def login_info(self):
@ -880,6 +884,9 @@ class MatrixServer(object):
self.send_or_queue(request)
def room_get_messages(self, room_id):
if not self.connected or not self.client.logged_in:
return
room_buffer = self.find_room_from_id(room_id)
# We're already fetching old messages
@ -1301,7 +1308,7 @@ class MatrixServer(object):
"state": {"lazy_load_members": True}
}
}
self.sync(timeout=0, sync_filter=sync_filter)
self.sync(timeout=0, sync_filter=sync_filter, full_state=True)
def _handle_room_info(self, response):
for room_id, info in response.rooms.invite.items():
@ -1458,41 +1465,40 @@ class MatrixServer(object):
# new key
# self.decrypt_printed_messages(event)
# Full sync response handle everything.
if isinstance(response, SyncResponse):
if self.client.should_upload_keys:
self.keys_upload()
if self.client.should_upload_keys:
self.keys_upload()
if self.client.should_query_keys and not self.keys_queried:
self.keys_query()
if self.client.should_query_keys and not self.keys_queried:
self.keys_query()
for room_buffer in self.room_buffers.values():
# It's our initial sync, we need to fetch room members, so add
# the room to the missing members queue.
if not self.next_batch:
if (not G.CONFIG.network.lazy_load_room_users
or room_buffer.room.encrypted):
self.rooms_with_missing_members.append(
room_buffer.room.room_id
)
if room_buffer.unhandled_users:
self._hook_lazy_user_adding()
break
for room_buffer in self.room_buffers.values():
# It's our initial sync, we need to fetch room members, so add
# the room to the missing members queue.
# 3 reasons we fetch room members here:
# * If the lazy load room users setting is off, otherwise we will
# fetch them when we switch to the buffer
# * If the room is encrypted, encryption needs the full member
# list for it to work.
# * If we are the only member, it is unlikely really an empty
# room and since we don't want a bunch of "Empty room?"
# buffers in our buffer list we fetch members here.
if not self.next_batch:
if (not G.CONFIG.network.lazy_load_room_users
or room_buffer.room.encrypted
or room_buffer.room.member_count <= 1):
self.rooms_with_missing_members.append(
room_buffer.room.room_id
)
if room_buffer.unhandled_users:
self._hook_lazy_user_adding()
break
self.next_batch = response.next_batch
self.schedule_sync()
W.bar_item_update("matrix_typing_notice")
self.next_batch = response.next_batch
self.schedule_sync()
W.bar_item_update("matrix_typing_notice")
if self.rooms_with_missing_members:
self.get_joined_members(self.rooms_with_missing_members.pop())
else:
if not self.partial_sync_hook:
hook = W.hook_timer(1 * 100, 0, 0, "matrix_partial_sync_cb",
self.name)
self.partial_sync_hook = hook
self.busy = True
W.bar_item_update("buffer_modes")
W.bar_item_update("matrix_modes")
if self.rooms_with_missing_members:
self.get_joined_members(self.rooms_with_missing_members.pop())
def handle_delete_device_auth(self, response):
device_id = self.device_deletion_queue.pop(response.uuid, None)
@ -1567,7 +1573,7 @@ class MatrixServer(object):
elif isinstance(response, LoginInfoResponse):
self._handle_login_info(response)
elif isinstance(response, (SyncResponse, PartialSyncResponse)):
elif isinstance(response, SyncResponse):
self._handle_sync(response)
elif isinstance(response, RoomSendResponse):
@ -1694,8 +1700,10 @@ class MatrixServer(object):
room = self.client.rooms[room_id]
buf = RoomBuffer(room, self.name, self.homeserver, prev_batch)
if room.members_synced:
buf.members_fetched = True
# We sadly don't get a correct summary on full_state from synapse so we
# can't trust it that the members are fully synced
# if room.members_synced:
# buf.members_fetched = True
self.room_buffers[room_id] = buf
self.buffers[room_id] = buf.weechat_buffer._ptr
@ -1842,30 +1850,6 @@ def matrix_config_server_change_cb(server_name, option):
return 1
@utf8_decode
def matrix_partial_sync_cb(server_name, remaining_calls):
start = time.time()
server = SERVERS[server_name]
W.unhook(server.partial_sync_hook)
server.partial_sync_hook = None
response = server.client.next_response(MAX_EVENTS)
while response:
server.handle_response(response)
current = time.time()
if current - start >= 0.1:
break
response = server.client.next_response(MAX_EVENTS)
if not server.partial_sync_hook:
server.busy = False
W.bar_item_update("buffer_modes")
W.bar_item_update("matrix_modes")
return W.WEECHAT_RC_OK
@utf8_decode
def matrix_load_users_cb(server_name, remaining_calls):
server = SERVERS[server_name]