commands: Add initial olm command.
This commit is contained in:
parent
69db90dd4d
commit
1208c9d4a2
2 changed files with 310 additions and 3 deletions
3
main.py
3
main.py
|
@ -44,7 +44,8 @@ from matrix.commands import (hook_commands, hook_page_up,
|
|||
matrix_command_pgup_cb, matrix_invite_command_cb,
|
||||
matrix_join_command_cb, matrix_kick_command_cb,
|
||||
matrix_me_command_cb, matrix_part_command_cb,
|
||||
matrix_redact_command_cb, matrix_topic_command_cb)
|
||||
matrix_redact_command_cb, matrix_topic_command_cb,
|
||||
matrix_olm_command_cb)
|
||||
from matrix.completion import (init_completion, matrix_command_completion_cb,
|
||||
matrix_debug_completion_cb,
|
||||
matrix_message_completion_cb,
|
||||
|
|
|
@ -14,11 +14,11 @@
|
|||
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
|
||||
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import argparse
|
||||
import re
|
||||
from builtins import str
|
||||
from future.moves.itertools import zip_longest
|
||||
from collections import defaultdict
|
||||
|
||||
from . import globals as G
|
||||
from .colors import Formatted
|
||||
|
@ -90,6 +90,45 @@ class WeechatCommandParser(object):
|
|||
parser.add_argument("room_id", nargs="?")
|
||||
return WeechatCommandParser._run_parser(parser, args)
|
||||
|
||||
@staticmethod
|
||||
def olm(args):
|
||||
parser = WeechatArgParse(prog="olm")
|
||||
subparsers = parser.add_subparsers(dest="subcommand")
|
||||
|
||||
info_parser = subparsers.add_parser("info")
|
||||
info_parser.add_argument(
|
||||
"category", nargs="?", default="private",
|
||||
choices=[
|
||||
"all",
|
||||
"blacklisted",
|
||||
"private",
|
||||
"unverified",
|
||||
"verified"
|
||||
])
|
||||
info_parser.add_argument("filter", nargs="?")
|
||||
|
||||
verify_parser = subparsers.add_parser("verify")
|
||||
verify_parser.add_argument("user_filter")
|
||||
verify_parser.add_argument("device_filter", nargs="?")
|
||||
|
||||
unverify_parser = subparsers.add_parser("unverify")
|
||||
unverify_parser.add_argument("user_filter")
|
||||
unverify_parser.add_argument("device_filter", nargs="?")
|
||||
|
||||
return WeechatCommandParser._run_parser(parser, args)
|
||||
|
||||
|
||||
def grouper(iterable, n, fillvalue=None):
|
||||
"Collect data into fixed-length chunks or blocks"
|
||||
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
|
||||
args = [iter(iterable)] * n
|
||||
return zip_longest(*args, fillvalue=fillvalue)
|
||||
|
||||
|
||||
def partition_key(key):
|
||||
groups = grouper(key, 4, " ")
|
||||
return ' '.join(''.join(g) for g in groups)
|
||||
|
||||
|
||||
def hook_commands():
|
||||
W.hook_command(
|
||||
|
@ -246,12 +285,279 @@ def hook_commands():
|
|||
"",
|
||||
)
|
||||
|
||||
W.hook_command(
|
||||
# Command name and short description
|
||||
"olm",
|
||||
"Matrix olm encryption configuration command",
|
||||
# Synopsis
|
||||
("info all|blacklisted|private|unverified|verified <filter>||"
|
||||
"blacklist <user-id> <device-id> ||"
|
||||
"unverify <user-id> <device-id> ||"
|
||||
"verify <user-id> <device-id>"),
|
||||
# Description
|
||||
(" info: show info about known devices and their keys\n"
|
||||
"blacklist: blacklist a device\n"
|
||||
" unverify: unverify a device\n"
|
||||
" verify: verify a device\n\n"
|
||||
"Examples:\n"),
|
||||
# Completions
|
||||
('info all|blacklisted|private|unverified|verified ||'
|
||||
'blacklist %(device_ids) ||'
|
||||
'unverify %(olm_user_ids) %(olm_devices) ||'
|
||||
'verify %(olm_user_ids) %(olm_devices)'),
|
||||
# Function name
|
||||
'matrix_olm_command_cb',
|
||||
'')
|
||||
|
||||
W.hook_command_run("/buffer clear", "matrix_command_buf_clear_cb", "")
|
||||
|
||||
if G.CONFIG.network.fetch_backlog_on_pgup:
|
||||
hook_page_up()
|
||||
|
||||
|
||||
def format_device(device_id, fp_key):
|
||||
fp_key = partition_key(fp_key)
|
||||
message = (" - Device ID: {device_color}{device_id}{ncolor}\n"
|
||||
" - Device key: {key_color}{fp_key}{ncolor}").format(
|
||||
device_color=W.color("chat_channel"),
|
||||
device_id=device_id,
|
||||
ncolor=W.color("reset"),
|
||||
key_color=W.color("chat_server"),
|
||||
fp_key=fp_key)
|
||||
return message
|
||||
|
||||
|
||||
def olm_info_command(server, args):
|
||||
def print_devices(
|
||||
device_store,
|
||||
filter_regex,
|
||||
device_category="Device",
|
||||
predicate=None,
|
||||
):
|
||||
user_strings = []
|
||||
try:
|
||||
filter_regex = re.compile(args.filter) if args.filter else None
|
||||
except re.error as e:
|
||||
server.error("Invalid regular expression: {}.".format(e.args[0]))
|
||||
return
|
||||
|
||||
for user_id in sorted(device_store.users):
|
||||
device_strings = []
|
||||
for device in device_store[user_id].values():
|
||||
if filter_regex:
|
||||
if (not filter_regex.search(user_id) and
|
||||
not filter_regex.search(device.id)):
|
||||
continue
|
||||
|
||||
if predicate:
|
||||
if not predicate(device):
|
||||
continue
|
||||
|
||||
device_strings.append(format_device(
|
||||
device.id,
|
||||
device.ed25519
|
||||
))
|
||||
|
||||
if not device_strings:
|
||||
continue
|
||||
|
||||
d_string = "\n".join(device_strings)
|
||||
message = (" - User: {user_color}{user}{ncolor}\n").format(
|
||||
user_color=W.color("chat_nick"),
|
||||
user=user_id,
|
||||
ncolor=W.color("reset"))
|
||||
message += d_string
|
||||
user_strings.append(message)
|
||||
|
||||
if not user_strings:
|
||||
message = ("{prefix}matrix: No matching devices "
|
||||
"found.").format(prefix=W.prefix("error"))
|
||||
W.prnt(server.server_buffer, message)
|
||||
return
|
||||
|
||||
W.prnt(server.server_buffer,
|
||||
"{}matrix: {} keys:\n".format(
|
||||
W.prefix("network"),
|
||||
device_category
|
||||
))
|
||||
W.prnt(server.server_buffer, "\n".join(user_strings))
|
||||
|
||||
olm = server.client.olm
|
||||
|
||||
if args.category == "private":
|
||||
fp_key = partition_key(olm.account.identity_keys["ed25519"])
|
||||
message = ("{prefix}matrix: Identity keys:\n"
|
||||
" - User: {user_color}{user}{ncolor}\n"
|
||||
" - Device ID: {device_color}{device_id}{ncolor}\n"
|
||||
" - Device key: {key_color}{fp_key}{ncolor}\n"
|
||||
"").format(
|
||||
prefix=W.prefix("network"),
|
||||
user_color=W.color("chat_self"),
|
||||
ncolor=W.color("reset"),
|
||||
user=olm.user_id,
|
||||
device_color=W.color("chat_channel"),
|
||||
device_id=olm.device_id,
|
||||
key_color=W.color("chat_server"),
|
||||
fp_key=fp_key)
|
||||
W.prnt(server.server_buffer, message)
|
||||
|
||||
elif args.category == "all":
|
||||
print_devices(olm.device_store, args.filter)
|
||||
|
||||
elif args.category == "verified":
|
||||
print_devices(
|
||||
olm.device_store,
|
||||
args.filter,
|
||||
"Verified",
|
||||
olm.is_device_verified
|
||||
)
|
||||
|
||||
elif args.category == "unverified":
|
||||
def predicate(device):
|
||||
return not olm.is_device_verified(device)
|
||||
|
||||
print_devices(
|
||||
olm.device_store,
|
||||
args.filter,
|
||||
"Unverified",
|
||||
predicate
|
||||
)
|
||||
|
||||
elif args.category == "blacklisted":
|
||||
print_devices(
|
||||
olm.device_store,
|
||||
args.filter,
|
||||
"Blacklisted",
|
||||
olm.is_device_blacklisted
|
||||
)
|
||||
|
||||
|
||||
def olm_action_command(server, args, category, error_category, prefix, action):
|
||||
device_store = server.client.olm.device_store
|
||||
users = []
|
||||
|
||||
if args.user_filter == "*":
|
||||
users = device_store.users
|
||||
else:
|
||||
users = [x for x in device_store.users if args.user_filter in x]
|
||||
|
||||
user_devices = {user: device_store[user].values() for user in users}
|
||||
|
||||
if args.device_filter and args.device_filter != "*":
|
||||
filtered_user_devices = {}
|
||||
for user, device_list in user_devices.items():
|
||||
filtered_devices = filter(
|
||||
lambda x: args.device_filter in x.id,
|
||||
device_list
|
||||
)
|
||||
filtered_user_devices[user] = list(filtered_devices)
|
||||
user_devices = filtered_user_devices
|
||||
|
||||
changed_devices = defaultdict(list)
|
||||
|
||||
for user, device_list in user_devices.items():
|
||||
for device in device_list:
|
||||
if action(device):
|
||||
changed_devices[user].append(device)
|
||||
|
||||
if not changed_devices:
|
||||
message = ("{prefix}matrix: No matching {error_category} devices "
|
||||
"found.").format(
|
||||
prefix=W.prefix("error"),
|
||||
error_category=error_category
|
||||
)
|
||||
W.prnt(server.server_buffer, message)
|
||||
return
|
||||
|
||||
user_strings = []
|
||||
for user_id, device_list in changed_devices.items():
|
||||
device_strings = []
|
||||
message = (" - User: {user_color}{user}{ncolor}\n").format(
|
||||
user_color=W.color("chat_nick"),
|
||||
user=user_id,
|
||||
ncolor=W.color("reset"))
|
||||
for device in device_list:
|
||||
device_strings.append(format_device(
|
||||
device.id,
|
||||
device.ed25519
|
||||
))
|
||||
if not device_strings:
|
||||
continue
|
||||
|
||||
d_string = "\n".join(device_strings)
|
||||
message += d_string
|
||||
user_strings.append(message)
|
||||
|
||||
W.prnt(server.server_buffer,
|
||||
"{}matrix: {} key(s):\n".format(W.prefix("prefix"), category))
|
||||
W.prnt(server.server_buffer, "\n".join(user_strings))
|
||||
pass
|
||||
|
||||
|
||||
def olm_verify_command(server, args):
|
||||
olm_action_command(
|
||||
server,
|
||||
args,
|
||||
"Verified",
|
||||
"unverified",
|
||||
"join",
|
||||
server.client.olm.verify_device
|
||||
)
|
||||
|
||||
|
||||
def olm_unverify_command(server, args):
|
||||
olm_action_command(
|
||||
server,
|
||||
args,
|
||||
"Unverified",
|
||||
"verified",
|
||||
"quit",
|
||||
server.client.olm.unverify_device
|
||||
)
|
||||
|
||||
|
||||
@utf8_decode
|
||||
def matrix_olm_command_cb(data, buffer, args):
|
||||
def command(server, data, buffer, args):
|
||||
parsed_args = WeechatCommandParser.olm(args)
|
||||
if not parsed_args:
|
||||
return W.WEECHAT_RC_OK
|
||||
|
||||
if not parsed_args:
|
||||
return W.WEECHAT_RC_OK
|
||||
|
||||
if not server.client.olm:
|
||||
W.prnt(server.server_buffer, "{}matrix: Olm account isn't "
|
||||
"loaded.".format(W.prefix("error")))
|
||||
return W.WEECHAT_RC_OK
|
||||
|
||||
if not parsed_args.subcommand or parsed_args.subcommand == "info":
|
||||
olm_info_command(server, parsed_args)
|
||||
elif parsed_args.subcommand == "verify":
|
||||
olm_verify_command(server, parsed_args)
|
||||
elif parsed_args.subcommand == "unverify":
|
||||
olm_unverify_command(server, parsed_args)
|
||||
else:
|
||||
message = ("{prefix}matrix: Command not implemented.".format(
|
||||
prefix=W.prefix("error")))
|
||||
W.prnt(server.server_buffer, message)
|
||||
|
||||
return W.WEECHAT_RC_OK
|
||||
|
||||
for server in SERVERS.values():
|
||||
if buffer in server.buffers.values():
|
||||
return command(server, data, buffer, args)
|
||||
elif buffer == server.server_buffer:
|
||||
return command(server, data, buffer, args)
|
||||
|
||||
W.prnt("", "{prefix}matrix: command \"olm\" must be executed on a "
|
||||
"matrix buffer (server or channel)".format(
|
||||
prefix=W.prefix("error")
|
||||
))
|
||||
|
||||
return W.WEECHAT_RC_OK
|
||||
|
||||
|
||||
@utf8_decode
|
||||
def matrix_me_command_cb(data, buffer, args):
|
||||
for server in SERVERS.values():
|
||||
|
|
Loading…
Reference in a new issue