| #!/usr/bin/python |
| |
| import gobject |
| |
| import sys |
| import dbus |
| import dbus.service |
| import dbus.mainloop.glib |
| import os.path |
| from optparse import OptionParser |
| |
| class Agent(dbus.service.Object): |
| def __init__(self, conn=None, obj_path=None, verbose=False): |
| dbus.service.Object.__init__(self, conn, obj_path) |
| self.verbose = verbose |
| |
| @dbus.service.method("org.openobex.Agent", |
| in_signature="o", out_signature="s") |
| def Request(self, path): |
| return "" |
| |
| @dbus.service.method("org.openobex.Agent", |
| in_signature="ot", out_signature="") |
| def Progress(self, path, transferred): |
| if self.verbose: |
| print "Transfer progress (%d bytes)" % (transferred) |
| return |
| |
| @dbus.service.method("org.openobex.Agent", |
| in_signature="o", out_signature="") |
| def Complete(self, path): |
| if self.verbose: |
| print "Transfer finished" |
| mainloop.quit() |
| |
| @dbus.service.method("org.openobex.Agent", |
| in_signature="os", out_signature="") |
| def Error(self, path, error): |
| print "Transfer finished with an error: %s" % (error) |
| mainloop.quit() |
| |
| @dbus.service.method("org.openobex.Agent", |
| in_signature="", out_signature="") |
| def Release(self): |
| mainloop.quit() |
| |
| |
| def parse_options(): |
| parser.add_option("-d", "--device", dest="device", |
| help="Device to connect", metavar="DEVICE") |
| parser.add_option("-c", "--chdir", dest="new_dir", |
| help="Change current directory to DIR", metavar="DIR") |
| parser.add_option("-l", "--list", action="store_true", dest="list_dir", |
| help="List the current directory") |
| parser.add_option("-g", "--get", dest="get_file", |
| help="Get FILE", metavar="FILE") |
| parser.add_option("-p", "--put", dest="put_file", |
| help="Put FILE", metavar="FILE") |
| parser.add_option("-y", "--copy", dest="copy_file", |
| help="Copy FILE", metavar="FILE") |
| parser.add_option("-m", "--move", dest="move_file", |
| help="Move FILE", metavar="FILE") |
| parser.add_option("-n", "--destname", dest="dest_file", |
| help="Destination FILE", metavar="FILE") |
| parser.add_option("-r", "--remove", dest="remove_file", |
| help="Remove FILE", metavar="FILE") |
| parser.add_option("-v", "--verbose", action="store_true", dest="verbose") |
| |
| return parser.parse_args() |
| |
| def error(err): |
| print err |
| |
| def void_reply(): |
| pass |
| |
| def change_folder(session, new_dir): |
| for node in new_dir.split("/"): |
| session.ChangeFolder(node) |
| |
| def list_folder(session): |
| for i in session.ListFolder(): |
| if i["Type"] == "folder": |
| print "%s/" % (i["Name"]) |
| else: |
| print "%s" % (i["Name"]) |
| |
| def put_file(session, filename): |
| session.PutFile(os.path.abspath(filename), |
| os.path.basename(filename), |
| reply_handler=void_reply, |
| error_handler=error) |
| |
| def get_file(session, filename): |
| session.GetFile(os.path.abspath(filename), |
| os.path.basename(filename), |
| reply_handler=void_reply, |
| error_handler=error) |
| |
| def remove_file(session, filename): |
| session.Delete(filename, |
| reply_handler=void_reply, |
| error_handler=error) |
| |
| def move_file(session, filename, destname): |
| session.MoveFile(filename, |
| destname, |
| reply_handler=void_reply, |
| error_handler=error) |
| |
| def copy_file(session, filename, destname): |
| session.CopyFile(filename, |
| destname, |
| reply_handler=void_reply, |
| error_handler=error) |
| |
| if __name__ == '__main__': |
| |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| |
| parser = OptionParser() |
| |
| (options, args) = parse_options() |
| |
| if not options.device: |
| parser.print_help() |
| sys.exit(0) |
| |
| bus = dbus.SessionBus() |
| mainloop = gobject.MainLoop() |
| |
| path = "/test/agent" |
| agent = Agent(bus, path, options.verbose) |
| |
| client = dbus.Interface(bus.get_object("org.openobex.client", "/"), |
| "org.openobex.Client") |
| |
| session_path = client.CreateSession({ "Destination": options.device, |
| "Target": "ftp"}) |
| |
| session = dbus.Interface(bus.get_object("org.openobex.client", session_path), |
| "org.openobex.Session") |
| |
| session.AssignAgent(path) |
| |
| ftp = dbus.Interface(bus.get_object("org.openobex.client", session_path), |
| "org.openobex.FileTransfer") |
| |
| if options.new_dir: |
| change_folder(ftp, options.new_dir) |
| |
| if options.list_dir: |
| list_folder(ftp) |
| |
| if options.get_file: |
| get_file(ftp, options.get_file) |
| |
| if options.put_file: |
| put_file(ftp, options.put_file) |
| |
| if options.move_file: |
| move_file(ftp, options.move_file, options.dest_file) |
| |
| if options.copy_file: |
| copy_file(ftp, options.copy_file, options.dest_file) |
| |
| if options.remove_file: |
| remove_file(ftp, options.remove_file) |
| |
| mainloop.run() |