blob: 7dd54efdf9a99ac22a5a49c3c958be7b7a5f6749 [file] [log] [blame]
#!/usr/bin/python
import gobject
import sys
import os
import dbus
import dbus.service
import dbus.mainloop.glib
class Transfer:
def __init__(self, callback_func):
self.callback_func = callback_func
self.path = None
self.filename = None
class PbapClient:
def __init__(self, session_path):
self.transfers = 0
self.props = dict()
self.flush_func = None
bus = dbus.SessionBus()
obj = bus.get_object("org.bluez.obex.client", session_path)
self.session = dbus.Interface(obj, "org.bluez.obex.Session")
self.pbap = dbus.Interface(obj,
"org.bluez.obex.PhonebookAccess")
bus.add_signal_receiver(self.transfer_complete,
dbus_interface="org.bluez.obex.Transfer",
signal_name="Complete",
path_keyword="path")
bus.add_signal_receiver(self.transfer_error,
dbus_interface="org.bluez.obex.Transfer",
signal_name="Error",
path_keyword="path")
def register(self, reply, transfer):
(path, properties) = reply
transfer.path = path
transfer.filename = properties["Filename"]
self.props[path] = transfer
print "Transfer created: %s (file %s)" % (path,
transfer.filename)
def error(self, err):
print err
mainloop.quit()
def transfer_complete(self, path):
req = self.props.get(path)
if req == None:
return
self.transfers -= 1
print "Transfer %s finished" % path
f = open(req.filename, "r")
os.remove(req.filename)
lines = f.readlines()
del self.props[path]
req.callback_func(lines)
if (len(self.props) == 0) and (self.transfers == 0):
if self.flush_func != None:
f = self.flush_func
self.flush_func = None
f()
def transfer_error(self, code, message, path):
req = self.props.get(path)
if req == None:
return
print "Transfer finished with error %s: %s" % (code, message)
mainloop.quit()
def pull(self, vcard, params, func):
req = Transfer(func)
self.pbap.Pull(vcard, "", params,
reply_handler=lambda r: self.register(r, req),
error_handler=self.error)
self.transfers += 1
def pull_all(self, params, func):
req = Transfer(func)
self.pbap.PullAll("", params,
reply_handler=lambda r: self.register(r, req),
error_handler=self.error)
self.transfers += 1
def flush_transfers(self, func):
if (len(self.props) == 0) and (self.transfers == 0):
return
self.flush_func = func
def interface(self):
return self.pbap
if __name__ == '__main__':
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SessionBus()
mainloop = gobject.MainLoop()
client = dbus.Interface(bus.get_object("org.bluez.obex.client", "/"),
"org.bluez.obex.Client")
if (len(sys.argv) < 2):
print "Usage: %s <device>" % (sys.argv[0])
sys.exit(1)
print "Creating Session"
session_path = client.CreateSession(sys.argv[1], { "Target": "PBAP" })
pbap_client = PbapClient(session_path)
def process_result(lines, header):
if header != None:
print header
for line in lines:
print line,
print
def test_paths(paths):
if len(paths) == 0:
print
print "FINISHED"
mainloop.quit()
return
path = paths[0]
print "\n--- Select Phonebook %s ---\n" % (path)
pbap_client.interface().Select("int", path)
print "\n--- GetSize ---\n"
ret = pbap_client.interface().GetSize()
print "Size = %d\n" % (ret)
print "\n--- List vCard ---\n"
ret = pbap_client.interface().List(dbus.Dictionary())
params = dbus.Dictionary({ "Format" : "vcard30",
"Fields" : [ "VERSION", "FN", "TEL"] })
for item in ret:
print "%s : %s" % (item[0], item[1])
pbap_client.pull(item[0], params,
lambda x: process_result(x, None))
pbap_client.pull_all(params, lambda x: process_result(x,
"\n--- PullAll ---\n"))
pbap_client.flush_transfers(lambda: test_paths(paths[1:]))
test_paths(["PB", "ICH", "OCH", "MCH", "CCH"])
mainloop.run()