blob: dc3204a2150f37d9a1a124424a17c612db3e507f [file] [log] [blame]
#!/usr/bin/env python3
from pathlib import Path
from sys import argv, exit
import traceback
import common
from threading import Thread
from contextlib import contextmanager
import gi
gi.require_versions({"GLib": "2.0", "Hinawa": "4.0"})
from gi.repository import GLib, Hinawa
CLOCK_MONOTONIC_RAW = 4
@contextmanager
def run_dispatcher(src: GLib.Source):
ctx = GLib.MainContext.new()
src.attach(ctx)
dispatcher = GLib.MainLoop.new(ctx, False)
th = Thread(target=lambda d: d.run(), args=(dispatcher,))
th.start()
try:
yield
finally:
dispatcher.quit()
th.join()
def sync_main(path: Path):
node = Hinawa.FwNode.new()
_ = node.open(str(path), 0)
_, src = node.create_source()
with run_dispatcher(src):
req = Hinawa.FwReq.new()
addr = 0xFFFFF0000404
_, cycle_time = node.read_cycle_time(
CLOCK_MONOTONIC_RAW, Hinawa.CycleTime.new()
)
initiate_cycle = cycle_time.get_fields()[:2]
(
_,
payload,
(request_tstamp, response_tstamp),
) = req.transaction_with_tstamp(
node, Hinawa.FwTcode.READ_QUADLET_REQUEST, addr, 4, [0] * 4, 100
)
sent_cycle = cycle_time.compute_tstamp(request_tstamp)
recv_cycle = cycle_time.compute_tstamp(response_tstamp)
_, cycle_time = node.read_cycle_time(
CLOCK_MONOTONIC_RAW, Hinawa.CycleTime.new()
)
finish_cycle = cycle_time.get_fields()[:2]
common.print_transaction_result(
addr, payload, initiate_cycle, sent_cycle, recv_cycle, finish_cycle
)
def main() -> int:
if len(argv) < 2:
msg = (
"One argument is required for path to special file of Linux FireWire character "
"device"
)
common.print_help_with_msg(Path(__file__).name, msg)
return 1
cmd, literal = argv[:2]
try:
path = common.detect_fw_cdev(literal)
except Exception as e:
common.print_help_with_msg(cmd, str(e))
return 1
try:
sync_main(path)
except GLib.Error as e:
error_domain_map = {
GLib.file_error_quark(): GLib.FileError,
Hinawa.fw_node_error_quark(): Hinawa.FwNodeError,
Hinawa.fw_req_error_quark(): Hinawa.FwReqError,
}
quark = GLib.quark_from_string(e.domain)
if quark in error_domain_map:
code_nick = error_domain_map[quark](e.code).value_nick
print(
f"GLib.Error exception: '{e.message}' due to '{code_nick}' in '{e.domain}'"
)
print()
traceback.print_exception(e)
return 1
except Exception as e:
traceback.print_exception(e)
return 1
return 0
if __name__ == "__main__":
exit(main())