blob: d5c55c50786d6bda0a335abf9cd7e98b3f8a1a3b [file] [log] [blame]
# SPDX-License-Identifier: MIT
"""Kernel log analysis"""
import logging
import re
import os
import subprocess
from datetime import timedelta
from amd_debug.common import systemd_in_use, read_file, fatal_error
def get_kernel_command_line() -> str:
"""Get the kernel command line"""
cmdline = read_file(os.path.join("/proc", "cmdline"))
# borrowed from https://github.com/fwupd/fwupd/blob/1.9.5/libfwupdplugin/fu-common-linux.c#L95
filtered = [
"apparmor",
"audit",
"auto",
"bluetooth.disable_ertm",
"boot",
"BOOT_IMAGE",
"console",
"crashkernel",
"cryptdevice",
"cryptkey",
"dm",
"earlycon",
"earlyprintk",
"ether",
"init",
"initrd",
"ip",
"LANG",
"loglevel",
"luks.key",
"luks.name",
"luks.options",
"luks.uuid",
"mitigations",
"mount.usr",
"mount.usrflags",
"mount.usrfstype",
"netdev",
"netroot",
"nfsaddrs",
"nfs.nfs4_unique_id",
"nfsroot",
"noplymouth",
"nowatchdog",
"ostree",
"preempt",
"quiet",
"rd.dm.uuid",
"rd.luks.allow-discards",
"rd.luks.key",
"rd.luks.name",
"rd.luks.options",
"rd.luks.uuid",
"rd.lvm.lv",
"rd.lvm.vg",
"rd.md.uuid",
"rd.systemd.mask",
"rd.systemd.wants",
"resume",
"resumeflags",
"rhgb",
"ro",
"root",
"rootflags",
"rootfstype",
"roothash",
"rw",
"security",
"selinux",
"showopts",
"splash",
"swap",
"systemd.machine_id",
"systemd.mask",
"systemd.show_status",
"systemd.unit",
"systemd.verity_root_data",
"systemd.verity_root_hash",
"systemd.wants",
"udev.log_priority",
"verbose",
"vt.handoff",
"zfs",
"zswap.enabled",
]
# remove anything that starts with something in filtered from cmdline
return " ".join([x for x in cmdline.split() if not x.startswith(tuple(filtered))])
def sscanf_bios_args(line):
"""Extracts the format string and arguments from a BIOS trace line"""
if re.search(r"ex_trace_point", line):
return True
elif re.search(r"ex_trace_args", line):
parts = line.split(": ", 1)
if len(parts) < 2:
return None
t = parts[1].strip()
match = re.match(r'"(.*?)"(,.*)', t)
if match:
format_string = match.group(1).strip().replace("\\n", "")
args_part = match.group(2).strip(", ")
arguments = [arg.strip() for arg in args_part.split(",")]
format_specifiers = re.findall(r"%([xXdD])", format_string)
converted_args = []
arg_index = 0
for _specifier in format_specifiers:
if arg_index < len(arguments):
value = arguments[arg_index]
if value == "Unknown":
converted_args.append(-1)
else:
try:
converted_args.append(int(value, 16))
except ValueError:
return None
arg_index += 1
else:
break
try:
return format_string % tuple(converted_args)
except TypeError:
return None
else:
# If no format string is found, assume no format modifiers and return True
return True
# evmisc-0132 ev_queue_notify_reques: Dispatching Notify on [UBTC] (Device) Value 0x80 (Status Change) Node 00000000851b15c1
elif re.search(r"ev_queue_notify_reques", line):
parts = line.split(": ", 1)
if len(parts) < 2:
return None
return parts[1].split("Node")[0].strip()
return None
class KernelLogger:
"""Base class for kernel loggers"""
def __init__(self):
pass
def seek(self):
"""Seek to the beginning of the log"""
def seek_tail(self, tim=None):
"""Seek to the end of the log"""
def process_callback(self, callback, priority=None):
"""Process the log"""
def match_line(self, _matches) -> str:
"""Find lines that match all matches"""
return ""
def match_pattern(self, _pattern) -> str:
"""Find lines that match a pattern"""
return ""
class InputFile(KernelLogger):
"""Class for input file parsing"""
def __init__(self, fname):
self.since_support = False
self.buffer = ""
self.seeked = False
self.buffer = read_file(fname)
def process_callback(self, callback, priority=None):
"""Process the log"""
for entry in self.buffer.split("\n"):
callback(entry, priority)
class DmesgLogger(KernelLogger):
"""Class for dmesg logging"""
def __init__(self):
self.since_support = False
self.buffer = ""
self.seeked = False
cmd = ["dmesg", "-h"]
result = subprocess.run(cmd, check=True, capture_output=True)
for line in result.stdout.decode("utf-8").split("\n"):
if "--since" in line:
self.since_support = True
logging.debug("dmesg since support: %d", self.since_support)
self.command = ["dmesg", "-t", "-k"]
self._refresh_head()
def _refresh_head(self):
self.buffer = ""
self.seeked = False
result = subprocess.run(self.command, check=True, capture_output=True)
if result.returncode == 0:
self.buffer = result.stdout.decode("utf-8")
def seek(self):
"""Seek to the beginning of the log"""
if self.seeked:
self._refresh_head()
def seek_tail(self, tim=None):
"""Seek to the end of the log"""
if tim:
if self.since_support:
# look 10 seconds back because dmesg time isn't always accurate
fuzz = tim - timedelta(seconds=10)
cmd = self.command + [
"--time-format=iso",
f"--since={fuzz.strftime('%Y-%m-%dT%H:%M:%S')}",
]
else:
cmd = self.command
result = subprocess.run(cmd, check=True, capture_output=True)
if result.returncode == 0:
self.buffer = result.stdout.decode("utf-8")
if self.since_support:
self.seeked = True
def process_callback(self, callback, _priority=None):
"""Process the log"""
for entry in self.buffer.split("\n"):
callback(entry, _priority)
def match_line(self, matches):
"""Find lines that match all matches"""
for entry in self.buffer.split("\n"):
for match in matches:
if match not in entry:
break
return entry
return ""
def match_pattern(self, pattern) -> str:
for entry in self.buffer.split("\n"):
if re.search(pattern, entry):
return entry
return ""
def capture_header(self):
"""Capture the header of the log"""
return self.buffer.split("\n")[0]
class CySystemdLogger(KernelLogger):
"""Class for logging using systemd journal using cython"""
def __init__(self):
from cysystemd.reader import (
JournalReader,
JournalOpenMode,
Rule,
) # pylint: disable=import-outside-toplevel
boot_reader = JournalReader()
boot_reader.open(JournalOpenMode.SYSTEM)
boot_reader.seek_tail()
boot_reader.skip_previous(1)
current_boot_id = None
for entry in boot_reader:
if hasattr(entry, "data") and "_BOOT_ID" in entry.data:
current_boot_id = entry.data["_BOOT_ID"]
break
if not current_boot_id:
raise RuntimeError("Unable to find current boot ID")
rules = Rule("_BOOT_ID", current_boot_id) & Rule("_TRANSPORT", "kernel")
self.journal = JournalReader()
self.journal.open(JournalOpenMode.SYSTEM)
self.journal.add_filter(rules)
def seek(self):
"""Seek to the beginning of the log"""
self.journal.seek_head()
def seek_tail(self, tim=None):
"""Seek to the end of the log"""
if tim:
timestamp_usec = int(tim.timestamp() * 1_000_000)
self.journal.seek_realtime_usec(timestamp_usec)
else:
self.journal.seek_tail()
def process_callback(self, callback, _priority=None):
"""Process the log"""
for entry in self.journal:
callback(entry["MESSAGE"], entry["PRIORITY"])
def match_line(self, matches):
"""Find lines that match all matches"""
for entry in self.journal:
for match in matches:
if match not in entry["MESSAGE"]:
break
return entry["MESSAGE"]
return None
def match_pattern(self, pattern):
"""Find lines that match a pattern"""
for entry in self.journal:
if re.search(pattern, entry["MESSAGE"]):
return entry["MESSAGE"]
return None
class SystemdLogger(KernelLogger):
"""Class for logging using systemd journal"""
def __init__(self):
from systemd import journal # pylint: disable=import-outside-toplevel
self.journal = journal.Reader()
self.journal.this_boot()
self.journal.log_level(journal.LOG_INFO)
self.journal.add_match(_TRANSPORT="kernel")
self.journal.add_match(PRIORITY=journal.LOG_DEBUG)
def seek(self):
"""Seek to the beginning of the log"""
self.journal.seek_head()
def seek_tail(self, tim=None):
if tim:
self.journal.seek_realtime(tim)
else:
self.journal.seek_tail()
def process_callback(self, callback, _priority=None):
"""Process the log"""
for entry in self.journal:
callback(entry["MESSAGE"], entry["PRIORITY"])
def match_line(self, matches):
"""Find lines that match all matches"""
for entry in self.journal:
for match in matches:
if match not in entry["MESSAGE"]:
break
return entry["MESSAGE"]
return ""
def match_pattern(self, pattern):
"""Find lines that match a pattern"""
for entry in self.journal:
if re.search(pattern, entry["MESSAGE"]):
return entry["MESSAGE"]
return ""
def get_kernel_log(input_file=None) -> KernelLogger:
"""Get the kernel log provider"""
kernel_log = None
if input_file:
kernel_log = InputFile(input_file)
elif systemd_in_use():
try:
kernel_log = CySystemdLogger()
except ImportError:
kernel_log = None
except RuntimeError as e:
logging.debug(e)
kernel_log = None
if not kernel_log:
try:
kernel_log = SystemdLogger()
except ModuleNotFoundError:
pass
if not kernel_log:
try:
kernel_log = DmesgLogger()
except subprocess.CalledProcessError as e:
fatal_error(f"{e}")
kernel_log = KernelLogger()
logging.debug("Kernel log provider: %s", kernel_log.__class__.__name__)
return kernel_log