diff --git a/main.py b/main.py index 2715d58..3c17504 100644 --- a/main.py +++ b/main.py @@ -44,7 +44,8 @@ from matrix.commands import (hook_commands, hook_page_up, matrix_command_pgup_cb, matrix_invite_command_cb, matrix_join_command_cb, matrix_kick_command_cb, matrix_me_command_cb, matrix_part_command_cb, - matrix_redact_command_cb, matrix_topic_command_cb) + matrix_redact_command_cb, matrix_topic_command_cb, + matrix_olm_command_cb) from matrix.completion import (init_completion, matrix_command_completion_cb, matrix_debug_completion_cb, matrix_message_completion_cb, diff --git a/matrix/commands.py b/matrix/commands.py index 112092c..e6f9701 100644 --- a/matrix/commands.py +++ b/matrix/commands.py @@ -14,11 +14,11 @@ # CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -from __future__ import unicode_literals - import argparse import re from builtins import str +from future.moves.itertools import zip_longest +from collections import defaultdict from . import globals as G from .colors import Formatted @@ -90,6 +90,45 @@ class WeechatCommandParser(object): parser.add_argument("room_id", nargs="?") return WeechatCommandParser._run_parser(parser, args) + @staticmethod + def olm(args): + parser = WeechatArgParse(prog="olm") + subparsers = parser.add_subparsers(dest="subcommand") + + info_parser = subparsers.add_parser("info") + info_parser.add_argument( + "category", nargs="?", default="private", + choices=[ + "all", + "blacklisted", + "private", + "unverified", + "verified" + ]) + info_parser.add_argument("filter", nargs="?") + + verify_parser = subparsers.add_parser("verify") + verify_parser.add_argument("user_filter") + verify_parser.add_argument("device_filter", nargs="?") + + unverify_parser = subparsers.add_parser("unverify") + unverify_parser.add_argument("user_filter") + unverify_parser.add_argument("device_filter", nargs="?") + + return WeechatCommandParser._run_parser(parser, args) + + +def grouper(iterable, n, fillvalue=None): + "Collect data into fixed-length chunks or blocks" + # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx" + args = [iter(iterable)] * n + return zip_longest(*args, fillvalue=fillvalue) + + +def partition_key(key): + groups = grouper(key, 4, " ") + return ' '.join(''.join(g) for g in groups) + def hook_commands(): W.hook_command( @@ -246,12 +285,279 @@ def hook_commands(): "", ) + W.hook_command( + # Command name and short description + "olm", + "Matrix olm encryption configuration command", + # Synopsis + ("info all|blacklisted|private|unverified|verified ||" + "blacklist ||" + "unverify ||" + "verify "), + # Description + (" info: show info about known devices and their keys\n" + "blacklist: blacklist a device\n" + " unverify: unverify a device\n" + " verify: verify a device\n\n" + "Examples:\n"), + # Completions + ('info all|blacklisted|private|unverified|verified ||' + 'blacklist %(device_ids) ||' + 'unverify %(olm_user_ids) %(olm_devices) ||' + 'verify %(olm_user_ids) %(olm_devices)'), + # Function name + 'matrix_olm_command_cb', + '') + W.hook_command_run("/buffer clear", "matrix_command_buf_clear_cb", "") if G.CONFIG.network.fetch_backlog_on_pgup: hook_page_up() +def format_device(device_id, fp_key): + fp_key = partition_key(fp_key) + message = (" - Device ID: {device_color}{device_id}{ncolor}\n" + " - Device key: {key_color}{fp_key}{ncolor}").format( + device_color=W.color("chat_channel"), + device_id=device_id, + ncolor=W.color("reset"), + key_color=W.color("chat_server"), + fp_key=fp_key) + return message + + +def olm_info_command(server, args): + def print_devices( + device_store, + filter_regex, + device_category="Device", + predicate=None, + ): + user_strings = [] + try: + filter_regex = re.compile(args.filter) if args.filter else None + except re.error as e: + server.error("Invalid regular expression: {}.".format(e.args[0])) + return + + for user_id in sorted(device_store.users): + device_strings = [] + for device in device_store[user_id].values(): + if filter_regex: + if (not filter_regex.search(user_id) and + not filter_regex.search(device.id)): + continue + + if predicate: + if not predicate(device): + continue + + device_strings.append(format_device( + device.id, + device.ed25519 + )) + + if not device_strings: + continue + + d_string = "\n".join(device_strings) + message = (" - User: {user_color}{user}{ncolor}\n").format( + user_color=W.color("chat_nick"), + user=user_id, + ncolor=W.color("reset")) + message += d_string + user_strings.append(message) + + if not user_strings: + message = ("{prefix}matrix: No matching devices " + "found.").format(prefix=W.prefix("error")) + W.prnt(server.server_buffer, message) + return + + W.prnt(server.server_buffer, + "{}matrix: {} keys:\n".format( + W.prefix("network"), + device_category + )) + W.prnt(server.server_buffer, "\n".join(user_strings)) + + olm = server.client.olm + + if args.category == "private": + fp_key = partition_key(olm.account.identity_keys["ed25519"]) + message = ("{prefix}matrix: Identity keys:\n" + " - User: {user_color}{user}{ncolor}\n" + " - Device ID: {device_color}{device_id}{ncolor}\n" + " - Device key: {key_color}{fp_key}{ncolor}\n" + "").format( + prefix=W.prefix("network"), + user_color=W.color("chat_self"), + ncolor=W.color("reset"), + user=olm.user_id, + device_color=W.color("chat_channel"), + device_id=olm.device_id, + key_color=W.color("chat_server"), + fp_key=fp_key) + W.prnt(server.server_buffer, message) + + elif args.category == "all": + print_devices(olm.device_store, args.filter) + + elif args.category == "verified": + print_devices( + olm.device_store, + args.filter, + "Verified", + olm.is_device_verified + ) + + elif args.category == "unverified": + def predicate(device): + return not olm.is_device_verified(device) + + print_devices( + olm.device_store, + args.filter, + "Unverified", + predicate + ) + + elif args.category == "blacklisted": + print_devices( + olm.device_store, + args.filter, + "Blacklisted", + olm.is_device_blacklisted + ) + + +def olm_action_command(server, args, category, error_category, prefix, action): + device_store = server.client.olm.device_store + users = [] + + if args.user_filter == "*": + users = device_store.users + else: + users = [x for x in device_store.users if args.user_filter in x] + + user_devices = {user: device_store[user].values() for user in users} + + if args.device_filter and args.device_filter != "*": + filtered_user_devices = {} + for user, device_list in user_devices.items(): + filtered_devices = filter( + lambda x: args.device_filter in x.id, + device_list + ) + filtered_user_devices[user] = list(filtered_devices) + user_devices = filtered_user_devices + + changed_devices = defaultdict(list) + + for user, device_list in user_devices.items(): + for device in device_list: + if action(device): + changed_devices[user].append(device) + + if not changed_devices: + message = ("{prefix}matrix: No matching {error_category} devices " + "found.").format( + prefix=W.prefix("error"), + error_category=error_category + ) + W.prnt(server.server_buffer, message) + return + + user_strings = [] + for user_id, device_list in changed_devices.items(): + device_strings = [] + message = (" - User: {user_color}{user}{ncolor}\n").format( + user_color=W.color("chat_nick"), + user=user_id, + ncolor=W.color("reset")) + for device in device_list: + device_strings.append(format_device( + device.id, + device.ed25519 + )) + if not device_strings: + continue + + d_string = "\n".join(device_strings) + message += d_string + user_strings.append(message) + + W.prnt(server.server_buffer, + "{}matrix: {} key(s):\n".format(W.prefix("prefix"), category)) + W.prnt(server.server_buffer, "\n".join(user_strings)) + pass + + +def olm_verify_command(server, args): + olm_action_command( + server, + args, + "Verified", + "unverified", + "join", + server.client.olm.verify_device + ) + + +def olm_unverify_command(server, args): + olm_action_command( + server, + args, + "Unverified", + "verified", + "quit", + server.client.olm.unverify_device + ) + + +@utf8_decode +def matrix_olm_command_cb(data, buffer, args): + def command(server, data, buffer, args): + parsed_args = WeechatCommandParser.olm(args) + if not parsed_args: + return W.WEECHAT_RC_OK + + if not parsed_args: + return W.WEECHAT_RC_OK + + if not server.client.olm: + W.prnt(server.server_buffer, "{}matrix: Olm account isn't " + "loaded.".format(W.prefix("error"))) + return W.WEECHAT_RC_OK + + if not parsed_args.subcommand or parsed_args.subcommand == "info": + olm_info_command(server, parsed_args) + elif parsed_args.subcommand == "verify": + olm_verify_command(server, parsed_args) + elif parsed_args.subcommand == "unverify": + olm_unverify_command(server, parsed_args) + else: + message = ("{prefix}matrix: Command not implemented.".format( + prefix=W.prefix("error"))) + W.prnt(server.server_buffer, message) + + return W.WEECHAT_RC_OK + + for server in SERVERS.values(): + if buffer in server.buffers.values(): + return command(server, data, buffer, args) + elif buffer == server.server_buffer: + return command(server, data, buffer, args) + + W.prnt("", "{prefix}matrix: command \"olm\" must be executed on a " + "matrix buffer (server or channel)".format( + prefix=W.prefix("error") + )) + + return W.WEECHAT_RC_OK + + @utf8_decode def matrix_me_command_cb(data, buffer, args): for server in SERVERS.values():