| #!/usr/bin/python |
| |
| import sys |
| import dbus |
| import gobject |
| import dbus.mainloop.glib |
| import os.path |
| from optparse import OptionParser |
| |
| def parse_options(): |
| parser.add_option("-d", "--device", dest="device", |
| help="Device to connect", metavar="DEVICE") |
| parser.add_option("-p", "--pull", dest="pull_to_file", |
| help="Pull vcard and store in FILE", metavar="FILE") |
| parser.add_option("-s", "--send", dest="send_file", |
| help="Send FILE", metavar="FILE") |
| parser.add_option("-v", "--verbose", action="store_true", |
| dest="verbose") |
| |
| return parser.parse_args() |
| |
| class OppClient: |
| def __init__(self, session_path, verbose=False): |
| self.progress = 0 |
| self.transfer_path = None |
| self.verbose = verbose |
| bus = dbus.SessionBus() |
| obj = bus.get_object("org.bluez.obex.client", session_path) |
| self.session = dbus.Interface(obj, "org.bluez.obex.Session") |
| self.opp = dbus.Interface(obj, "org.bluez.obex.ObjectPush") |
| bus.add_signal_receiver(self.transfer_complete, |
| dbus_interface="org.bluez.obex.Transfer", |
| signal_name="Complete", |
| path_keyword="path") |
| bus.add_signal_receiver(self.transfer_error, |
| dbus_interface="org.bluez.obex.Transfer", |
| signal_name="Error", |
| path_keyword="path") |
| if self.verbose: |
| bus.add_signal_receiver(self.transfer_progress, |
| dbus_interface="org.bluez.obex.Transfer", |
| signal_name="PropertyChanged", |
| path_keyword="path") |
| |
| def create_transfer_reply(self, reply): |
| (path, properties) = reply |
| self.transfer_path = path |
| self.transfer_size = properties["Size"] |
| if self.verbose: |
| print "Transfer created: %s" % path |
| |
| def error(self, err): |
| print err |
| mainloop.quit() |
| |
| def transfer_complete(self, path): |
| if path != self.transfer_path: |
| return |
| if self.verbose: |
| print "Transfer finished" |
| mainloop.quit() |
| |
| def transfer_error(self, code, message, path): |
| if path != self.transfer_path: |
| return |
| print "Transfer finished with error %s: %s" % (code, message) |
| mainloop.quit() |
| |
| def transfer_progress(self, prop, value, path): |
| if path != self.transfer_path: |
| return |
| |
| if prop != "Progress": |
| return |
| |
| speed = (value - self.progress) / 1000 |
| print "Transfer progress %d/%d at %d kBps" % (value, |
| self.transfer_size, |
| speed) |
| self.progress = value |
| |
| def pull_business_card(self, filename): |
| self.opp.PullBusinessCard(os.path.abspath(filename), |
| reply_handler=self.create_transfer_reply, |
| error_handler=self.error) |
| |
| def send_file(self, filename): |
| self.opp.SendFile(os.path.abspath(filename), |
| reply_handler=self.create_transfer_reply, |
| error_handler=self.error) |
| |
| if __name__ == '__main__': |
| |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| |
| parser = OptionParser() |
| |
| (options, args) = parse_options() |
| |
| if not options.device: |
| parser.print_help() |
| sys.exit(0) |
| |
| bus = dbus.SessionBus() |
| mainloop = gobject.MainLoop() |
| |
| client = dbus.Interface(bus.get_object("org.bluez.obex.client", "/"), |
| "org.bluez.obex.Client") |
| |
| print "Creating Session" |
| path = client.CreateSession(options.device, { "Target": "OPP" }) |
| |
| opp_client = OppClient(path, options.verbose) |
| |
| if options.pull_to_file: |
| opp_client.pull_business_card(options.pull_to_file) |
| |
| if options.send_file: |
| opp_client.send_file(options.send_file) |
| |
| mainloop.run() |