samples: add a Python 3 script to read quadlet data with blocking API As a basic demonstration. Signed-off-by: Takashi Sakamoto <o-takashi@sakamocchi.jp>
diff --git a/README.rst b/README.rst index 2124f85..fcf4923 100644 --- a/README.rst +++ b/README.rst
@@ -102,6 +102,7 @@ - gtk3 - PyGObject is required. - gtk4 - PyGObject is required. - qt5 - PyQt5 is required. +- read-quadlet - demonstration to read quadlet data from node in IEEE 1394 bus Example of Python3 with PyGobject =================================
diff --git a/samples/common/__init__.py b/samples/common/__init__.py index 0a5c5af..75c46f1 100644 --- a/samples/common/__init__.py +++ b/samples/common/__init__.py
@@ -41,6 +41,26 @@ return path +def print_transaction_result( + addr: int, + payload: list[int], + initiate_cycle: list[2], + sent_cycle: list[2], + recv_cycle: list[2], + finish_cycle: list[2], +): + quadlet = unpack(">I", payload)[0] + + print("Read quadlet transaction:") + print(" addr 0x{:012x}, quadlet: 0x{:08x}".format(addr, quadlet)) + print( + " initiate at: {} sec {} cycle".format(initiate_cycle[0], initiate_cycle[1]) + ) + print(" sent at: {} sec {} cycle".format(sent_cycle[0], sent_cycle[1])) + print(" received at: {} sec {} cycle".format(recv_cycle[0], recv_cycle[1])) + print(" finish at: {} sec {} cycle".format(finish_cycle[0], finish_cycle[1])) + + def print_generation_information(node: Hinawa.FwNode): print(' Topology:') print(' self: {:04x}'.format(node.get_property('node-id')))
diff --git a/samples/read-quadlet b/samples/read-quadlet new file mode 100755 index 0000000..dc3204a --- /dev/null +++ b/samples/read-quadlet
@@ -0,0 +1,111 @@ +#!/usr/bin/env python3 + +from pathlib import Path +from sys import argv, exit +import traceback +import common + +from threading import Thread +from contextlib import contextmanager + +import gi + +gi.require_versions({"GLib": "2.0", "Hinawa": "4.0"}) +from gi.repository import GLib, Hinawa + +CLOCK_MONOTONIC_RAW = 4 + + +@contextmanager +def run_dispatcher(src: GLib.Source): + ctx = GLib.MainContext.new() + src.attach(ctx) + + dispatcher = GLib.MainLoop.new(ctx, False) + th = Thread(target=lambda d: d.run(), args=(dispatcher,)) + th.start() + + try: + yield + finally: + dispatcher.quit() + th.join() + + +def sync_main(path: Path): + node = Hinawa.FwNode.new() + _ = node.open(str(path), 0) + _, src = node.create_source() + + with run_dispatcher(src): + req = Hinawa.FwReq.new() + addr = 0xFFFFF0000404 + + _, cycle_time = node.read_cycle_time( + CLOCK_MONOTONIC_RAW, Hinawa.CycleTime.new() + ) + initiate_cycle = cycle_time.get_fields()[:2] + + ( + _, + payload, + (request_tstamp, response_tstamp), + ) = req.transaction_with_tstamp( + node, Hinawa.FwTcode.READ_QUADLET_REQUEST, addr, 4, [0] * 4, 100 + ) + + sent_cycle = cycle_time.compute_tstamp(request_tstamp) + recv_cycle = cycle_time.compute_tstamp(response_tstamp) + + _, cycle_time = node.read_cycle_time( + CLOCK_MONOTONIC_RAW, Hinawa.CycleTime.new() + ) + finish_cycle = cycle_time.get_fields()[:2] + + common.print_transaction_result( + addr, payload, initiate_cycle, sent_cycle, recv_cycle, finish_cycle + ) + + +def main() -> int: + if len(argv) < 2: + msg = ( + "One argument is required for path to special file of Linux FireWire character " + "device" + ) + common.print_help_with_msg(Path(__file__).name, msg) + return 1 + cmd, literal = argv[:2] + + try: + path = common.detect_fw_cdev(literal) + except Exception as e: + common.print_help_with_msg(cmd, str(e)) + return 1 + + try: + sync_main(path) + except GLib.Error as e: + error_domain_map = { + GLib.file_error_quark(): GLib.FileError, + Hinawa.fw_node_error_quark(): Hinawa.FwNodeError, + Hinawa.fw_req_error_quark(): Hinawa.FwReqError, + } + quark = GLib.quark_from_string(e.domain) + if quark in error_domain_map: + code_nick = error_domain_map[quark](e.code).value_nick + print( + f"GLib.Error exception: '{e.message}' due to '{code_nick}' in '{e.domain}'" + ) + print() + traceback.print_exception(e) + return 1 + except Exception as e: + traceback.print_exception(e) + return 1 + + return 0 + + +if __name__ == "__main__": + exit(main())