| from sys import stderr |
| from pathlib import Path |
| from struct import unpack |
| from contextlib import contextmanager |
| from threading import Thread |
| |
| import gi |
| gi.require_versions({'GLib': '2.0', 'Hinawa': '4.0'}) |
| from gi.repository import GLib, Hinawa |
| |
| CLOCK_MONOTONIC_RAW = 4 |
| |
| |
| def print_help_with_msg(cmd: str, msg: str): |
| print('Error:', file=stderr) |
| print(' {}'.format(msg), file=stderr) |
| print('', file=stderr) |
| print('Usage:', file=stderr) |
| print(' {} PATH'.format(cmd), file=stderr) |
| print('', file=stderr) |
| print(' where', file=stderr) |
| print(' PATH: path to special file for Linux FireWire character device (/dev/fw[0-9]+)', |
| file=stderr) |
| |
| |
| def detect_fw_cdev(literal: str) -> Path: |
| path = Path(literal) |
| |
| if not path.exists(): |
| msg = '"{}" not exists'.format(path) |
| raise ValueError(msg) |
| |
| if not path.is_char_device(): |
| msg = '"{}" is not for special file of any character device'.format(path) |
| raise ValueError(msg) |
| |
| if path.name.find('fw') != 0: |
| msg = '"{}" is not for special file of Linux Firewire character device'.format(path) |
| raise ValueError(msg) |
| |
| return path |
| |
| |
| def print_transaction_result( |
| addr: int, |
| payload: list[int], |
| initiate_cycle: list[2], |
| sent_cycle: list[2], |
| recv_cycle: list[2], |
| finish_cycle: list[2], |
| ): |
| quadlet = unpack(">I", payload)[0] |
| |
| print("Read quadlet transaction:") |
| print(" addr 0x{:012x}, quadlet: 0x{:08x}".format(addr, quadlet)) |
| print( |
| " initiate at: {} sec {} cycle".format(initiate_cycle[0], initiate_cycle[1]) |
| ) |
| print(" sent at: {} sec {} cycle".format(sent_cycle[0], sent_cycle[1])) |
| print(" received at: {} sec {} cycle".format(recv_cycle[0], recv_cycle[1])) |
| print(" finish at: {} sec {} cycle".format(finish_cycle[0], finish_cycle[1])) |
| |
| |
| def print_generation_information(node: Hinawa.FwNode): |
| print(' Topology:') |
| print(' self: {:04x}'.format(node.get_property('node-id'))) |
| print(' local: {:04x}'.format(node.get_property('local-node-id'))) |
| print(' root: {:04x}'.format(node.get_property('root-node-id'))) |
| print(' bus-manager: {:04x}'.format(node.get_property('bus-manager-node-id'))) |
| print(' ir-manager: {:04x}'.format(node.get_property('ir-manager-node-id'))) |
| print(' generation: {}'.format(node.get_property('generation'))) |
| |
| |
| def print_fw_node_information(node: Hinawa.FwNode): |
| print('IEEE1394 node info:') |
| |
| print_generation_information(node) |
| |
| print(' Config ROM:') |
| _, image = node.get_config_rom() |
| quads = unpack('>{}I'.format(len(image) // 4), image) |
| for i, q in enumerate(quads): |
| print(' 0xfffff00004{:02x}: 0x{:08x}'.format(i * 4, q)) |
| |
| |
| def read_quadlet(node: Hinawa.FwNode, req: Hinawa.FwReq, addr: int) -> int: |
| cycle_time = Hinawa.CycleTime.new() |
| |
| try: |
| _, cycle_time = node.read_cycle_time(CLOCK_MONOTONIC_RAW, cycle_time) |
| except Exception as e: |
| print(e) |
| return 0 |
| initiate_cycle = cycle_time.get_fields()[:2] |
| |
| frame = [0] * 4 |
| try: |
| _, frame, tstamp = req.transaction_with_tstamp( |
| node, |
| Hinawa.FwTcode.READ_QUADLET_REQUEST, |
| addr, |
| len(frame), |
| frame, |
| 100 |
| ) |
| except Exception as e: |
| print(e) |
| |
| sent_cycle = cycle_time.compute_tstamp(tstamp[0]) |
| recv_cycle = cycle_time.compute_tstamp(tstamp[1]) |
| |
| try: |
| _, cycle_time = node.read_cycle_time(CLOCK_MONOTONIC_RAW, cycle_time) |
| except Exception as e: |
| print(e) |
| return 0 |
| finish_cycle = cycle_time.get_fields()[:2] |
| |
| quadlet = unpack('>I', frame)[0] |
| |
| print('Read quadlet transaction:') |
| print(' addr 0x{:012x}, quadlet: 0x{:08x}'.format(addr, quadlet)) |
| print(' initiate at: sec {} cycle {}'.format(initiate_cycle[0], initiate_cycle[1])) |
| print(' sent at: sec {} cycle {}'.format(sent_cycle[0], sent_cycle[1])) |
| print(' received at: sec {} cycle {}'.format(recv_cycle[0], recv_cycle[1])) |
| print(' finish at: sec {} cycle {}'.format(finish_cycle[0], finish_cycle[1])) |
| |
| return quadlet |
| |
| |
| @contextmanager |
| def run_dispatcher(node: Hinawa.FwNode): |
| ctx = GLib.MainContext.new() |
| _, src = node.create_source() |
| src.attach(ctx) |
| |
| dispatcher = GLib.MainLoop.new(ctx, False) |
| th = Thread(target=lambda d: d.run(), args=(dispatcher,)) |
| th.start() |
| |
| yield |
| |
| dispatcher.quit() |
| th.join() |
| |
| |
| def handle_bus_update(node: Hinawa.FwNode): |
| print('Event: bus-update:') |
| print_generation_information(node) |
| |
| |
| @contextmanager |
| def listen_bus_update(node: Hinawa.FwNode): |
| handler = node.connect('bus-update', handle_bus_update) |
| yield |
| node.disconnect(handler) |
| |
| |
| def print_frame(frame: list): |
| for i in range(len(frame)): |
| print(' [{:02d}]: 0x{:02x}'.format(i, frame[i])) |
| |
| |
| def handle_requested(resp: Hinawa.FwResp, tcode: Hinawa.FwRcode, offset: int, |
| src: int, dst: int, card: int, generation: int, tstamp: int, |
| frame: list, length: int, args: tuple[Hinawa.FwNode, Hinawa.CycleTime]): |
| node, cycle_time = args |
| print('Event requested: {0}'.format(tcode.value_nick)) |
| try: |
| _, cycle_time = node.read_cycle_time(CLOCK_MONOTONIC_RAW, cycle_time) |
| isoc_cycle = cycle_time.compute_tstamp(tstamp) |
| except Exception as e: |
| print(e) |
| isoc_cycle = [0] * 2 |
| pass |
| print_frame(frame) |
| print(' received at: sec {0[0]} cycle {0[1]}'.format(isoc_cycle)) |
| return Hinawa.FwRcode.COMPLETE |
| |
| |
| @contextmanager |
| def listen_region(node: Hinawa.FwNode): |
| resp = Hinawa.FwResp() |
| cycle_time = Hinawa.CycleTime.new() |
| handler = resp.connect('requested', handle_requested, (node, cycle_time)) |
| try: |
| _ = resp.reserve(node, 0xfffff0000d00, 0x100) |
| yield |
| except Exception as e: |
| print(e) |
| |
| resp.disconnect(handler) |
| resp.release() |
| |
| |
| def handle_responded(fcp: Hinawa.FwFcp, generation: int, tstamp: int, frame: list, |
| length: int, args: tuple[Hinawa.FwNode, Hinawa.CycleTime]): |
| node, cycle_time = args |
| print('Event responded: length {}'.format(length)) |
| try: |
| _, cycle_time = node.read_cycle_time(CLOCK_MONOTONIC_RAW, cycle_time) |
| isoc_cycle = cycle_time.compute_tstamp(tstamp) |
| except Exception as e: |
| print(e) |
| isoc_cycle = [0] * 2 |
| pass |
| print_frame(frame) |
| print(' received at: sec {0[0]} cycle {0[1]}'.format(isoc_cycle)) |
| |
| |
| @contextmanager |
| def listen_fcp(node: Hinawa.FwNode): |
| cycle_time = Hinawa.CycleTime.new() |
| |
| fcp = Hinawa.FwFcp() |
| handler = fcp.connect('responded', handle_responded, (node, cycle_time)) |
| try: |
| _ = fcp.bind(node) |
| |
| _, cycle_time = node.read_cycle_time(CLOCK_MONOTONIC_RAW, cycle_time) |
| initiate_cycle = cycle_time.get_fields()[:2] |
| |
| request = bytes([0x01, 0xff, 0x19, 0x00, 0xff, 0xff, 0xff, 0xff]) |
| _, response, tstamp = fcp.avc_transaction_with_tstamp(request, [0] * len(request), 100) |
| |
| req_sent_cycle = cycle_time.compute_tstamp(tstamp[0]) |
| req_responded_cycle = cycle_time.compute_tstamp(tstamp[1]) |
| resp_sent_cycle = cycle_time.compute_tstamp(tstamp[2]) |
| |
| _, cycle_time = node.read_cycle_time(CLOCK_MONOTONIC_RAW, cycle_time) |
| finish_cycle = cycle_time.get_fields()[:2] |
| |
| print('FCP request:') |
| print_frame(request) |
| print(' initiate at: sec {0[0]} cycle {0[1]}'.format(initiate_cycle)) |
| print(' sent at: sec {0[0]} cycle {0[1]}'.format(req_sent_cycle)) |
| print(' received at: sec {0[0]} cycle {0[1]}'.format(req_responded_cycle)) |
| |
| print('FCP response:') |
| print_frame(response) |
| print(' received at: sec {0[0]} cycle {0[1]}'.format(resp_sent_cycle)) |
| print(' finished at: sec {0[0]} cycle {0[1]}'.format(finish_cycle)) |
| |
| yield |
| except Exception as e: |
| print(e) |
| |
| fcp.disconnect(handler) |
| fcp.unbind() |
| |
| |
| @contextmanager |
| def listen_node_event(node: Hinawa.FwNode, path: Path): |
| root = Path.cwd().parents[-1] |
| sysfs_path = root.joinpath('sys', 'bus', 'firewire', 'devices', path.name, 'units') |
| |
| # Linux FireWire subsystem exports all of pairs of specifier_id and version in unit directory |
| # via sysfs, thus not need to parse the content of configuration ROM. |
| with sysfs_path.open('r') as f: |
| content = f.read() |
| |
| # The specifier_id for 1394TA is expected to express the device implements FCP. |
| has_fcp = content.find('0x00a02d') >= 0 |
| |
| if has_fcp: |
| with run_dispatcher(node), listen_bus_update(node), listen_region(node), listen_fcp(node): |
| yield |
| else: |
| with run_dispatcher(node), listen_bus_update(node): |
| yield |
| |
| |
| __all__ = ['print_help_with_msg', 'detect_fw_cdev', 'run_async_transaction', |
| 'dump_fw_node_information', 'listen_node_event'] |