refactor smal => mxsmal

This commit is contained in:
saces 2026-04-04 15:07:31 +02:00
parent 3f6be78685
commit 7e984ef129
37 changed files with 142 additions and 72 deletions

View file

@ -0,0 +1 @@
from .demobot import main

View file

@ -0,0 +1,6 @@
# Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only
import sys
from .demobot import main
sys.exit(main())

View file

@ -0,0 +1,90 @@
# Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only
import logging
from mxsmal.bot import SMALBot
# setup logging, we want timestamps
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s.%(msecs)03d %(levelname)s %(name)s - %(funcName)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
DEFAULT_PREFIX = "!"
class DemoBot(SMALBot):
async def on_sys(self, ntf):
print("Got a system notification: ", ntf)
async def on_event(self, evt):
print("Got an event: ", evt)
async def on_message(self, msg):
if msg["type"] != "m.room.message":
# not a room message
logger.error(f"not a room message: {msg}")
return
if msg["sender"] == self.UserID:
# ignore own messages
logger.info(f"ignore own message: {msg}")
return
if "msgtype" in msg["content"].keys() and msg["content"]["msgtype"] != "m.text":
# only react to messages, not emotes
logger.debug(f"ignore unknown message type: {msg}")
return
if msg["content"]["body"] == "!stop":
logger.info("stopping the bot")
self.stop()
return
if msg["content"]["body"] == "!leave":
logger.info(f"leaving room {msg['roomid']}")
await self.leaveroom(msg["roomid"])
return
if msg["content"]["body"].startswith("!echo"):
txt = msg["content"]["body"][5:].strip()
if txt == "":
txt = "Empty text? Are you kidding me?"
if msg["is_direct"]:
await self.sendmessage(msg["roomid"], txt)
else:
await self.sendmessagereply(
msg["roomid"], msg["id"], msg["sender"], txt
)
return
logger.info(f"ignored a message: {msg}")
async def on_startup_run(self):
roomlist = await self.joinedrooms()
for room in roomlist:
if room["is_direct"]:
txt = "Hey, I'm back for secret talk :)"
else:
txt = "I'm back online."
await self.sendnotice(room["roomid"], txt)
def main():
# create and initialize the bot
bot = DemoBot(DEFAULT_PREFIX)
# start the asyncio event loop and sync forever (listen for incommmig messages/events)
bot.run()
if __name__ == "__main__":
main()

View file

@ -0,0 +1 @@
from .demobot import main

View file

@ -0,0 +1,6 @@
# Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only
import sys
from .demobot import main
sys.exit(main())

View file

@ -0,0 +1,92 @@
# Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only
import logging
from mxsmal.simple.bot import SMALBot
# setup logging, we want timestamps
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s.%(msecs)03d %(levelname)s %(name)s - %(funcName)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
logger.setLevel(level=logging.INFO)
DEFAULT_PREFIX = "!"
class SimpleDemoBot(SMALBot):
def on_sys(self, ntf):
print("Got a system notification: ", ntf)
def on_event(self, evt):
print("Got an event: ", evt)
def on_message(self, msg):
if msg["type"] != "m.room.message":
# not a room message
logger.error(f"not a room message: {msg}")
return
if msg["sender"] == self.UserID:
# ignore own messages
logger.info(f"ignore own message: {msg}")
return
if "msgtype" in msg["content"].keys() and msg["content"]["msgtype"] != "m.text":
# only react to messages, not emotes
logger.debug(f"ignore unknown message type: {msg}")
return
if msg["content"]["body"] == "!stop":
logger.info("stopping the bot")
self.stop()
return
if msg["content"]["body"] == "!leave":
logger.info(f"leaving room {msg['roomid']}")
self.leaveroom(msg["roomid"])
return
if msg["content"]["body"].startswith("!echo"):
txt = msg["content"]["body"][5:].strip()
if txt == "":
txt = "Empty text? Are you kidding me?"
if msg["is_direct"]:
self.sendmessage(msg["roomid"], txt)
else:
self.sendmessagereply(msg["roomid"], msg["id"], msg["sender"], txt)
return
logger.info(f"ignored a message: {msg}")
def listjoinedrooms(self):
roomlist = self.joinedrooms()
for room in roomlist:
if room["is_direct"]:
txt = "Hey, I'm back for secret talk :)"
else:
txt = "I'm back online."
self.sendnotice(room["roomid"], txt)
def main():
# create and initialize the bot
bot = SimpleDemoBot(DEFAULT_PREFIX)
# the bot's matrix client is ready to use now
# request the list of joined rooms
bot.listjoinedrooms()
# start syncing forever (listen for incommmig messages/events)
bot.run()
if __name__ == "__main__":
main()

