blob: 9bc038dcd8cc236045cc8972e7cc047b5ded7bd1 [file] [log] [blame]
#!/usr/bin/python
import gobject
import sys
import dbus
import dbus.service
import dbus.mainloop.glib
import os.path
from optparse import OptionParser
class Agent(dbus.service.Object):
def __init__(self, conn=None, obj_path=None, verbose=False):
dbus.service.Object.__init__(self, conn, obj_path)
self.verbose = verbose
@dbus.service.method("org.openobex.Agent",
in_signature="o", out_signature="s")
def Request(self, path):
return ""
@dbus.service.method("org.openobex.Agent",
in_signature="ot", out_signature="")
def Progress(self, path, transferred):
if self.verbose:
print "Transfer progress (%d bytes)" % (transferred)
return
@dbus.service.method("org.openobex.Agent",
in_signature="o", out_signature="")
def Complete(self, path):
if self.verbose:
print "Transfer finished"
mainloop.quit()
@dbus.service.method("org.openobex.Agent",
in_signature="os", out_signature="")
def Error(self, path, error):
print "Transfer finished with an error: %s" % (error)
mainloop.quit()
@dbus.service.method("org.openobex.Agent",
in_signature="", out_signature="")
def Release(self):
mainloop.quit()
def parse_options():
parser.add_option("-d", "--device", dest="device",
help="Device to connect", metavar="DEVICE")
parser.add_option("-c", "--chdir", dest="new_dir",
help="Change current directory to DIR", metavar="DIR")
parser.add_option("-l", "--list", action="store_true", dest="list_dir",
help="List the current directory")
parser.add_option("-g", "--get", dest="get_file",
help="Get FILE", metavar="FILE")
parser.add_option("-p", "--put", dest="put_file",
help="Put FILE", metavar="FILE")
parser.add_option("-y", "--copy", dest="copy_file",
help="Copy FILE", metavar="FILE")
parser.add_option("-m", "--move", dest="move_file",
help="Move FILE", metavar="FILE")
parser.add_option("-n", "--destname", dest="dest_file",
help="Destination FILE", metavar="FILE")
parser.add_option("-r", "--remove", dest="remove_file",
help="Remove FILE", metavar="FILE")
parser.add_option("-v", "--verbose", action="store_true", dest="verbose")
return parser.parse_args()
def error(err):
print err
def void_reply():
pass
def change_folder(session, new_dir):
for node in new_dir.split("/"):
session.ChangeFolder(node)
def list_folder(session):
for i in session.ListFolder():
if i["Type"] == "folder":
print "%s/" % (i["Name"])
else:
print "%s" % (i["Name"])
def put_file(session, filename):
session.PutFile(os.path.abspath(filename),
os.path.basename(filename),
reply_handler=void_reply,
error_handler=error)
def get_file(session, filename):
session.GetFile(os.path.abspath(filename),
os.path.basename(filename),
reply_handler=void_reply,
error_handler=error)
def remove_file(session, filename):
session.Delete(filename,
reply_handler=void_reply,
error_handler=error)
def move_file(session, filename, destname):
session.MoveFile(filename,
destname,
reply_handler=void_reply,
error_handler=error)
def copy_file(session, filename, destname):
session.CopyFile(filename,
destname,
reply_handler=void_reply,
error_handler=error)
if __name__ == '__main__':
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
parser = OptionParser()
(options, args) = parse_options()
if not options.device:
parser.print_help()
sys.exit(0)
bus = dbus.SessionBus()
mainloop = gobject.MainLoop()
path = "/test/agent"
agent = Agent(bus, path, options.verbose)
client = dbus.Interface(bus.get_object("org.openobex.client", "/"),
"org.openobex.Client")
session_path = client.CreateSession({ "Destination": options.device,
"Target": "ftp"})
session = dbus.Interface(bus.get_object("org.openobex.client", session_path),
"org.openobex.Session")
session.AssignAgent(path)
ftp = dbus.Interface(bus.get_object("org.openobex.client", session_path),
"org.openobex.FileTransfer")
if options.new_dir:
change_folder(ftp, options.new_dir)
if options.list_dir:
list_folder(ftp)
if options.get_file:
get_file(ftp, options.get_file)
if options.put_file:
put_file(ftp, options.put_file)
if options.move_file:
move_file(ftp, options.move_file, options.dest_file)
if options.copy_file:
copy_file(ftp, options.copy_file, options.dest_file)
if options.remove_file:
remove_file(ftp, options.remove_file)
mainloop.run()