blob: c90f8a12bcec18aad0cb4d20312d8655a3644d08 [file]
#!/usr/bin/env python3
# SPDX-License-Identifier: LGPL-2.1-or-later
import os
import signal
import sys
import dbus
import dbus.service
import dbus.mainloop.glib
import gi
gi.require_version("Gst", "1.0")
gi.require_version("GLib", "2.0")
from gi.repository import GLib, Gst
import bluezutils
mainloop = None
pipeline = None
seqnum: int = 0
def signal_handler(_sig, _frame):
print("Got interrupt")
mainloop.quit()
signal.signal(signal.SIGINT, signal_handler)
def usage():
print(f"Usage: simple-asha <remote addr> <audio file name> (optional volume 0-127)")
def start_playback(fd: int, omtu: int):
global mainloop, pipeline
pktsize = 161
if omtu < pktsize:
print("Weird mtu", omtu)
outdata = bytearray(pktsize)
Gst.init(None)
pipeline = Gst.parse_launch(
f"""
filesrc location="{sys.argv[2]}" ! decodebin !
audioconvert ! audioresample !
audiobuffersplit output-buffer-duration="20/1000" ! avenc_g722 !
appsink name=sink emit-signals=true
"""
)
def on_new_sample(sink):
global seqnum
sample = sink.emit("pull-sample")
buf = sample.get_buffer()
with buf.map(Gst.MapFlags.READ) as info:
pos = 0
if info.size != pktsize - 1:
print("Unexpected buffer size: ", info.size)
outdata[pos] = seqnum % 256
pos += 1
for byte in info.data:
outdata[pos] = byte
pos += 1
try:
n = os.write(fd, outdata)
if n != pktsize:
print("Wrote less than expected: ", n)
except:
return Gst.FlowReturn.ERROR
seqnum += 1
return Gst.FlowReturn.OK
sink = pipeline.get_by_name("sink")
sink.connect("new-sample", on_new_sample)
def bus_message(_bus, message, _data) -> bool:
typ = message.type
if typ == Gst.MessageType.EOS:
print("End of stream")
mainloop.quit()
elif typ == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"Pipeline error: {err} ({debug})")
mainloop.quit()
return True
bus = pipeline.get_bus()
bus.add_watch(GLib.PRIORITY_DEFAULT, bus_message, None)
pipeline.set_state(Gst.State.PLAYING)
if __name__ == "__main__":
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
mainloop = GLib.MainLoop()
bus = dbus.SystemBus()
if (len(sys.argv) == 3) or (len(sys.argv) == 4):
device = bluezutils.find_device(sys.argv[1])
if device is None:
print("Could not find device: ", sys.argv[1])
exit(255)
else:
usage()
sys.exit(255)
asha_object_path = device.object_path + "/asha"
print("Looking up ASHA object", asha_object_path)
asha = bus.get_object("org.bluez", asha_object_path)
print("Looking up endpoint properties for", asha.object_path)
props = asha.GetAll(
"org.bluez.MediaEndpoint1",
dbus_interface="org.freedesktop.DBus.Properties",
)
path = props["Transport"]
print("Trying to acquire", path)
transport = dbus.Interface(
bus.get_object("org.bluez", path),
"org.bluez.MediaTransport1",
)
# Keep default volume at 25%
volume = 32
if len(sys.argv) == 4:
volume = int(sys.argv[3])
if volume < 0 or volume > 127:
print("Volume must be between 0 (mute) and 127 (max)")
print("Setting initial volume to", volume)
transport.Set(
"org.bluez.MediaTransport1",
"Volume",
dbus.UInt16(volume, variant_level=1),
dbus_interface="org.freedesktop.DBus.Properties",
)
print("Acquiring transport")
(fd, imtu, omtu) = transport.Acquire()
print("Starting playback, hit Ctrl-C to stop")
start_playback(fd.take(), omtu)
mainloop.run()
pipeline.set_state(Gst.State.NULL)
transport.Release()