Initial old messages backfill functionality.

This commit is contained in:
poljar (Damir Jelić) 2018-01-16 13:23:45 +01:00
parent 3ab1defc92
commit 594a8d42c4

View file

@ -13,6 +13,7 @@ import re
from builtins import bytes from builtins import bytes
from collections import deque, Mapping, Iterable, namedtuple from collections import deque, Mapping, Iterable, namedtuple
from operator import itemgetter
from enum import Enum, unique from enum import Enum, unique
from functools import wraps from functools import wraps
@ -117,6 +118,7 @@ class MessageType(Enum):
SEND = 2 SEND = 2
STATE = 3 STATE = 3
REDACT = 4 REDACT = 4
ROOM_MSG = 5
@unique @unique
@ -264,7 +266,7 @@ class MatrixMessage:
# TODO the limit should be configurable matrix.network.sync_limit # TODO the limit should be configurable matrix.network.sync_limit
sync_filter = { sync_filter = {
"room": { "room": {
"timeline": {"limit": 1000} "timeline": {"limit": 10}
} }
} }
@ -334,6 +336,22 @@ class MatrixMessage:
data data
) )
elif message_type == MessageType.ROOM_MSG:
path = ("{api}/rooms/{room}/messages?from={prev_batch}&"
"dir=b&limit={message_limit}&"
"access_token={access_token}").format(
api=MATRIX_API_PATH,
room=room_id,
prev_batch=extra_id,
message_limit=10,
access_token=server.access_token)
self.request = HttpRequest(
RequestType.GET,
server.address,
server.port,
path,
)
class MatrixRoom: class MatrixRoom:
def __init__(self, room_id): def __init__(self, room_id):
@ -343,6 +361,7 @@ class MatrixRoom:
self.topic = "" # type: unicode self.topic = "" # type: unicode
self.topic_author = "" # type: unicode self.topic_author = "" # type: unicode
self.topic_date = None # type: datetime.datetime self.topic_date = None # type: datetime.datetime
self.prev_batch = "" # type: unicode
def key_from_value(dictionary, value): def key_from_value(dictionary, value):
@ -619,8 +638,9 @@ def date_from_age(age):
return date return date
def matrix_handle_room_text_message(server, room_id, event): def matrix_handle_room_text_message(server, room_id, event, old=False):
# type: (MatrixServer, unicode, Dict[unicode, Any]) -> None # type: (MatrixServer, unicode, Dict[unicode, Any], bool) -> None
tag = ""
msg = event['content']['body'] msg = event['content']['body']
msg_author = strip_matrix_server(event['sender'])[1:] msg_author = strip_matrix_server(event['sender'])[1:]
@ -632,8 +652,16 @@ def matrix_handle_room_text_message(server, room_id, event):
msg_date = date_from_age(event['unsigned']['age']) msg_date = date_from_age(event['unsigned']['age'])
# TODO if this is an initial sync tag the messages as backlog # TODO if this is an initial sync tag the messages as backlog
tag = "nick_{a},matrix_id_{event_id},matrix_message,notify_message".format( if old:
a=msg_author, event_id=event_id) tag = ("nick_{a},matrix_id_{event_id},"
"matrix_message,notify_message,no_log,no_highlight").format(
a=msg_author,
event_id=event_id)
else:
tag = ("nick_{a},matrix_id_{event_id},"
"matrix_message,notify_message,log1").format(
a=msg_author,
event_id=event_id)
buf = server.buffers[room_id] buf = server.buffers[room_id]
W.prnt_date_tags(buf, msg_date, tag, data) W.prnt_date_tags(buf, msg_date, tag, data)
@ -673,15 +701,15 @@ def matrix_handle_redacted_message(server, room_id, event):
W.prnt_date_tags(buf, msg_date, tag, data) W.prnt_date_tags(buf, msg_date, tag, data)
def matrix_handle_room_messages(server, room_id, event): def matrix_handle_room_messages(server, room_id, event, old=False):
# type: (MatrixServer, unicode, Dict[unicode, Any]) -> None # type: (MatrixServer, unicode, Dict[unicode, Any], bool) -> None
if event['type'] == 'm.room.message': if event['type'] == 'm.room.message':
if 'redacted_by' in event['unsigned']: if 'redacted_by' in event['unsigned']:
matrix_handle_redacted_message(server, room_id, event) matrix_handle_redacted_message(server, room_id, event)
return return
if event['content']['msgtype'] == 'm.text': if event['content']['msgtype'] == 'm.text':
matrix_handle_room_text_message(server, room_id, event) matrix_handle_room_text_message(server, room_id, event, old)
# TODO handle different content types here # TODO handle different content types here
else: else:
@ -734,7 +762,7 @@ def matrix_handle_room_events(server, room_id, room_events):
topic=topic) topic=topic)
tags = "matrix_topic,log3" tags = "matrix_topic,log3"
date = int(time.time()) date = date_from_age(topic_age)
W.prnt_date_tags(buf, date, tags, message) W.prnt_date_tags(buf, date, tags, message)
@ -742,7 +770,7 @@ def matrix_handle_room_events(server, room_id, room_events):
pass pass
else: else:
message = ("{prefix}Handling of message type " message = ("{prefix}Handling of room event type "
"{type} not implemented").format( "{type} not implemented").format(
type=event['type'], type=event['type'],
prefix=W.prefix("error")) prefix=W.prefix("error"))
@ -758,10 +786,98 @@ def matrix_handle_room_info(server, room_info):
if room_id not in server.buffers: if room_id not in server.buffers:
matrix_create_room_buffer(server, room_id) matrix_create_room_buffer(server, room_id)
if not server.rooms[room_id].prev_batch:
server.rooms[room_id].prev_batch = room['timeline']['prev_batch']
matrix_handle_room_events(server, room_id, room['state']['events']) matrix_handle_room_events(server, room_id, room['state']['events'])
matrix_handle_room_events(server, room_id, room['timeline']['events']) matrix_handle_room_events(server, room_id, room['timeline']['events'])
def matrix_sort_old_messages(server, room_id):
lines = []
buf = server.buffers[room_id]
own_lines = W.hdata_pointer(W.hdata_get('buffer'), buf, 'own_lines')
if own_lines:
hdata_line = W.hdata_get('line')
hdata_line_data = W.hdata_get('line_data')
line = W.hdata_pointer(
W.hdata_get('lines'),
own_lines,
'first_line'
)
while line:
data = W.hdata_pointer(hdata_line, line, 'data')
line_data = {}
if data:
date = W.hdata_time(hdata_line_data, data, 'date')
print_date = W.hdata_time(hdata_line_data, data,
'date_printed')
tags = tags_from_line_data(data)
prefix = W.hdata_string(hdata_line_data, data, 'prefix')
message = W.hdata_string(hdata_line_data, data,
'message')
line_data = {'date': date,
'date_printed': print_date,
'tags_array': ','.join(tags),
'prefix': prefix,
'message': message}
lines.append(line_data)
line = W.hdata_move(hdata_line, line, 1)
sorted_lines = sorted(lines, key=itemgetter('date'))
lines = []
# We need to convert the dates to a string for hdata_update(), this
# will reverse the list at the same time
while sorted_lines:
line = sorted_lines.pop()
new_line = {k: unicode(v) for k, v in line.items()}
lines.append(new_line)
matrix_update_buffer_lines(lines, own_lines)
def matrix_update_buffer_lines(new_lines, own_lines):
hdata_line = W.hdata_get('line')
hdata_line_data = W.hdata_get('line_data')
line = W.hdata_pointer(
W.hdata_get('lines'),
own_lines,
'first_line'
)
while line:
data = W.hdata_pointer(hdata_line, line, 'data')
if data:
W.hdata_update(hdata_line_data, data, new_lines.pop())
line = W.hdata_move(hdata_line, line, 1)
def matrix_handle_old_messages(server, room_id, events):
for event in events:
if event['type'] == 'm.room.message':
matrix_handle_room_messages(server, room_id, event, old=True)
# TODO do we wan't to handle topics joins/quits here?
else:
pass
matrix_sort_old_messages(server, room_id)
# matrix_handle_room_events(server, room_id, events)
# TODO sort messages
def matrix_handle_message( def matrix_handle_message(
server, # type: MatrixServer server, # type: MatrixServer
message_type, # type: MessageType message_type, # type: MessageType
@ -806,6 +922,19 @@ def matrix_handle_message(
buf = server.buffers[room_id] buf = server.buffers[room_id]
W.prnt_date_tags(buf, date, tag, data) W.prnt_date_tags(buf, date, tag, data)
elif message_type == MessageType.ROOM_MSG:
# Response has no messages, that is we already got the oldest message
# in a previous request, nothing to do
if not response['chunk']:
return
room_id = response['chunk'][0]['room_id']
room = server.rooms[room_id]
matrix_handle_old_messages(server, room_id, response['chunk'])
room.prev_batch = response['end']
# Nothing to do here, we'll handle state changes and redactions in the sync # Nothing to do here, we'll handle state changes and redactions in the sync
elif (message_type == MessageType.STATE or elif (message_type == MessageType.STATE or
message_type == MessageType.REDACT): message_type == MessageType.REDACT):
@ -1897,13 +2026,46 @@ def matrix_command_topic_cb(data, buffer, command):
return W.WEECHAT_RC_OK return W.WEECHAT_RC_OK
def matrix_fetch_old_messages(server, room_id):
room = server.rooms[room_id]
prev_batch = room.prev_batch
if not prev_batch:
return
message = MatrixMessage(server, MessageType.ROOM_MSG,
room_id=room_id, extra_id=prev_batch)
send_or_queue(server, message)
return
@utf8_decode
def matrix_command_pgup_cb(data, buffer, command):
for server in SERVERS.values():
if buffer in server.buffers.values():
window = W.window_search_with_buffer(buffer)
first_line_displayed = bool(
W.window_get_integer(window, "first_line_displayed")
)
if first_line_displayed:
room_id = key_from_value(server.buffers, buffer)
matrix_fetch_old_messages(server, room_id)
return W.WEECHAT_RC_OK
return W.WEECHAT_RC_OK
def tags_from_line_data(line_data, ): def tags_from_line_data(line_data, ):
# type: (weechat.hdata) -> List[unicode] # type: (weechat.hdata) -> List[unicode]
tags_count = W.hdata_get_var_array_size( tags_count = W.hdata_get_var_array_size(
W.hdata_get('line_data'), W.hdata_get('line_data'),
line_data, line_data,
'tags_array' 'tags_array')
)
tags = [ tags = [
W.hdata_string( W.hdata_string(
@ -2143,6 +2305,7 @@ def init_hooks():
'matrix_redact_command_cb', '') 'matrix_redact_command_cb', '')
W.hook_command_run('/topic', 'matrix_command_topic_cb', '') W.hook_command_run('/topic', 'matrix_command_topic_cb', '')
W.hook_command_run('/window page_up', 'matrix_command_pgup_cb', '')
def autoconnect(servers): def autoconnect(servers):