View file

46
mxsmal/src/mxsmal/app.py Normal file
View file

@ -0,0 +1,46 @@
# Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only
import logging
import asyncio
from pygomx.client import _AsyncClient
logger = logging.getLogger(__name__)
"""
"""
class SMALApp(_AsyncClient):
"""
implement 'async def self.on_startup()'
async_client is logged in & ready.
time to setup extra things & hooks not covered by this class
sync_loop will not start til we return
implement 'async def self.on_startup_run()'
async_client is logged in & ready.
this will not wait for return
do your even long running startup code here
"""
def __init__(self):
super().__init__()
def run(self):
asyncio.run(self.main_loop())
async def main_loop(self):
if hasattr(self, "on_startup") and callable(self.on_startup):
await self.on_startup()
if hasattr(self, "on_startup_run") and callable(self.on_startup_run):
await asyncio.ensure_future(self.on_startup_run())
await self._sync()
def stop(self):
self._stopsync()

53
mxsmal/src/mxsmal/bot.py Normal file
View file

@ -0,0 +1,53 @@
# Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only
import logging
from .app import SMALApp
logger = logging.getLogger(__name__)
"""
"""
class SMALBot(SMALApp):
""" """
def __init__(self, sigil):
super().__init__()
self._sigil = sigil
async def sendmessage(self, roomid, text):
data = {}
data["roomid"] = roomid
data["content"] = {}
data["content"]["body"] = text
data["content"]["msgtype"] = "m.text"
await self._sendmessage(data)
async def sendmessagereply(self, roomid, msgid, mxid, text):
data = {}
data["roomid"] = roomid
data["content"] = {}
data["content"]["body"] = text
data["content"]["msgtype"] = "m.text"
data["content"]["m.mentions"] = {}
data["content"]["m.mentions"]["user_ids"] = [
mxid,
]
data["content"]["m.relates_to"] = {}
data["content"]["m.relates_to"]["m.in_reply_to"] = {}
data["content"]["m.relates_to"]["m.in_reply_to"]["event_id"] = msgid
await self._sendmessage(data)
async def sendnotice(self, roomid, text):
data = {}
data["roomid"] = roomid
data["content"] = {}
data["content"]["body"] = text
data["content"]["msgtype"] = "m.notice"
await self._sendmessage(data)

View file

View file

@ -0,0 +1,18 @@
# Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only
import logging
from pygomx.simple import _SimpleClient
logger = logging.getLogger(__name__)
"""
"""
class SMALApp(_SimpleClient):
""" """
def __init__(self):
super().__init__()

View file

@ -0,0 +1,59 @@
# Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only
import logging
from .app import SMALApp
logger = logging.getLogger(__name__)
"""
"""
class SMALBot(SMALApp):
""" """
def __init__(self, sigil):
super().__init__()
self._sigil = sigil
def run(self):
self._sync()
def stop(self):
self._stopsync()
def sendmessage(self, roomid, text):
data = {}
data["roomid"] = roomid
data["content"] = {}
data["content"]["body"] = text
data["content"]["msgtype"] = "m.text"
self._sendmessage(data)
def sendmessagereply(self, roomid, msgid, mxid, text):
data = {}
data["roomid"] = roomid
data["content"] = {}
data["content"]["body"] = text
data["content"]["msgtype"] = "m.text"
data["content"]["m.mentions"] = {}
data["content"]["m.mentions"]["user_ids"] = [
mxid,
]
data["content"]["m.relates_to"] = {}
data["content"]["m.relates_to"]["m.in_reply_to"] = {}
data["content"]["m.relates_to"]["m.in_reply_to"]["event_id"] = msgid
self._sendmessage(data)
def sendnotice(self, roomid, text):
data = {}
data["roomid"] = roomid
data["content"] = {}
data["content"]["body"] = text
data["content"]["msgtype"] = "m.notice"
self._sendmessage(data)

