commit 0de75c7ca3fad3b687820f8a9ca3e0aabf5dd021 Author: poljar (Damir Jelić) Date: Sat Dec 30 14:03:03 2017 +0100 Initial commit. diff --git a/weechat-matrix.py b/weechat-matrix.py new file mode 100644 index 0000000..5917f51 --- /dev/null +++ b/weechat-matrix.py @@ -0,0 +1,771 @@ +# -*- coding: utf-8 -*- + +from __future__ import unicode_literals + +import json +import socket +import ssl +import time + +from builtins import bytes +from collections import deque, Mapping, Iterable +from enum import Enum, unique +from functools import wraps +from typing import List, Set, Dict, Tuple, Text, Optional, AnyStr, Deque, Any + +from http_parser.pyparser import HttpParser + +import weechat + +WEECHAT_SCRIPT_NAME = "matrix" # type: unicode +WEECHAT_SCRIPT_DESCRIPTION = "matrix chat plugin" # type: unicode +WEECHAT_SCRIPT_AUTHOR = "Damir Jelić " # type: unicode +WEECHAT_SCRIPT_VERSION = "0.1" # type: unicode +WEECHAT_SCRIPT_LICENSE = "GPL3" # type: unicode + +SCRIPT_COMMAND = WEECHAT_SCRIPT_NAME # type: unicode + +MATRIX_API_PATH = "/_matrix/client/r0" # type: unicode + + +# Unicode handling + +def encode_to_utf8(data): + if isinstance(data, unicode): + return data.encode('utf-8') + if isinstance(data, bytes): + return data + elif isinstance(data, Mapping): + return type(data)(map(encode_to_utf8, data.iteritems())) + elif isinstance(data, Iterable): + return type(data)(map(encode_to_utf8, data)) + else: + return data + + +def decode_from_utf8(data): + if isinstance(data, bytes): + return data.decode('utf-8') + if isinstance(data, unicode): + return data + elif isinstance(data, Mapping): + return type(data)(map(decode_from_utf8, data.iteritems())) + elif isinstance(data, Iterable): + return type(data)(map(decode_from_utf8, data)) + else: + return data + + +def utf8_decode(f): + """ + Decode all arguments from byte strings to unicode strings. Use this for + functions called from outside of this script, e.g. callbacks from weechat. + """ + @wraps(f) + def wrapper(*args, **kwargs): + return f(*decode_from_utf8(args), **decode_from_utf8(kwargs)) + return wrapper + + +class WeechatWrapper(object): + def __init__(self, wrapped_class): + self.wrapped_class = wrapped_class + + # Helper method used to encode/decode method calls. + def wrap_for_utf8(self, method): + def hooked(*args, **kwargs): + result = method(*encode_to_utf8(args), **encode_to_utf8(kwargs)) + # Prevent wrapped_class from becoming unwrapped + if result == self.wrapped_class: + return self + return decode_from_utf8(result) + return hooked + + # Encode and decode everything sent to/received from weechat. We use the + # unicode type internally in wee-slack, but has to send utf8 to weechat. + def __getattr__(self, attr): + orig_attr = self.wrapped_class.__getattribute__(attr) + if callable(orig_attr): + return self.wrap_for_utf8(orig_attr) + else: + return decode_from_utf8(orig_attr) + + # Ensure all lines sent to weechat specifies a prefix. For lines after the + # first, we want to disable the prefix, which is done by specifying a space. + def prnt_date_tags(self, buffer, date, tags, message): + message = message.replace("\n", "\n \t") + return self.wrap_for_utf8(self.wrapped_class.prnt_date_tags)(buffer, date, tags, message) + + +@unique +class MessageType(Enum): + LOGIN = 1 + GET_INFO = 2 + SYNC = 3 + POST_MSG = 4 + + +@unique +class RequestType(Enum): + GET = 1 + POST = 2 + PUT = 3 + + +# TODO separate this into weechat settings and matrix settings +class Settings: + def __init__(self, + address="https://matrix.org", + port=8448, + user="", + password="", + nick="", + autoconnect=False): + # type: (unicode, int, unicode, unicode, unicode, bool) -> None + + self.address = address # type: unicode + self.nick = nick # type: unicode + self.user = user # type: unicode + self.password = password # type: unicode + self.port = port # type: int + self.autoconnect = autoconnect # type: bool + + +class RequestBuilder: + # TODO put the user agent somewhere globally + def __init__(self, host, user_agent='weechat-matrix/0.1'): + # type: (unicode, unicode) -> None + self.host = host + + self.host_header = 'Host: {host}'.format(host=host) + self.user_agent = 'User-Agent: {agent}'.format(agent=user_agent) + + # TODO we need to handle PUT as well + def request(self, location, data=None): + # type: (unicode, Dict[Any, Any]) -> (HttpRequest) + + request_list = [] # type: List[unicode] + accept_header = 'Accept: */*' # type: unicode + end_separator = '\r\n' # type: unicode + payload = None # type: unicode + + if data: + json_data = json.dumps(data, separators=(',', ':')) + + post = 'POST {location} HTTP/1.1'.format(location=location) + type_header = 'Content-Type: application/x-www-form-urlencoded' + length_header = 'Content-Length: {length}'.format(length=len(json_data)) + + request_list = [post, self.host_header, + self.user_agent, accept_header, + length_header, type_header, end_separator] + payload = json_data + else: + get = 'GET {location} HTTP/1.1'.format(location=location) + request_list = [get, self.host_header, + self.user_agent, accept_header, end_separator] + + request = '\r\n'.join(request_list) + + return HttpRequest(request, payload) + + +class HttpResponse: + def __init__(self, status, headers, body): + self.status = status # type: int + self.headers = headers # type: Dict[unicode, unicode] + self.body = body # type: bytes + + +class HttpRequest: + def __init__(self, request, payload): + # type: (unicode, unicode) -> None + self.request = request + self.payload = payload + + +class MatrixMessage: + def __init__(self, messageType, request, response): + # type: (MessageType, HttpRequest, HttpResponse) -> None + self.type = messageType + self.request = request + self.response = response + + +class Matrix: + def __init__(self, settings): + # type: (Settings) -> None + self.settings = settings # type: Settings + self.access_token = "" # type: unicode + self.next_batch = "" # type: unicode + self.rooms = {} # type: Dict[unicode, MatrixRoom] + + +class MatrixRoom: + def __init__(self, id, join_rule, alias=None): + # type: (unicode, unicode, unicode) -> None + self.id = id # type: unicode + self.alias = alias # type: unicode + self.join_rule = join_rule # type: unicode + self.users = [] # type: MatrixUsers + + +class WeechatClient: + def __init__(self, settings): + self.settings = settings + + self.buffers = dict() # type: Dict[unicode, weechat.buffer] + self.fd_hook = None + self.timer_hook = None + + self.connected = False # type: bool + self.reconnectCount = 0 # type: long + self.socket = None # type: ssl.SSLSocket + self.ssl_context = ssl.create_default_context() # type: ssl.SSLContext + + # TODO this belongs into the Matrix class + self.access_token = None # type: unicode + host_string = ':'.join([settings.address, + str(settings.port)]) # type: unicode + self.builder = RequestBuilder(host_string) # type: RequestBuilder + self.next_batch = None + + self.httpParser = HttpParser() # type: HttpParser + self.httpBodyBuffer = [] # type: List[bytes] + + # Queue of messages we need to send off. + self.sendQueue = deque() # type: Deque[MatrixMessage] + # Queue of messages we send off and are waiting a response for + self.recieveQueue = deque() # type: Deque[MatrixMessage] + # Queue for messages we got a response of and need to handle + # TODO is this needed? will we ever deffer message handling? + self.MessageQueue = deque() # type: Deque[MatrixMessage] + + # FIXME Don't set insecure + self.set_insecure() + + # TODO remove this + def set_insecure(self): + self.ssl_context.check_hostname = False + self.ssl_context.verify_mode = ssl.CERT_NONE + + +def wrap_socket(fd=None): + # type: (int) -> None + s = None # type: socket.socket + + # TODO explain why these type gymnastics are needed + if fd: + tempSocket = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) + if type(tempSocket) == socket._socket.socket: + s = socket._socketobject(_sock=tempSocket) + else: + s = tempSocket + else: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + try: + ssl_socket = CLIENT.ssl_context.wrap_socket(s, + server_hostname=CLIENT.settings.address) # type: ssl.SSLSocket + CLIENT.socket = ssl_socket + # TODO add the other relevant exceptions + except ssl.SSLError as e: + print(e) + + +def handleHttpResponse(message): + # type: (MatrixMessage) -> None + + status_code = message.response.status + + # TODO handle error responses + # TODO handle try again response + if status_code == 200: + # TODO json.loads can fail + response = json.loads(message.response.body, encoding='utf-8') + handleMatrixMessage(message.type, response) + else: + print("ERROR IN HTTP RESPONSE {status_code}".format(status_code=status_code)) + print(message.request.request) + print(message.response.body) + + return + +NICK_GROUP_HERE = "0|Here" +NICK_GROUP_AWAY = "1|Away" + +def handle_room_info(room_info): + def create_buffer(roomd_id, alias=None): + if not alias: + alias = "#{id}".format(id=room_id) + + + buf = W.buffer_new( + alias, + "my_input_cb", + "", + "my_close_cb", + "" + ) + + # TODO set the buffer type dynamically + W.buffer_set(buf, "localvar_set_type", 'channel') + W.buffer_set(buf, "type", 'formated') + W.buffer_set(buf, "localvar_set_channel", alias) + + # TODO set the nick dynamically + W.buffer_set(buf, "localvar_set_nick", 'poljar') + + W.buffer_set(buf, "localvar_set_server", "matrix.org") + W.buffer_set(buf, "title", "🔐") + + # TODO put this in a function + short_name = name=alias.rsplit(":", 1)[0] + W.buffer_set(buf, "short_name", short_name) + + CLIENT.buffers[room_id] = buf + + def handle_aliases(room_id, event): + if room_id not in CLIENT.buffers: + alias = event['content']['aliases'][-1] + create_buffer(room_id, alias) + + def handle_members(room_id, event): + if event['membership'] == 'join': + try: + buf = CLIENT.buffers[room_id] + except KeyError: + event_queue.append(event) + return + + W.buffer_set(buf, "nicklist", "1") + W.buffer_set(buf, "nicklist_display_groups", "0") + # create nicklists for the current channel if they don't exist + # if they do, use the existing pointer + here = W.nicklist_search_group(buf, '', NICK_GROUP_HERE) + nick = event['content']['displayname'] + if not here: + here = W.nicklist_add_group(buf, '', NICK_GROUP_HERE, "weechat.color.nicklist_group", 1) + + W.nicklist_add_nick(buf, here, nick, "", "", "", 1) + + def handle_room_state(state_events): + for event in state_events: + if event['type'] == 'm.room.aliases': + handle_aliases(room_id, event) + elif event['type'] == 'm.room.member': + handle_members(room_id, event) + elif event['type'] == 'm.room.message': + message_queue.append(event) + + def handle_room_timeline(timeline_events): + for event in timeline_events: + if event['type'] == 'm.room.aliases': + handle_aliases(room_id, event) + + elif event['type'] == 'm.room.member': + handle_members(room_id, event) + elif event['type'] == 'm.room.message': + message_queue.append(event) + + def handle_text_message(room_id, event): + msg = event['content']['body'] + + # TODO put this in a function or lambda + msg_author = event['sender'].rsplit(":", 1)[0][1:] + + data = "{author}\t{msg}".format(author=msg_author, msg=msg) + + event_id = event['event_id'] + event_id = "matrix_id_{id}".format(id=event_id) + + msg_age = event['unsigned']['age'] + t = time.time() + t = int(t - (msg_age / 1000)) + buf = CLIENT.buffers[room_id] + + # TODO if this is an initial sync tag the messages as backlog + tag = "nick_{a},{event_id},irc_privmsg,notify_message".format( + a=msg_author, event_id=event_id) + + W.prnt_date_tags(buf, t, tag, data) + + for room_id, room in room_info['join'].iteritems(): + # TODO do we need these queues or can we just rename the buffer if and + # when we get an alias dynamically? + event_queue = deque() + message_queue = deque() + + handle_room_state(room['state']['events']) + handle_room_timeline(room['timeline']['events']) + + # The room doesn't have an alias, create it now using the room id + if room_id not in CLIENT.buffers: + create_buffer(room_id) + + # TODO we don't need a separate event/message queue here + while event_queue: + event = event_queue.popleft() + + if event['type'] == 'm.room.member': + handle_members(room_id, event) + else: + assert("Wrong event type in event queue") + + while message_queue: + event = message_queue.popleft() + + if event['type'] == 'm.room.message': + if event['content']['msgtype'] == 'm.text': + handle_text_message(room_id, event) + # TODO handle different content types here + else: + print("Handling of content type {type} not implemented".format(type=event['content']['type'])) + + +def handleMatrixMessage(messageType, matrixResponse): + # type: (MessageType, Dict[Any, Any]) -> None + + if messageType is MessageType.LOGIN: + CLIENT.access_token = matrixResponse["access_token"] + message = generate_matrix_request(MessageType.SYNC, CLIENT.builder, + CLIENT.access_token) + send_or_queue(message) + + elif messageType is MessageType.GET_INFO: + print(matrixResponse) + + elif messageType is messageType.SYNC: + next_batch = matrixResponse['next_batch'] + + # we got the same batch again, nothing to do + if next_batch == CLIENT.next_batch: + return + + room_info = matrixResponse['rooms'] + handle_room_info(room_info) + + CLIENT.next_batch = next_batch + + else: + print("Handling of message type {type} not implemented".format(type=messageType)) + + +def generate_matrix_request(type, http_builder, access_token=None, room_id=None, data=None): + # type: (MessageType, RequestBuilder, unicode, unicode, Dict[Any, Any]) -> MatrixMessage + if type == MessageType.LOGIN: + path = '/_matrix/client/r0/login' + post_data = {"type": "m.login.password", + "user": CLIENT.settings.user, + "password": CLIENT.settings.password} + + request = CLIENT.builder.request(path, post_data) + + return MatrixMessage(MessageType.LOGIN, request, None) + + elif type == MessageType.GET_INFO: + path = '/_matrix/client/r0/login' + request = CLIENT.builder.request(path) + return MatrixMessage(MessageType.GET_INFO, request, None) + + elif type == MessageType.SYNC: + path = '/_matrix/client/r0/sync?access_token={access_token}'.format(access_token=access_token) + + if CLIENT.next_batch: + path = path + '&since={next_batch}'.format(next_batch=CLIENT.next_batch) + + request = CLIENT.builder.request(path) + + return MatrixMessage(MessageType.SYNC, request, None) + + elif type == MessageType.POST_MSG: + path = '/_matrix/client/r0/rooms/{room}/send/m.room.message?access_token={access_token}'.format(room=room_id, access_token=access_token) + request = CLIENT.builder.request(path, data) + + return MatrixMessage(MessageType.POST_MSG, request, None) + + else: + assert("Incorrect message type") + return None + + +def connect(): + fd = CLIENT.socket.fileno() + hook = W.hook_fd(fd, 1, 0, 0, "recieve_cb", "") + + CLIENT.fd_hook = hook + CLIENT.connected = True + CLIENT.connecting = False + CLIENT.reconnectCount = 0 + + print("Connected") + + if not CLIENT.access_token: + matrix_login() + + +def matrix_login(): + message = generate_matrix_request(MessageType.LOGIN, CLIENT.builder) + send_or_queue(message) + + +def matrix_get_info(): + message = generate_matrix_request(MessageType.GET_INFO, CLIENT.builder) + send_or_queue(message) + + +def matrix_initial_sync(): + message = generate_matrix_request(MessageType.SYNC, CLIENT.builder, + CLIENT.access_token) + send_or_queue(message) + + +def send_or_queue(message): + # type: (MatrixMessage) -> None + if not send(message): + CLIENT.sendQueue.append(message) + +def send(message): + # type: (MatrixMessage) -> Bool + + request = message.request.request + payload = message.request.payload + + try: + CLIENT.socket.sendall(bytes(request, 'utf-8')) + if payload: + CLIENT.socket.sendall(bytes(payload, 'utf-8')) + + CLIENT.recieveQueue.append(message) + return True + + except socket.error as e: + disconnect() + print(e) + return False + +@utf8_decode +def recieve_cb(data, fd): + if not CLIENT.connected: + print("NOT CONNECTED WHILE RECEIVING") + # can this happen? + # do reconnection + pass + + while True: + try: + # print("Trying to read") + data = CLIENT.socket.recv(4096) + # TODO add the other relevant exceptions + except ssl.SSLWantReadError: + break + except socket.error as e: + disconnect() + + # Queue the failed message for resending + message = CLIENT.recieveQueue.popleft() + CLIENT.sendQueue.appendleft(message) + + print(e) + return + + if not data: + print("No data while reading") + disconnect() + break + + recieved = len(data) # type: int + nParsed = CLIENT.httpParser.execute(data, recieved) + + assert nParsed == recieved + + if CLIENT.httpParser.is_partial_body(): + CLIENT.httpBodyBuffer.append(CLIENT.httpParser.recv_body()) + + if CLIENT.httpParser.is_message_complete(): + status = CLIENT.httpParser.get_status_code() + headers = CLIENT.httpParser.get_headers() + body = b"".join(CLIENT.httpBodyBuffer) + + message = CLIENT.recieveQueue.popleft() + message.response = HttpResponse(status, headers, body) + + # Message done, reset the parser state. + CLIENT.httpParser = HttpParser() + CLIENT.httpBodyBuffer = [] + + handleHttpResponse(message) + break + + return W.WEECHAT_RC_OK + + +def disconnect(): + if CLIENT.fd_hook: + W.unhook(CLIENT.fd_hook) + + # if CLIENT.timer_hook: + # W.unhook(CLIENT.timer_hook) + + CLIENT.fd_hook = None + CLIENT.timer_hook = None + CLIENT.socket = None + CLIENT.connected = False + W.prnt("", "Disconnected") + + +# TODO if we're reconnecting we should retry even if there was an error on the +# socket creation +@utf8_decode +def connect_cb(data, status, gnutls_rc, sock, error, ip_address): + status_value = int(status) # type: long + + if status_value == W.WEECHAT_HOOK_CONNECT_OK: + fd = int(sock) # type: int + wrap_socket(fd) + + if CLIENT.socket: + connect() + else: + reconnect_cmd(None, None, None) + + elif status_value == W.WEECHAT_HOOK_CONNECT_ADDRESS_NOT_FOUND: + W.prnt("", '{address} not found'.format(address=ip_address)) + + elif status_value == W.WEECHAT_HOOK_CONNECT_IP_ADDRESS_NOT_FOUND: + W.prnt("", 'IP address not found') + + elif status_value == W.WEECHAT_HOOK_CONNECT_CONNECTION_REFUSED: + W.prnt("", 'Connection refused') + + elif status_value == W.WEECHAT_HOOK_CONNECT_PROXY_ERROR: + W.prnt("", 'Proxy fails to establish connection to server') + + elif status_value == W.WEECHAT_HOOK_CONNECT_LOCAL_HOSTNAME_ERROR: + W.prnt("", 'Unable to set local hostname') + + elif status_value == W.WEECHAT_HOOK_CONNECT_GNUTLS_INIT_ERROR: + W.prnt("", 'TLS init error') + + elif status_value == W.WEECHAT_HOOK_CONNECT_GNUTLS_HANDSHAKE_ERROR: + W.prnt("", 'TLS Handshake failed') + + elif status_value == W.WEECHAT_HOOK_CONNECT_MEMORY_ERROR: + W.prnt("", 'Not enough memory') + + elif status_value == W.WEECHAT_HOOK_CONNECT_TIMEOUT: + W.prnt("", 'Timeout') + + elif status_value == W.WEECHAT_HOOK_CONNECT_SOCKET_ERROR: + W.prnt("", 'Unable to create socket') + else: + W.prnt("", 'Unexpected error: {status}'.format(status=status_value)) + + return W.WEECHAT_RC_OK + + +@utf8_decode +def test_cb(data, buffer, args): + matrix_get_info() + return W.WEECHAT_RC_OK + + +def reconnect_cmd(data, buffer, args): + timeout = CLIENT.reconnectCount * 5 * 1000 + + if timeout > 0: + print("Reconnecting in {timeout} seconds.".format(timeout=timeout / 1000)) + W.hook_timer(timeout, 0, 1, "reconnect_cb", "") + else: + connect_cmd(None, None, None) + + CLIENT.reconnectCount += 1 + + return W.WEECHAT_RC_OK + + +@utf8_decode +def reconnect_cb(data, remaining): + connect_cmd(None, None, None) + return W.WEECHAT_RC_OK + + +@utf8_decode +def connect_cmd(data, buffer, args): + W.hook_connect("", settings.address, settings.port, 1, 0, "", + "connect_cb", "server") + CLIENT.connecting = True + return W.WEECHAT_RC_OK + + +@utf8_decode +def my_input_cb(data, buffer, input_data): + room_id = list(CLIENT.buffers.keys())[list(CLIENT.buffers.values()).index(buffer)] + body = {"msgtype": "m.text", "body": input_data} + message = generate_matrix_request(MessageType.POST_MSG, CLIENT.builder, + data=body, room_id=room_id, + access_token=CLIENT.access_token) + # print(message.request.request) + send_or_queue(message) + # W.prnt(buffer, "Text: %s" % input_data) + return W.WEECHAT_RC_OK + + +@utf8_decode +def my_close_cb(data, buffer): + W.prnt("", "Buffer '%s' will be closed!" % + W.buffer_get_string(buffer, "name")) + return W.WEECHAT_RC_OK + + +@utf8_decode +def my_timer_cb(data, remaining_calls): + if not CLIENT.connected: + if not CLIENT.connecting: + print("Reconnecting timeout blaaaa") + reconnect_cmd(None, None, None) + return W.WEECHAT_RC_OK + + while CLIENT.sendQueue: + message = CLIENT.sendQueue.popleft() + + if not send(message): + # We got an error while sending the last message return the message + # to the queue and exit the loop + CLIENT.sendQueue.appendleft(message) + break + + for message in CLIENT.MessageQueue: + print("Handling message: {message}".format(message=message)) + + if CLIENT.next_batch: + message = generate_matrix_request(MessageType.SYNC, CLIENT.builder, + CLIENT.access_token) + CLIENT.sendQueue.append(message) + + return W.WEECHAT_RC_OK + + +if __name__ == "__main__": + W = WeechatWrapper(weechat) + + if W.register(WEECHAT_SCRIPT_NAME, + WEECHAT_SCRIPT_AUTHOR, + WEECHAT_SCRIPT_VERSION, + WEECHAT_SCRIPT_LICENSE, + WEECHAT_SCRIPT_DESCRIPTION, + '', + ''): + + settings = Settings(address="localhost", + port=8448, user="example", + password="wordpass") + CLIENT = WeechatClient(settings) + + CLIENT.timer_hook = W.hook_timer(1 * 1000, 0, 0, "my_timer_cb", "") + + W.hook_command("matrix-send", "testing sending", "", "", "", + "test_cb", "") + + # TODO connect if autoconnect is enabled + connect_cmd(None, None, None)