blob: 1ac4c17988567b580187d6faabc83ce78dbc4b0d [file] [log] [blame]
#!/usr/bin/python
import gobject
import sys
import dbus
import dbus.service
import dbus.mainloop.glib
class Agent(dbus.service.Object):
def __init__(self, conn=None, obj_path=None):
dbus.service.Object.__init__(self, conn, obj_path)
self.pending_auth = False
@dbus.service.method("org.openobex.Agent",
in_signature="osssii", out_signature="s")
def Authorize(self, dpath, device, filename, ftype, length, time):
global transfers
self.pending_auth = True
print "Authorize (%s, %s, %s) Y/n" % (path, device, filename)
auth = raw_input().strip("\n ")
if auth == "n" or auth == "N":
self.pending_auth = False
raise dbus.DBusException("org.openobex.Error.Rejected: "
"Not Autorized")
print "Full filename (including path):"
self.pending_auth = False
transfers.append(Transfer(dpath, filename, 0, length))
return raw_input().strip("\n ")
@dbus.service.method("org.openobex.Agent",
in_signature="", out_signature="")
def Cancel(self):
print "Authorization Canceled"
self.pending_auth = False
class Transfer(object):
def __init__(self, dpath, filename=None, transfered=-1, size=-1):
self.dpath = dpath
self.filename = filename
self.transfered = transfered
self.size = size
def update(self, filename=None, transfered=-1, total=-1):
if filename:
self.filename = filename
self.transfered = transfered
self.size = total
def cancel(self):
transfer_iface = dbus.Interface(bus.get_object("org.openobex",
self.dpath),
"org.openobex.Transfer")
transfer_iface.Cancel()
def __str__(self):
p = float(self.transfered) / float(self.size) * 100
return "%s (%s) (%.2f%%)" % (self.filename, self.dpath, p)
__repr__ = __str__
if __name__ == '__main__':
global transfers
def new_transfer(dpath):
print "new transfer"
bus.add_signal_receiver(progress,
dbus_interface="org.openobex.Transfer",
signal_name="Progress",
path_keyword="dpath")
def transfer_completed(dpath, success):
global transfers
print "\ntransfer completed => %s" % (success and "Success" or "Fail")
transfers = [t for t in transfers if t.dpath != dpath]
def progress(total, current, dpath):
s = []
for t in transfers:
if t.dpath == dpath:
t.update(None, current, total)
s.append("%s" % (t))
sys.stdout.write(" ".join(s) + "\r")
sys.stdout.flush()
transfers = []
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SessionBus()
manager = dbus.Interface(bus.get_object("org.openobex", "/"),
"org.openobex.Manager")
bus.add_signal_receiver(new_transfer,
dbus_interface="org.openobex.Manager",
signal_name="TransferStarted")
bus.add_signal_receiver(transfer_completed,
dbus_interface="org.openobex.Manager",
signal_name="TransferCompleted")
path = "/test/agent"
agent = Agent(bus, path)
mainloop = gobject.MainLoop()
manager.RegisterAgent(path)
print "Agent registered"
cont = True
while cont:
try:
mainloop.run()
except KeyboardInterrupt:
if agent.pending_auth:
agent.Cancel()
elif len(transfers) > 0:
for a in transfers:
a.cancel()
else:
cont = False
# manager.UnregisterAgent(path)
# print "Agent unregistered"