blob: 064a81ae7e1dc785c201d9b10f5ab848b0c2f0da [file] [log] [blame]
#!/usr/bin/python
import gobject
import sys
import dbus
import dbus.service
import dbus.mainloop.glib
import os.path
from optparse import OptionParser
def parse_options():
parser.add_option("-d", "--device", dest="device",
help="Device to connect", metavar="DEVICE")
parser.add_option("-c", "--chdir", dest="new_dir",
help="Change current directory to DIR", metavar="DIR")
parser.add_option("-l", "--list", action="store_true", dest="list_dir",
help="List the current directory")
parser.add_option("-g", "--get", dest="get_file",
help="Get FILE", metavar="FILE")
parser.add_option("-p", "--put", dest="put_file",
help="Put FILE", metavar="FILE")
parser.add_option("-y", "--copy", dest="copy_file",
help="Copy FILE", metavar="FILE")
parser.add_option("-m", "--move", dest="move_file",
help="Move FILE", metavar="FILE")
parser.add_option("-n", "--destname", dest="dest_file",
help="Destination FILE", metavar="FILE")
parser.add_option("-r", "--remove", dest="remove_file",
help="Remove FILE", metavar="FILE")
parser.add_option("-v", "--verbose", action="store_true",
dest="verbose")
return parser.parse_args()
class FtpClient:
def __init__(self, session_path, verbose=False):
self.progress = 0
self.transfer_path = None
self.transfer_size = 0
self.verbose = verbose
bus = dbus.SessionBus()
obj = bus.get_object("org.bluez.obex.client", session_path)
self.session = dbus.Interface(obj, "org.bluez.obex.Session")
self.ftp = dbus.Interface(obj, "org.bluez.obex.FileTransfer")
bus.add_signal_receiver(self.transfer_complete,
dbus_interface="org.bluez.obex.Transfer",
signal_name="Complete",
path_keyword="path")
bus.add_signal_receiver(self.transfer_error,
dbus_interface="org.bluez.obex.Transfer",
signal_name="Error",
path_keyword="path")
if self.verbose:
bus.add_signal_receiver(self.transfer_progress,
dbus_interface="org.bluez.obex.Transfer",
signal_name="PropertyChanged",
path_keyword="path")
def create_transfer_reply(self, reply):
(path, properties) = reply
self.transfer_path = path
self.transfer_size = properties["Size"]
if self.verbose:
print "Transfer created: %s" % path
def generic_reply(self):
if self.verbose:
print "Operation succeeded"
def error(self, err):
print err
mainloop.quit()
def transfer_complete(self, path):
if path != self.transfer_path:
return
if self.verbose:
print "Transfer finished"
mainloop.quit()
def transfer_error(self, code, message, path):
if path != self.transfer_path:
return
print "Transfer finished with error %s: %s" % (code, message)
mainloop.quit()
def transfer_progress(self, prop, value, path):
if path != self.transfer_path:
return
if prop != "Progress":
return
speed = (value - self.progress) / 1000
print "Transfer progress %d/%d at %d kBps" % (value,
self.transfer_size,
speed)
self.progress = value
def change_folder(self, new_dir):
for node in new_dir.split("/"):
self.ftp.ChangeFolder(node)
def list_folder(self):
for i in self.ftp.ListFolder():
if i["Type"] == "folder":
print "%s/" % (i["Name"])
else:
print "%s" % (i["Name"])
def put_file(self, filename):
self.ftp.PutFile(os.path.abspath(filename),
os.path.basename(filename),
reply_handler=self.create_transfer_reply,
error_handler=self.error)
def get_file(self, filename):
self.ftp.GetFile(os.path.abspath(filename),
os.path.basename(filename),
reply_handler=self.create_transfer_reply,
error_handler=self.error)
def remove_file(self, filename):
self.ftp.Delete(filename,
reply_handler=self.generic_reply,
error_handler=self.error)
def move_file(self, filename, destname):
self.ftp.MoveFile(filename, destname,
reply_handler=self.generic_reply,
error_handler=self.error)
def copy_file(self, filename, destname):
self.ftp.CopyFile(filename, destname,
reply_handler=self.generic_reply,
error_handler=self.error)
if __name__ == '__main__':
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
parser = OptionParser()
(options, args) = parse_options()
if not options.device:
parser.print_help()
sys.exit(0)
bus = dbus.SessionBus()
mainloop = gobject.MainLoop()
client = dbus.Interface(bus.get_object("org.bluez.obex.client", "/"),
"org.bluez.obex.Client")
print "Creating Session"
path = client.CreateSession(options.device, { "Target": "ftp" })
ftp_client = FtpClient(path, options.verbose)
if options.new_dir:
ftp_client.change_folder(options.new_dir)
if options.list_dir:
ftp_client.list_folder()
if options.get_file:
ftp_client.get_file(options.get_file)
if options.put_file:
ftp_client.put_file(options.put_file)
if options.move_file:
ftp_client.move_file(options.move_file, options.dest_file)
if options.copy_file:
ftp_client.copy_file(options.copy_file, options.dest_file)
if options.remove_file:
ftp_client.remove_file(options.remove_file)
mainloop.run()