encryption: Initial device verification support.

This commit is contained in:
poljar (Damir Jelić) 2018-05-17 12:59:07 +02:00
parent d5b768e7e0
commit bb06293031

View file

@ -27,6 +27,7 @@ import argparse
from builtins import str, bytes
from collections import defaultdict
from itertools import chain
from functools import wraps
from future.moves.itertools import zip_longest
@ -207,6 +208,45 @@ def olm_info_command(server, args):
W.prnt(server.server_buffer, message)
def olm_verify_command(server, args):
olm = server.olm
devices = olm.device_keys
filtered_devices = []
if args.user_filter == "*":
filtered_devices = devices.values()
else:
device_keys = filter(lambda x: args.user_filter in x, devices)
filtered_devices = [devices[x] for x in device_keys]
filtered_devices = chain.from_iterable(filtered_devices)
if args.device_filter and args.device_filter != "*":
filtered_devices = filter(
lambda x: args.device_filter in x.device_id,
filtered_devices
)
key_list = []
for device in list(filtered_devices):
if olm.verify_device(device):
key_list.append(str(device))
if not key_list:
message = ("{prefix}matrix: No matching unverified devices "
"found.").format(prefix=W.prefix("error"))
W.prnt(server.server_buffer, message)
return
key_str = "\n - ".join(key_list)
message = ("{prefix}matrix: Verified keys:\n"
" - {key_str}").format(prefix=W.prefix("join"),
key_str=key_str)
W.prnt(server.server_buffer, message)
@own_buffer_or_error
@utf8_decode
def matrix_olm_command_cb(server_name, buffer, args):
@ -218,6 +258,8 @@ def matrix_olm_command_cb(server_name, buffer, args):
if not parsed_args.subcommand or parsed_args.subcommand == "info":
olm_info_command(server, parsed_args)
elif parsed_args.subcommand == "verify":
olm_verify_command(server, parsed_args)
else:
message = ("{prefix}matrix: Command not implemented.".format(
prefix=W.prefix("error")))
@ -351,7 +393,7 @@ class StoreEntry(object):
def to_line(self):
# type: () -> str
key_type = "matrix-{}".format(self.key_type)
line = "{} {} {} {}".format(
line = "{} {} {} {}\n".format(
self.user_id,
self.device_id,
key_type,
@ -391,6 +433,15 @@ class OlmDeviceKey():
self.device_id = device_id
self.keys = key_dict
def __str__(self):
# type: () -> str
return "{} {} {}".format(
self.user_id, self.device_id, self.keys["ed25519"])
def __repr__(self):
# type: () -> str
return str(self)
class OneTimeKey():
def __init__(self, user_id, device_id, key):
@ -443,6 +494,9 @@ class Olm():
self.inbound_group_sessions = inbound_group_sessions
self.outbound_group_sessions = {}
trust_file_path = "{}_{}.trusted_devices".format(user, device_id)
self.trust_db = DeviceStore(os.path.join(session_path, trust_file_path))
def _create_session(self, sender, sender_key, message):
W.prnt("", "matrix: Creating session for {}".format(sender))
session = InboundSession(self.account, message, sender_key)
@ -452,6 +506,16 @@ class Olm():
return session
def verify_device(self, device):
if self.trust_db.check(device):
return False
self.trust_db.add(device)
return True
def unverify_device(self, device):
self.trust_db.remove(device)
def create_session(self, user_id, device_id, one_time_key):
W.prnt("", "matrix: Creating session for {}".format(user_id))
id_key = None