more click, more api, even more fixes

This commit is contained in:
saces 2026-03-17 18:16:21 +01:00
parent 8fcf9b4785
commit cc8c77e780
18 changed files with 318 additions and 189 deletions

View file

@ -4,9 +4,11 @@ package mxapi
import ( import (
"context" "context"
"crypto/sha256" "crypto/rand"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"os"
"time" "time"
"maunium.net/go/mautrix" "maunium.net/go/mautrix"
@ -18,6 +20,11 @@ type login_data struct {
Mxid string `json:"mxid"` Mxid string `json:"mxid"`
Loginname string `json:"loginname"` Loginname string `json:"loginname"`
Password string `json:"password"` Password string `json:"password"`
DeviceID string `json:"deviceid"`
DeviceName string `json:"devicename"`
MXPassFile string `json:"mxpassfile"`
MakeMasterKey bool `json:"make_master_key"`
MakeRecoveryKey bool `json:"make_recovery_key"`
} }
func Login(data string) (string, error) { func Login(data string) (string, error) {
@ -27,12 +34,27 @@ func Login(data string) (string, error) {
return "", err return "", err
} }
if ld.MXPassFile != "" {
if _, err := os.Stat(ld.MXPassFile); err == nil {
return "", fmt.Errorf("mxpassfile '%s' already exists", ld.MXPassFile)
} else if !errors.Is(err, os.ErrNotExist) {
return "", fmt.Errorf("error while checking mxpassfile: %v", err)
}
}
mauclient, err := mautrix.NewClient(ld.Homeserver, id.UserID(ld.Mxid), "") mauclient, err := mautrix.NewClient(ld.Homeserver, id.UserID(ld.Mxid), "")
if err != nil { if err != nil {
return "", err return "", err
} }
deviceName := fmt.Sprintf("smalbot-%d", time.Now().Unix()) now := time.Now()
if ld.DeviceID == "" {
ld.DeviceID = fmt.Sprintf("libmxclient-%d", now.Unix())
}
if ld.DeviceName == "" {
ld.DeviceName = fmt.Sprintf("libmxclient-%s", now.Format(time.RFC3339))
}
resp, err := mauclient.Login(context.Background(), &mautrix.ReqLogin{ resp, err := mauclient.Login(context.Background(), &mautrix.ReqLogin{
Type: "m.login.password", Type: "m.login.password",
@ -41,8 +63,8 @@ func Login(data string) (string, error) {
User: ld.Loginname, User: ld.Loginname,
}, },
Password: ld.Password, Password: ld.Password,
DeviceID: id.DeviceID(deviceName), DeviceID: id.DeviceID(ld.DeviceID),
InitialDeviceDisplayName: deviceName, InitialDeviceDisplayName: ld.DeviceName,
StoreCredentials: false, StoreCredentials: false,
StoreHomeserverURL: false, StoreHomeserverURL: false,
RefreshToken: false, RefreshToken: false,
@ -51,8 +73,27 @@ func Login(data string) (string, error) {
return "", err return "", err
} }
tpl := "%s | %s | %s | %s\nrecovery | | | %x\nmaster | | | %x\n" res := fmt.Sprintf("%s | %s | %s | %s\n", ld.Homeserver, ld.Loginname, id.UserID(ld.Mxid).Homeserver(), resp.AccessToken)
res := fmt.Sprintf(tpl, ld.Homeserver, ld.Loginname, id.UserID(ld.Mxid).Homeserver(), resp.AccessToken, sha256.Sum256([]byte(ld.Password)), sha256.Sum256([]byte(deviceName)))
if ld.MakeMasterKey {
masterkey := make([]byte, 32)
rand.Read(masterkey)
res = fmt.Sprintf("%smaster | | | %x\n", res, masterkey)
}
if ld.MakeRecoveryKey {
recoverykey := make([]byte, 32)
rand.Read(recoverykey)
res = fmt.Sprintf("%srecovery | | | %x\n", res, recoverykey)
}
if ld.MXPassFile != "" {
err := os.WriteFile(ld.MXPassFile, []byte(res), 0600)
if err != nil {
return "", fmt.Errorf("unable to write file: %w", err)
}
return "SUCCESS.", nil
}
return res, nil return res, nil
} }

1
pygomx/.gitignore vendored
View file

@ -6,3 +6,4 @@ build/
*.c *.c
*.o *.o
*.a *.a
__pycache__

View file

@ -1 +1,2 @@
from .cliv0 import CliV0Api, CliV0 from .cliv0 import CliV0Api, CliV0
from .client import _AsyncClient

View file

@ -0,0 +1,72 @@
# Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only
import logging
from _pygomx import lib
from .util import _stringresult, _autostring, _autodict
from .errors import CheckApiError, CheckApiErrorOnly, CheckApiResult
logger = logging.getLogger(__name__)
class ApiV0Api:
"""api_v0 api
some c-api wrappers & helpers
inputs: str or bytes
output: str (utf8)
"""
@staticmethod
def discover(mxid):
return _stringresult(lib.apiv0_discover(_autostring(mxid)))
@staticmethod
def login(data):
return _stringresult(lib.apiv0_login(_autodict(data)))
@staticmethod
def joinedrooms(cid):
return _stringresult(lib.apiv0_joinedrooms(cid))
@staticmethod
def sendmessage(cid, data):
return _stringresult(lib.apiv0_sendmessage(cid, _autodict(data)))
@staticmethod
def startclient(cid):
return _stringresult(lib.apiv0_startclient(cid))
@staticmethod
def stopclient(cid):
return _stringresult(lib.apiv0_stopclient(cid))
@staticmethod
def leaveroom(cid, roomid):
return _stringresult(lib.apiv0_leaveroom(cid, _autostring(roomid)))
@staticmethod
def createroom(cid, data):
return _stringresult(lib.apiv0_createroom(cid, _autodict(data)))
class ApiV0:
"""ApiV0"""
@staticmethod
def Discover(mxid):
res = ApiV0Api.discover(mxid)
return CheckApiResult(res)
@staticmethod
def Login(data: dict, mxpassfile: str):
withpass = False
if mxpassfile is not None and len(mxpassfile.strip()) > 0:
data["mxpassfile"] = mxpassfile
withpass = True
res = ApiV0Api.login(data)
if withpass:
CheckApiError(res)
else:
CheckApiErrorOnly(res)
return res

View file

@ -1,13 +1,14 @@
# Copyright (C) 2026 saces@c-base.org # Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
import asyncio
import json import json
import logging import logging
import threading
from _pygomx import ffi, lib from _pygomx import ffi, lib
from .errors import APIError, CheckApiError, CheckApiResult from .apiv0 import ApiV0Api
import asyncio from .errors import CheckApiError, CheckApiResult, PygomxAPIError
import threading
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -29,7 +30,7 @@ class _AsyncClient:
result = ffi.string(r) result = ffi.string(r)
lib.FreeCString(r) lib.FreeCString(r)
if result.startswith(b"ERR:"): if result.startswith(b"ERR:"):
raise APIError(result) raise PygomxAPIError(result)
r = lib.apiv0_set_on_message_handler( r = lib.apiv0_set_on_message_handler(
self.client_id, on_message_callback, self._ffi_selfhandle self.client_id, on_message_callback, self._ffi_selfhandle
@ -37,7 +38,7 @@ class _AsyncClient:
result = ffi.string(r) result = ffi.string(r)
lib.FreeCString(r) lib.FreeCString(r)
if result.startswith(b"ERR:"): if result.startswith(b"ERR:"):
raise APIError(result) raise PygomxAPIError(result)
r = lib.apiv0_set_on_sys_handler( r = lib.apiv0_set_on_sys_handler(
self.client_id, on_sys_callback, self._ffi_selfhandle self.client_id, on_sys_callback, self._ffi_selfhandle
@ -45,7 +46,7 @@ class _AsyncClient:
result = ffi.string(r) result = ffi.string(r)
lib.FreeCString(r) lib.FreeCString(r)
if result.startswith(b"ERR:"): if result.startswith(b"ERR:"):
raise APIError(result) raise PygomxAPIError(result)
def _createMXClient(self): def _createMXClient(self):
r = lib.apiv0_createclient_pass(b".mxpass", b".", b"*", b"*", b"*") r = lib.apiv0_createclient_pass(b".mxpass", b".", b"*", b"*", b"*")
@ -53,7 +54,7 @@ class _AsyncClient:
result = ffi.string(r) result = ffi.string(r)
lib.FreeCString(r) lib.FreeCString(r)
if result.startswith(b"ERR:"): if result.startswith(b"ERR:"):
raise APIError(result) raise PygomxAPIError(result)
result_dict = json.loads(result) result_dict = json.loads(result)
self.client_id = result_dict["id"] self.client_id = result_dict["id"]
@ -61,29 +62,27 @@ class _AsyncClient:
self.DeviceID = result_dict["deviceid"] self.DeviceID = result_dict["deviceid"]
async def _sync(self): async def _sync(self):
r = lib.apiv0_startclient(self.client_id) r = ApiV0Api.startclient(self.client_id)
CheckApiError(r) CheckApiError(r)
def _stopsync(self): def _stopsync(self):
r = lib.apiv0_stopclient(self.client_id) r = ApiV0Api.stopclient(self.client_id)
CheckApiError(r) CheckApiError(r)
async def _sendmessage(self, data_dict): async def _sendmessage(self, data_dict):
data = json.dumps(data_dict).encode(encoding="utf-8") r = ApiV0Api.sendmessage(self.client_id, data_dict)
r = lib.apiv0_sendmessage(self.client_id, data)
return CheckApiResult(r) return CheckApiResult(r)
def leaveroom(self, roomid): def leaveroom(self, roomid):
r = lib.apiv0_leaveroom(self.client_id, roomid.encode(encoding="utf-8")) r = ApiV0Api.leaveroom(self.client_id, roomid)
CheckApiError(r) CheckApiError(r)
async def joinedrooms(self): async def joinedrooms(self):
r = lib.apiv0_joinedrooms(self.client_id) r = ApiV0Api.joinedrooms(self.client_id)
return CheckApiResult(r) return CheckApiResult(r)
def _createroom(self, data_dict): def _createroom(self, data_dict):
data = json.dumps(data_dict).encode(encoding="utf-8") r = ApiV0Api.createroom(self.client_id, data_dict)
r = lib.apiv0_createroom(self.client_id, data)
return CheckApiError(r) return CheckApiError(r)
def process_event(self, evt): def process_event(self, evt):

View file

@ -1,57 +1,9 @@
# Copyright (C) 2026 saces@c-base.org # Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
from _pygomx import ffi, lib from _pygomx import lib
import json from .errors import CheckApiResult
from .errors import APIError
from .util import _stringresult, _autostring, _autodict
def _stringresult(cstr):
result = ffi.string(cstr)
lib.FreeCString(cstr)
return result.decode("utf-8")
def _autostring(xstr):
match xstr:
case bytes():
return xstr
case str():
return xstr.encode(encoding="utf-8")
case _:
raise TypeError("only str or bytes allowed")
def _autodict(xdict):
match xdict:
case bytes():
return xdict
case str():
return xdict.encode(encoding="utf-8")
case dict():
return json.dumps(xdict).encode(encoding="utf-8")
case _:
raise TypeError("only str or bytes or dict allowed")
def CheckApiError(rstr):
if rstr.startswith("ERR:"):
raise APIError(rstr)
if rstr == "SUCCESS.":
return None
raise ValueError(f"unexpected result: {rstr[:20]}")
def CheckApiResult(rstr):
if rstr.startswith("ERR:"):
raise APIError(rstr)
if rstr == "SUCCESS.":
return None
result_dict = json.loads(rstr)
return result_dict
class CliV0Api: class CliV0Api:
@ -86,6 +38,14 @@ class CliV0Api:
) )
) )
@staticmethod
def discover(domain):
return _stringresult(
lib.cliv0_discoverhs(
_autostring(domain),
)
)
class CliV0: class CliV0:
"""cli_v0 api class """cli_v0 api class
@ -102,6 +62,16 @@ class CliV0:
result_dict = CheckApiResult(res) result_dict = CheckApiResult(res)
return cls(result_dict["Matrixhost"], result_dict["Token"]) return cls(result_dict["Matrixhost"], result_dict["Token"])
@staticmethod
def Discover(domain):
res = CliV0Api.discover(domain)
return CheckApiResult(res)
@staticmethod
def MXPassItem(mxpassfile, hs_url, localpart, domain):
res = CliV0Api.mxpassitem(mxpassfile, hs_url, localpart, domain)
return CheckApiResult(res)
def Whoami(self): def Whoami(self):
res = CliV0Api.whoami(self.hs_url, self.token) res = CliV0Api.whoami(self.hs_url, self.token)
return CheckApiResult(res) return CheckApiResult(res)

View file

@ -1,10 +1,9 @@
# Copyright (C) 2026 saces@c-base.org # Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
from _pygomx import ffi, lib
import json import json
class APIError(Exception): class PygomxAPIError(Exception):
"""Exception raised for api usage errors. """Exception raised for api usage errors.
Attributes: Attributes:
@ -16,27 +15,27 @@ class APIError(Exception):
super().__init__(self.message) super().__init__(self.message)
def apiResult(cstr): def CheckApiErrorOnly(rstr):
result = ffi.string(cstr).decode("utf-8") if rstr.startswith("ERR:"):
lib.FreeCString(cstr) raise PygomxAPIError(rstr)
return result
def CheckApiError(cstr): def CheckApiError(rstr):
result = apiResult(cstr) if rstr.startswith("ERR:"):
raise PygomxAPIError(rstr)
if result.startswith("ERR:"): if rstr == "SUCCESS.":
raise APIError(result)
def CheckApiResult(cstr):
result = apiResult(cstr)
if result.startswith("ERR:"):
raise APIError(result)
if result == "SUCCESS.":
return None return None
result_dict = json.loads(result) raise ValueError(f"unexpected result: {rstr[:60]}")
def CheckApiResult(rstr):
if rstr.startswith("ERR:"):
raise PygomxAPIError(rstr)
if rstr == "SUCCESS.":
return None
result_dict = json.loads(rstr)
return result_dict return result_dict

View file

@ -0,0 +1 @@
from .client import _SimpleClient

View file

@ -5,7 +5,8 @@ import logging
from _pygomx import ffi, lib from _pygomx import ffi, lib
from .errors import APIError, CheckApiError from ..apiv0 import ApiV0Api
from ..errors import CheckApiError, CheckApiResult, PygomxAPIError
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -27,7 +28,7 @@ class _SimpleClient:
result = ffi.string(r) result = ffi.string(r)
lib.FreeCString(r) lib.FreeCString(r)
if result.startswith(b"ERR:"): if result.startswith(b"ERR:"):
raise APIError(result) raise PygomxAPIError(result)
r = lib.apiv0_set_on_message_handler( r = lib.apiv0_set_on_message_handler(
self.client_id, on_message_callback, self._ffi_selfhandle self.client_id, on_message_callback, self._ffi_selfhandle
@ -35,7 +36,7 @@ class _SimpleClient:
result = ffi.string(r) result = ffi.string(r)
lib.FreeCString(r) lib.FreeCString(r)
if result.startswith(b"ERR:"): if result.startswith(b"ERR:"):
raise APIError(result) raise PygomxAPIError(result)
r = lib.apiv0_set_on_sys_handler( r = lib.apiv0_set_on_sys_handler(
self.client_id, on_sys_callback, self._ffi_selfhandle self.client_id, on_sys_callback, self._ffi_selfhandle
@ -43,7 +44,7 @@ class _SimpleClient:
result = ffi.string(r) result = ffi.string(r)
lib.FreeCString(r) lib.FreeCString(r)
if result.startswith(b"ERR:"): if result.startswith(b"ERR:"):
raise APIError(result) raise PygomxAPIError(result)
def _createMXClient(self): def _createMXClient(self):
r = lib.apiv0_createclient_pass(b".mxpass", b".", b"*", b"*", b"*") r = lib.apiv0_createclient_pass(b".mxpass", b".", b"*", b"*", b"*")
@ -51,7 +52,7 @@ class _SimpleClient:
result = ffi.string(r) result = ffi.string(r)
lib.FreeCString(r) lib.FreeCString(r)
if result.startswith(b"ERR:"): if result.startswith(b"ERR:"):
raise APIError(result) raise PygomxAPIError(result)
result_dict = json.loads(result) result_dict = json.loads(result)
self.client_id = result_dict["id"] self.client_id = result_dict["id"]
@ -59,30 +60,27 @@ class _SimpleClient:
self.DeviceID = result_dict["deviceid"] self.DeviceID = result_dict["deviceid"]
def _sync(self): def _sync(self):
r = lib.apiv0_startclient(self.client_id) r = ApiV0Api.startclient(self.client_id)
CheckApiError(r) CheckApiError(r)
def _stopsync(self): def _stopsync(self):
r = lib.apiv0_stopclient(self.client_id) r = ApiV0Api.stopclient(self.client_id)
CheckApiError(r) CheckApiError(r)
def _sendmessage(self, data_dict): def _sendmessage(self, data_dict):
data = json.dumps(data_dict).encode(encoding="utf-8") r = ApiV0Api.sendmessage(self.client_id, data_dict)
r = lib.apiv0_sendmessage(self.client_id, data) return CheckApiResult(r)
result = CheckApiError(r)
return result
def leaveroom(self, roomid): def leaveroom(self, roomid):
r = lib.apiv0_leaveroom(self.client_id, roomid.encode(encoding="utf-8")) r = ApiV0Api.leaveroom(self.client_id, roomid)
CheckApiError(r) CheckApiError(r)
def joinedrooms(self): def joinedrooms(self):
r = lib.apiv0_joinedrooms(self.client_id) r = ApiV0Api.joinedrooms(self.client_id)
return CheckApiError(r) return CheckApiResult(r)
def _createroom(self, data_dict): def _createroom(self, data_dict):
data = json.dumps(data_dict).encode(encoding="utf-8") r = ApiV0Api.createroom(self.client_id, data_dict)
r = lib.apiv0_createroom(self.client_id, data)
return CheckApiError(r) return CheckApiError(r)
def process_event(self, evt): def process_event(self, evt):

32
pygomx/src/pygomx/util.py Normal file
View file

@ -0,0 +1,32 @@
# Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only
from _pygomx import ffi, lib
import json
def _stringresult(cstr):
result = ffi.string(cstr)
lib.FreeCString(cstr)
return result.decode("utf-8")
def _autostring(xstr):
match xstr:
case bytes():
return xstr
case str():
return xstr.encode(encoding="utf-8")
case _:
raise TypeError("only str or bytes allowed")
def _autodict(xdict):
match xdict:
case bytes():
return xdict
case str():
return xdict.encode(encoding="utf-8")
case dict():
return json.dumps(xdict).encode(encoding="utf-8")
case _:
raise TypeError("only str or bytes or dict allowed")

1
smal/.gitignore vendored
View file

@ -1,2 +1,3 @@
__pycache__ __pycache__
*.egg-info *.egg-info
build

View file

@ -0,0 +1 @@
from .demobot import main

View file

@ -0,0 +1 @@
from .demobot import main

View file

@ -1,9 +1,11 @@
# Copyright (C) 2026 saces@c-base.org # Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
import sys
from _pygomx import lib, ffi
import click import click
import json from pygomx.errors import PygomxAPIError
from pygomx import CliV0
from .click import click_catch_exception
@click.command() @click.command()
@ -11,18 +13,12 @@ import json
"--json", "show_json", is_flag=True, help="show json as returned from server." "--json", "show_json", is_flag=True, help="show json as returned from server."
) )
@click.argument("domain", metavar="string") @click.argument("domain", metavar="string")
@click_catch_exception(handle=(PygomxAPIError))
def discoverhs(domain, show_json): def discoverhs(domain, show_json):
"""Attempts to discover the homeserver from the given string""" """Attempts to discover the homeserver from the given string"""
mxid = domain.encode(encoding="utf-8") result = CliV0.Discover(domain)
r = lib.cliv0_discoverhs(mxid)
result = ffi.string(r).decode("utf-8")
lib.FreeCString(r)
if result.startswith("ERR:"):
print(result)
sys.exit(1)
if show_json: if show_json:
print(result) click.echo(result)
else: else:
result_dict = json.loads(result) click.echo(result["m.homeserver"]["base_url"])
print(result_dict["m.homeserver"]["base_url"])

View file

@ -4,9 +4,12 @@ import getpass
from datetime import datetime from datetime import datetime
import click import click
from pygomx.errors import PygomxAPIError
from pygomx import CliV0 from pygomx import CliV0
from .click import click_catch_exception
@click.command() @click.command()
@click.option("-u", "--url", "hs_url", metavar="url", help="homeserver url") @click.option("-u", "--url", "hs_url", metavar="url", help="homeserver url")
@ -22,17 +25,18 @@ from pygomx import CliV0
type=click.Choice(["all", "other", "self"]), type=click.Choice(["all", "other", "self"]),
help="logout devices", help="logout devices",
) )
@click_catch_exception(handle=(PygomxAPIError))
def logout(hs_url, token, devices, logout_type, show_json): def logout(hs_url, token, devices, logout_type, show_json):
"""List or logout devices. """List or logout devices.
\b \b
mxlogout [--json] mxlogout [--json]
list all devices list all devices
mxlogout --all mxlogout --logout all
logout all devices logout all devices
mxlogout --self mxlogout --logout self
logout this device logout this device
mxlogout --other mxlogout --logout other
logout all other devices (requires auth) logout all other devices (requires auth)
mxlogout deviceid [deviceid]... mxlogout deviceid [deviceid]...
logout given devices (requires auth) logout given devices (requires auth)
@ -79,7 +83,7 @@ def logout(hs_url, token, devices, logout_type, show_json):
return return
if show_json: if show_json:
print(raw_device_dict) click.echo(raw_device_dict)
return return
max_len = 0 max_len = 0
@ -88,12 +92,16 @@ def logout(hs_url, token, devices, logout_type, show_json):
for device in raw_device_dict["devices"]: for device in raw_device_dict["devices"]:
date_object = datetime.fromtimestamp(device["last_seen_ts"] / 1000) date_object = datetime.fromtimestamp(device["last_seen_ts"] / 1000)
print( click.echo(
" ".join(
[
device["device_id"], device["device_id"],
" " * (max_len - len(device["device_id"])), " " * (max_len - len(device["device_id"])),
date_object, str(date_object),
device["last_seen_ip"], device["last_seen_ip"],
device["display_name"], device["display_name"],
]
)
) )
date_object = datetime.fromtimestamp(device["last_seen_ts"] / 1000) date_object = datetime.fromtimestamp(device["last_seen_ts"] / 1000)
@ -106,7 +114,7 @@ def do_logout(cli, all):
if all: if all:
reqData["path"] += ["all"] reqData["path"] += ["all"]
res = cli.Generic(reqData) res = cli.Generic(reqData)
print(res) click.echo(res)
def do_logout_devices(cli, devices, user_id): def do_logout_devices(cli, devices, user_id):
@ -126,4 +134,4 @@ def do_logout_devices(cli, devices, user_id):
}, },
} }
res = cli.Generic(reqData) res = cli.Generic(reqData)
print(res) click.echo(res)