View file

@ -0,0 +1 @@
from .smalsetup import smalsetup

View file

@ -0,0 +1,6 @@
# Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only
import sys
from .smalsetup import smalsetup
sys.exit(smalsetup())

View file

@ -0,0 +1,60 @@
# Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only
import datetime
import getpass
import os
import time
from functools import partial, wraps
import click
from pygomx.errors import PygomxAPIError
from pygomx import ApiV0
def catch_exception(func=None, *, handle):
if not func:
return partial(catch_exception, handle=handle)
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except handle as e:
raise click.ClickException(e)
return wrapper
@click.command()
@click.option(
"--mxpass",
"mxpassfile",
metavar="filepath",
default=".mxpass",
help="mxpass file name",
)
@click.argument("mxid", metavar="MatrixID")
@catch_exception(handle=(PygomxAPIError))
def smalsetup(mxid, mxpassfile):
"""Utility for creating smalbot mxpass files"""
create_mxpass = len(mxpassfile.strip()) > 0
if create_mxpass:
if os.path.exists(mxpassfile):
raise click.ClickException(f"file {mxpassfile} exists.")
result_dict = ApiV0.Discover(mxid)
result_dict["password"] = getpass.getpass(prompt="Password: ")
result_dict["make_master_key"] = True
result_dict["make_recovery_key"] = True
now = int(time.time())
result_dict["deviceid"] = f"smalbot-{now}"
result_dict["devicename"] = f"smalbot-{datetime.fromtimestamp(now)}"
ApiV0.Login(result_dict, ".mxpass")
click.echo("login created. start your bot now.")

View file

@ -0,0 +1,8 @@
from .discoverhs import discoverhs as discoverhs
from .mktoken import mktoken as mktoken
from .whoami import whoami as whoami
from .accountinfo import accountinfo as accountinfo
from .clearaccount import clearaccount as clearaccount
from .serverinfo import serverinfo as serverinfo
from .passitem import passitem as passitem
from .logout import logout as logout

View file

@ -0,0 +1,29 @@
# Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only
import click
import json
from _pygomx import lib, ffi
@click.command()
@click.option("-u", "--url", "hs_url", metavar="url", help="homeserver url")
@click.option("-t", "--token", "token", metavar="token", help="access token")
def accountinfo(hs_url, token):
"""print info about this account devices"""
if hs_url is None and token is None:
r = lib.cliv0_mxpassitem(b".mxpass", b"*", b"*", b"*")
result = ffi.string(r).decode("utf-8")
lib.FreeCString(r)
result_dict = json.loads(result)
hs_url = result_dict["Matrixhost"]
token = result_dict["Token"]
r = lib.cliv0_accountinfo(
hs_url.encode(encoding="utf-8"), token.encode(encoding="utf-8")
)
result = ffi.string(r)
lib.FreeCString(r)
print(result.decode("utf-8"))

View file

