blob: 5dd95ae94e19f61c447b2085e79a6bb7b5b65a2c [file] [log] [blame]
# SPDX-License-Identifier: GPL-2.0-or-later
''' Module to obtain network information for the rteval report '''
#
# Copyright 2022 John Kacur <jkacur@redhat.com
# SPDX-License-Identifier: GPL-2.0-or-later
#
import os
import socket
import ipaddress
import sys
import libxml2
from rteval.Log import Log
def get_active_devices():
''' returns a list of active network devices, similar to ethtool '''
ret = []
for device in socket.if_nameindex():
ret.append(device[1])
return ret
def compress_iv6(addr):
''' inserts colons into an ipv6address and returns it in compressed form '''
retaddr = ''
# Insert colons into the number
for i in range(4,33,4):
if i == 32:
retaddr += addr[i-4:i]
else:
retaddr += addr[i-4:i] + ':'
addr = ipaddress.IPv6Network(retaddr)
retaddr = str(ipaddress.IPv6Address(retaddr))
return retaddr
def get_defaultgw():
''' return the ipv4address of the default gateway '''
defaultgw = None
with open('/proc/net/route') as f:
line = f.readline().strip()
while len(line) > 0:
(iface, dest, gateway, _, _, _, _, _, _, _, _) = line.split()
if iface == 'Iface':
line = f.readline().strip()
continue
if dest == '00000000' and gateway != '00000000':
addr = int(gateway, base=16)
defaultgw = str(ipaddress.IPv4Address(socket.ntohl(addr)))
return defaultgw
line = f.readline().strip()
return defaultgw
class IPv6Addresses():
''' Obtains a list of IPv6 addresses from the proc file system '''
def __init__(self):
self.data = {}
IPv6Addresses.load(self)
def __contains__(self, dev):
return dev in self.data
def __getitem__(self, dev):
return self.data.get(dev, None)
def __iter__(self):
return iter(self.data)
def load(self):
'''
Called by init to load the self.data dictionary with device keys
and a list of ipv6addresses
'''
MYP = '/proc/net/if_inet6'
try:
with open(MYP, 'r') as f:
mystr = f.readline().strip()
while len(mystr) > 0:
ipv6addr , _, _, _, _, intf = mystr.split()
ipv6addr = compress_iv6(ipv6addr)
if intf == 'lo':
mystr = f.readline().strip()
continue
if intf not in self.data:
self.data[intf] = [ipv6addr]
else:
self.data[intf].append(ipv6addr)
mystr = f.readline().strip()
# if IPv6 is disabled, the if_net6 files does not exist, so we can pass
except FileNotFoundError:
pass
class IPv4Addresses():
''' Obtains a list of IPv4 addresses from the proc file system '''
def __init__(self):
self.data = {}
IPv4Addresses.load(self)
def __contains__(self, dev):
return dev in self.data
def __getitem__(self, dev):
return self.data[dev]
def __iter__(self):
return iter(self.data)
def load(self):
'''
Called by init to load the self.data dictionary with
device keys, and value consisting of a list of
ipv4address, netmask and broadcast address
'''
MYP = '/proc/net/route'
with open(MYP, 'r') as f:
mystr = f.readline().strip()
while len(mystr) > 0:
intf, dest, _, _, _, _, _, mask, _, _, _ = mystr.split()
# Skip over the head of the table an the gateway line
if intf == "Iface" or dest == '00000000':
mystr = f.readline().strip()
continue
d1 = int(dest, base=16)
m1 = int(mask, base=16)
addr = str(ipaddress.IPv4Address(socket.ntohl(d1)))
netmask = str(ipaddress.IPv4Address(socket.ntohl(m1)))
addr_with_mask = ipaddress.ip_network(addr + '/' + netmask)
broadcast = str(addr_with_mask.broadcast_address)
if intf not in self.data:
self.data[intf] = [(addr, netmask, broadcast)]
else:
self.data[intf].append((addr, netmask, broadcast))
mystr = f.readline().strip()
class MacAddresses():
''' Obtains a list of hardware addresses of network devices '''
def __init__(self):
self.mac_address = {}
self.path = None
MacAddresses.load(self)
def load(self):
'''
called by init to load self.mac_address as a dictionary of
device keys, and mac or hardware addresses as values
'''
nics = get_active_devices()
for nic in nics:
self.path = f'/sys/class/net/{nic}'
hwaddr = MacAddresses.set_val(self, 'address')
self.mac_address[nic] = hwaddr
def set_val(self, val):
''' Return the result of reading self.path/val '''
val_path = f'{self.path}/{val}'
if os.path.exists(val_path):
with open(val_path, 'r') as f:
return f.readline().strip()
return None
def __contains__(self, dev):
return dev in self.mac_address
def __getitem__(self, dev):
return self.mac_address[dev]
def __iter__(self):
return iter(self.mac_address)
class NetworkInfo():
''' Creates an xml report of the network for rteval '''
def __init__(self, logger):
self.defgw4 = get_defaultgw()
self.__logger = logger
def MakeReport(self):
''' Make an xml report for rteval '''
ncfg_n = libxml2.newNode("NetworkConfig")
defgw4 = self.defgw4
mads = MacAddresses()
for device in mads:
if device == 'lo':
continue
intf_n = libxml2.newNode('interface')
intf_n.newProp('device', device)
intf_n.newProp('hwaddr', mads[device])
ncfg_n.addChild(intf_n)
ipv4ads = IPv4Addresses()
ipv6ads = IPv6Addresses()
for dev in ipv4ads:
if dev != device:
continue
for lelem in ipv4ads[dev]:
ipv4_n = libxml2.newNode('IPv4')
(ipaddr, netmask, broadcast) = lelem
ipv4_n.newProp('ipaddr', ipaddr)
ipv4_n.newProp('netmask', netmask)
ipv4_n.newProp('broadcast', broadcast)
ipv4_n.newProp('defaultgw', (defgw4 == ipaddr) and '1' or '0')
intf_n.addChild(ipv4_n)
if ipv6ads[dev]:
for lelem in ipv6ads[dev]:
ipv6_n = libxml2.newNode('IPv6')
ipaddr = lelem
ipv6_n.newProp('ipaddr', ipaddr)
intf_n.addChild(ipv6_n)
return ncfg_n
if __name__ == "__main__":
try:
log = Log()
log.SetLogVerbosity(Log.DEBUG|Log.INFO)
net = NetworkInfo(logger=log)
doc = libxml2.newDoc('1.0')
cfg = net.MakeReport()
doc.setRootElement(cfg)
doc.saveFormatFileEnc('-', 'UTF-8', 1)
except Exception as e:
import traceback
traceback.print_exc(file=sys.stdout)
print(f"** EXCEPTION {str(e)}")