View file

@ -1,9 +1,14 @@
# Copyright (C) 2026 saces@c-base.org # Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
from _pygomx import lib, ffi
import click
import json import json
import click
from pygomx.errors import PygomxAPIError
from pygomx import CliV0
from .click import click_catch_exception
@click.command() @click.command()
@click.option( @click.option(
@ -15,6 +20,7 @@ import json
) )
@click.option("-d", "--domain", "domain", metavar="domain", help="domain selector") @click.option("-d", "--domain", "domain", metavar="domain", help="domain selector")
@click.argument("mxpassfile", metavar="mxpassfilepath", required=False) @click.argument("mxpassfile", metavar="mxpassfilepath", required=False)
@click_catch_exception(handle=(PygomxAPIError))
def passitem(mxpassfile, show_secret, hs_url, localpart, domain): def passitem(mxpassfile, show_secret, hs_url, localpart, domain):
"""utility to get items from mxpasss files""" """utility to get items from mxpasss files"""
@ -28,19 +34,10 @@ def passitem(mxpassfile, show_secret, hs_url, localpart, domain):
if domain is None: if domain is None:
domain = "*" domain = "*"
r = lib.cliv0_mxpassitem( result_dict = CliV0.MXPassItem(mxpassfile, hs_url, localpart, domain)
mxpassfile.encode(encoding="utf-8"),
hs_url.encode(encoding="utf-8"),
localpart.encode(encoding="utf-8"),
domain.encode(encoding="utf-8"),
)
result = ffi.string(r).decode("utf-8")
lib.FreeCString(r)
result_dict = json.loads(result)
if show_secret: if show_secret:
print(result_dict["Token"]) click.echo(result_dict["Token"])
else: else:
result_dict["Token"] = "***" result_dict["Token"] = "***"
print(json.dumps(result_dict)) click.echo(json.dumps(result_dict))

