blob: 4e00b77f200a535f310f3b578bf3e261811db8ad [file] [log] [blame]
#!/usr/bin/env python3
#
# This script will check random content published on www.kernel.org/pub against
# authorized signatures to identify when corruption or substitution happens. The name
# comes from the Russian word /proveryat/, meaning "to verify".
#
# The script it supposed to be fire-and-forget, running in a screen session, as
# a background task, or as a systemd service, with reports sent to admin@kernel.org.
#
# E.g. (after you play with it to verify that it's doing the right thing):
# ./sig-prover -c sig-prover.conf -q &
#
# CAUTION:
# This script is not a guaranteed mechanism to detect intrusion -- an
# attacker can defeat it by analyzing access patterns/IPs and serving
# different content when it suspects that someone is running an automated
# signature verification check. The script can probably be improved by
# adding random delays between retrieving the tarball and the detached
# signature, setting a referrer value, etc. However, even with added
# measures, it will always act fairly predictably, so there will always
# remain a way to detect and defeat it.
#
# If you download tarballs from kernel.org for any purpose, you should
# always run your own verification on each downloaded file.
# https://www.kernel.org/signature.html
#
# SPDX-License-Identifier: GPL-2.0-or-later
#
# -*- coding: utf-8 -*-
#
__author__ = 'Konstantin Ryabitsev <konstantin@linuxfoundation.org>'
import sys
import os
import logging
import argparse
import requests
import random
import subprocess
import tempfile
import re
import time
import json
import email
import email.message
import email.utils
import smtplib
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
logger = logging.getLogger(__name__)
REQSESSION = None
GPGBIN = '/usr/bin/gpg'
SEEN = dict()
__VERSION__ = '0.1'
def get_requests_session(useragent=None):
global REQSESSION
if REQSESSION is None:
REQSESSION = requests.session()
retry = Retry(connect=3, backoff_factor=1)
adapter = HTTPAdapter(max_retries=retry)
REQSESSION.mount('http://', adapter)
REQSESSION.mount('https://', adapter)
if useragent is None:
useragent = f'Sig-Prover/{__VERSION__}'
headers = {
'User-Agent': useragent,
}
REQSESSION.headers.update(headers)
return REQSESSION
def get_random_target(config, rsect):
global SEEN
if rsect not in SEEN:
SEEN[rsect] = set()
ua = config[rsect].get('useragent')
if ua:
ua = random.choice(ua.split('\n'))
rses = get_requests_session(useragent=ua)
candidates = list()
# Is it a releases.json, or a collection of hosts and paths?
jurl = config[rsect].get('json')
if jurl:
logger.info(' retrieving %s', jurl)
resp = rses.get(jurl)
resp.raise_for_status()
rels = json.loads(resp.content)
for release in rels['releases']:
if not release['pgp']:
continue
candidate = release['source']
# Do we define hosts?
hosts = config[rsect].get('hosts')
if hosts and candidate.find('https://cdn') == 0:
# Swap in the CDN URL with an actual host URL, as it doesn't
# really make sense to check things over cdn cache which we don't
# control and can't do anything about.
for rhost in config[rsect].get('hosts').split('\n'):
hostcand = candidate.replace('https://cdn.kernel.org', rhost)
if hostcand not in SEEN[rsect]:
candidate = hostcand
break
if candidate in SEEN[rsect]:
logger.debug('Already checked %s in this session', candidate)
continue
candidates.append(candidate)
else:
# Grab a random host
rhost = random.choice(config[rsect].get('hosts').split('\n'))
# Grab a random path
rpath = random.choice(config[rsect].get('paths').split('\n'))
rurl = rhost + rpath
# Now we grab the sha256sums.txt file from there
shapath = rurl + 'sha256sums.asc'
logger.info(' retrieving %s', shapath)
resp = rses.get(shapath)
resp.raise_for_status()
keyring = os.path.join(config[rsect].get('keyringdir'), config[rsect].get('dirsigner_keyring'))
logger.info(' verifying with %s', keyring)
gpgargs = ['--verify', '--status-fd=2', '-']
ecode, out, err = gpg_run_command(gpgargs, keyring, stdin=resp.content)
if ecode == 0:
good, valid, created, errors = validate_gpg_signature(err.decode())
if good and valid:
logger.info(' checksums signature is good and valid (created: %s)', created)
else:
errors = err.decode().split('\n')
if errors:
report_badness(config[rsect], shapath, errors)
rmask = random.choice(config[rsect].get('masks').split('\n'))
for line in resp.content.split(b'\n'):
if re.search(rmask.encode(), line):
filen = line.split()[1].decode()
candidate = rurl + filen
if candidate in SEEN[rsect]:
logger.debug('Already checked %s in this session', candidate)
continue
candidates.append(rurl + filen)
if not candidates:
logger.debug('Already tried all possible choices for %s', rsect)
candidates = list(SEEN[rsect])
SEEN[rsect] = set()
if not candidates:
logger.info('No suitable candidates found for %s', rsect)
return None
candidate = random.choice(candidates)
SEEN[rsect].add(candidate)
return candidate
def _run_command(cmdargs, stdin=None):
logger.debug('Running %s' % ' '.join(cmdargs))
sp = subprocess.Popen(cmdargs,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE)
(output, error) = sp.communicate(input=stdin)
return sp.returncode, output, error
def gpg_run_command(args, keyring, stdin=None):
cmdargs = [GPGBIN, '--batch', '--no-auto-key-retrieve', '--no-auto-check-trustdb', '--no-default-keyring',
'--keyring', keyring]
cmdargs += args
return _run_command(cmdargs, stdin=stdin)
def validate_gpg_signature(output):
good = False
valid = False
created = None
errors = set()
gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M)
if gs_matches:
logger.debug(' GOODSIG')
good = True
keyid = gs_matches.groups()[0]
vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M)
if vs_matches:
logger.debug(' VALIDSIG')
valid = True
created = vs_matches.groups()[1]
else:
errors.add('Signature not valid from key: %s' % keyid)
else:
# Are we missing a key?
matches = re.search(r'^\[GNUPG:] NO_PUBKEY ([0-9A-F]+)$', output, re.M)
if matches:
errors.add('Missing public key: %s' % matches.groups()[0])
# Is the key expired?
matches = re.search(r'^\[GNUPG:] EXPKEYSIG (.*)$', output, re.M)
if matches:
errors.add('Expired key: %s' % matches.groups()[0])
return good, valid, created, errors
def report_badness(config, furl, errors):
if not config.get('notify'):
logger.critical('ERROR: failed verifying: %s', furl)
for entry in errors:
logger.critical(' %s', entry)
logger.debug('WARNING: notify not set, not sending a mail report')
sys.exit(1)
logger.info('ERROR: failed verifying: %s', furl)
msg = email.message.Message()
# Set to and cc
msg['To'] = config.get('notify')
targets = [msg['To']]
ccs = config.get('notify_cc', '')
if ccs:
msg['Cc'] = ccs
targets += [x.strip() for x in ccs.split(',')]
msg['Subject'] = f'SIGFAIL: {furl}'
msg['From'] = config.get('mailfrom', 'devnull@kernel.org')
msg['Message-Id'] = email.utils.make_msgid('sig-prover')
msg['Date'] = email.utils.formatdate(localtime=True)
body = list()
body.append('Hello:')
body.append('')
body.append('The following URL failed signature verification:')
body.append(f' {furl}')
body.append('')
body.append('Errors:')
for error in errors:
body.append(f' {error}')
msg.set_payload('\r\n'.join(body))
logger.debug('Message follows')
logger.debug(msg.as_string())
mailhost = config.get('mailhost', 'localhost')
try:
server = smtplib.SMTP(mailhost)
if config.getboolean('mailtls'):
server.starttls()
muser = config.get('mailuser')
mpass = config.get('mailpass')
if muser and mpass:
server.login(muser, mpass)
logger.info('Sending mail to %s', ', '.join(targets))
server.sendmail(msg['From'], targets, msg.as_string())
server.close()
except Exception as ex: # noqa
logger.critical('Unable to send mail to %s', ', '.join(targets))
logger.critical('Attempting to use %s returned:', mailhost)
logger.critical(ex)
def verify_tarball(config, turl):
# Try the exact filename + .sign first
signurl = turl + '.sign'
rses = get_requests_session()
resp = rses.get(signurl)
zext = None
zbin = None
if resp.status_code > 200:
# Try dropping the last .foo and trying again
parts = turl.rsplit('.', 1)
signurl = parts[0] + '.sign'
zext = parts[1]
# Are we capable of dealing with zext?
zbin = config.get(f'un{zext}')
if not zbin:
logger.critical('Not aware of how to deal with %s compression', zext)
sys.exit(1)
logger.debug('Will use %s for uncompression', zbin)
resp = rses.get(signurl)
resp.raise_for_status()
logger.info(' retrieving %s', signurl)
with tempfile.TemporaryDirectory(suffix='.sig-prover', dir=config.get('tempdir', '/tmp')) as td:
signfile = os.path.join(td, 'content.sig')
with open(signfile, 'wb') as sfh:
sfh.write(resp.content)
resp.close()
logger.info(' retrieving %s', turl)
resp = rses.get(turl, stream=True)
resp.raise_for_status()
contentfile = os.path.join(td, 'content')
if zext:
contentfile = f'{contentfile}.{zext}'
with open(contentfile, 'wb') as cfh:
for chunk in resp.iter_content(chunk_size=8192):
cfh.write(chunk)
resp.close()
if zext:
logger.info(' uncompressing %s', zext)
cmdargs = [zbin, contentfile]
ecode, out, err = _run_command(cmdargs)
if ecode > 0:
logger.critical('Error uncompressing %s', turl)
sys.exit(1)
contentfile = os.path.join(td, 'content')
gpgargs = ['--verify', '--status-fd=2', signfile, contentfile]
keyring = os.path.join(config.get('keyringdir'), config.get('keyring'))
logger.info(' verifying with %s', keyring)
ecode, out, err = gpg_run_command(gpgargs, keyring=keyring)
if ecode == 0:
good, valid, created, errors = validate_gpg_signature(err.decode())
if good and valid:
logger.info(' signature is good and valid (created: %s)', created)
return
else:
errors = err.decode().split('\n')
report_badness(config, turl, errors)
def get_random_sect(config):
global GPGBIN
sects = list(config.sections())
weights = list()
for sect in sects:
weights.append(config[sect].getint('weight', 10))
rsect = random.choices(sects, weights=weights, k=1)[0]
if config[rsect].get('gpgbin'):
GPGBIN = config[rsect].get('gpgbin')
return rsect
def sig_verify(config):
rsect = get_random_sect(config)
logger.info('[%s]', rsect)
try:
target = get_random_target(config, rsect)
if target:
verify_tarball(config[rsect], target)
except requests.exceptions.RequestException as ex:
# Treat failures as non-critical, because hosts can be intermittently
# unreachable for various reasons.
logger.info('Failed getting remote content:')
logger.info(ex)
return config[rsect].getint('sleep', 0)
def read_config(cfgfile):
from configparser import ConfigParser, ExtendedInterpolation
if not os.path.exists(cfgfile):
sys.stderr.write('ERROR: config file %s does not exist' % cfgfile)
sys.exit(1)
fconfig = ConfigParser(interpolation=ExtendedInterpolation())
fconfig.read(cfgfile)
return fconfig
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config-file', dest='cfgfile', required=True,
help='Config file to use')
parser.add_argument('-q', '--quiet', dest='quiet', action='store_true', default=False,
help='Quiet operation (cron mode)')
parser.add_argument('-d', '--debug', dest='debug', action='store_true', default=False,
help='Output debug information')
parser.add_argument('-l', '--logfile', dest='logfile',
help='Record activity in this log file')
_cmdargs = parser.parse_args()
_config = read_config(_cmdargs.cfgfile)
logger.setLevel(logging.DEBUG)
if _cmdargs.logfile:
ch = logging.FileHandler(_cmdargs.logfile)
formatter = logging.Formatter(f'[%(asctime)s] %(message)s')
ch.setFormatter(formatter)
ch.setLevel(logging.INFO)
logger.addHandler(ch)
ch = logging.StreamHandler()
formatter = logging.Formatter('%(message)s')
ch.setFormatter(formatter)
if _cmdargs.quiet:
ch.setLevel(logging.CRITICAL)
elif _cmdargs.debug:
ch.setLevel(logging.DEBUG)
else:
ch.setLevel(logging.INFO)
logger.addHandler(ch)
while True:
sleep = sig_verify(_config)
if not sleep:
break
logger.info('--- sleeping %s seconds ---', sleep)
try:
time.sleep(sleep)
except KeyboardInterrupt:
logger.info('Bye')
sys.exit(0)