Make the message sending non blocking.

This commit is contained in:
poljar (Damir Jelić) 2018-01-31 13:55:55 +01:00
parent d83c25e709
commit 484d91b166
4 changed files with 98 additions and 43 deletions

View file

@ -32,7 +32,7 @@ from matrix import colors
from matrix.utf import utf8_decode
from matrix.http import HttpResponse
from matrix.api import MatrixMessage, MessageType, matrix_login
from matrix.socket import disconnect, send_or_queue, send, connect
from matrix.socket import disconnect, send_or_queue, send, connect, send_cb
from matrix.messages import handle_http_response
# Weechat searches for the registered callbacks in the scope of the main script

View file

@ -54,7 +54,7 @@ class HttpRequest:
request_list = [] # type: List[str]
accept_header = 'Accept: */*' # type: str
end_separator = '\r\n' # type: str
payload = None # type: str
payload = "" # type: str
if request_type == RequestType.GET:
get = 'GET {location} HTTP/1.1'.format(location=location)

View file

@ -64,6 +64,10 @@ class MatrixServer:
self.transaction_id = 0 # type: int
self.lag = 0 # type: int
self.send_fd_hook = None # type: weechat.hook
self.send_buffer = b"" # type: bytes
self.current_message = None # type: MatrixMessage
self.http_parser = HttpParser() # type: HttpParser
self.http_buffer = [] # type: List[bytes]

View file

@ -17,13 +17,16 @@
from __future__ import unicode_literals
import time
import ssl
import socket
import pprint
from builtins import bytes, str
import matrix.globals
from matrix.plugin_options import DebugType
from matrix.utils import prnt_debug, server_buffer_prnt, create_server_buffer
from matrix.utf import utf8_decode
W = matrix.globals.W
@ -47,17 +50,6 @@ def disconnect(server):
server_buffer_prnt(server, "Disconnected")
def send_or_queue(server, message):
# type: (MatrixServer, MatrixMessage) -> None
if not send(server, message):
prnt_debug(DebugType.MESSAGING, server,
("{prefix} Failed sending message of type {t}. "
"Adding to queue").format(
prefix=W.prefix("error"),
t=message.type))
server.send_queue.append(message)
def connect(server):
# type: (MatrixServer) -> int
if not server.address or not server.port:
@ -94,41 +86,100 @@ def connect(server):
return W.WEECHAT_RC_OK
@utf8_decode
def send_cb(server_name, file_descriptor):
# type: (str, int) -> int
server = SERVERS[server_name]
if server.send_fd_hook:
W.unhook(server.send_fd_hook)
server.send_fd_hook = None
if server.send_buffer:
try_send(server, send_buffer)
return W.WEECHAT_RC_OK
def send_or_queue(server, message):
# type: (MatrixServer, MatrixMessage) -> None
if not send(server, message):
prnt_debug(DebugType.MESSAGING, server,
("{prefix} Failed sending message of type {t}. "
"Adding to queue").format(
prefix=W.prefix("error"),
t=message.type))
server.send_queue.append(message)
def try_send(server, message):
# type: (MatrixServer, bytes) -> bool
socket = server.socket
total_sent = 0
message_length = len(message)
while total_sent < message_length:
try:
sent = socket.send(message[total_sent:])
except ssl.SSLWantWriteError:
hook = W.hook_fd(
server.socket.fileno(),
0, 1, 0,
"send_cb",
server.name
)
server.send_fd_hook = hook
server.send_buffer = message[total_sent:]
return True
except OSError as error:
disconnect(server)
abort_send(server)
server_buffer_prnt(server, str(error))
return False
if sent == 0:
disconnect(server)
abort_send(server)
server_buffer_prnt(server, "Socket closed while sending data.")
return False
total_sent = total_sent + sent
finalize_send(server)
return True
def abort_send(server):
server.send_queue.appendleft(server.current_message)
server.current_message = None
server.send_buffer = ""
def finalize_send(server):
# type: (MatrixServer) -> None
server.current_message.send_time = time.time()
server.receive_queue.append(server.current_message)
server.send_buffer = ""
server.current_message = None
def send(server, message):
# type: (MatrixServer, MatrixMessage) -> bool
if server.current_message:
return False
server.current_message = message
request = message.request.request
payload = message.request.payload
prnt_debug(DebugType.MESSAGING, server,
"{prefix} Sending message of type {t}.".format(
prefix=W.prefix("error"),
t=message.type))
bytes_message = bytes(request, 'utf-8') + bytes(payload, 'utf-8')
try:
start = time.time()
try_send(server, bytes_message)
# TODO we probably shouldn't use sendall here.
server.socket.sendall(bytes(request, 'utf-8'))
if payload:
server.socket.sendall(bytes(payload, 'utf-8'))
end = time.time()
message.send_time = end
send_lag = (end - start) * 1000
lag_string = "{0:.3f}" if send_lag < 1000 else "{0:.1f}"
prnt_debug(DebugType.NETWORK, server.server_buffer,
("{prefix}matrix: Message done sending (Lag: {t}s), putting"
" message in the receive queue.").format(
prefix=W.prefix("network"),
t=lag_string.format(send_lag)))
server.receive_queue.append(message)
return True
except OSError as error:
disconnect(server)
server_buffer_prnt(server, str(error))
return False
return True