View file

@ -2,7 +2,7 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
import click import click
from pygomx import CliV0 from pygomx import CliV0
import pygomx from pygomx.errors import PygomxAPIError
from .click import click_catch_exception from .click import click_catch_exception
@ -10,7 +10,7 @@ from .click import click_catch_exception
@click.command() @click.command()
@click.option("-u", "--url", "hs_url", metavar="url", help="homeserver url") @click.option("-u", "--url", "hs_url", metavar="url", help="homeserver url")
@click.option("-t", "--token", "token", metavar="token", help="access token") @click.option("-t", "--token", "token", metavar="token", help="access token")
@click_catch_exception(handle=(pygomx.errors.APIError)) @click_catch_exception(handle=(PygomxAPIError))
def whoami(hs_url, token): def whoami(hs_url, token):
"""this token belongs to?""" """this token belongs to?"""

View file

@ -1,49 +1,60 @@
# Copyright (C) 2026 saces@c-base.org # Copyright (C) 2026 saces@c-base.org
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
import sys import datetime
import os
import getpass import getpass
import json import os
from _pygomx import lib, ffi import time
from functools import partial, wraps
import click
from pygomx.errors import PygomxAPIError
from pygomx import ApiV0
def smalsetup(): def catch_exception(func=None, *, handle):
if len(sys.argv) != 2: if not func:
print("usage: ", sys.argv[0], " matrixid") return partial(catch_exception, handle=handle)
return 1
mxid = sys.argv[1].encode(encoding="utf-8") @wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except handle as e:
raise click.ClickException(e)
r = lib.apiv0_discover(mxid) return wrapper
result = ffi.string(r).decode("utf-8")
lib.FreeCString(r)
if result.startswith("ERR:"):
print(result)
return 1
result_dict = json.loads(result) @click.command()
@click.option(
"--mxpass",
"mxpassfile",
metavar="filepath",
default=".mxpass",
help="mxpass file name",
)
@click.argument("mxid", metavar="MatrixID")
@catch_exception(handle=(PygomxAPIError))
def smalsetup(mxid, mxpassfile):
"""Utility for creating smalbot mxpass files"""
create_mxpass = len(mxpassfile.strip()) > 0
if create_mxpass:
if os.path.exists(mxpassfile):
raise click.ClickException(f"file {mxpassfile} exists.")
result_dict = ApiV0.Discover(mxid)
result_dict["password"] = getpass.getpass(prompt="Password: ") result_dict["password"] = getpass.getpass(prompt="Password: ")
data = json.dumps(result_dict).encode(encoding="utf-8") result_dict["make_master_key"] = True
result_dict["make_recovery_key"] = True
r = lib.apiv0_login(data) now = int(time.time())
result = ffi.string(r).decode("utf-8") result_dict["deviceid"] = f"smalbot-{now}"
lib.FreeCString(r) result_dict["devicename"] = f"smalbot-{datetime.fromtimestamp(now)}"
if result.startswith("ERR:"): ApiV0.Login(result_dict, ".mxpass")
print(result)
return 1
# Set restrictive umask (owner only) click.echo("login created. start your bot now.")
new_umask = 0o077
old_umask = os.umask(new_umask)
# Create file with new umask
with open(".mxpass", "w") as f:
f.write(result)
# Restore original umask
os.umask(old_umask)
print("login created. start your bot now.")
return 0