From fc4c879e0dd4c0dbd863742501e2a8722874241d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Fri, 20 Jul 2018 19:14:32 +0200 Subject: [PATCH] matrix: Switch to the nio http client. --- main.py | 46 +-- matrix/api.py | 720 ---------------------------------- matrix/encryption.py | 913 ------------------------------------------- matrix/events.py | 386 ------------------ matrix/http.py | 93 ----- matrix/server.py | 379 ++---------------- 6 files changed, 55 insertions(+), 2482 deletions(-) delete mode 100644 matrix/api.py delete mode 100644 matrix/encryption.py delete mode 100644 matrix/events.py delete mode 100644 matrix/http.py diff --git a/main.py b/main.py index ff9db4e..781d643 100644 --- a/main.py +++ b/main.py @@ -32,6 +32,8 @@ from future.utils import bytes_to_native_str as n # pylint: disable=unused-import from typing import (List, Set, Dict, Tuple, Text, Optional, AnyStr, Deque, Any) +from nio import TransportType + from matrix.colors import Formatted from matrix.utf import utf8_decode from matrix.http import HttpResponse @@ -288,49 +290,31 @@ def receive_cb(server_name, file_descriptor): server.disconnect() break - received = len(data) # type: int - parsed_bytes = server.http_parser.execute(data, received) + server.client.receive(data) - assert parsed_bytes == received + response = server.client.next_response() - if server.http_parser.is_partial_body(): - server.http_buffer.append(server.http_parser.recv_body()) - - if server.http_parser.is_message_complete(): - status = server.http_parser.get_status_code() - headers = server.http_parser.get_headers() - body = b"".join(server.http_buffer) - - message = server.receive_queue.popleft() - message.response = HttpResponse(status, headers, body) - receive_time = time.time() - server.lag = (receive_time - message.send_time) * 1000 - server.lag_done = True - W.bar_item_update("lag") - message.receive_time = receive_time - - prnt_debug(DebugType.MESSAGING, server, - ("{prefix}Received message of type {t} and " - "status {s}").format( - prefix=W.prefix("error"), - t=message.__class__.__name__, - s=status)) - - # Message done, reset the parser state. - server.reset_parser() - - server.handle_response(message) + if response: + server.handle_response(response) break return W.WEECHAT_RC_OK def finalize_connection(server): - hook = W.hook_fd(server.socket.fileno(), 1, 0, 0, "receive_cb", server.name) + hook = W.hook_fd( + server.socket.fileno(), + 1, + 0, + 0, + "receive_cb", + server.name + ) server.fd_hook = hook server.connected = True server.connecting = False + server.client.connect(TransportType.HTTP) server.login() diff --git a/matrix/api.py b/matrix/api.py deleted file mode 100644 index 0919bbb..0000000 --- a/matrix/api.py +++ /dev/null @@ -1,720 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright © 2018 Damir Jelić -# -# Permission to use, copy, modify, and/or distribute this software for -# any purpose with or without fee is hereby granted, provided that the -# above copyright notice and this permission notice appear in all copies. -# -# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES -# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF -# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY -# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER -# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF -# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN -# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - -from __future__ import unicode_literals -from builtins import str - -import time -import json -from enum import Enum, unique -from functools import partial - -try: - from json.decoder import JSONDecodeError -except ImportError: - JSONDecodeError = ValueError - -try: - from urllib import quote, urlencode - from urlparse import urlparse -except ImportError: - from urllib.parse import quote, urlencode, urlparse - -from matrix.http import RequestType, HttpRequest -import matrix.events as MatrixEvents - -MATRIX_API_PATH = "/_matrix/client/r0" # type: str - - -class MatrixClient: - - def __init__( - self, - host, # type: str - access_token="", # type: str - user_agent="" # type: str - ): - # type: (...) -> None - self.host = host - self.user_agent = user_agent - self.access_token = access_token - self.txn_id = 0 # type: int - - def _get_txn_id(self): - txn_id = self.txn_id - self.txn_id = self.txn_id + 1 - return txn_id - - def login(self, user, password, device_name="", device_id=""): - # type (str, str, str) -> HttpRequest - path = ("{api}/login").format(api=MATRIX_API_PATH) - - post_data = { - "type": "m.login.password", - "user": user, - "password": password - } - - if device_id: - post_data["device_id"] = device_id - - if device_name: - post_data["initial_device_display_name"] = device_name - - return HttpRequest(RequestType.POST, self.host, path, post_data) - - def sync(self, next_batch="", sync_filter=None): - # type: (str, Dict[Any, Any]) -> HttpRequest - assert self.access_token - - query_parameters = {"access_token": self.access_token} - - if sync_filter: - query_parameters["filter"] = json.dumps( - sync_filter, separators=(",", ":")) - - if next_batch: - query_parameters["since"] = next_batch - - path = ("{api}/sync?{query_params}").format( - api=MATRIX_API_PATH, query_params=urlencode(query_parameters)) - - return HttpRequest(RequestType.GET, self.host, path) - - def room_encrypted_message(self, room_id, content): - # type: (str, Dict[Any, Any]) -> HttpRequest - query_parameters = {"access_token": self.access_token} - - path = ("{api}/rooms/{room}/send/m.room.encrypted/" - "{tx_id}?{query_parameters}").format( - api=MATRIX_API_PATH, - room=quote(room_id), - tx_id=quote(str(self._get_txn_id())), - query_parameters=urlencode(query_parameters)) - - return HttpRequest(RequestType.PUT, self.host, path, content) - - def room_send_message(self, - room_id, - message_type, - content, - formatted_content=None): - # type: (str, str, str) -> HttpRequest - query_parameters = {"access_token": self.access_token} - - body = {"msgtype": message_type, "body": content} - - if formatted_content: - body["format"] = "org.matrix.custom.html" - body["formatted_body"] = formatted_content - - path = ("{api}/rooms/{room}/send/m.room.message/" - "{tx_id}?{query_parameters}").format( - api=MATRIX_API_PATH, - room=quote(room_id), - tx_id=quote(str(self._get_txn_id())), - query_parameters=urlencode(query_parameters)) - - return HttpRequest(RequestType.PUT, self.host, path, body) - - def room_topic(self, room_id, topic): - # type: (str, str) -> HttpRequest - query_parameters = {"access_token": self.access_token} - - content = {"topic": topic} - - path = ("{api}/rooms/{room}/state/m.room.topic?" - "{query_parameters}").format( - api=MATRIX_API_PATH, - room=quote(room_id), - query_parameters=urlencode(query_parameters)) - - return HttpRequest(RequestType.PUT, self.host, path, content) - - def room_redact(self, room_id, event_id, reason=None): - # type: (str, str, str) -> HttpRequest - query_parameters = {"access_token": self.access_token} - content = {} - - if reason: - content["reason"] = reason - - path = ("{api}/rooms/{room}/redact/{event_id}/{tx_id}?" - "{query_parameters}").format( - api=MATRIX_API_PATH, - room=quote(room_id), - event_id=quote(event_id), - tx_id=quote(str(self._get_txn_id())), - query_parameters=urlencode(query_parameters)) - - return HttpRequest(RequestType.PUT, self.host, path, content) - - def room_get_messages(self, - room_id, - start_token, - end_token="", - limit=10, - direction='b'): - query_parameters = { - "access_token": self.access_token, - "from": start_token, - "dir": direction, - "limit": str(limit) - } - - if end_token: - query_parameters["to"] = end_token - - path = ("{api}/rooms/{room}/messages?{query_parameters}").format( - api=MATRIX_API_PATH, - room=quote(room_id), - query_parameters=urlencode(query_parameters)) - - return HttpRequest(RequestType.GET, self.host, path) - - def room_join(self, room_id): - query_parameters = {"access_token": self.access_token} - - path = ("{api}/join/{room_id}?" - "{query_parameters}").format( - api=MATRIX_API_PATH, - room_id=quote(room_id), - query_parameters=urlencode(query_parameters)) - - return HttpRequest(RequestType.POST, self.host, path) - - def room_leave(self, room_id): - query_parameters = {"access_token": self.access_token} - - path = ("{api}/rooms/{room_id}/leave?" - "{query_parameters}").format( - api=MATRIX_API_PATH, - room_id=quote(room_id), - query_parameters=urlencode(query_parameters)) - - return HttpRequest(RequestType.POST, self.host, path) - - def room_invite(self, room_id, user_id): - query_parameters = {"access_token": self.access_token} - - content = {"user_id": user_id} - - path = ("{api}/rooms/{room_id}/invite?" - "{query_parameters}").format( - api=MATRIX_API_PATH, - room_id=quote(room_id), - query_parameters=urlencode(query_parameters)) - - return HttpRequest(RequestType.POST, self.host, path, content) - - def room_kick(self, room_id, user_id, reason=None): - query_parameters = {"access_token": self.access_token} - - content = {"user_id": user_id} - - if reason: - content["reason"] = reason - - path = ("{api}/rooms/{room_id}/kick?" - "{query_parameters}").format( - api=MATRIX_API_PATH, - room_id=quote(room_id), - query_parameters=urlencode(query_parameters)) - - h = HttpRequest(RequestType.POST, self.host, path, content) - return h - - def keys_upload(self, user_id, device_id, olm, keys=None, - one_time_keys=None): - query_parameters = {"access_token": self.access_token} - - path = ("{api}/keys/upload?" - "{query_parameters}").format( - api=MATRIX_API_PATH, - query_parameters=urlencode(query_parameters)) - - content = {} - - if keys: - device_keys = { - "algorithms": [ - "m.olm.v1.curve25519-aes-sha2", - "m.megolm.v1.aes-sha2" - ], - "device_id": device_id, - "user_id": user_id, - "keys": { - "curve25519:" + device_id: keys["curve25519"], - "ed25519:" + device_id: keys["ed25519"] - } - } - - signature = olm.sign_json(device_keys) - - device_keys["signatures"] = { - user_id: { - "ed25519:" + device_id: signature - } - } - - content["device_keys"] = device_keys - - if one_time_keys: - one_time_key_dict = {} - - for key_id, key in one_time_keys.items(): - key_dict = {"key": key} - signature = olm.sign_json(key_dict) - - one_time_key_dict["signed_curve25519:" + key_id] = { - "key": key_dict.pop("key"), - "signatures": { - user_id: { - "ed25519:" + device_id: signature - } - } - } - - content["one_time_keys"] = one_time_key_dict - - return HttpRequest(RequestType.POST, self.host, path, content) - - def keys_query(self, users): - query_parameters = {"access_token": self.access_token} - - path = ("{api}/keys/query?" - "{query_parameters}").format( - api=MATRIX_API_PATH, - query_parameters=urlencode(query_parameters)) - - content = { - "device_keys": {user: {} for user in users} - } - - return HttpRequest(RequestType.POST, self.host, path, content) - - def keys_claim(self, key_dict): - query_parameters = {"access_token": self.access_token} - - path = ("{api}/keys/claim?" - "{query_parameters}").format( - api=MATRIX_API_PATH, - query_parameters=urlencode(query_parameters)) - - content = { - "one_time_keys": key_dict, - "timeout": 10000 - } - - return HttpRequest(RequestType.POST, self.host, path, content) - - def to_device(self, event_type, content): - query_parameters = {"access_token": self.access_token} - - path = ("{api}/sendToDevice/{event_type}/{tx_id}?" - "{query_parameters}").format( - api=MATRIX_API_PATH, - event_type=event_type, - tx_id=quote(str(self._get_txn_id())), - query_parameters=urlencode(query_parameters)) - - return HttpRequest(RequestType.PUT, self.host, path, content) - - @staticmethod - def mxc_to_http(mxc): - # type: (str) -> str - url = urlparse(mxc) - - if url.scheme != "mxc": - return None - - if not url.netloc or not url.path: - return None - - return "https://{}/_matrix/media/r0/download/{}{}".format( - url.netloc, - url.netloc, - url.path - ) - - -class MatrixMessage(): - - def __init__( - self, - request_func, # type: Callable[[...], HttpRequest] - func_args, - ): - # type: (...) -> None - # yapf: disable - - self.request = None # type: HttpRequest - self.response = None # type: HttpResponse - self.decoded_response = None # type: Dict[Any, Any] - - self.creation_time = time.time() # type: float - self.send_time = None # type: float - self.receive_time = None # type: float - self.event = None - - self.request = request_func(**func_args) - # yapf: enable - - def decode_body(self, server): - try: - self.decoded_response = json.loads( - self.response.body, encoding='utf-8') - return (True, None) - except Exception as error: - return (False, error) - - def _decode(self, server, object_hook): - try: - parsed_dict = json.loads( - self.response.body, - encoding='utf-8', - ) - - self.event = object_hook(parsed_dict) - - return (True, None) - - except JSONDecodeError as error: - return (False, error) - - -class MatrixLoginMessage(MatrixMessage): - - def __init__(self, client, user, password, device_name, device_id=None): - data = {"user": user, "password": password, "device_name": device_name} - - if device_id: - data["device_id"] = device_id - - MatrixMessage.__init__(self, client.login, data) - - def decode_body(self, server): - object_hook = partial(MatrixEvents.MatrixLoginEvent.from_dict, server) - - return self._decode(server, object_hook) - - -class MatrixSyncMessage(MatrixMessage): - - def __init__(self, client, next_batch=None, limit=None): - data = {} - - if next_batch: - data["next_batch"] = next_batch - - if limit: - data["sync_filter"] = {"room": {"timeline": {"limit": limit}}} - - MatrixMessage.__init__(self, client.sync, data) - - def decode_body(self, server): - object_hook = partial(MatrixEvents.MatrixSyncEvent.from_dict, server) - - return self._decode(server, object_hook) - - -class MatrixSendMessage(MatrixMessage): - - def __init__(self, - client, - room_id, - formatted_message, - message_type="m.text"): - self.room_id = room_id - self.formatted_message = formatted_message - - data = { - "room_id": self.room_id, - "message_type": message_type, - "content": self.formatted_message.to_plain() - } - - if self.formatted_message.is_formatted(): - data["formatted_content"] = self.formatted_message.to_html() - - MatrixMessage.__init__(self, client.room_send_message, data) - - def decode_body(self, server): - object_hook = partial( - MatrixEvents.MatrixSendEvent.from_dict, - server, - self.room_id, - self.formatted_message, - ) - - return self._decode(server, object_hook) - - -class MatrixEmoteMessage(MatrixSendMessage): - - def __init__(self, client, room_id, formatted_message): - MatrixSendMessage.__init__(self, client, room_id, formatted_message, - "m.emote") - - def decode_body(self, server): - object_hook = partial( - MatrixEvents.MatrixEmoteEvent.from_dict, - server, - self.room_id, - self.formatted_message, - ) - - return self._decode(server, object_hook) - - -class MatrixTopicMessage(MatrixMessage): - - def __init__(self, client, room_id, topic): - self.room_id = room_id - self.topic = topic - - data = {"room_id": self.room_id, "topic": self.topic} - - MatrixMessage.__init__(self, client.room_topic, data) - - def decode_body(self, server): - object_hook = partial( - MatrixEvents.MatrixTopicEvent.from_dict, - server, - self.room_id, - self.topic, - ) - - return self._decode(server, object_hook) - - -class MatrixRedactMessage(MatrixMessage): - - def __init__(self, client, room_id, event_id, reason=None): - self.room_id = room_id - self.event_id = event_id - self.reason = reason - - data = {"room_id": self.room_id, "event_id": self.event_id} - - if reason: - data["reason"] = reason - - MatrixMessage.__init__(self, client.room_redact, data) - - def decode_body(self, server): - object_hook = partial( - MatrixEvents.MatrixRedactEvent.from_dict, - server, - self.room_id, - self.reason, - ) - - return self._decode(server, object_hook) - - -class MatrixBacklogMessage(MatrixMessage): - - def __init__(self, client, room_id, token, limit): - self.room_id = room_id - - data = { - "room_id": self.room_id, - "start_token": token, - "direction": "b", - "limit": limit - } - - MatrixMessage.__init__(self, client.room_get_messages, data) - - def decode_body(self, server): - object_hook = partial(MatrixEvents.MatrixBacklogEvent.from_dict, server, - self.room_id) - - return self._decode(server, object_hook) - - -class MatrixJoinMessage(MatrixMessage): - - def __init__(self, client, room_id): - self.room_id = room_id - - data = {"room_id": self.room_id} - - MatrixMessage.__init__(self, client.room_join, data) - - def decode_body(self, server): - object_hook = partial(MatrixEvents.MatrixJoinEvent.from_dict, server, - self.room_id) - - return self._decode(server, object_hook) - - -class MatrixPartMessage(MatrixMessage): - - def __init__(self, client, room_id): - self.room_id = room_id - - data = {"room_id": self.room_id} - - MatrixMessage.__init__(self, client.room_leave, data) - - def decode_body(self, server): - object_hook = partial(MatrixEvents.MatrixPartEvent.from_dict, server, - self.room_id) - - return self._decode(server, object_hook) - - -class MatrixInviteMessage(MatrixMessage): - - def __init__(self, client, room_id, user_id): - self.room_id = room_id - self.user_id = user_id - - data = {"room_id": self.room_id, "user_id": self.user_id} - - MatrixMessage.__init__(self, client.room_invite, data) - - def decode_body(self, server): - object_hook = partial(MatrixEvents.MatrixInviteEvent.from_dict, server, - self.room_id, self.user_id) - - return self._decode(server, object_hook) - - -class MatrixKickMessage(MatrixMessage): - - def __init__(self, client, room_id, user_id, reason=None): - self.room_id = room_id - self.user_id = user_id - self.reason = reason - - data = {"room_id": self.room_id, - "user_id": self.user_id, - "reason": reason} - - MatrixMessage.__init__(self, client.room_kick, data) - - def decode_body(self, server): - object_hook = partial( - MatrixEvents.MatrixKickEvent.from_dict, - server, - self.room_id, - self.user_id, - self.reason) - - return self._decode(server, object_hook) - - -class MatrixKeyUploadMessage(MatrixMessage): - - def __init__(self, client, user_id, device_id, olm, keys=None, - one_time_keys=None): - data = { - "device_id": device_id, - "user_id": user_id, - "olm": olm, - "keys": keys, - "one_time_keys": one_time_keys - } - - self.device_keys = True if keys else False - - MatrixMessage.__init__(self, client.keys_upload, data) - - def decode_body(self, server): - object_hook = partial(MatrixEvents.MatrixKeyUploadEvent.from_dict, - server, self.device_keys) - - return self._decode(server, object_hook) - - -class MatrixKeyQueryMessage(MatrixMessage): - - def __init__(self, client, users): - data = { - "users": users, - } - - MatrixMessage.__init__(self, client.keys_query, data) - - def decode_body(self, server): - object_hook = partial(MatrixEvents.MatrixKeyQueryEvent.from_dict, - server) - - return self._decode(server, object_hook) - - -class MatrixKeyClaimMessage(MatrixMessage): - - def __init__(self, client, room_id, key_dict): - self.room_id = room_id - data = { - "key_dict": key_dict, - } - - MatrixMessage.__init__(self, client.keys_claim, data) - - def decode_body(self, server): - object_hook = partial(MatrixEvents.MatrixKeyClaimEvent.from_dict, - server, self.room_id) - - return self._decode(server, object_hook) - - -class MatrixToDeviceMessage(MatrixMessage): - def __init__(self, client, to_device_dict): - data = { - "content": to_device_dict, - "event_type": "m.room.encrypted" - } - - MatrixMessage.__init__(self, client.to_device, data) - - def decode_body(self, server): - object_hook = partial(MatrixEvents.MatrixToDeviceEvent.from_dict, - server) - - return self._decode(server, object_hook) - - -class MatrixEncryptedMessage(MatrixMessage): - - def __init__(self, - client, - room_id, - formatted_message, - content): - self.room_id = room_id - self.formatted_message = formatted_message - - data = { - "room_id": self.room_id, - "content": content - } - - MatrixMessage.__init__(self, client.room_encrypted_message, data) - - def decode_body(self, server): - object_hook = partial( - MatrixEvents.MatrixSendEvent.from_dict, - server, - self.room_id, - self.formatted_message, - ) - - return self._decode(server, object_hook) diff --git a/matrix/encryption.py b/matrix/encryption.py deleted file mode 100644 index 851baf7..0000000 --- a/matrix/encryption.py +++ /dev/null @@ -1,913 +0,0 @@ -# -*- coding: utf-8 -*- - -# Weechat Matrix Protocol Script -# Copyright © 2018 Damir Jelić -# -# Permission to use, copy, modify, and/or distribute this software for -# any purpose with or without fee is hereby granted, provided that the -# above copyright notice and this permission notice appear in all copies. -# -# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES -# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF -# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY -# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER -# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF -# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN -# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - -from __future__ import unicode_literals - -import os -import json -import sqlite3 -import pprint -import argparse - -# pylint: disable=redefined-builtin -from builtins import str, bytes - -from collections import defaultdict -from itertools import chain -from functools import wraps -from future.moves.itertools import zip_longest - -try: - FileNotFoundError -except NameError: - FileNotFoundError = IOError - -import matrix.globals - -try: - from olm import ( - Account, - OlmAccountError, - Session, - InboundSession, - OutboundSession, - OlmSessionError, - OlmPreKeyMessage, - InboundGroupSession, - OutboundGroupSession, - OlmGroupSessionError - ) -except ImportError: - matrix.globals.ENCRYPTION = False - -from matrix.globals import W, SERVERS -from matrix.utils import sanitize_id -from matrix.utf import utf8_decode - - -class ParseError(Exception): - pass - - -class OlmTrustError(Exception): - pass - - -class WeechatArgParse(argparse.ArgumentParser): - def print_usage(self, file): - pass - - def error(self, message): - m = ("{prefix}Error: {message} for command {command} " - "(see /help {command})").format(prefix=W.prefix("error"), - message=message, - command=self.prog) - W.prnt("", m) - raise ParseError - - -def own_buffer_or_error(f): - - @wraps(f) - def wrapper(data, buffer, *args, **kwargs): - - for server in SERVERS.values(): - if buffer in server.buffers.values(): - return f(server.name, buffer, *args, **kwargs) - elif buffer == server.server_buffer: - return f(server.name, buffer, *args, **kwargs) - - W.prnt("", "{prefix}matrix: command \"olm\" must be executed on a " - "matrix buffer (server or channel)".format( - prefix=W.prefix("error"))) - - return W.WEECHAT_RC_OK - - return wrapper - - -def encrypt_enabled(f): - @wraps(f) - def wrapper(*args, **kwds): - if matrix.globals.ENCRYPTION: - return f(*args, **kwds) - return None - - return wrapper - - -@encrypt_enabled -def matrix_hook_olm_command(): - W.hook_command( - # Command name and short description - "olm", - "Matrix olm encryption command", - # Synopsis - ("info all|blacklisted|private|unverified|verified ||" - "blacklist ||" - "unverify ||" - "verify "), - # Description - (" info: show info about known devices and their keys\n" - "blacklist: blacklist a device\n" - " unverify: unverify a device\n" - " verify: verify a device\n\n" - "Examples:\n"), - # Completions - ('info all|blacklisted|private|unverified|verified ||' - 'blacklist %(device_ids) ||' - 'unverify %(user_ids) %(device_ids) ||' - 'verify %(olm_user_ids) %(olm_devices)'), - # Function name - 'matrix_olm_command_cb', - '') - - -def olm_cmd_parse_args(args): - parser = WeechatArgParse(prog="olm") - subparsers = parser.add_subparsers(dest="subcommand") - - info_parser = subparsers.add_parser("info") - info_parser.add_argument( - "category", nargs="?", default="private", - choices=[ - "all", - "blacklisted", - "private", - "unverified", - "verified" - ]) - info_parser.add_argument("filter", nargs="?") - - verify_parser = subparsers.add_parser("verify") - verify_parser.add_argument("user_filter") - verify_parser.add_argument("device_filter", nargs="?") - - try: - parsed_args = parser.parse_args(args.split()) - return parsed_args - except ParseError: - return None - - -def grouper(iterable, n, fillvalue=None): - "Collect data into fixed-length chunks or blocks" - # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx" - args = [iter(iterable)] * n - return zip_longest(*args, fillvalue=fillvalue) - - -def partition_key(key): - groups = grouper(key, 4, " ") - return ' '.join(''.join(g) for g in groups) - - -def olm_info_command(server, args): - olm = server.olm - - if args.category == "private": - device_msg = (" - Device ID: {}\n".format(server.device_id) - if server.device_id else "") - id_key = partition_key(olm.account.identity_keys["curve25519"]) - fp_key = partition_key(olm.account.identity_keys["ed25519"]) - message = ("{prefix}matrix: Identity keys:\n" - " - User: {user}\n" - "{device_msg}" - " - Identity key: {id_key}\n" - " - Fingerprint key: {fp_key}\n").format( - prefix=W.prefix("network"), - user=server.user, - device_msg=device_msg, - id_key=id_key, - fp_key=fp_key) - W.prnt(server.server_buffer, message) - elif args.category == "all": - for user, keys in olm.device_keys.items(): - message = ("{prefix}matrix: Identity keys:\n" - " - User: {user}\n").format( - prefix=W.prefix("network"), - user=user) - W.prnt(server.server_buffer, message) - - for key in keys: - id_key = partition_key(key.keys["curve25519"]) - fp_key = partition_key(key.keys["ed25519"]) - device_msg = (" - Device ID: {}\n".format( - key.device_id) if key.device_id else "") - message = ("{device_msg}" - " - Identity key: {id_key}\n" - " - Fingerprint key: {fp_key}\n\n").format( - device_msg=device_msg, - id_key=id_key, - fp_key=fp_key) - W.prnt(server.server_buffer, message) - - -def olm_verify_command(server, args): - olm = server.olm - devices = olm.device_keys - filtered_devices = [] - - if args.user_filter == "*": - filtered_devices = devices.values() - else: - device_keys = filter(lambda x: args.user_filter in x, devices) - filtered_devices = [devices[x] for x in device_keys] - - filtered_devices = chain.from_iterable(filtered_devices) - - if args.device_filter and args.device_filter != "*": - filtered_devices = filter( - lambda x: args.device_filter in x.device_id, - filtered_devices - ) - - key_list = [] - - for device in list(filtered_devices): - if olm.verify_device(device): - key_list.append(str(device)) - - if not key_list: - message = ("{prefix}matrix: No matching unverified devices " - "found.").format(prefix=W.prefix("error")) - W.prnt(server.server_buffer, message) - return - - key_str = "\n - ".join(key_list) - message = ("{prefix}matrix: Verified keys:\n" - " - {key_str}").format(prefix=W.prefix("join"), - key_str=key_str) - - W.prnt(server.server_buffer, message) - - -@own_buffer_or_error -@utf8_decode -def matrix_olm_command_cb(server_name, buffer, args): - server = SERVERS[server_name] - parsed_args = olm_cmd_parse_args(args) - - if not parsed_args: - return W.WEECHAT_RC_OK - - if not parsed_args.subcommand or parsed_args.subcommand == "info": - olm_info_command(server, parsed_args) - elif parsed_args.subcommand == "verify": - olm_verify_command(server, parsed_args) - else: - message = ("{prefix}matrix: Command not implemented.".format( - prefix=W.prefix("error"))) - W.prnt(server.server_buffer, message) - - return W.WEECHAT_RC_OK - - -class EncryptionError(Exception): - pass - - -class DeviceStore(object): - def __init__(self, filename): - # type: (str) -> None - self._entries = [] - self._filename = filename - - self._load(filename) - - def _load(self, filename): - # type: (str) -> None - try: - with open(filename, "r") as f: - for line in f: - line = line.strip() - - if not line or line.startswith("#"): - continue - - entry = StoreEntry.from_line(line) - - if not entry: - continue - - self._entries.append(entry) - except FileNotFoundError: - pass - - def _save_store(f): - @wraps(f) - def decorated(*args, **kwargs): - self = args[0] - ret = f(*args, **kwargs) - self._save() - return ret - - return decorated - - def _save(self): - # type: (str) -> None - with open(self._filename, "w") as f: - for entry in self._entries: - line = entry.to_line() - f.write(line) - - @_save_store - def add(self, device): - # type: (OlmDeviceKey) -> None - new_entries = StoreEntry.from_olmdevice(device) - self._entries += new_entries - - # Remove duplicate entries - self._entries = list(set(self._entries)) - - self._save() - - @_save_store - def remove(self, device): - # type: (OlmDeviceKey) -> int - removed = 0 - entries = StoreEntry.from_olmdevice(device) - - for entry in entries: - if entry in self._entries: - self._entries.remove(entry) - removed += 1 - - self._save() - - return removed - - def check(self, device): - # type: (OlmDeviceKey) -> bool - entries = StoreEntry.from_olmdevice(device) - result = map(lambda entry: entry in self._entries, entries) - - if False in result: - return False - - return True - - -class StoreEntry(object): - def __init__(self, user_id, device_id, key_type, key): - # type: (str, str, str, str) -> None - self.user_id = user_id - self.device_id = device_id - self.key_type = key_type - self.key = key - - @classmethod - def from_line(cls, line): - # type: (str) -> StoreEntry - fields = line.split(' ') - - if len(fields) < 4: - return None - - user_id, device_id, key_type, key = fields[:4] - - if key_type == "matrix-ed25519": - return cls(user_id, device_id, "ed25519", key) - else: - return None - - @classmethod - def from_olmdevice(cls, device_key): - # type: (OlmDeviceKey) -> [StoreEntry] - entries = [] - - user_id = device_key.user_id - device_id = device_key.device_id - - for key_type, key in device_key.keys.items(): - if key_type == "ed25519": - entries.append(cls(user_id, device_id, "ed25519", key)) - - return entries - - def to_line(self): - # type: () -> str - key_type = "matrix-{}".format(self.key_type) - line = "{} {} {} {}\n".format( - self.user_id, - self.device_id, - key_type, - self.key - ) - return line - - def __hash__(self): - # type: () -> int - return hash(str(self)) - - def __str__(self): - # type: () -> str - key_type = "matrix-{}".format(self.key_type) - line = "{} {} {} {}".format( - self.user_id, - self.device_id, - key_type, - self.key - ) - return line - - def __eq__(self, value): - # type: (StoreEntry) -> bool - if (self.user_id == value.user_id - and self.device_id == value.device_id - and self.key_type == value.key_type and self.key == value.key): - return True - - return False - - -class OlmDeviceKey(): - def __init__(self, user_id, device_id, key_dict): - # type: (str, str, Dict[str, str]) - self.user_id = user_id - self.device_id = device_id - self.keys = key_dict - - def __str__(self): - # type: () -> str - return "{} {} {}".format( - self.user_id, self.device_id, self.keys["ed25519"]) - - def __repr__(self): - # type: () -> str - return str(self) - - -class OneTimeKey(): - def __init__(self, user_id, device_id, key): - # type: (str, str, str) -> None - self.user_id = user_id - self.device_id = device_id - self.key = key - - -class Olm(): - - @encrypt_enabled - def __init__( - self, - user, - device_id, - session_path, - database=None, - account=None, - sessions=None, - inbound_group_sessions=None - ): - # type: (str, str, str, Account, Dict[str, List[Session]) -> None - self.user = user - self.device_id = device_id - self.session_path = session_path - self.database = database - self.device_keys = {} - self.shared_sessions = [] - - if not database: - db_file = "{}_{}.db".format(user, device_id) - db_path = os.path.join(session_path, db_file) - self.database = sqlite3.connect(db_path) - Olm._check_db_tables(self.database) - - if account: - self.account = account - - else: - self.account = Account() - self._insert_acc_to_db() - - if not sessions: - sessions = defaultdict(lambda: defaultdict(list)) - - if not inbound_group_sessions: - inbound_group_sessions = defaultdict(dict) - - self.sessions = sessions - self.inbound_group_sessions = inbound_group_sessions - self.outbound_group_sessions = {} - - trust_file_path = "{}_{}.trusted_devices".format(user, device_id) - self.trust_db = DeviceStore(os.path.join(session_path, trust_file_path)) - - def _create_session(self, sender, sender_key, message): - W.prnt("", "matrix: Creating session for {}".format(sender)) - session = InboundSession(self.account, message, sender_key) - W.prnt("", "matrix: Created session for {}".format(sender)) - self.account.remove_one_time_keys(session) - self._update_acc_in_db() - - return session - - def verify_device(self, device): - if self.trust_db.check(device): - return False - - self.trust_db.add(device) - return True - - def unverify_device(self, device): - self.trust_db.remove(device) - - def create_session(self, user_id, device_id, one_time_key): - W.prnt("", "matrix: Creating session for {}".format(user_id)) - id_key = None - - for user, keys in self.device_keys.items(): - if user != user_id: - continue - - for key in keys: - if key.device_id == device_id: - id_key = key.keys["curve25519"] - break - - if not id_key: - W.prnt("", "ERRR not found ID key") - W.prnt("", "Found id key {}".format(id_key)) - session = OutboundSession(self.account, id_key, one_time_key) - self._update_acc_in_db() - self.sessions[user_id][device_id].append(session) - self._store_session(user_id, device_id, session) - W.prnt("", "matrix: Created session for {}".format(user_id)) - - def create_group_session(self, room_id, session_id, session_key): - W.prnt("", "matrix: Creating group session for {}".format(room_id)) - session = InboundGroupSession(session_key) - self.inbound_group_sessions[room_id][session_id] = session - self._store_inbound_group_session(room_id, session) - - def create_outbound_group_session(self, room_id): - session = OutboundGroupSession() - self.outbound_group_sessions[room_id] = session - self.create_group_session(room_id, session.id, session.session_key) - - @encrypt_enabled - def get_missing_sessions(self, users): - # type: (List[str]) -> Dict[str, Dict[str, str]] - missing = {} - - for user in users: - devices = [] - - for key in self.device_keys[user]: - # we don't need a session for our own device, skip it - if key.device_id == self.device_id: - continue - - if not self.sessions[user][key.device_id]: - W.prnt("", "Missing session for device {}".format(key.device_id)) - devices.append(key.device_id) - - if devices: - missing[user] = {device: "signed_curve25519" for - device in devices} - - return missing - - @encrypt_enabled - def decrypt(self, sender, sender_key, message): - plaintext = None - - for device_id, session_list in self.sessions[sender].items(): - for session in session_list: - W.prnt("", "Trying session for device {}".format(device_id)) - try: - if isinstance(message, OlmPreKeyMessage): - if not session.matches(message): - continue - - W.prnt("", "Decrypting using existing session") - plaintext = session.decrypt(message) - parsed_plaintext = json.loads(plaintext, encoding='utf-8') - W.prnt("", "Decrypted using existing session") - return parsed_plaintext - except OlmSessionError: - pass - - try: - session = self._create_session(sender, sender_key, message) - except OlmSessionError: - return None - - try: - plaintext = session.decrypt(message) - parsed_plaintext = json.loads(plaintext, encoding='utf-8') - - device_id = sanitize_id(parsed_plaintext["sender_device"]) - self.sessions[sender][device_id].append(session) - self._store_session(sender, device_id, session) - return parsed_plaintext - except OlmSessionError: - return None - - def group_encrypt(self, room_id, plaintext_dict, own_id, users): - # type: (str, Dict[str, str]) -> Dict[str, str], Optional[Dict[Any, Any]] - plaintext_dict["room_id"] = room_id - to_device_dict = None - - if room_id not in self.outbound_group_sessions: - self.create_outbound_group_session(room_id) - - if self.outbound_group_sessions[room_id].id not in self.shared_sessions: - to_device_dict = self.share_group_session(room_id, own_id, users) - self.shared_sessions.append( - self.outbound_group_sessions[room_id].id - ) - - session = self.outbound_group_sessions[room_id] - - ciphertext = session.encrypt(Olm._to_json(plaintext_dict)) - - payload_dict = { - "algorithm": "m.megolm.v1.aes-sha2", - "sender_key": self.account.identity_keys["curve25519"], - "ciphertext": ciphertext, - "session_id": session.id, - "device_id": self.device_id - } - - return payload_dict, to_device_dict - - @encrypt_enabled - def group_decrypt(self, room_id, session_id, ciphertext): - if session_id not in self.inbound_group_sessions[room_id]: - return None, None - - session = self.inbound_group_sessions[room_id][session_id] - - try: - return session.decrypt(ciphertext) - except OlmGroupSessionError: - pass - - return None, None - - def share_group_session(self, room_id, own_id, users): - group_session = self.outbound_group_sessions[room_id] - - key_content = { - "algorithm": "m.megolm.v1.aes-sha2", - "room_id": room_id, - "session_id": group_session.id, - "session_key": group_session.session_key, - "chain_index": group_session.message_index - } - - payload_dict = { - "type": "m.room_key", - "content": key_content, - # TODO we don't have the user_id in the Olm class - "sender": own_id, - "sender_device": self.device_id, - "keys": { - "ed25519": self.account.identity_keys["ed25519"] - } - } - - to_device_dict = { - "messages": {} - } - - for user in users: - if user not in self.device_keys: - continue - - for key in self.device_keys[user]: - if key.device_id == self.device_id: - continue - - if not self.sessions[user][key.device_id]: - continue - - if not self.trust_db.check(key): - raise OlmTrustError - - device_payload_dict = payload_dict.copy() - # TODO sort the sessions - session = self.sessions[user][key.device_id][0] - device_payload_dict["recipient"] = user - device_payload_dict["recipient_keys"] = { - "ed25519": key.keys["ed25519"] - } - - olm_message = session.encrypt( - Olm._to_json(device_payload_dict) - ) - - olm_dict = { - "algorithm": "m.olm.v1.curve25519-aes-sha2", - "sender_key": self.account.identity_keys["curve25519"], - "ciphertext": { - key.keys["curve25519"]: { - "type": (0 if isinstance( - olm_message, - OlmPreKeyMessage - ) else 1), - "body": olm_message.ciphertext - } - } - } - - if user not in to_device_dict["messages"]: - to_device_dict["messages"][user] = {} - - to_device_dict["messages"][user][key.device_id] = olm_dict - - # W.prnt("", pprint.pformat(to_device_dict)) - return to_device_dict - - @classmethod - @encrypt_enabled - def from_session_dir(cls, user, device_id, session_path): - # type: (Server) -> Olm - db_file = "{}_{}.db".format(user, device_id) - db_path = os.path.join(session_path, db_file) - database = sqlite3.connect(db_path) - Olm._check_db_tables(database) - - cursor = database.cursor() - - cursor.execute("select pickle from olmaccount where user = ?", (user,)) - row = cursor.fetchone() - account_pickle = row[0] - - cursor.execute("select user, device_id, pickle from olmsessions") - db_sessions = cursor.fetchall() - - cursor.execute("select room_id, pickle from inbound_group_sessions") - db_inbound_group_sessions = cursor.fetchall() - - cursor.close() - - sessions = defaultdict(lambda: defaultdict(list)) - inbound_group_sessions = defaultdict(dict) - - try: - try: - account_pickle = bytes(account_pickle, "utf-8") - except TypeError: - pass - - account = Account.from_pickle(account_pickle) - - for db_session in db_sessions: - session_pickle = db_session[2] - try: - session_pickle = bytes(session_pickle, "utf-8") - except TypeError: - pass - - sessions[db_session[0]][db_session[1]].append( - Session.from_pickle(session_pickle)) - - for db_session in db_inbound_group_sessions: - session_pickle = db_session[1] - try: - session_pickle = bytes(session_pickle, "utf-8") - except TypeError: - pass - - session = InboundGroupSession.from_pickle(session_pickle) - inbound_group_sessions[db_session[0]][session.id] = session - - return cls(user, device_id, session_path, database, account, - sessions, inbound_group_sessions) - except (OlmAccountError, OlmSessionError) as error: - raise EncryptionError(error) - - def _update_acc_in_db(self): - cursor = self.database.cursor() - cursor.execute("update olmaccount set pickle=? where user = ?", - (self.account.pickle(), self.user)) - self.database.commit() - cursor.close() - - def _update_sessions_in_db(self): - cursor = self.database.cursor() - - for user, session_dict in self.sessions.items(): - for device_id, session_list in session_dict.items(): - for session in session_list: - cursor.execute("""update olmsessions set pickle=? - where user = ? and session_id = ? and - device_id = ?""", - (session.pickle(), user, session.id, - device_id)) - self.database.commit() - - cursor.close() - - def _update_inbound_group_sessions(self): - cursor = self.database.cursor() - - for room_id, session_dict in self.inbound_group_sessions.items(): - for session in session_dict.values(): - cursor.execute("""update inbound_group_sessions set pickle=? - where room_id = ? and session_id = ?""", - (session.pickle(), room_id, session.id)) - self.database.commit() - - cursor.close() - - def _store_session(self, user, device_id, session): - cursor = self.database.cursor() - - cursor.execute("insert into olmsessions values(?,?,?,?)", - (user, device_id, session.id, session.pickle())) - - self.database.commit() - - cursor.close() - - def _store_inbound_group_session(self, room_id, session): - cursor = self.database.cursor() - - cursor.execute("insert into inbound_group_sessions values(?,?,?)", - (room_id, session.id, session.pickle())) - - self.database.commit() - - cursor.close() - - def _insert_acc_to_db(self): - cursor = self.database.cursor() - cursor.execute("insert into olmaccount values (?,?)", - (self.user, self.account.pickle())) - self.database.commit() - cursor.close() - - @staticmethod - def _check_db_tables(database): - cursor = database.cursor() - cursor.execute("""select name from sqlite_master where type='table' - and name='olmaccount'""") - if not cursor.fetchone(): - cursor.execute("create table olmaccount (user text, pickle text)") - database.commit() - - cursor.execute("""select name from sqlite_master where type='table' - and name='olmsessions'""") - if not cursor.fetchone(): - cursor.execute("""create table olmsessions (user text, - device_id text, session_id text, pickle text)""") - database.commit() - - cursor.execute("""select name from sqlite_master where type='table' - and name='inbound_group_sessions'""") - if not cursor.fetchone(): - cursor.execute("""create table inbound_group_sessions - (room_id text, session_id text, pickle text)""") - database.commit() - - cursor.close() - - @encrypt_enabled - def to_session_dir(self): - # type: (Server) -> None - try: - self._update_acc_in_db() - self._update_sessions_in_db() - except OlmAccountError as error: - raise EncryptionError(error) - - def sign_json(self, json_dict): - signature = self.account.sign(json.dumps( - json_dict, - ensure_ascii=False, - separators=(',', ':'), - sort_keys=True, - )) - - return signature - - @staticmethod - def _to_json(json_dict): - # type: (Dict[Any, Any]) -> str - return json.dumps( - json_dict, - ensure_ascii=False, - separators=(",", ":"), - sort_keys=True - ) - - @encrypt_enabled - def mark_keys_as_published(self): - self.account.mark_keys_as_published() diff --git a/matrix/events.py b/matrix/events.py deleted file mode 100644 index 3221516..0000000 --- a/matrix/events.py +++ /dev/null @@ -1,386 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright © 2018 Damir Jelić -# -# Permission to use, copy, modify, and/or distribute this software for -# any purpose with or without fee is hereby granted, provided that the -# above copyright notice and this permission notice appear in all copies. -# -# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES -# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF -# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY -# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER -# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF -# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN -# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - -from __future__ import unicode_literals -from builtins import str - -import json -import pprint - -from collections import deque, defaultdict -from functools import partial -from operator import itemgetter - -from matrix.globals import W -from matrix.utils import (tags_for_message, sanitize_id, sanitize_token, - sanitize_text, tags_from_line_data) - -from matrix.encryption import OlmDeviceKey, OneTimeKey -from .buffer import RoomUser, OwnMessage, OwnAction - -try: - from olm.session import OlmMessage, OlmPreKeyMessage -except ImportError: - pass - - -class MatrixEvent(): - - def __init__(self, server): - self.server = server - - def execute(self): - pass - - -class MatrixErrorEvent(MatrixEvent): - - def __init__(self, server, error_message, fatal=False): - self.error_message = error_message - self.fatal = fatal - MatrixEvent.__init__(self, server) - - @classmethod - def from_dict(cls, server, error_prefix, fatal, parsed_dict): - try: - message = "{prefix}: {error}".format( - prefix=error_prefix, error=sanitize_text(parsed_dict["error"])) - return cls(server, message, fatal=fatal) - except KeyError: - return cls( - server, ("{prefix}: Invalid JSON response " - "from server.").format(prefix=error_prefix), - fatal=fatal) - - -class MatrixKeyUploadEvent(MatrixEvent): - - def __init__(self, server, device_keys): - self.device_keys = device_keys - MatrixEvent.__init__(self, server) - - def execute(self): - self.server.olm.mark_keys_as_published() - self.server.store_olm() - - if not self.device_keys: - return - - message = "{prefix}matrix: Uploaded Olm device keys.".format( - prefix=W.prefix("network")) - - W.prnt(self.server.server_buffer, message) - - @classmethod - def from_dict(cls, server, device_keys, parsed_dict): - try: - return cls(server, device_keys) - except (KeyError, TypeError, ValueError): - return MatrixErrorEvent.from_dict(server, "Error uploading device" - "keys", False, parsed_dict) - - -class MatrixSendEvent(MatrixEvent): - - def __init__(self, server, room_id, message): - self.room_id = room_id - self.message = message - MatrixEvent.__init__(self, server) - - @classmethod - def from_dict(cls, server, room_id, message, parsed_dict): - try: - event_id = sanitize_id(parsed_dict["event_id"]) - sender = server.user_id - age = 0 - formatted_message = message - - message = OwnMessage(sender, age, event_id, formatted_message) - - return cls(server, room_id, message) - except (KeyError, TypeError, ValueError): - return MatrixErrorEvent.from_dict(server, "Error sending message", - False, parsed_dict) - - -class MatrixEmoteEvent(MatrixSendEvent): - - @classmethod - def from_dict(cls, server, room_id, message, parsed_dict): - try: - event_id = sanitize_id(parsed_dict["event_id"]) - sender = server.user_id - age = 0 - formatted_message = message - - message = OwnAction(sender, age, event_id, formatted_message) - - return cls(server, room_id, message) - except (KeyError, TypeError, ValueError): - return MatrixErrorEvent.from_dict(server, "Error sending message", - False, parsed_dict) - - -class MatrixTopicEvent(MatrixEvent): - - def __init__(self, server, room_id, event_id, topic): - self.room_id = room_id - self.topic = topic - self.event_id = event_id - MatrixEvent.__init__(self, server) - - @classmethod - def from_dict(cls, server, room_id, topic, parsed_dict): - try: - return cls(server, room_id, sanitize_id(parsed_dict["event_id"]), - topic) - except (KeyError, TypeError, ValueError): - return MatrixErrorEvent.from_dict(server, "Error setting topic", - False, parsed_dict) - - -class MatrixRedactEvent(MatrixEvent): - - def __init__(self, server, room_id, event_id, reason): - self.room_id = room_id - self.topic = reason - self.event_id = event_id - MatrixEvent.__init__(self, server) - - @classmethod - def from_dict(cls, server, room_id, reason, parsed_dict): - try: - return cls(server, room_id, sanitize_id(parsed_dict["event_id"]), - reason) - except (KeyError, TypeError, ValueError): - return MatrixErrorEvent.from_dict(server, "Error redacting message", - False, parsed_dict) - - -class MatrixJoinEvent(MatrixEvent): - - def __init__(self, server, room, room_id): - self.room = room - self.room_id = room_id - MatrixEvent.__init__(self, server) - - @classmethod - def from_dict(cls, server, room_id, parsed_dict): - try: - return cls( - server, - room_id, - sanitize_id(parsed_dict["room_id"]), - ) - except (KeyError, TypeError, ValueError): - return MatrixErrorEvent.from_dict(server, "Error joining room", - False, parsed_dict) - - -class MatrixPartEvent(MatrixEvent): - - def __init__(self, server, room_id): - self.room_id = room_id - MatrixEvent.__init__(self, server) - - @classmethod - def from_dict(cls, server, room_id, parsed_dict): - try: - if parsed_dict == {}: - return cls(server, room_id) - - raise KeyError - except KeyError: - return MatrixErrorEvent.from_dict(server, "Error leaving room", - False, parsed_dict) - - -class MatrixInviteEvent(MatrixEvent): - - def __init__(self, server, room_id, user_id): - self.room_id = room_id - self.user_id = user_id - MatrixEvent.__init__(self, server) - - @classmethod - def from_dict(cls, server, room_id, user_id, parsed_dict): - try: - if parsed_dict == {}: - return cls(server, room_id, user_id) - - raise KeyError - except KeyError: - return MatrixErrorEvent.from_dict(server, "Error inviting user", - False, parsed_dict) - - -class MatrixKickEvent(MatrixEvent): - - def __init__(self, server, room_id, user_id, reason): - self.room_id = room_id - self.user_id = user_id - self.reason = reason - MatrixEvent.__init__(self, server) - - @classmethod - def from_dict(cls, server, room_id, user_id, reason, parsed_dict): - try: - if parsed_dict == {}: - return cls(server, room_id, user_id, reason) - - raise KeyError - except KeyError: - return MatrixErrorEvent.from_dict(server, "Error kicking user", - False, parsed_dict) - - -class MatrixKeyQueryEvent(MatrixEvent): - - def __init__(self, server, keys): - self.keys = keys - MatrixEvent.__init__(self, server) - - @staticmethod - def _get_keys(key_dict): - keys = {} - - for key_type, key in key_dict.items(): - key_type, _ = key_type.split(":") - keys[key_type] = key - - return keys - - @classmethod - def from_dict(cls, server, parsed_dict): - keys = defaultdict(list) - try: - for user_id, device_dict in parsed_dict["device_keys"].items(): - for device_id, key_dict in device_dict.items(): - device_keys = MatrixKeyQueryEvent._get_keys( - key_dict.pop("keys")) - keys[user_id].append(OlmDeviceKey(user_id, device_id, - device_keys)) - return cls(server, keys) - except KeyError: - # TODO error message - return MatrixErrorEvent.from_dict(server, "Error kicking user", - False, parsed_dict) - - def execute(self): - # TODO move this logic into an Olm method - olm = self.server.olm - - if olm.device_keys == self.keys: - return - - olm.device_keys = self.keys - # TODO invalidate megolm sessions for rooms that got new devices - - -class MatrixKeyClaimEvent(MatrixEvent): - - def __init__(self, server, room_id, keys): - self.keys = keys - self.room_id = room_id - MatrixEvent.__init__(self, server) - - @classmethod - def from_dict(cls, server, room_id, parsed_dict): - W.prnt("", pprint.pformat(parsed_dict)) - keys = [] - try: - for user_id, user_dict in parsed_dict["one_time_keys"].items(): - for device_id, device_dict in user_dict.items(): - for key_dict in device_dict.values(): - # TODO check the signature of the key - key = OneTimeKey(user_id, device_id, key_dict["key"]) - keys.append(key) - - return cls(server, room_id, keys) - except KeyError: - return MatrixErrorEvent.from_dict( - server, ("Error claiming onetime keys."), False, parsed_dict) - - def execute(self): - server = self.server - olm = server.olm - - for key in self.keys: - olm.create_session(key.user_id, key.device_id, key.key) - - while server.encryption_queue[self.room_id]: - formatted_message = server.encryption_queue[self.room_id].popleft() - room, _ = server.find_room_from_id(self.room_id) - server.send_room_message(room, formatted_message, True) - - -class MatrixToDeviceEvent(MatrixEvent): - - def __init__(self, server): - MatrixEvent.__init__(self, server) - - @classmethod - def from_dict(cls, server, parsed_dict): - try: - if parsed_dict == {}: - return cls(server) - - raise KeyError - except KeyError: - return MatrixErrorEvent.from_dict(server, ("Error sending to " - "device message"), - False, parsed_dict) - - -class MatrixBacklogEvent(MatrixEvent): - - def __init__(self, server, room_id, end_token, events): - self.room_id = room_id - self.end_token = end_token - self.events = events - MatrixEvent.__init__(self, server) - - @staticmethod - def _room_event_from_dict(room_id, event_dict): - if room_id != event_dict["room_id"]: - raise ValueError - - if "redacted_by" in event_dict["unsigned"]: - return RoomRedactedMessageEvent.from_dict(event_dict) - - return RoomMessageEvent.from_dict(event_dict) - - @classmethod - def from_dict(cls, server, room_id, parsed_dict): - try: - end_token = sanitize_id(parsed_dict["end"]) - - if not parsed_dict["chunk"]: - return cls(server, room_id, end_token, []) - - event_func = partial(MatrixBacklogEvent._room_event_from_dict, - room_id) - - message_events = list( - filter(lambda event: event["type"] == "m.room.message", - parsed_dict["chunk"])) - - events = [event_func(m) for m in message_events] - - return cls(server, room_id, end_token, events) - except (KeyError, ValueError, TypeError): - return MatrixErrorEvent.from_dict(server, "Error fetching backlog", - False, parsed_dict) diff --git a/matrix/http.py b/matrix/http.py deleted file mode 100644 index 8ad1ef4..0000000 --- a/matrix/http.py +++ /dev/null @@ -1,93 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright © 2018 Damir Jelić -# -# Permission to use, copy, modify, and/or distribute this software for -# any purpose with or without fee is hereby granted, provided that the -# above copyright notice and this permission notice appear in all copies. -# -# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES -# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF -# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY -# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER -# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF -# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN -# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - -from __future__ import unicode_literals -from builtins import str - -import json -from enum import Enum, unique - - -@unique -class RequestType(Enum): - GET = 0 - POST = 1 - PUT = 2 - - -class HttpResponse: - - def __init__(self, status, headers, body): - self.status = status # type: int - self.headers = headers # type: Dict[str, str] - self.body = body # type: bytes - - -# yapf: disable -class HttpRequest: - def __init__( - self, - request_type, # type: RequestType - host, # type: str - location, # type: str - data=None, # type: Dict[str, Any] - user_agent='weechat-matrix/{version}'.format( - version="0.1") # type: str - ): - # type: (...) -> None - user_agent = 'User-Agent: {agent}'.format(agent=user_agent) - host_header = 'Host: {host}'.format(host=host) - keepalive = "Connection: keep-alive" - request_list = [] # type: List[str] - accept_header = 'Accept: */*' # type: str - end_separator = '\r\n' # type: str - payload = "" # type: str - # yapf: enable - - if request_type == RequestType.GET: - get = 'GET {location} HTTP/1.1'.format(location=location) - request_list = [ - get, host_header, user_agent, keepalive, accept_header, - end_separator - ] - - elif (request_type == RequestType.POST or - request_type == RequestType.PUT): - - json_data = json.dumps(data, separators=(',', ':')) - - if request_type == RequestType.POST: - method = "POST" - else: - method = "PUT" - - request_line = '{method} {location} HTTP/1.1'.format( - method=method, location=location) - - type_header = 'Content-Type: application/x-www-form-urlencoded' - length_header = 'Content-Length: {length}'.format( - length=len(json_data)) - - request_list = [ - request_line, host_header, user_agent, keepalive, - accept_header, length_header, type_header, end_separator - ] - payload = json_data - - request = '\r\n'.join(request_list) - - self.request = request - self.payload = payload diff --git a/matrix/server.py b/matrix/server.py index eb8ca05..0c28206 100644 --- a/matrix/server.py +++ b/matrix/server.py @@ -15,55 +15,23 @@ # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. from __future__ import unicode_literals -from builtins import str, bytes import os import ssl import socket import time -import datetime import pprint -import json from collections import deque, defaultdict -from http_parser.pyparser import HttpParser -from nio import Client, LoginResponse, SyncRepsponse +from nio import HttpClient, LoginResponse, SyncRepsponse from matrix.plugin_options import Option, DebugType from matrix.utils import (key_from_value, prnt_debug, server_buffer_prnt, - create_server_buffer, tags_for_message, - server_ts_to_weechat, shorten_sender) + create_server_buffer) from matrix.utf import utf8_decode -from matrix.globals import W, SERVERS, OPTIONS -import matrix.api as API +from matrix.globals import W, SERVERS from .buffer import RoomBuffer, OwnMessage, OwnAction -from matrix.api import ( - MatrixClient, - MatrixSyncMessage, - MatrixLoginMessage, - MatrixKeyUploadMessage, - MatrixKeyQueryMessage, - MatrixToDeviceMessage, - MatrixSendMessage, - MatrixEncryptedMessage, - MatrixKeyClaimMessage -) - -from .events import ( - MatrixSendEvent, - MatrixBacklogEvent, - MatrixErrorEvent, - MatrixEmoteEvent, - MatrixJoinEvent -) - -from matrix.encryption import ( - Olm, - EncryptionError, - OlmTrustError, - encrypt_enabled -) try: FileNotFoundError @@ -108,7 +76,6 @@ class MatrixServer: self.ssl_context = ssl.create_default_context() # type: ssl.SSLContext self.client = None - self.nio_client = Client() # type: Option[Client] self.access_token = None # type: str self.next_batch = None # type: str self.transaction_id = 0 # type: int @@ -117,18 +84,8 @@ class MatrixServer: self.send_fd_hook = None # type: weechat.hook self.send_buffer = b"" # type: bytes - self.current_message = None # type: MatrixMessage self.device_check_timestamp = None - self.http_parser = HttpParser() # type: HttpParser - self.http_buffer = [] # type: List[bytes] - - # Queue of messages we need to send off. - self.send_queue = deque() # type: Deque[MatrixMessage] - - # Queue of messages we send off and are waiting a response for - self.receive_queue = deque() # type: Deque[MatrixMessage] - self.event_queue_timer = None self.event_queue = deque() # type: Deque[RoomInfo] @@ -166,48 +123,6 @@ class MatrixServer: with open(path, 'w') as f: f.write(self.device_id) - def _load_olm(self): - try: - self.olm = Olm.from_session_dir( - self.user, - self.device_id, - self.get_session_path() - ) - message = ("{prefix}matrix: Loaded Olm account for {user} (device:" - "{device})").format(prefix=W.prefix("network"), - user=self.user, - device=self.device_id) - W.prnt("", message) - - except FileNotFoundError: - pass - except EncryptionError as error: - message = ("{prefix}matrix: Error loading Olm" - "account: {error}.").format( - prefix=W.prefix("error"), error=error) - W.prnt("", message) - - @encrypt_enabled - def create_olm(self): - message = ("{prefix}matrix: Creating new Olm identity for " - "{self_color}{user}{ncolor}" - " on {server_color}{server}{ncolor} for device " - "{device}.").format( - prefix=W.prefix("network"), - self_color=W.color("chat_nick_self"), - ncolor=W.color("reset"), - user=self.user_id, - server_color=W.color("chat_server"), - server=self.name, - device=self.device_id) - W.prnt(self.server_buffer, message) - self.olm = Olm(self.user, self.device_id, self.get_session_path()) - - @encrypt_enabled - def store_olm(self): - if self.olm: - self.olm.to_session_dir() - def _create_options(self, config_file): options = [ Option('autoconnect', 'boolean', '', 0, 0, 'off', @@ -243,15 +158,9 @@ class MatrixServer: option.max, option.value, option.value, 0, "", "", "matrix_config_server_change_cb", self.name, "", "") - def reset_parser(self): - self.http_parser = HttpParser() - self.http_buffer = [] - def _change_client(self): host = ':'.join([self.address, str(self.port)]) - user_agent = 'weechat-matrix/{version}'.format(version="0.1") - self.client = MatrixClient(host, user_agent=user_agent) - # self.nio_client = Client() + self.client = HttpClient(host, self.user) def update_option(self, option, option_name): if option_name == "address": @@ -280,13 +189,9 @@ class MatrixServer: value = W.config_string(option) self.user = value self.access_token = "" - self.nio_client.user = value - self.nio_client.access_token = "" - self._load_device_id() - - # if self.device_id: - # self._load_olm() + if self.client: + self.client.user = value elif option_name == "password": value = W.config_string(option) @@ -305,7 +210,6 @@ class MatrixServer: "Adding to queue").format( prefix=W.prefix("error"), t=message.__class__.__name__)) - self.send_queue.append(message) def try_send(self, message): # type: (MatrixServer, bytes) -> bool @@ -362,37 +266,22 @@ class MatrixServer: return True def _abort_send(self): - self.send_queue.appendleft(self.current_message) self.current_message = None self.send_buffer = "" def _finalize_send(self): # type: (MatrixServer) -> None - self.current_message.send_time = time.time() - self.receive_queue.append(self.current_message) - self.send_buffer = b"" - self.current_message = None - def send(self, message): - # type: (MatrixServer, MatrixMessage) -> bool - if self.current_message: - return False - - self.current_message = message - - request = message.request.request - payload = message.request.payload - - bytes_message = bytes(request, 'utf-8') + bytes(payload, 'utf-8') - - self.try_send(bytes_message) + def send(self, data): + # type: (bytes) -> bool + self.try_send(data) return True def reconnect(self): message = ("{prefix}matrix: reconnecting to server..." - ).format(prefix=W.prefix("network")) + ).format(prefix=W.prefix("network")) server_buffer_prnt(self, message) @@ -437,8 +326,6 @@ class MatrixServer: self.socket = None self.connected = False self.access_token = "" - self.send_queue.clear() - self.receive_queue.clear() self.send_buffer = b"" self.current_message = None @@ -500,124 +387,13 @@ class MatrixServer: return True def sync(self): - limit = None if self.next_batch else OPTIONS.sync_limit - message = MatrixSyncMessage(self.client, self.next_batch, limit) - self.send_queue.append(message) - - def _send_unencrypted_message(self, room_id, formatted_data): - message = MatrixSendMessage( - self.client, room_id=room_id, formatted_message=formatted_data) - self.send_or_queue(message) - - def send_room_message( - self, - room, - formatted_data, - already_claimed=False - ): - # type: (str, Formatted) -> None - if not room.encrypted: - self._send_unencrypted_message(room.room_id, formatted_data) - return - - # TODO don't send messages unless all the devices are verified - missing = self.olm.get_missing_sessions(room.users.keys()) - - if missing and not already_claimed: - W.prnt("", "{prefix}matrix: Olm session missing for room, can't" - " encrypt message.") - W.prnt("", pprint.pformat(missing)) - self.encryption_queue[room.room_id].append(formatted_data) - message = MatrixKeyClaimMessage(self.client, room.room_id, missing) - self.send_or_queue(message) - return - - body = {"msgtype": "m.text", "body": formatted_data.to_plain()} - - if formatted_data.is_formatted(): - body["format"] = "org.matrix.custom.html" - body["formatted_body"] = formatted_data.to_html() - - plaintext_dict = { - "type": "m.room.message", - "content": body - } - - W.prnt("", "matrix: Encrypting message") - - try: - payload_dict, to_device_dict = self.olm.group_encrypt( - room.room_id, - plaintext_dict, - self.user_id, - room.users.keys() - ) - - if to_device_dict: - W.prnt("", "matrix: Megolm session missing for room.") - message = MatrixToDeviceMessage(self.client, to_device_dict) - self.send_queue.append(message) - - message = MatrixEncryptedMessage( - self.client, - room.room_id, - formatted_data, - payload_dict - ) - - self.send_queue.append(message) - - except OlmTrustError: - m = ("{prefix}matrix: Untrusted devices found in room, " - "verification is needed before sending a message").format( - prefix=W.prefix("error")) - W.prnt(self.server_buffer, m) - return - - @encrypt_enabled - def upload_keys(self, device_keys=False, one_time_keys=False): - keys = self.olm.account.identity_keys if device_keys else None - - one_time_keys = (self.olm.account.one_time_keys["curve25519"] if - one_time_keys else None) - - message = MatrixKeyUploadMessage(self.client, self.user_id, - self.device_id, self.olm, - keys, one_time_keys) - self.send_queue.append(message) - - @encrypt_enabled - def check_one_time_keys(self, key_count): - max_keys = self.olm.account.max_one_time_keys - - key_count = (max_keys / 2) - key_count - - if key_count <= 0: - return - - self.olm.account.generate_one_time_keys(key_count) - self.upload_keys(device_keys=False, one_time_keys=True) - - @encrypt_enabled - def query_keys(self): - users = [] - - for room_buffer in self.room_buffers.values(): - if not room_buffer.room.encrypted: - continue - users += list(room_buffer.room.users) - - if not users: - return - - message = MatrixKeyQueryMessage(self.client, users) - self.send_queue.append(message) + request = self.client.sync() + self.send_or_queue(request) def login(self): - # type: (MatrixServer) -> None - message = MatrixLoginMessage(self.client, self.user, self.password, - self.device_name, self.device_id) - self.send_or_queue(message) + # type: () -> None + request = self.client.login(self.password) + self.send_or_queue(request) msg = "{prefix}matrix: Logging in...".format(prefix=W.prefix("network")) @@ -698,92 +474,18 @@ class MatrixServer: # self.check_one_time_keys(response.one_time_key_count) # self.handle_events() - def handle_matrix_response(self, response): - if isinstance(response, MatrixSendEvent): - room_buffer = self.find_room_from_id(response.room_id) - self.handle_own_messages(room_buffer, response.message) - - elif isinstance(response, MatrixBacklogEvent): - room_buffer = self.find_room_from_id(response.room_id) - room_buffer.handle_backlog(response.events) - W.bar_item_update("buffer_modes") - - elif isinstance(response, MatrixErrorEvent): - self._handle_erorr_response(response) - - def nio_receive(self): - response = self.nio_client.next_response() + def handle_response(self, response): + # type: (MatrixMessage) -> None if isinstance(response, LoginResponse): self._handle_login(response) elif isinstance(response, SyncRepsponse): self._handle_sync(response) - def nio_parse_response(self, response): - if isinstance(response, MatrixLoginMessage): - self.nio_client.receive("login", response.response.body) - elif isinstance(response, MatrixSyncMessage): - self.nio_client.receive("sync", response.response.body) - - self.nio_receive() - - return - - def handle_response(self, message): - # type: (MatrixMessage) -> None - - assert message.response - - if ('content-type' in message.response.headers and - message.response.headers['content-type'] == 'application/json'): - - if isinstance(message, (MatrixLoginMessage, MatrixSyncMessage)): - self.nio_parse_response(message) - - else: - ret, error = message.decode_body(self) - - if not ret: - message = ("{prefix}matrix: Error decoding json response" - " from server: {error}").format( - prefix=W.prefix("error"), error=error) - W.prnt(self.server_buffer, message) - return - - event = message.event - self.handle_matrix_response(event) - else: - status_code = message.response.status - if status_code == 504: - if isinstance(message, API.MatrixSyncMessage): - self.sync() - else: - self._print_message_error(message) - else: - self._print_message_error(message) - - creation_date = datetime.datetime.fromtimestamp(message.creation_time) - done_time = time.time() - info_message = ( - "Message of type {t} created at {c}." - "\nMessage lifetime information:" - "\n Send delay: {s} ms" - "\n Receive delay: {r} ms" - "\n Handling time: {h} ms" - "\n Total time: {total} ms").format( - t=message.__class__.__name__, - c=creation_date, - s=(message.send_time - message.creation_time) * 1000, - r=(message.receive_time - message.send_time) * 1000, - h=(done_time - message.receive_time) * 1000, - total=(done_time - message.creation_time) * 1000, - ) - prnt_debug(DebugType.TIMING, self, info_message) - return def create_room_buffer(self, room_id): - room = self.nio_client.rooms[room_id] + room = self.client.rooms[room_id] buf = RoomBuffer(room, self.name) # TODO this should turned into a propper class self.room_buffers[room_id] = buf @@ -867,31 +569,31 @@ def matrix_timer_cb(server_name, remaining_calls): if not server.connected: return W.WEECHAT_RC_OK - # check lag, disconnect if it's too big - if server.receive_queue: - message = server.receive_queue.popleft() - server.lag = (current_time - message.send_time) * 1000 - server.receive_queue.appendleft(message) - server.lag_done = False - W.bar_item_update("lag") + # # check lag, disconnect if it's too big + # if server.receive_queue: + # message = server.receive_queue.popleft() + # server.lag = (current_time - message.send_time) * 1000 + # server.receive_queue.appendleft(message) + # server.lag_done = False + # W.bar_item_update("lag") - # TODO print out message, make timeout configurable - if server.lag > 300000: - server.disconnect() - return W.WEECHAT_RC_OK + # # TODO print out message, make timeout configurable + # if server.lag > 300000: + # server.disconnect() + # return W.WEECHAT_RC_OK - while server.send_queue: - message = server.send_queue.popleft() - prnt_debug( - DebugType.MESSAGING, - server, ("Timer hook found message of type {t} in queue. Sending " - "out.".format(t=message.__class__.__name__))) + # while server.send_queue: + # message = server.send_queue.popleft() + # prnt_debug( + # DebugType.MESSAGING, + # server, ("Timer hook found message of type {t} in queue. Sending " + # "out.".format(t=message.__class__.__name__))) - if not server.send(message): - # We got an error while sending the last message return the message - # to the queue and exit the loop - server.send_queue.appendleft(message) - break + # if not server.send(message): + # # We got an error while sending the last message return the message + # # to the queue and exit the loop + # server.send_queue.appendleft(message) + # break if not server.next_batch: return W.WEECHAT_RC_OK @@ -904,7 +606,6 @@ def matrix_timer_cb(server_name, remaining_calls): "{prefix}matrix: Querying user devices.".format( prefix=W.prefix("networ"))) - server.query_keys() server.device_check_timestamp = current_time return W.WEECHAT_RC_OK