| #!/usr/bin/env python3 |
| |
| from pathlib import Path |
| from sys import argv, exit |
| import traceback |
| import common |
| |
| from threading import Thread |
| from contextlib import contextmanager |
| |
| import gi |
| |
| gi.require_versions({"GLib": "2.0", "Hinawa": "4.0"}) |
| from gi.repository import GLib, Hinawa |
| |
| CLOCK_MONOTONIC_RAW = 4 |
| |
| |
| @contextmanager |
| def run_dispatcher(src: GLib.Source): |
| ctx = GLib.MainContext.new() |
| src.attach(ctx) |
| |
| dispatcher = GLib.MainLoop.new(ctx, False) |
| th = Thread(target=lambda d: d.run(), args=(dispatcher,)) |
| th.start() |
| |
| try: |
| yield |
| finally: |
| dispatcher.quit() |
| th.join() |
| |
| |
| def sync_main(path: Path): |
| node = Hinawa.FwNode.new() |
| _ = node.open(str(path), 0) |
| _, src = node.create_source() |
| |
| with run_dispatcher(src): |
| req = Hinawa.FwReq.new() |
| addr = 0xFFFFF0000404 |
| |
| _, cycle_time = node.read_cycle_time( |
| CLOCK_MONOTONIC_RAW, Hinawa.CycleTime.new() |
| ) |
| initiate_cycle = cycle_time.get_fields()[:2] |
| |
| ( |
| _, |
| payload, |
| (request_tstamp, response_tstamp), |
| ) = req.transaction_with_tstamp( |
| node, Hinawa.FwTcode.READ_QUADLET_REQUEST, addr, 4, [0] * 4, 100 |
| ) |
| |
| sent_cycle = cycle_time.compute_tstamp(request_tstamp) |
| recv_cycle = cycle_time.compute_tstamp(response_tstamp) |
| |
| _, cycle_time = node.read_cycle_time( |
| CLOCK_MONOTONIC_RAW, Hinawa.CycleTime.new() |
| ) |
| finish_cycle = cycle_time.get_fields()[:2] |
| |
| common.print_transaction_result( |
| addr, payload, initiate_cycle, sent_cycle, recv_cycle, finish_cycle |
| ) |
| |
| |
| def main() -> int: |
| if len(argv) < 2: |
| msg = ( |
| "One argument is required for path to special file of Linux FireWire character " |
| "device" |
| ) |
| common.print_help_with_msg(Path(__file__).name, msg) |
| return 1 |
| cmd, literal = argv[:2] |
| |
| try: |
| path = common.detect_fw_cdev(literal) |
| except Exception as e: |
| common.print_help_with_msg(cmd, str(e)) |
| return 1 |
| |
| try: |
| sync_main(path) |
| except GLib.Error as e: |
| error_domain_map = { |
| GLib.file_error_quark(): GLib.FileError, |
| Hinawa.fw_node_error_quark(): Hinawa.FwNodeError, |
| Hinawa.fw_req_error_quark(): Hinawa.FwReqError, |
| } |
| quark = GLib.quark_from_string(e.domain) |
| if quark in error_domain_map: |
| code_nick = error_domain_map[quark](e.code).value_nick |
| print( |
| f"GLib.Error exception: '{e.message}' due to '{code_nick}' in '{e.domain}'" |
| ) |
| print() |
| traceback.print_exception(e) |
| return 1 |
| except Exception as e: |
| traceback.print_exception(e) |
| return 1 |
| |
| return 0 |
| |
| |
| if __name__ == "__main__": |
| exit(main()) |