commands: Add initial olm command.

This commit is contained in:
Damir Jelić 2018-10-02 13:01:15 +02:00
parent 69db90dd4d
commit 1208c9d4a2
2 changed files with 310 additions and 3 deletions

View file

@ -44,7 +44,8 @@ from matrix.commands import (hook_commands, hook_page_up,
matrix_command_pgup_cb, matrix_invite_command_cb, matrix_command_pgup_cb, matrix_invite_command_cb,
matrix_join_command_cb, matrix_kick_command_cb, matrix_join_command_cb, matrix_kick_command_cb,
matrix_me_command_cb, matrix_part_command_cb, matrix_me_command_cb, matrix_part_command_cb,
matrix_redact_command_cb, matrix_topic_command_cb) matrix_redact_command_cb, matrix_topic_command_cb,
matrix_olm_command_cb)
from matrix.completion import (init_completion, matrix_command_completion_cb, from matrix.completion import (init_completion, matrix_command_completion_cb,
matrix_debug_completion_cb, matrix_debug_completion_cb,
matrix_message_completion_cb, matrix_message_completion_cb,

View file

@ -14,11 +14,11 @@
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN # CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import unicode_literals
import argparse import argparse
import re import re
from builtins import str from builtins import str
from future.moves.itertools import zip_longest
from collections import defaultdict
from . import globals as G from . import globals as G
from .colors import Formatted from .colors import Formatted
@ -90,6 +90,45 @@ class WeechatCommandParser(object):
parser.add_argument("room_id", nargs="?") parser.add_argument("room_id", nargs="?")
return WeechatCommandParser._run_parser(parser, args) return WeechatCommandParser._run_parser(parser, args)
@staticmethod
def olm(args):
parser = WeechatArgParse(prog="olm")
subparsers = parser.add_subparsers(dest="subcommand")
info_parser = subparsers.add_parser("info")
info_parser.add_argument(
"category", nargs="?", default="private",
choices=[
"all",
"blacklisted",
"private",
"unverified",
"verified"
])
info_parser.add_argument("filter", nargs="?")
verify_parser = subparsers.add_parser("verify")
verify_parser.add_argument("user_filter")
verify_parser.add_argument("device_filter", nargs="?")
unverify_parser = subparsers.add_parser("unverify")
unverify_parser.add_argument("user_filter")
unverify_parser.add_argument("device_filter", nargs="?")
return WeechatCommandParser._run_parser(parser, args)
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
def partition_key(key):
groups = grouper(key, 4, " ")
return ' '.join(''.join(g) for g in groups)
def hook_commands(): def hook_commands():
W.hook_command( W.hook_command(
@ -246,12 +285,279 @@ def hook_commands():
"", "",
) )
W.hook_command(
# Command name and short description
"olm",
"Matrix olm encryption configuration command",
# Synopsis
("info all|blacklisted|private|unverified|verified <filter>||"
"blacklist <user-id> <device-id> ||"
"unverify <user-id> <device-id> ||"
"verify <user-id> <device-id>"),
# Description
(" info: show info about known devices and their keys\n"
"blacklist: blacklist a device\n"
" unverify: unverify a device\n"
" verify: verify a device\n\n"
"Examples:\n"),
# Completions
('info all|blacklisted|private|unverified|verified ||'
'blacklist %(device_ids) ||'
'unverify %(olm_user_ids) %(olm_devices) ||'
'verify %(olm_user_ids) %(olm_devices)'),
# Function name
'matrix_olm_command_cb',
'')
W.hook_command_run("/buffer clear", "matrix_command_buf_clear_cb", "") W.hook_command_run("/buffer clear", "matrix_command_buf_clear_cb", "")
if G.CONFIG.network.fetch_backlog_on_pgup: if G.CONFIG.network.fetch_backlog_on_pgup:
hook_page_up() hook_page_up()
def format_device(device_id, fp_key):
fp_key = partition_key(fp_key)
message = (" - Device ID: {device_color}{device_id}{ncolor}\n"
" - Device key: {key_color}{fp_key}{ncolor}").format(
device_color=W.color("chat_channel"),
device_id=device_id,
ncolor=W.color("reset"),
key_color=W.color("chat_server"),
fp_key=fp_key)
return message
def olm_info_command(server, args):
def print_devices(
device_store,
filter_regex,
device_category="Device",
predicate=None,
):
user_strings = []
try:
filter_regex = re.compile(args.filter) if args.filter else None
except re.error as e:
server.error("Invalid regular expression: {}.".format(e.args[0]))
return
for user_id in sorted(device_store.users):
device_strings = []
for device in device_store[user_id].values():
if filter_regex:
if (not filter_regex.search(user_id) and
not filter_regex.search(device.id)):
continue
if predicate:
if not predicate(device):
continue
device_strings.append(format_device(
device.id,
device.ed25519
))
if not device_strings:
continue
d_string = "\n".join(device_strings)
message = (" - User: {user_color}{user}{ncolor}\n").format(
user_color=W.color("chat_nick"),
user=user_id,
ncolor=W.color("reset"))
message += d_string
user_strings.append(message)
if not user_strings:
message = ("{prefix}matrix: No matching devices "
"found.").format(prefix=W.prefix("error"))
W.prnt(server.server_buffer, message)
return
W.prnt(server.server_buffer,
"{}matrix: {} keys:\n".format(
W.prefix("network"),
device_category
))
W.prnt(server.server_buffer, "\n".join(user_strings))
olm = server.client.olm
if args.category == "private":
fp_key = partition_key(olm.account.identity_keys["ed25519"])
message = ("{prefix}matrix: Identity keys:\n"
" - User: {user_color}{user}{ncolor}\n"
" - Device ID: {device_color}{device_id}{ncolor}\n"
" - Device key: {key_color}{fp_key}{ncolor}\n"
"").format(
prefix=W.prefix("network"),
user_color=W.color("chat_self"),
ncolor=W.color("reset"),
user=olm.user_id,
device_color=W.color("chat_channel"),
device_id=olm.device_id,
key_color=W.color("chat_server"),
fp_key=fp_key)
W.prnt(server.server_buffer, message)
elif args.category == "all":
print_devices(olm.device_store, args.filter)
elif args.category == "verified":
print_devices(
olm.device_store,
args.filter,
"Verified",
olm.is_device_verified
)
elif args.category == "unverified":
def predicate(device):
return not olm.is_device_verified(device)
print_devices(
olm.device_store,
args.filter,
"Unverified",
predicate
)
elif args.category == "blacklisted":
print_devices(
olm.device_store,
args.filter,
"Blacklisted",
olm.is_device_blacklisted
)
def olm_action_command(server, args, category, error_category, prefix, action):
device_store = server.client.olm.device_store
users = []
if args.user_filter == "*":
users = device_store.users
else:
users = [x for x in device_store.users if args.user_filter in x]
user_devices = {user: device_store[user].values() for user in users}
if args.device_filter and args.device_filter != "*":
filtered_user_devices = {}
for user, device_list in user_devices.items():
filtered_devices = filter(
lambda x: args.device_filter in x.id,
device_list
)
filtered_user_devices[user] = list(filtered_devices)
user_devices = filtered_user_devices
changed_devices = defaultdict(list)
for user, device_list in user_devices.items():
for device in device_list:
if action(device):
changed_devices[user].append(device)
if not changed_devices:
message = ("{prefix}matrix: No matching {error_category} devices "
"found.").format(
prefix=W.prefix("error"),
error_category=error_category
)
W.prnt(server.server_buffer, message)
return
user_strings = []
for user_id, device_list in changed_devices.items():
device_strings = []
message = (" - User: {user_color}{user}{ncolor}\n").format(
user_color=W.color("chat_nick"),
user=user_id,
ncolor=W.color("reset"))
for device in device_list:
device_strings.append(format_device(
device.id,
device.ed25519
))
if not device_strings:
continue
d_string = "\n".join(device_strings)
message += d_string
user_strings.append(message)
W.prnt(server.server_buffer,
"{}matrix: {} key(s):\n".format(W.prefix("prefix"), category))
W.prnt(server.server_buffer, "\n".join(user_strings))
pass
def olm_verify_command(server, args):
olm_action_command(
server,
args,
"Verified",
"unverified",
"join",
server.client.olm.verify_device
)
def olm_unverify_command(server, args):
olm_action_command(
server,
args,
"Unverified",
"verified",
"quit",
server.client.olm.unverify_device
)
@utf8_decode
def matrix_olm_command_cb(data, buffer, args):
def command(server, data, buffer, args):
parsed_args = WeechatCommandParser.olm(args)
if not parsed_args:
return W.WEECHAT_RC_OK
if not parsed_args:
return W.WEECHAT_RC_OK
if not server.client.olm:
W.prnt(server.server_buffer, "{}matrix: Olm account isn't "
"loaded.".format(W.prefix("error")))
return W.WEECHAT_RC_OK
if not parsed_args.subcommand or parsed_args.subcommand == "info":
olm_info_command(server, parsed_args)
elif parsed_args.subcommand == "verify":
olm_verify_command(server, parsed_args)
elif parsed_args.subcommand == "unverify":
olm_unverify_command(server, parsed_args)
else:
message = ("{prefix}matrix: Command not implemented.".format(
prefix=W.prefix("error")))
W.prnt(server.server_buffer, message)
return W.WEECHAT_RC_OK
for server in SERVERS.values():
if buffer in server.buffers.values():
return command(server, data, buffer, args)
elif buffer == server.server_buffer:
return command(server, data, buffer, args)
W.prnt("", "{prefix}matrix: command \"olm\" must be executed on a "
"matrix buffer (server or channel)".format(
prefix=W.prefix("error")
))
return W.WEECHAT_RC_OK
@utf8_decode @utf8_decode
def matrix_me_command_cb(data, buffer, args): def matrix_me_command_cb(data, buffer, args):
for server in SERVERS.values(): for server in SERVERS.values():