diff --git a/main.py b/main.py index d78076b..8848c3f 100644 --- a/main.py +++ b/main.py @@ -32,7 +32,6 @@ from matrix import colors from matrix.utf import utf8_decode from matrix.http import HttpResponse from matrix.api import MatrixMessage, MessageType, matrix_login -from matrix.socket import disconnect, send_or_queue, send, connect, send_cb from matrix.messages import handle_http_response # Weechat searches for the registered callbacks in the scope of the main script @@ -53,7 +52,13 @@ from matrix.commands import ( from matrix.server import ( MatrixServer, create_default_server, + matrix_server_connect, + send, + send_cb, + send_or_queue, + matrix_server_disconnect, matrix_server_reconnect, + matrix_server_reconnect_schedule, matrix_timer_cb, matrix_config_server_read_cb, matrix_config_server_write_cb, @@ -219,7 +224,7 @@ def receive_cb(server_name, file_descriptor): except ssl.SSLWantReadError: break except socket.error as error: - disconnect(server) + matrix_server_disconnect(server) # Queue the failed message for resending if server.receive_queue: @@ -237,7 +242,7 @@ def receive_cb(server_name, file_descriptor): message = server.receive_queue.popleft() server.send_queue.appendleft(message) - disconnect(server) + matrix_server_disconnect(server) break received = len(data) # type: int @@ -284,19 +289,11 @@ def finalize_connection(server): server.name ) - if not server.timer_hook: - server.timer_hook = W.hook_timer( - 1 * 1000, - 0, - 0, - "matrix_timer_cb", - server.name - ) - server.fd_hook = hook server.connected = True server.connecting = False - server.reconnect_count = 0 + server.reconnect_time = None + server.reconnect_delay = 0 matrix_login(server) @@ -357,15 +354,7 @@ def connect_cb(data, status, gnutls_rc, sock, error, ip_address): 'Unexpected error: {status}'.format(status=status_value) ) - matrix_server_reconnect(server) - return W.WEECHAT_RC_OK - - -@utf8_decode -def reconnect_cb(server_name, remaining): - server = SERVERS[server_name] - connect(server) - + matrix_server_reconnect_schedule(server) return W.WEECHAT_RC_OK @@ -426,7 +415,7 @@ def matrix_unload_cb(): def autoconnect(servers): for server in servers.values(): if server.autoconnect: - connect(server) + matrix_server_connect(server) if __name__ == "__main__": diff --git a/matrix/api.py b/matrix/api.py index 5940cfa..53065e7 100644 --- a/matrix/api.py +++ b/matrix/api.py @@ -23,7 +23,7 @@ from enum import Enum, unique from matrix.globals import OPTIONS -from matrix.socket import send_or_queue +from matrix.server import send_or_queue from matrix.http import RequestType, HttpRequest MATRIX_API_PATH = "/_matrix/client/r0" # type: str diff --git a/matrix/commands.py b/matrix/commands.py index a94c825..367a018 100644 --- a/matrix/commands.py +++ b/matrix/commands.py @@ -25,9 +25,13 @@ import matrix.globals from matrix.utf import utf8_decode from matrix.api import MatrixMessage, MessageType from matrix.utils import key_from_value, tags_from_line_data -from matrix.socket import send_or_queue, disconnect, connect from matrix.plugin_options import DebugType -from matrix.server import MatrixServer +from matrix.server import ( + MatrixServer, + matrix_server_connect, + matrix_server_disconnect, + send_or_queue +) W = matrix.globals.W @@ -819,17 +823,17 @@ def matrix_command_cb(data, buffer, args): for server_name in args: if check_server_existence(server_name, SERVERS): server = SERVERS[server_name] - connect(server) + matrix_server_connect(server) def disconnect_server(args): for server_name in args: if check_server_existence(server_name, SERVERS): server = SERVERS[server_name] - if server.connected: - W.unhook(server.timer_hook) - server.timer_hook = None + if server.connected or server.reconnect_time: + # W.unhook(server.timer_hook) + # server.timer_hook = None server.access_token = "" - disconnect(server) + matrix_server_disconnect(server, reconnect=False) split_args = list(filter(bool, args.split(' '))) diff --git a/matrix/messages.py b/matrix/messages.py index a6d4257..afc598a 100644 --- a/matrix/messages.py +++ b/matrix/messages.py @@ -36,9 +36,9 @@ from matrix.api import ( MatrixUser ) -from matrix.socket import send_or_queue, disconnect, close_socket from matrix.utils import server_buffer_prnt, tags_from_line_data, prnt_debug from matrix.plugin_options import RedactType, DebugType +from matrix.server import send_or_queue, matrix_server_disconnect def strip_matrix_server(string): # type: (str) -> str @@ -809,8 +809,7 @@ def handle_http_response(server, message): W.unhook(server.timer_hook) server.timer_hook = None - close_socket(server) - disconnect(server) + matrix_server_disconnect(server) elif message.type == MessageType.STATE: response = decode_json(server, message.response.body) reason = ("." if not response or not response["error"] else diff --git a/matrix/server.py b/matrix/server.py index 0177b5e..a930048 100644 --- a/matrix/server.py +++ b/matrix/server.py @@ -16,18 +16,24 @@ from __future__ import unicode_literals -from builtins import str +from builtins import str, bytes import ssl +import socket +import time from collections import deque from http_parser.pyparser import HttpParser from matrix.plugin_options import Option, DebugType -from matrix.utils import key_from_value, prnt_debug, server_buffer_prnt +from matrix.utils import ( + key_from_value, + prnt_debug, + server_buffer_prnt, + create_server_buffer +) from matrix.utf import utf8_decode from matrix.globals import W, SERVERS -from matrix.socket import send, connect class MatrixServer: @@ -55,7 +61,8 @@ class MatrixServer: self.autoconnect = False # type: bool self.connected = False # type: bool self.connecting = False # type: bool - self.reconnect_count = 0 # type: int + self.reconnect_delay = 0 # type: int + self.reconnect_time = None # type: float self.socket = None # type: ssl.SSLSocket self.ssl_context = ssl.create_default_context() # type: ssl.SSLContext @@ -228,10 +235,14 @@ def matrix_config_server_change_cb(server_name, option): def matrix_timer_cb(server_name, remaining_calls): server = SERVERS[server_name] + current_time = time.time() + + if ((not server.connected) and + server.reconnect_time and + current_time >= (server.reconnect_time + server.reconnect_delay)): + matrix_server_reconnect(server) + if not server.connected: - if not server.connecting: - server_buffer_prnt(server, "Reconnecting timeout blaaaa") - matrix_server_reconnect(server) return W.WEECHAT_RC_OK while server.send_queue: @@ -246,11 +257,6 @@ def matrix_timer_cb(server_name, remaining_calls): server.send_queue.appendleft(message) break - for message in server.message_queue: - server_buffer_prnt( - server, - "Handling message: {message}".format(message=message)) - return W.WEECHAT_RC_OK @@ -264,17 +270,205 @@ def create_default_server(config_file): def matrix_server_reconnect(server): + message = ("{prefix}matrix: reconnecting to server...").format( + prefix=W.prefix("network")) + + server_buffer_prnt(server, message) + + server.reconnect_time = None + + if not matrix_server_connect(server): + matrix_server_reconnect_schedule(server) + + +def matrix_server_reconnect_schedule(server): # type: (MatrixServer) -> None server.connecting = True - timeout = server.reconnect_count * 5 * 1000 + server.reconnect_time = time.time() - if timeout > 0: - server_buffer_prnt( - server, - "Reconnecting in {timeout} seconds.".format( - timeout=timeout / 1000)) - W.hook_timer(timeout, 0, 1, "reconnect_cb", server.name) + if server.reconnect_delay: + server.reconnect_delay = server.reconnect_delay * 2 else: - connect(server) + server.reconnect_delay = 10 - server.reconnect_count += 1 + message = ("{prefix}matrix: reconnecting to server in {t} " + "seconds").format( + prefix=W.prefix("network"), + t=server.reconnect_delay) + + server_buffer_prnt(server, message) + + +def matrix_server_disconnect(server, reconnect=True): + # type: (MatrixServer) -> None + if server.fd_hook: + W.unhook(server.fd_hook) + + # TODO close socket + close_socket(server.socket) + + server.fd_hook = None + server.socket = None + server.connected = False + server.access_token = "" + server.receive_queue.clear() + + server.reconnect_delay = 0 + server.reconnect_time = None + + if server.server_buffer: + message = ("{prefix}matrix: disconnected from server").format( + prefix=W.prefix("network")) + server_buffer_prnt(server, message) + + if reconnect: + matrix_server_reconnect_schedule(server) + + +def matrix_server_connect(server): + # type: (MatrixServer) -> int + if not server.address or not server.port: + message = "{prefix}Server address or port not set".format( + prefix=W.prefix("error")) + W.prnt("", message) + return False + + if not server.user or not server.password: + message = "{prefix}User or password not set".format( + prefix=W.prefix("error")) + W.prnt("", message) + return False + + if server.connected: + return True + + if not server.server_buffer: + create_server_buffer(server) + + if not server.timer_hook: + server.timer_hook = W.hook_timer( + 1 * 1000, + 0, + 0, + "matrix_timer_cb", + server.name + ) + + ssl_message = " (SSL)" if server.ssl_context.check_hostname else "" + + message = "{prefix}matrix: Connecting to {server}:{port}{ssl}...".format( + prefix=W.prefix("network"), + server=server.address, + port=server.port, + ssl=ssl_message) + + W.prnt(server.server_buffer, message) + + W.hook_connect("", server.address, server.port, 1, 0, "", + "connect_cb", server.name) + + return True + + +@utf8_decode +def send_cb(server_name, file_descriptor): + # type: (str, int) -> int + + server = SERVERS[server_name] + + if server.send_fd_hook: + W.unhook(server.send_fd_hook) + server.send_fd_hook = None + + if server.send_buffer: + try_send(server, send_buffer) + + return W.WEECHAT_RC_OK + + +def send_or_queue(server, message): + # type: (MatrixServer, MatrixMessage) -> None + if not send(server, message): + prnt_debug(DebugType.MESSAGING, server, + ("{prefix} Failed sending message of type {t}. " + "Adding to queue").format( + prefix=W.prefix("error"), + t=message.type)) + server.send_queue.append(message) + + +def try_send(server, message): + # type: (MatrixServer, bytes) -> bool + + socket = server.socket + total_sent = 0 + message_length = len(message) + + while total_sent < message_length: + try: + sent = socket.send(message[total_sent:]) + + except ssl.SSLWantWriteError: + hook = W.hook_fd( + server.socket.fileno(), + 0, 1, 0, + "send_cb", + server.name + ) + server.send_fd_hook = hook + server.send_buffer = message[total_sent:] + return True + + except OSError as error: + disconnect(server) + abort_send(server) + server_buffer_prnt(server, str(error)) + return False + + if sent == 0: + disconnect(server) + abort_send(server) + server_buffer_prnt(server, "Socket closed while sending data.") + return False + + total_sent = total_sent + sent + + finalize_send(server) + return True + + +def abort_send(server): + server.send_queue.appendleft(server.current_message) + server.current_message = None + server.send_buffer = "" + + +def finalize_send(server): + # type: (MatrixServer) -> None + server.current_message.send_time = time.time() + server.receive_queue.append(server.current_message) + + server.send_buffer = "" + server.current_message = None + + +def send(server, message): + # type: (MatrixServer, MatrixMessage) -> bool + if server.current_message: + return False + + server.current_message = message + + request = message.request.request + payload = message.request.payload + + bytes_message = bytes(request, 'utf-8') + bytes(payload, 'utf-8') + + try_send(server, bytes_message) + + return True + +def close_socket(sock): + # type: (socket.socket) -> None + sock.shutdown(socket.SHUT_RDWR) + sock.close() diff --git a/matrix/socket.py b/matrix/socket.py deleted file mode 100644 index ebe5400..0000000 --- a/matrix/socket.py +++ /dev/null @@ -1,186 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright © 2018 Damir Jelić -# -# Permission to use, copy, modify, and/or distribute this software for -# any purpose with or without fee is hereby granted, provided that the -# above copyright notice and this permission notice appear in all copies. -# -# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES -# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF -# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY -# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER -# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF -# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN -# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - -from __future__ import unicode_literals - -import time -import ssl -import socket -import pprint - -from builtins import bytes, str - -import matrix.globals -from matrix.plugin_options import DebugType -from matrix.utils import prnt_debug, server_buffer_prnt, create_server_buffer -from matrix.utf import utf8_decode - - -W = matrix.globals.W - - -def close_socket(server): - # type: (MatrixServer) -> None - server.socket.shutdown(socket.SHUT_RDWR) - server.socket.close() - - -def disconnect(server): - # type: (MatrixServer) -> None - if server.fd_hook: - W.unhook(server.fd_hook) - - server.fd_hook = None - server.socket = None - server.connected = False - server.receive_queue.clear() - - server_buffer_prnt(server, "Disconnected") - - -def connect(server): - # type: (MatrixServer) -> int - if not server.address or not server.port: - message = "{prefix}Server address or port not set".format( - prefix=W.prefix("error")) - W.prnt("", message) - return False - - if not server.user or not server.password: - message = "{prefix}User or password not set".format( - prefix=W.prefix("error")) - W.prnt("", message) - return False - - if server.connected: - return True - - if not server.server_buffer: - create_server_buffer(server) - - ssl_message = " (SSL)" if server.ssl_context.check_hostname else "" - - message = "{prefix}matrix: Connecting to {server}:{port}{ssl}...".format( - prefix=W.prefix("network"), - server=server.address, - port=server.port, - ssl=ssl_message) - - W.prnt(server.server_buffer, message) - - W.hook_connect("", server.address, server.port, 1, 0, "", - "connect_cb", server.name) - - return W.WEECHAT_RC_OK - - -@utf8_decode -def send_cb(server_name, file_descriptor): - # type: (str, int) -> int - - server = SERVERS[server_name] - - if server.send_fd_hook: - W.unhook(server.send_fd_hook) - server.send_fd_hook = None - - if server.send_buffer: - try_send(server, send_buffer) - - return W.WEECHAT_RC_OK - - -def send_or_queue(server, message): - # type: (MatrixServer, MatrixMessage) -> None - if not send(server, message): - prnt_debug(DebugType.MESSAGING, server, - ("{prefix} Failed sending message of type {t}. " - "Adding to queue").format( - prefix=W.prefix("error"), - t=message.type)) - server.send_queue.append(message) - - -def try_send(server, message): - # type: (MatrixServer, bytes) -> bool - - socket = server.socket - total_sent = 0 - message_length = len(message) - - while total_sent < message_length: - try: - sent = socket.send(message[total_sent:]) - - except ssl.SSLWantWriteError: - hook = W.hook_fd( - server.socket.fileno(), - 0, 1, 0, - "send_cb", - server.name - ) - server.send_fd_hook = hook - server.send_buffer = message[total_sent:] - return True - - except OSError as error: - disconnect(server) - abort_send(server) - server_buffer_prnt(server, str(error)) - return False - - if sent == 0: - disconnect(server) - abort_send(server) - server_buffer_prnt(server, "Socket closed while sending data.") - return False - - total_sent = total_sent + sent - - finalize_send(server) - return True - - -def abort_send(server): - server.send_queue.appendleft(server.current_message) - server.current_message = None - server.send_buffer = "" - - -def finalize_send(server): - # type: (MatrixServer) -> None - server.current_message.send_time = time.time() - server.receive_queue.append(server.current_message) - - server.send_buffer = "" - server.current_message = None - - -def send(server, message): - # type: (MatrixServer, MatrixMessage) -> bool - if server.current_message: - return False - - server.current_message = message - - request = message.request.request - payload = message.request.payload - - bytes_message = bytes(request, 'utf-8') + bytes(payload, 'utf-8') - - try_send(server, bytes_message) - - return True