blob: a357979a23ebc03f85053c66b084f3c37efb5ab8 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (C) 2018 by The Linux Foundation and contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (absolute_import,
division,
print_function,
unicode_literals)
__author__ = 'Konstantin Ryabitsev <konstantin@linuxfoundation.org>'
import sys
import subprocess
import logging
from datetime import datetime
import pydotplus.graphviz as pd
ALGOS = {
1: 'RSA',
17: 'DSA',
18: 'ECDH',
19: 'ECDSA',
22: 'EdDSA',
}
DB_VERSION = 1
GPGBIN = '/usr/bin/gpg2'
GNUPGHOME = None
logger = logging.getLogger(__name__)
# convenience caching so we avoid redundant lookups
_all_signed_by_cache = {}
_all_sigs_cache = {}
def get_logger(quiet=False):
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
formatter = logging.Formatter('%(message)s')
ch.setFormatter(formatter)
if quiet:
ch.setLevel(logging.CRITICAL)
else:
ch.setLevel(logging.DEBUG)
logger.addHandler(ch)
return logger
def gpg_run_command(args, matchonly=()):
args = [GPGBIN, '--with-colons'] + args
env = {}
if GNUPGHOME is not None:
env['GNUPGHOME'] = GNUPGHOME
logger.info('Running %s...' % ' '.join(args))
(output, error) = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env).communicate()
output = output.strip()
if len(error.strip()):
sys.stderr.write(error.decode('utf-8'))
lines = []
for line in output.split(b'\n'):
if line == b'' or line[0] == b'#':
continue
if len(matchonly):
for umatch in matchonly:
match = umatch.encode('utf-8')
if line.startswith(match):
lines.append(line)
continue
else:
lines.append(line)
return lines
def gpg_get_fields(bline):
# gpg uses \x3a to indicate an encoded colon, so we explode and de-encode
line = bline.decode('utf-8')
fields = [rawchunk.replace('\\x3a', ':') for rawchunk in line.split(':')]
# fields 5 and 6 are timestamps, so make them python datetime
if len(fields[5]):
fields[5] = datetime.fromtimestamp(int(fields[5]))
if len(fields[6]):
fields[6] = datetime.fromtimestamp(int(fields[6]))
return fields
def init_sqlite_db(c):
# Create primary keys table
logger.info('Initializing new sqlite3 db with metadata version %s' % DB_VERSION)
c.execute('''CREATE TABLE metadata (
version INTEGER
)''')
c.execute('''INSERT INTO metadata VALUES(?)''', (DB_VERSION,))
c.execute('''CREATE TABLE pub (
keyid TEXT UNIQUE,
validity TEXT,
size INTEGER,
algo INTEGER,
created TEXT,
expires TEXT,
ownertrust TEXT
)''')
c.execute('''CREATE TABLE uid (
pubrowid INTEGER,
validity TEXT,
created TEXT,
expires TEXT,
uiddata TEXT,
is_primary INTEGER,
FOREIGN KEY(pubrowid) REFERENCES pub(rowid)
)''')
c.execute('''CREATE TABLE sig (
uidrowid INTEGER,
pubrowid INTEGER,
created TEXT,
expires TEXT,
sigtype INTEGER,
FOREIGN KEY(uidrowid) REFERENCES pub(rowid),
FOREIGN KEY(pubrowid) REFERENCES uid(rowid),
PRIMARY KEY (uidrowid, pubrowid)
) WITHOUT ROWID''')
def get_all_signed_by(c, p_rowid):
if p_rowid not in _all_signed_by_cache:
c.execute('''SELECT DISTINCT uid.pubrowid
FROM uid JOIN sig ON sig.uidrowid = uid.rowid
WHERE sig.pubrowid=?''', (p_rowid,))
_all_signed_by_cache[p_rowid] = c.fetchall()
return _all_signed_by_cache[p_rowid]
def get_all_signed(c, p_rowid):
if p_rowid not in _all_sigs_cache:
c.execute('''SELECT DISTINCT sig.pubrowid
FROM sig JOIN uid ON sig.uidrowid = uid.rowid
WHERE uid.pubrowid = ?''', (p_rowid,))
_all_sigs_cache[p_rowid] = c.fetchall()
return _all_sigs_cache[p_rowid]
def get_all_full_trust(c):
c.execute('''SELECT DISTINCT pub.rowid
FROM pub
WHERE ownertrust IN ('u', 'f')''')
return c.fetchall()
def make_graph_node(c, p_rowid, show_trust=False):
c.execute('''SELECT pub.*,
uid.uiddata
FROM uid JOIN pub
ON uid.pubrowid = pub.rowid
WHERE pub.rowid=? AND uid.is_primary = 1''', (p_rowid,))
(kid, val, size, algo, cre, exp, trust, uiddata) = c.fetchone()
nodename = 'a_%s' % p_rowid
anode = pd.Node(nodename)
anode.set('shape', 'record')
anode.set('style', 'rounded')
if trust == 'u':
anode.set('color', 'purple')
elif trust == 'f':
anode.set('color', 'red')
elif trust == 'm':
anode.set('color', 'blue')
else:
anode.set('color', 'gray')
uiddata = uiddata.replace('"', '')
name = uiddata.split('<')[0].replace('"', '').strip()
name = name.split('(')[0].strip()
try:
email = uiddata.split('<')[1].replace('>', '').strip()
try:
domain = email.split('@')[1]
show = domain
except IndexError:
show = email
except IndexError:
show = ''
if algo in ALGOS.keys():
algosize = '%s %s' % (ALGOS[algo], size)
else:
algosize = 'ALGO? %s' % size
if show_trust:
anode.set('label', '{{%s\n%s|{val: %s|tru: %s}}|{%s|%s}}' % (name, show, val, trust, algosize, kid))
else:
anode.set('label', '{%s\n%s|{%s|%s}}' % (name, show, algosize, kid))
return anode
def cull_redundant_paths(paths, maxpaths=None):
paths.sort(key=len)
culled = []
chunks = []
for path in paths:
redundant = False
pos = -2
while pos > -len(path):
if path[pos:] in chunks:
redundant = True
break
chunks.append(path[pos:])
pos -= 1
if not redundant:
culled.append(path)
if maxpaths and len(culled) >= maxpaths:
break
return culled
def get_shortest_path(c, t_p_rowid, b_p_rowid, depth, maxdepth, seenkeys, ignorekeys):
depth += 1
sigs = get_all_signed_by(c, t_p_rowid)
if (b_p_rowid,) in sigs:
return [t_p_rowid, b_p_rowid]
shortest = None
if depth >= maxdepth:
return None
for (s_p_rowid,) in sigs:
if (depth, s_p_rowid) in seenkeys:
continue
if s_p_rowid in ignorekeys:
continue
subchain = get_shortest_path(c, s_p_rowid, b_p_rowid, depth, maxdepth, seenkeys, ignorekeys)
if subchain:
if shortest is None or len(shortest) > len(subchain):
shortest = subchain
seenkeys.append((depth, s_p_rowid))
# no need to go any deeper than current shortest
maxdepth = depth - 1 + len(shortest)
else:
# if we returned with None, then this key is a dead-end at this depth
seenkeys.append((depth, s_p_rowid))
if shortest is not None:
return [t_p_rowid] + shortest
return None
def draw_key_paths(c, paths, graph, show_trust):
seenactors = {}
# make a subgraph for toplevel nodes
tl_subgraph = pd.Subgraph('cluster_toplevel')
tl_subgraph.set('color', 'white')
for path in paths:
signer = None
for actor in path:
if actor not in seenactors.keys():
anode = make_graph_node(c, actor, show_trust)
seenactors[actor] = anode
if signer is None:
tl_subgraph.add_node(anode)
else:
graph.add_node(anode)
else:
anode = seenactors[actor]
if signer is not None:
graph.add_edge(pd.Edge(signer, anode))
signer = anode
graph.add_subgraph(tl_subgraph)