| #!/usr/bin/python |
| |
| from gi.repository import GLib |
| |
| import sys |
| |
| import dbus |
| import dbus.service |
| import dbus.mainloop.glib |
| |
| import traceback |
| |
| def extract_list(list): |
| val = "[" |
| for i in list: |
| val += " " + str(i) |
| val += " ]" |
| return val |
| |
| def extract_values(values): |
| val = "{" |
| for key in list(values.keys()): |
| val += " " + key + "=" |
| if key in ["PrefixLength"]: |
| val += "%s" % (int(values[key])) |
| else: |
| if key in ["Servers", "Excludes"]: |
| val += extract_list(values[key]) |
| else: |
| val += str(values[key]) |
| val += " }" |
| return val |
| |
| class Notification(dbus.service.Object): |
| def __init__(self, bus, app, notify_path): |
| dbus.service.Object.__init__(self) |
| self.app = app |
| |
| @dbus.service.method("net.connman.Notification", |
| in_signature='', out_signature='') |
| def Release(self): |
| print("Release %s" % (self._object_path)) |
| session_name = self._object_path.split('/')[-1] |
| self.app.release(session_name) |
| |
| @dbus.service.method("net.connman.Notification", |
| in_signature='a{sv}', out_signature='') |
| def Update(self, settings): |
| print("Update called at %s" % (self._object_path)) |
| |
| try: |
| for key in list(settings.keys()): |
| if key in ["IPv4", "IPv6"]: |
| val = extract_values(settings[key]) |
| elif key in ["AllowedBearers"]: |
| val = extract_list(settings[key]) |
| else: |
| val = settings[key] |
| print(" %s = %s" % (key, val)) |
| except: |
| print("Exception:") |
| traceback.print_exc() |
| |
| class SessionApplication(dbus.service.Object): |
| def __init__(self, bus, object_path, mainloop): |
| dbus.service.Object.__init__(self, bus, object_path) |
| |
| self.manager = None |
| self.mainloop = mainloop |
| self.sessions = {} |
| |
| try: |
| bus = dbus.SystemBus() |
| bus.watch_name_owner('net.connman', self.connman_name_owner_changed) |
| except dbus.DBusException: |
| traceback.print_exc() |
| |
| def connman_name_owner_changed(self, proxy): |
| try: |
| if proxy: |
| print("connman appeared on D-Bus ", str(proxy)) |
| |
| bus = dbus.SystemBus() |
| self.manager = dbus.Interface(bus.get_object("net.connman", "/"), |
| "net.connman.Manager") |
| else: |
| print("connman disappeared on D-Bus") |
| self.manager = None |
| for s in list(self.sessions.keys()): |
| self.sessions[s]['notify'].remove_from_connection() |
| self.sessions[s]['notify'] = None |
| |
| self.sessions = {} |
| |
| except dbus.DBusException: |
| traceback.print_exc() |
| |
| def release(self, session_name): |
| s = self.find_session(session_name) |
| if not s: |
| return |
| if s['session']: |
| s['session'].Destroy() |
| s['session'] = None |
| if s['notify']: |
| s['notify'].remove_from_connection() |
| s['notify'] = None |
| del self.sessions[session_name] |
| |
| def type_convert(self, key, value): |
| if key in [ "AllowedBearers" ]: |
| return value |
| elif key in [ "RoamingPolicy", "ConnectionType" ]: |
| if len(value) > 0: |
| return value[0] |
| elif key in [ "Priority", "AvoidHandover", |
| "StayConnected", "EmergencyCall" ]: |
| flag = str(value[0]).strip().lower() |
| val = flag not in ['false', 'f', 'n', '0'] |
| return dbus.Boolean(val) |
| elif key in [ "PeriodicConnect", "IdleTimeout" ]: |
| val = value[0] |
| return dbus.UInt32(val) |
| |
| return value |
| |
| def find_session(self, session_name): |
| if not session_name in list(self.sessions.keys()): |
| return None |
| return self.sessions[session_name] |
| |
| @dbus.service.method("com.example.TestSession", |
| in_signature='', out_signature='') |
| def CreateSession(self, session_name): |
| print("Create session") |
| |
| s = self.find_session(session_name) |
| if s and s['session'] : |
| print("Session %s already created-> drop request" % (session_name)) |
| return |
| |
| try: |
| bus = dbus.SystemBus() |
| |
| if s == None: |
| s = {} |
| s['notify_path'] = self._object_path + "/" + session_name |
| s['notify'] = Notification(bus, self, s['notify_path']) |
| s['notify'].add_to_connection(bus, s['notify_path']) |
| if not 'settings' in list(s.keys()): |
| s['settings'] = {}; |
| s['session_path'] = self.manager.CreateSession(s['settings'], s['notify_path']) |
| print("notify path %s" % (s['notify_path'])) |
| print("session path %s" % (s['session_path'])) |
| s['session'] = dbus.Interface(bus.get_object("net.connman", s['session_path']), |
| "net.connman.Session") |
| self.sessions[session_name] = s |
| |
| except dbus.DBusException as e: |
| if e.get_dbus_name() in ['net.connman.Error.Failed']: |
| print(e.get_dbus_message()) |
| return |
| traceback.print_exc() |
| |
| @dbus.service.method("com.example.TestSession", |
| in_signature='', out_signature='') |
| def DestroySession(self, session_name): |
| print("Destroy session") |
| |
| s = self.find_session(session_name) |
| if s == None or s['session'] == None: |
| print("The session is not running -> drop request") |
| return |
| |
| try: |
| self.release(session_name) |
| except dbus.DBusException: |
| traceback.print_exc() |
| |
| @dbus.service.method("com.example.TestSession", |
| in_signature='', out_signature='') |
| def Connect(self, session_name): |
| print("Connect session") |
| |
| s = self.find_session(session_name) |
| if s == None or s['session'] == None: |
| print("The session is not running -> drop request") |
| return |
| |
| try: |
| s['session'].Connect() |
| except dbus.DBusException as e: |
| if e.get_dbus_name() in ['net.connman.Error.Failed']: |
| print(e.get_dbus_message()) |
| return |
| traceback.print_exc() |
| |
| @dbus.service.method("com.example.TestSession", |
| in_signature='', out_signature='') |
| def Disconnect(self, session_name): |
| print("Disconnect session") |
| |
| s = self.find_session(session_name) |
| if s == None or s['session'] == None: |
| print("The session is not running -> drop request") |
| return |
| |
| try: |
| s['session'].Disconnect() |
| except dbus.DBusException as e: |
| if e.get_dbus_name() in ['net.connman.Error.Failed']: |
| print(e.get_dbus_message()) |
| return |
| traceback.print_exc() |
| |
| @dbus.service.method("com.example.TestSession", |
| in_signature='', out_signature='') |
| def Change(self, session_name, key, value): |
| print("Update session settings") |
| |
| s = self.find_session(session_name) |
| if s == None or s['session'] == None: |
| print("The session is not running -> drop request") |
| return |
| |
| try: |
| val = self.type_convert(key, value) |
| s['session'].Change(key, val) |
| except dbus.DBusException as e: |
| if e.get_dbus_name() in ['net.connman.Error.Failed']: |
| print(e.get_dbus_message()) |
| return |
| traceback.print_exc() |
| |
| @dbus.service.method("com.example.TestSession", |
| in_signature='', out_signature='') |
| def Configure(self, session_name, key, value): |
| print("Configure session settings") |
| s = self.find_session(session_name) |
| if s == None: |
| s = {} |
| s['notify_path'] = None |
| s['notify'] = None |
| if not 'settings' in list(s.keys()): |
| s['settings'] = {}; |
| s['session_path'] = None |
| s['session'] = None |
| self.sessions[session_name] = s |
| if s and s['session']: |
| print("The session is running, use change -> drop request") |
| return |
| val = self.type_convert(key, value) |
| s['settings'][key] = val |
| |
| def main(): |
| if len(sys.argv) < 2: |
| print("Usage: %s <command>" % (sys.argv[0])) |
| print("") |
| print(" enable") |
| print(" disable") |
| print(" create <app_path> <session_name>") |
| print(" destroy <app_path> <session_name>") |
| print(" connect <app_path> <session_name>") |
| print(" disconnect <app_path> <session_name>") |
| print(" change <app_path> <session_name> <key> <value>") |
| print(" configure <app_path> <session_name> <key> <value>") |
| print("") |
| print(" run <app_path>") |
| sys.exit(1) |
| |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| |
| if sys.argv[1] == "enable": |
| bus = dbus.SystemBus() |
| manager = dbus.Interface(bus.get_object("net.connman", "/"), |
| "net.connman.Manager") |
| manager.SetProperty("SessionMode", True) |
| return |
| |
| elif sys.argv[1] == "disable": |
| bus = dbus.SystemBus() |
| manager = dbus.Interface(bus.get_object("net.connman", "/"), |
| "net.connman.Manager") |
| manager.SetProperty("SessionMode", False) |
| return |
| |
| if (len(sys.argv) < 3): |
| print("Need test application path") |
| sys.exit(1) |
| |
| app_path = sys.argv[2] |
| bus = dbus.SessionBus() |
| |
| app_name = "com.example.SessionApplication.%s" % (str.strip(app_path, "/")) |
| |
| if sys.argv[1] == "run": |
| name = dbus.service.BusName(app_name, bus) |
| mainloop = GLib.MainLoop() |
| |
| app = SessionApplication(bus, app_path, mainloop) |
| |
| mainloop.run() |
| return |
| |
| app = dbus.Interface(bus.get_object(app_name, app_path), |
| "com.example.TestSession") |
| |
| if sys.argv[1] == "create": |
| app.CreateSession(sys.argv[3]) |
| |
| elif sys.argv[1] == "destroy": |
| app.DestroySession(sys.argv[3]) |
| |
| elif sys.argv[1] == "connect": |
| app.Connect(sys.argv[3]) |
| |
| elif sys.argv[1] == "disconnect": |
| app.Disconnect(sys.argv[3]) |
| |
| elif sys.argv[1] == "change": |
| if len(sys.argv) < 5: |
| print("Arguments missing") |
| sys.exit(1) |
| |
| app.Change(sys.argv[3], sys.argv[4], sys.argv[5:]) |
| |
| elif sys.argv[1] == "configure": |
| if len(sys.argv) < 5: |
| print("Arguments missing") |
| sys.exit(1) |
| |
| app.Configure(sys.argv[3], sys.argv[4], sys.argv[5:]) |
| |
| else: |
| print("Unknown command '%s'" % sys.argv[1]) |
| sys.exit(1) |
| |
| if __name__ == '__main__': |
| main() |