From 484d91b1664d3e5be1a068087146adc609723d0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?poljar=20=28Damir=20Jeli=C4=87=29?= Date: Wed, 31 Jan 2018 13:55:55 +0100 Subject: [PATCH] Make the message sending non blocking. --- main.py | 2 +- matrix/http.py | 2 +- matrix/server.py | 4 ++ matrix/socket.py | 133 ++++++++++++++++++++++++++++++++--------------- 4 files changed, 98 insertions(+), 43 deletions(-) diff --git a/main.py b/main.py index 0c9cdb2..92391cb 100644 --- a/main.py +++ b/main.py @@ -32,7 +32,7 @@ from matrix import colors from matrix.utf import utf8_decode from matrix.http import HttpResponse from matrix.api import MatrixMessage, MessageType, matrix_login -from matrix.socket import disconnect, send_or_queue, send, connect +from matrix.socket import disconnect, send_or_queue, send, connect, send_cb from matrix.messages import handle_http_response # Weechat searches for the registered callbacks in the scope of the main script diff --git a/matrix/http.py b/matrix/http.py index a14b51e..6b4d881 100644 --- a/matrix/http.py +++ b/matrix/http.py @@ -54,7 +54,7 @@ class HttpRequest: request_list = [] # type: List[str] accept_header = 'Accept: */*' # type: str end_separator = '\r\n' # type: str - payload = None # type: str + payload = "" # type: str if request_type == RequestType.GET: get = 'GET {location} HTTP/1.1'.format(location=location) diff --git a/matrix/server.py b/matrix/server.py index b4b8eac..0177b5e 100644 --- a/matrix/server.py +++ b/matrix/server.py @@ -64,6 +64,10 @@ class MatrixServer: self.transaction_id = 0 # type: int self.lag = 0 # type: int + self.send_fd_hook = None # type: weechat.hook + self.send_buffer = b"" # type: bytes + self.current_message = None # type: MatrixMessage + self.http_parser = HttpParser() # type: HttpParser self.http_buffer = [] # type: List[bytes] diff --git a/matrix/socket.py b/matrix/socket.py index 672caca..208c0cb 100644 --- a/matrix/socket.py +++ b/matrix/socket.py @@ -17,13 +17,16 @@ from __future__ import unicode_literals import time +import ssl import socket +import pprint from builtins import bytes, str import matrix.globals from matrix.plugin_options import DebugType from matrix.utils import prnt_debug, server_buffer_prnt, create_server_buffer +from matrix.utf import utf8_decode W = matrix.globals.W @@ -47,17 +50,6 @@ def disconnect(server): server_buffer_prnt(server, "Disconnected") -def send_or_queue(server, message): - # type: (MatrixServer, MatrixMessage) -> None - if not send(server, message): - prnt_debug(DebugType.MESSAGING, server, - ("{prefix} Failed sending message of type {t}. " - "Adding to queue").format( - prefix=W.prefix("error"), - t=message.type)) - server.send_queue.append(message) - - def connect(server): # type: (MatrixServer) -> int if not server.address or not server.port: @@ -94,41 +86,100 @@ def connect(server): return W.WEECHAT_RC_OK +@utf8_decode +def send_cb(server_name, file_descriptor): + # type: (str, int) -> int + + server = SERVERS[server_name] + + if server.send_fd_hook: + W.unhook(server.send_fd_hook) + server.send_fd_hook = None + + if server.send_buffer: + try_send(server, send_buffer) + + return W.WEECHAT_RC_OK + + +def send_or_queue(server, message): + # type: (MatrixServer, MatrixMessage) -> None + if not send(server, message): + prnt_debug(DebugType.MESSAGING, server, + ("{prefix} Failed sending message of type {t}. " + "Adding to queue").format( + prefix=W.prefix("error"), + t=message.type)) + server.send_queue.append(message) + + +def try_send(server, message): + # type: (MatrixServer, bytes) -> bool + + socket = server.socket + total_sent = 0 + message_length = len(message) + + while total_sent < message_length: + try: + sent = socket.send(message[total_sent:]) + + except ssl.SSLWantWriteError: + hook = W.hook_fd( + server.socket.fileno(), + 0, 1, 0, + "send_cb", + server.name + ) + server.send_fd_hook = hook + server.send_buffer = message[total_sent:] + return True + + except OSError as error: + disconnect(server) + abort_send(server) + server_buffer_prnt(server, str(error)) + return False + + if sent == 0: + disconnect(server) + abort_send(server) + server_buffer_prnt(server, "Socket closed while sending data.") + return False + + total_sent = total_sent + sent + + finalize_send(server) + return True + + +def abort_send(server): + server.send_queue.appendleft(server.current_message) + server.current_message = None + server.send_buffer = "" + + +def finalize_send(server): + # type: (MatrixServer) -> None + server.current_message.send_time = time.time() + server.receive_queue.append(server.current_message) + + server.send_buffer = "" + server.current_message = None + + def send(server, message): # type: (MatrixServer, MatrixMessage) -> bool + if server.current_message: + return False + + server.current_message = message request = message.request.request payload = message.request.payload - prnt_debug(DebugType.MESSAGING, server, - "{prefix} Sending message of type {t}.".format( - prefix=W.prefix("error"), - t=message.type)) + bytes_message = bytes(request, 'utf-8') + bytes(payload, 'utf-8') - try: - start = time.time() + try_send(server, bytes_message) - # TODO we probably shouldn't use sendall here. - server.socket.sendall(bytes(request, 'utf-8')) - if payload: - server.socket.sendall(bytes(payload, 'utf-8')) - - end = time.time() - message.send_time = end - - send_lag = (end - start) * 1000 - lag_string = "{0:.3f}" if send_lag < 1000 else "{0:.1f}" - - prnt_debug(DebugType.NETWORK, server.server_buffer, - ("{prefix}matrix: Message done sending (Lag: {t}s), putting" - " message in the receive queue.").format( - prefix=W.prefix("network"), - t=lag_string.format(send_lag))) - - server.receive_queue.append(message) - return True - - except OSError as error: - disconnect(server) - server_buffer_prnt(server, str(error)) - return False + return True