@ -0,0 +1,75 @@
# Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only
import click
import json
from _pygomx import lib, ffi
@click.group()
@click.option("-u", "--url", "hs_url", metavar="url", help="homeserver url")
@click.option("-t", "--token", "token", metavar="token", help="access token")
def clearaccount(hs_url, token):
"""remove various things from account"""
global _hs_url
global _token
if hs_url is None and token is None:
r = lib.cliv0_mxpassitem(b".mxpass", b"*", b"*", b"*")
result = ffi.string(r).decode("utf-8")
lib.FreeCString(r)
result_dict = json.loads(result)
_hs_url = result_dict["Matrixhost"]
_token = result_dict["Token"]
else:
_hs_url = hs_url
_token = token
r = lib.cliv0_accountinfo(
_hs_url.encode(encoding="utf-8"), _token.encode(encoding="utf-8")
)
result = ffi.string(r)
lib.FreeCString(r)
print(result.decode("utf-8"))
# r = lib.cliv0_clearaccount(hs_url, token)
# result = ffi.string(r)
# lib.FreeCString(r)
# print(result)
@clearaccount.group()
def logout(ctx):
"""Logout devices"""
pass
@logout.command("others")
def logout_others():
"""Logout all other devices"""
pass
@logout.command("all")
def logout_all():
"""Logout all devices"""
pass
@logout.command("self")
def logout_self():
"""Logout this device"""
pass
@clearaccount.command()
def sub1():
"""sub1"""
pass
@clearaccount.command()
def sub2():
"""sub2"""
pass

View file

@ -0,0 +1,19 @@
# Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only
from functools import partial, wraps
import click
def click_catch_exception(func=None, *, handle):
if not func:
return partial(click_catch_exception, handle=handle)
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except handle as e:
raise click.ClickException(e)
return wrapper

View file

@ -0,0 +1,24 @@
# Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only
import click
from pygomx.errors import PygomxAPIError
from pygomx import CliV0
from .click import click_catch_exception
@click.command()
@click.option(
"--json", "show_json", is_flag=True, help="show json as returned from server."
)
@click.argument("domain", metavar="string")
@click_catch_exception(handle=(PygomxAPIError))
def discoverhs(domain, show_json):
"""Attempts to discover the homeserver from the given string"""
result = CliV0.Discover(domain)
if show_json:
click.echo(result)
else:
click.echo(result["m.homeserver"]["base_url"])

View file

@ -0,0 +1,137 @@
# Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only
import getpass
from datetime import datetime
import click
from pygomx.errors import PygomxAPIError
from pygomx import CliV0
from .click import click_catch_exception
@click.command()
@click.option("-u", "--url", "hs_url", metavar="url", help="homeserver url")
@click.option("-t", "--token", "token", metavar="token", help="access token")
@click.option(
"--json", "show_json", is_flag=True, help="show json as returned from server."
)
@click.argument("devices", metavar="DeviceID", nargs=-1)
@click.option(
"-l",
"--logout",
"logout_type",
type=click.Choice(["all", "other", "self"]),
help="logout devices",
)
@click_catch_exception(handle=(PygomxAPIError))
def logout(hs_url, token, devices, logout_type, show_json):
"""List or logout devices.
\b
mxlogout [--json]
list all devices
mxlogout --logout all
logout all devices
mxlogout --logout self
logout this device
mxlogout --logout other
logout all other devices (requires auth)
mxlogout deviceid [deviceid]...
logout given devices (requires auth)
"""
if len(devices) > 0 and logout_type is not None:
raise ValueError("you can't get both.")
if hs_url is None and token is None:
cli = CliV0.from_mxpass(".mxpass", "*", "*", "*")
else:
cli = CliV0(hs_url, token)
match logout_type:
case "self":
do_logout(cli, all=False)
return
case "all":
do_logout(cli, all=True)
return
if len(devices) > 0:
device_list = list(devices)
whoami_dict = cli.Whoami()
self_user_id = whoami_dict["user_id"]
do_logout_devices(cli, device_list, self_user_id)
return
reqData = {"method": "GET", "path": ["_matrix", "client", "v3", "devices"]}
raw_device_dict = cli.Generic(reqData)
if logout_type == "other":
whoami_dict = cli.Whoami()
self_device_id = whoami_dict["device_id"]
self_user_id = whoami_dict["user_id"]
device_list = []
for device in raw_device_dict["devices"]:
if device["device_id"] != self_device_id:
device_list += [
device["device_id"],
]
if len(device_list) > 0:
do_logout_devices(cli, device_list, self_user_id)
return
if show_json:
click.echo(raw_device_dict)
return
max_len = 0
for device in raw_device_dict["devices"]:
max_len = max(max_len, len(device["device_id"]))
for device in raw_device_dict["devices"]:
date_object = datetime.fromtimestamp(device["last_seen_ts"] / 1000)
click.echo(
" ".join(
[
device["device_id"],
" " * (max_len - len(device["device_id"])),
str(date_object),
device["last_seen_ip"],
device["display_name"],
]
)
)
date_object = datetime.fromtimestamp(device["last_seen_ts"] / 1000)
def do_logout(cli, all):
reqData = {
"method": "POST",
"path": ["_matrix", "client", "v3", "logout"],
}
if all:
reqData["path"] += ["all"]
res = cli.Generic(reqData)
click.echo(res)
def do_logout_devices(cli, devices, user_id):
reqData = {
"method": "POST",
"path": ["_matrix", "client", "v3", "delete_devices"],
"payload": {
"devices": devices,
"auth": {
"type": "m.login.password",
"password": getpass.getpass(prompt="Password: "),
"identifier": {
"type": "m.id.user",
"user": user_id,
},
},
},
}
res = cli.Generic(reqData)
click.echo(res)

