From c63551d37e6f11fde1d5851fbb9ed7c196a67e32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?poljar=20=28Damir=20Jeli=C4=87=29?= Date: Fri, 2 Feb 2018 10:13:56 +0100 Subject: [PATCH] Make the send functions methods of the server class. --- main.py | 4 +- matrix/api.py | 3 +- matrix/commands.py | 13 ++- matrix/messages.py | 4 +- matrix/server.py | 211 ++++++++++++++++++++++----------------------- 5 files changed, 114 insertions(+), 121 deletions(-) diff --git a/main.py b/main.py index 7b8e0e7..eb556f2 100644 --- a/main.py +++ b/main.py @@ -53,9 +53,7 @@ from matrix.server import ( MatrixServer, create_default_server, matrix_server_connect, - send, send_cb, - send_or_queue, matrix_server_disconnect, matrix_server_reconnect, matrix_server_reconnect_schedule, @@ -428,7 +426,7 @@ def room_input_cb(server_name, buffer, input_data): data=body, room_id=room_id, extra_data=extra_data) - send_or_queue(server, message) + server.send_or_queue(message) return W.WEECHAT_RC_OK diff --git a/matrix/api.py b/matrix/api.py index 53065e7..8d2799c 100644 --- a/matrix/api.py +++ b/matrix/api.py @@ -23,7 +23,6 @@ from enum import Enum, unique from matrix.globals import OPTIONS -from matrix.server import send_or_queue from matrix.http import RequestType, HttpRequest MATRIX_API_PATH = "/_matrix/client/r0" # type: str @@ -257,4 +256,4 @@ def matrix_login(server): MessageType.LOGIN, data=post_data ) - send_or_queue(server, message) + server.send_or_queue(message) diff --git a/matrix/commands.py b/matrix/commands.py index 367a018..e889388 100644 --- a/matrix/commands.py +++ b/matrix/commands.py @@ -30,7 +30,6 @@ from matrix.server import ( MatrixServer, matrix_server_connect, matrix_server_disconnect, - send_or_queue ) @@ -117,7 +116,7 @@ def matrix_fetch_old_messages(server, room_id): message = MatrixMessage(server, GLOBAL_OPTIONS, MessageType.ROOM_MSG, room_id=room_id, extra_id=prev_batch) - send_or_queue(server, message) + server.send_or_queue(message) return @@ -195,7 +194,7 @@ def matrix_command_join_cb(data, buffer, command): MessageType.JOIN, room_id=room_id ) - send_or_queue(server, message) + server.send_or_queue(message) for server in SERVERS.values(): if buffer in server.buffers.values(): @@ -236,7 +235,7 @@ def matrix_command_part_cb(data, buffer, command): MessageType.PART, room_id=room_id ) - send_or_queue(server, message) + server.send_or_queue(message) for server in SERVERS.values(): if buffer in server.buffers.values(): @@ -274,7 +273,7 @@ def matrix_command_invite_cb(data, buffer, command): room_id=room_id, data=body ) - send_or_queue(server, message) + server.send_or_queue(message) for server in SERVERS.values(): if buffer in server.buffers.values(): @@ -365,7 +364,7 @@ def matrix_redact_command_cb(data, buffer, args): room_id=room_id, extra_id=event_id ) - send_or_queue(server, message) + server.send_or_queue(message) return W.WEECHAT_RC_OK @@ -933,7 +932,7 @@ def matrix_command_topic_cb(data, buffer, command): room_id=room_id, extra_id="m.room.topic" ) - send_or_queue(server, message) + server.send_or_queue(message) return W.WEECHAT_RC_OK_EAT diff --git a/matrix/messages.py b/matrix/messages.py index afc598a..be0c3ab 100644 --- a/matrix/messages.py +++ b/matrix/messages.py @@ -38,7 +38,7 @@ from matrix.api import ( from matrix.utils import server_buffer_prnt, tags_from_line_data, prnt_debug from matrix.plugin_options import RedactType, DebugType -from matrix.server import send_or_queue, matrix_server_disconnect +from matrix.server import matrix_server_disconnect def strip_matrix_server(string): # type: (str) -> str @@ -687,7 +687,7 @@ def matrix_handle_message( server.access_token = response["access_token"] server.user_id = response["user_id"] message = MatrixMessage(server, OPTIONS, MessageType.SYNC) - send_or_queue(server, message) + server.send_or_queue(message) elif message_type is MessageType.SYNC: next_batch = response['next_batch'] diff --git a/matrix/server.py b/matrix/server.py index 6138260..d6f16e7 100644 --- a/matrix/server.py +++ b/matrix/server.py @@ -174,6 +174,108 @@ class MatrixServer: else: pass + def send_or_queue(self, message): + # type: (MatrixServer, MatrixMessage) -> None + if not self.send(message): + prnt_debug(DebugType.MESSAGING, self, + ("{prefix} Failed sending message of type {t}. " + "Adding to queue").format( + prefix=W.prefix("error"), + t=message.type)) + self.send_queue.append(message) + + def try_send(self, message): + # type: (MatrixServer, bytes) -> bool + + sock = self.socket + total_sent = 0 + message_length = len(message) + + while total_sent < message_length: + try: + sent = sock.send(message[total_sent:]) + + except ssl.SSLWantWriteError: + hook = W.hook_fd( + sock.fileno(), + 0, 1, 0, + "send_cb", + self.name + ) + self.send_fd_hook = hook + self.send_buffer = message[total_sent:] + return True + + except socket.error as error: + self.abort_send() + + errno = "error" + str(error.errno) + " " if error.errno else "" + strerr = error.strerror if error.strerror else "Unknown reason" + strerr = errno + strerr + + message = ("{prefix}Error while writing to " + "socket: {error}").format( + prefix=W.prefix("network"), + error=strerr) + + server_buffer_prnt(self, message) + server_buffer_prnt( + self, + ("{prefix}matrix: disconnecting from server...").format( + prefix=W.prefix("network"))) + + matrix_server_disconnect(self) + return False + + if sent == 0: + self.abort_send() + + server_buffer_prnt( + self, + "{prefix}matrix: Error while writing to socket".format( + W.prefix("network"))) + server_buffer_prnt( + self, + ("{prefix}matrix: disconnecting from server...").format( + prefix=W.prefix("network"))) + matrix_server_disconnect(self) + return False + + total_sent = total_sent + sent + + self.finalize_send() + return True + + def abort_send(self): + self.send_queue.appendleft(self.current_message) + self.current_message = None + self.send_buffer = "" + + def finalize_send(self): + # type: (MatrixServer) -> None + self.current_message.send_time = time.time() + self.receive_queue.append(self.current_message) + + self.send_buffer = "" + self.current_message = None + + + def send(self, message): + # type: (MatrixServer, MatrixMessage) -> bool + if self.current_message: + return False + + self.current_message = message + + request = message.request.request + payload = message.request.payload + + bytes_message = bytes(request, 'utf-8') + bytes(payload, 'utf-8') + + self.try_send(bytes_message) + + return True + @utf8_decode def matrix_config_server_read_cb( @@ -250,7 +352,7 @@ def matrix_timer_cb(server_name, remaining_calls): ("Timer hook found message of type {t} in queue. Sending " "out.".format(t=message.type))) - if not send(server, message): + if not server.send(message): # We got an error while sending the last message return the message # to the queue and exit the loop server.send_queue.appendleft(message) @@ -380,116 +482,11 @@ def send_cb(server_name, file_descriptor): server.send_fd_hook = None if server.send_buffer: - try_send(server, server.send_buffer) + server.try_send(server, server.send_buffer) return W.WEECHAT_RC_OK -def send_or_queue(server, message): - # type: (MatrixServer, MatrixMessage) -> None - if not send(server, message): - prnt_debug(DebugType.MESSAGING, server, - ("{prefix} Failed sending message of type {t}. " - "Adding to queue").format( - prefix=W.prefix("error"), - t=message.type)) - server.send_queue.append(message) - - -def try_send(server, message): - # type: (MatrixServer, bytes) -> bool - - sock = server.socket - total_sent = 0 - message_length = len(message) - - while total_sent < message_length: - try: - sent = sock.send(message[total_sent:]) - - except ssl.SSLWantWriteError: - hook = W.hook_fd( - server.sock.fileno(), - 0, 1, 0, - "send_cb", - server.name - ) - server.send_fd_hook = hook - server.send_buffer = message[total_sent:] - return True - - except socket.error as error: - abort_send(server) - - errno = "error" + str(error.errno) + " " if error.errno else "" - str_error = error.strerror if error.strerror else "Unknown reason" - str_error = errno + str_error - - message = ("{prefix}Error while writing to " - "socket: {error}").format( - prefix=W.prefix("network"), - error=str_error) - - server_buffer_prnt(server, message) - server_buffer_prnt( - server, - ("{prefix}matrix: disconnecting from server...").format( - prefix=W.prefix("network"))) - - matrix_server_disconnect(server) - return False - - if sent == 0: - abort_send(server) - - server_buffer_prnt( - server, - "{prefix}matrix: Error while writing to socket".format( - W.prefix("network"))) - server_buffer_prnt( - server, - ("{prefix}matrix: disconnecting from server...").format( - prefix=W.prefix("network"))) - matrix_server_disconnect(server) - return False - - total_sent = total_sent + sent - - finalize_send(server) - return True - - -def abort_send(server): - server.send_queue.appendleft(server.current_message) - server.current_message = None - server.send_buffer = "" - - -def finalize_send(server): - # type: (MatrixServer) -> None - server.current_message.send_time = time.time() - server.receive_queue.append(server.current_message) - - server.send_buffer = "" - server.current_message = None - - -def send(server, message): - # type: (MatrixServer, MatrixMessage) -> bool - if server.current_message: - return False - - server.current_message = message - - request = message.request.request - payload = message.request.payload - - bytes_message = bytes(request, 'utf-8') + bytes(payload, 'utf-8') - - try_send(server, bytes_message) - - return True - def close_socket(sock): # type: (socket.socket) -> None sock.shutdown(socket.SHUT_RDWR)