From d5b768e7e09c136f6e84c2a44045a1b7f0c015e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?poljar=20=28Damir=20Jeli=C4=87=29?= Date: Wed, 16 May 2018 22:46:22 +0200 Subject: [PATCH] encryption: Port the olm command parser to argparse. --- matrix/encryption.py | 139 ++++++++++++++++++++++++++++--------------- 1 file changed, 90 insertions(+), 49 deletions(-) diff --git a/matrix/encryption.py b/matrix/encryption.py index 37111ab..7734838 100644 --- a/matrix/encryption.py +++ b/matrix/encryption.py @@ -21,6 +21,7 @@ import os import json import sqlite3 import pprint +import argparse # pylint: disable=redefined-builtin from builtins import str, bytes @@ -53,6 +54,22 @@ from matrix.utils import sanitize_id from matrix.utf import utf8_decode +class ParseError(Exception): + pass + + +class WeechatArgParse(argparse.ArgumentParser): + def print_usage(self, file): + pass + + def error(self, message): + m = ("{prefix}Error: {message} for command {command} " + "(see /help {command})").format(prefix=W.prefix("error"), + message=message, command=self.prog) + W.prnt("", m) + raise ParseError + + def own_buffer_or_error(f): @wraps(f) @@ -74,7 +91,6 @@ def own_buffer_or_error(f): def encrypt_enabled(f): - @wraps(f) def wrapper(*args, **kwds): if matrix.globals.ENCRYPTION: @@ -92,9 +108,9 @@ def matrix_hook_olm_command(): "Matrix olm encryption command", # Synopsis ("info all|blacklisted|private|unverified|verified ||" - "blacklist ||" - "unverify ||" - "verify "), + "blacklist ||" + "unverify ||" + "verify "), # Description (" info: show info about known devices and their keys\n" "blacklist: blacklist a device\n" @@ -104,21 +120,38 @@ def matrix_hook_olm_command(): # Completions ('info all|blacklisted|private|unverified|verified ||' 'blacklist %(device_ids) ||' - 'unverify %(device_ids) ||' - 'verify %(device_ids)'), + 'unverify %(user_ids) %(device_ids) ||' + 'verify %(user_ids) %(device_ids)'), # Function name 'matrix_olm_command_cb', '') def olm_cmd_parse_args(args): - split_args = args.split() + parser = WeechatArgParse(prog="olm") + subparsers = parser.add_subparsers(dest="subcommand") - command = split_args.pop(0) if split_args else "info" + info_parser = subparsers.add_parser("info") + info_parser.add_argument( + "category", nargs="?", default="private", + choices=[ + "all", + "blacklisted", + "private", + "unverified", + "verified" + ]) + info_parser.add_argument("filter", nargs="?") - rest_args = split_args if split_args else [] + verify_parser = subparsers.add_parser("verify") + verify_parser.add_argument("user_filter") + verify_parser.add_argument("device_filter", nargs="?") - return command, rest_args + try: + parsed_args = parser.parse_args(args.split()) + return parsed_args + except ParseError: + return None def grouper(iterable, n, fillvalue=None): @@ -133,51 +166,58 @@ def partition_key(key): return ' '.join(''.join(g) for g in groups) +def olm_info_command(server, args): + olm = server.olm + + if args.category == "private": + device_msg = (" - Device ID: {}\n".format(server.device_id) + if server.device_id else "") + id_key = partition_key(olm.account.identity_keys()["curve25519"]) + fp_key = partition_key(olm.account.identity_keys()["ed25519"]) + message = ("{prefix}matrix: Identity keys:\n" + " - User: {user}\n" + "{device_msg}" + " - Identity key: {id_key}\n" + " - Fingerprint key: {fp_key}\n").format( + prefix=W.prefix("network"), + user=server.user, + device_msg=device_msg, + id_key=id_key, + fp_key=fp_key) + W.prnt(server.server_buffer, message) + elif args.category == "all": + for user, keys in olm.device_keys.items(): + message = ("{prefix}matrix: Identity keys:\n" + " - User: {user}\n").format( + prefix=W.prefix("network"), + user=user) + W.prnt(server.server_buffer, message) + + for key in keys: + id_key = partition_key(key.keys["curve25519"]) + fp_key = partition_key(key.keys["ed25519"]) + device_msg = (" - Device ID: {}\n".format( + key.device_id) if key.device_id else "") + message = ("{device_msg}" + " - Identity key: {id_key}\n" + " - Fingerprint key: {fp_key}\n\n").format( + device_msg=device_msg, + id_key=id_key, + fp_key=fp_key) + W.prnt(server.server_buffer, message) + + @own_buffer_or_error @utf8_decode def matrix_olm_command_cb(server_name, buffer, args): server = SERVERS[server_name] - command, args = olm_cmd_parse_args(args) + parsed_args = olm_cmd_parse_args(args) - if not command or command == "info": - olm = server.olm + if not parsed_args: + return W.WEECHAT_RC_OK - if not args or args[0] == "private": - device_msg = (" - Device ID: {}\n".format(server.device_id) - if server.device_id else "") - id_key = partition_key(olm.account.identity_keys()["curve25519"]) - fp_key = partition_key(olm.account.identity_keys()["ed25519"]) - message = ("{prefix}matrix: Identity keys:\n" - " - User: {user}\n" - "{device_msg}" - " - Identity key: {id_key}\n" - " - Fingerprint key: {fp_key}\n").format( - prefix=W.prefix("network"), - user=server.user, - device_msg=device_msg, - id_key=id_key, - fp_key=fp_key) - W.prnt(server.server_buffer, message) - elif args[0] == "all": - for user, keys in olm.device_keys.items(): - message = ("{prefix}matrix: Identity keys:\n" - " - User: {user}\n").format( - prefix=W.prefix("network"), - user=user) - W.prnt(server.server_buffer, message) - - for key in keys: - id_key = partition_key(key.keys["curve25519"]) - fp_key = partition_key(key.keys["ed25519"]) - device_msg = (" - Device ID: {}\n".format( - key.device_id) if key.device_id else "") - message = ("{device_msg}" - " - Identity key: {id_key}\n" - " - Fingerprint key: {fp_key}\n\n").format( - device_msg=device_msg, - id_key=id_key, - fp_key=fp_key) - W.prnt(server.server_buffer, message) + if not parsed_args.subcommand or parsed_args.subcommand == "info": + olm_info_command(server, parsed_args) else: message = ("{prefix}matrix: Command not implemented.".format( prefix=W.prefix("error"))) @@ -192,6 +232,7 @@ class EncryptionError(Exception): class DeviceStore(object): def __init__(self, filename): + # type: (str) -> None self._entries = [] self._filename = filename