View file

@ -0,0 +1,18 @@
# Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only
import sys
from _pygomx import lib, ffi
def mktoken():
if len(sys.argv) != 3:
print("usage: ", sys.argv[0], " matrixid password")
return 1
mxid = sys.argv[1].encode(encoding="utf-8")
pw = sys.argv[2].encode(encoding="utf-8")
r = lib.cliv0_mkmxtoken(mxid, pw)
result = ffi.string(r)
lib.FreeCString(r)
print(result)

View file

@ -0,0 +1,43 @@
# Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only
import json
import click
from pygomx.errors import PygomxAPIError
from pygomx import CliV0
from .click import click_catch_exception
@click.command()
@click.option(
"-s", "--secret", "show_secret", is_flag=True, help="print only the secret"
)
@click.option("-u", "--url", "hs_url", metavar="url", help="url selector")
@click.option(
"-l", "--localpart", "localpart", metavar="localpart", help="localpart selector"
)
@click.option("-d", "--domain", "domain", metavar="domain", help="domain selector")
@click.argument("mxpassfile", metavar="mxpassfilepath", required=False)
@click_catch_exception(handle=(PygomxAPIError))
def passitem(mxpassfile, show_secret, hs_url, localpart, domain):
"""utility to get items from mxpasss files"""
# defaults
if mxpassfile is None:
mxpassfile = ".mxpass"
if hs_url is None:
hs_url = "*"
if localpart is None:
localpart = "*"
if domain is None:
domain = "*"
result_dict = CliV0.MXPassItem(mxpassfile, hs_url, localpart, domain)
if show_secret:
click.echo(result_dict["Token"])
else:
result_dict["Token"] = "***"
click.echo(json.dumps(result_dict))

View file

@ -0,0 +1,22 @@
# Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only
import sys
import click
from _pygomx import ffi, lib
@click.command()
@click.option(
"--json", "show_json", is_flag=True, help="show json as returned from server."
)
@click.argument("domain", metavar="string")
def serverinfo(domain, show_json):
"""show server info for given server (federationstester light)"""
mxdomain = sys.argv[1].encode(encoding="utf-8")
r = lib.cliv0_serverinfo(mxdomain)
result = ffi.string(r).decode("utf-8")
lib.FreeCString(r)
print(result)

View file

@ -0,0 +1,22 @@
# Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only
import click
from pygomx import CliV0
from pygomx.errors import PygomxAPIError
from .click import click_catch_exception
@click.command()
@click.option("-u", "--url", "hs_url", metavar="url", help="homeserver url")
@click.option("-t", "--token", "token", metavar="token", help="access token")
@click_catch_exception(handle=(PygomxAPIError))
def whoami(hs_url, token):
"""this token belongs to?"""
if hs_url is None and token is None:
cli = CliV0.from_mxpass(".mxpass", "*", "*", "*")
else:
cli = CliV0(hs_url, token)
click.echo(cli.Whoami())

View file