Make the send functions methods of the server class.

This commit is contained in:
poljar (Damir Jelić) 2018-02-02 10:13:56 +01:00
parent 007b2b5cad
commit c63551d37e
5 changed files with 114 additions and 121 deletions

View file

@ -53,9 +53,7 @@ from matrix.server import (
MatrixServer, MatrixServer,
create_default_server, create_default_server,
matrix_server_connect, matrix_server_connect,
send,
send_cb, send_cb,
send_or_queue,
matrix_server_disconnect, matrix_server_disconnect,
matrix_server_reconnect, matrix_server_reconnect,
matrix_server_reconnect_schedule, matrix_server_reconnect_schedule,
@ -428,7 +426,7 @@ def room_input_cb(server_name, buffer, input_data):
data=body, room_id=room_id, data=body, room_id=room_id,
extra_data=extra_data) extra_data=extra_data)
send_or_queue(server, message) server.send_or_queue(message)
return W.WEECHAT_RC_OK return W.WEECHAT_RC_OK

View file

@ -23,7 +23,6 @@ from enum import Enum, unique
from matrix.globals import OPTIONS from matrix.globals import OPTIONS
from matrix.server import send_or_queue
from matrix.http import RequestType, HttpRequest from matrix.http import RequestType, HttpRequest
MATRIX_API_PATH = "/_matrix/client/r0" # type: str MATRIX_API_PATH = "/_matrix/client/r0" # type: str
@ -257,4 +256,4 @@ def matrix_login(server):
MessageType.LOGIN, MessageType.LOGIN,
data=post_data data=post_data
) )
send_or_queue(server, message) server.send_or_queue(message)

View file

@ -30,7 +30,6 @@ from matrix.server import (
MatrixServer, MatrixServer,
matrix_server_connect, matrix_server_connect,
matrix_server_disconnect, matrix_server_disconnect,
send_or_queue
) )
@ -117,7 +116,7 @@ def matrix_fetch_old_messages(server, room_id):
message = MatrixMessage(server, GLOBAL_OPTIONS, MessageType.ROOM_MSG, message = MatrixMessage(server, GLOBAL_OPTIONS, MessageType.ROOM_MSG,
room_id=room_id, extra_id=prev_batch) room_id=room_id, extra_id=prev_batch)
send_or_queue(server, message) server.send_or_queue(message)
return return
@ -195,7 +194,7 @@ def matrix_command_join_cb(data, buffer, command):
MessageType.JOIN, MessageType.JOIN,
room_id=room_id room_id=room_id
) )
send_or_queue(server, message) server.send_or_queue(message)
for server in SERVERS.values(): for server in SERVERS.values():
if buffer in server.buffers.values(): if buffer in server.buffers.values():
@ -236,7 +235,7 @@ def matrix_command_part_cb(data, buffer, command):
MessageType.PART, MessageType.PART,
room_id=room_id room_id=room_id
) )
send_or_queue(server, message) server.send_or_queue(message)
for server in SERVERS.values(): for server in SERVERS.values():
if buffer in server.buffers.values(): if buffer in server.buffers.values():
@ -274,7 +273,7 @@ def matrix_command_invite_cb(data, buffer, command):
room_id=room_id, room_id=room_id,
data=body data=body
) )
send_or_queue(server, message) server.send_or_queue(message)
for server in SERVERS.values(): for server in SERVERS.values():
if buffer in server.buffers.values(): if buffer in server.buffers.values():
@ -365,7 +364,7 @@ def matrix_redact_command_cb(data, buffer, args):
room_id=room_id, room_id=room_id,
extra_id=event_id extra_id=event_id
) )
send_or_queue(server, message) server.send_or_queue(message)
return W.WEECHAT_RC_OK return W.WEECHAT_RC_OK
@ -933,7 +932,7 @@ def matrix_command_topic_cb(data, buffer, command):
room_id=room_id, room_id=room_id,
extra_id="m.room.topic" extra_id="m.room.topic"
) )
send_or_queue(server, message) server.send_or_queue(message)
return W.WEECHAT_RC_OK_EAT return W.WEECHAT_RC_OK_EAT

View file

@ -38,7 +38,7 @@ from matrix.api import (
from matrix.utils import server_buffer_prnt, tags_from_line_data, prnt_debug from matrix.utils import server_buffer_prnt, tags_from_line_data, prnt_debug
from matrix.plugin_options import RedactType, DebugType from matrix.plugin_options import RedactType, DebugType
from matrix.server import send_or_queue, matrix_server_disconnect from matrix.server import matrix_server_disconnect
def strip_matrix_server(string): def strip_matrix_server(string):
# type: (str) -> str # type: (str) -> str
@ -687,7 +687,7 @@ def matrix_handle_message(
server.access_token = response["access_token"] server.access_token = response["access_token"]
server.user_id = response["user_id"] server.user_id = response["user_id"]
message = MatrixMessage(server, OPTIONS, MessageType.SYNC) message = MatrixMessage(server, OPTIONS, MessageType.SYNC)
send_or_queue(server, message) server.send_or_queue(message)
elif message_type is MessageType.SYNC: elif message_type is MessageType.SYNC:
next_batch = response['next_batch'] next_batch = response['next_batch']

View file

@ -174,6 +174,108 @@ class MatrixServer:
else: else:
pass pass
def send_or_queue(self, message):
# type: (MatrixServer, MatrixMessage) -> None
if not self.send(message):
prnt_debug(DebugType.MESSAGING, self,
("{prefix} Failed sending message of type {t}. "
"Adding to queue").format(
prefix=W.prefix("error"),
t=message.type))
self.send_queue.append(message)
def try_send(self, message):
# type: (MatrixServer, bytes) -> bool
sock = self.socket
total_sent = 0
message_length = len(message)
while total_sent < message_length:
try:
sent = sock.send(message[total_sent:])
except ssl.SSLWantWriteError:
hook = W.hook_fd(
sock.fileno(),
0, 1, 0,
"send_cb",
self.name
)
self.send_fd_hook = hook
self.send_buffer = message[total_sent:]
return True
except socket.error as error:
self.abort_send()
errno = "error" + str(error.errno) + " " if error.errno else ""
strerr = error.strerror if error.strerror else "Unknown reason"
strerr = errno + strerr
message = ("{prefix}Error while writing to "
"socket: {error}").format(
prefix=W.prefix("network"),
error=strerr)
server_buffer_prnt(self, message)
server_buffer_prnt(
self,
("{prefix}matrix: disconnecting from server...").format(
prefix=W.prefix("network")))
matrix_server_disconnect(self)
return False
if sent == 0:
self.abort_send()
server_buffer_prnt(
self,
"{prefix}matrix: Error while writing to socket".format(
W.prefix("network")))
server_buffer_prnt(
self,
("{prefix}matrix: disconnecting from server...").format(
prefix=W.prefix("network")))
matrix_server_disconnect(self)
return False
total_sent = total_sent + sent
self.finalize_send()
return True
def abort_send(self):
self.send_queue.appendleft(self.current_message)
self.current_message = None
self.send_buffer = ""
def finalize_send(self):
# type: (MatrixServer) -> None
self.current_message.send_time = time.time()
self.receive_queue.append(self.current_message)
self.send_buffer = ""
self.current_message = None
def send(self, message):
# type: (MatrixServer, MatrixMessage) -> bool
if self.current_message:
return False
self.current_message = message
request = message.request.request
payload = message.request.payload
bytes_message = bytes(request, 'utf-8') + bytes(payload, 'utf-8')
self.try_send(bytes_message)
return True
@utf8_decode @utf8_decode
def matrix_config_server_read_cb( def matrix_config_server_read_cb(
@ -250,7 +352,7 @@ def matrix_timer_cb(server_name, remaining_calls):
("Timer hook found message of type {t} in queue. Sending " ("Timer hook found message of type {t} in queue. Sending "
"out.".format(t=message.type))) "out.".format(t=message.type)))
if not send(server, message): if not server.send(message):
# We got an error while sending the last message return the message # We got an error while sending the last message return the message
# to the queue and exit the loop # to the queue and exit the loop
server.send_queue.appendleft(message) server.send_queue.appendleft(message)
@ -380,116 +482,11 @@ def send_cb(server_name, file_descriptor):
server.send_fd_hook = None server.send_fd_hook = None
if server.send_buffer: if server.send_buffer:
try_send(server, server.send_buffer) server.try_send(server, server.send_buffer)
return W.WEECHAT_RC_OK return W.WEECHAT_RC_OK
def send_or_queue(server, message):
# type: (MatrixServer, MatrixMessage) -> None
if not send(server, message):
prnt_debug(DebugType.MESSAGING, server,
("{prefix} Failed sending message of type {t}. "
"Adding to queue").format(
prefix=W.prefix("error"),
t=message.type))
server.send_queue.append(message)
def try_send(server, message):
# type: (MatrixServer, bytes) -> bool
sock = server.socket
total_sent = 0
message_length = len(message)
while total_sent < message_length:
try:
sent = sock.send(message[total_sent:])
except ssl.SSLWantWriteError:
hook = W.hook_fd(
server.sock.fileno(),
0, 1, 0,
"send_cb",
server.name
)
server.send_fd_hook = hook
server.send_buffer = message[total_sent:]
return True
except socket.error as error:
abort_send(server)
errno = "error" + str(error.errno) + " " if error.errno else ""
str_error = error.strerror if error.strerror else "Unknown reason"
str_error = errno + str_error
message = ("{prefix}Error while writing to "
"socket: {error}").format(
prefix=W.prefix("network"),
error=str_error)
server_buffer_prnt(server, message)
server_buffer_prnt(
server,
("{prefix}matrix: disconnecting from server...").format(
prefix=W.prefix("network")))
matrix_server_disconnect(server)
return False
if sent == 0:
abort_send(server)
server_buffer_prnt(
server,
"{prefix}matrix: Error while writing to socket".format(
W.prefix("network")))
server_buffer_prnt(
server,
("{prefix}matrix: disconnecting from server...").format(
prefix=W.prefix("network")))
matrix_server_disconnect(server)
return False
total_sent = total_sent + sent
finalize_send(server)
return True
def abort_send(server):
server.send_queue.appendleft(server.current_message)
server.current_message = None
server.send_buffer = ""
def finalize_send(server):
# type: (MatrixServer) -> None
server.current_message.send_time = time.time()
server.receive_queue.append(server.current_message)
server.send_buffer = ""
server.current_message = None
def send(server, message):
# type: (MatrixServer, MatrixMessage) -> bool
if server.current_message:
return False
server.current_message = message
request = message.request.request
payload = message.request.payload
bytes_message = bytes(request, 'utf-8') + bytes(payload, 'utf-8')
try_send(server, bytes_message)
return True
def close_socket(sock): def close_socket(sock):
# type: (socket.socket) -> None # type: (socket.socket) -> None
sock.shutdown(socket.SHUT_RDWR) sock.shutdown(socket.SHUT_RDWR)