blob: af158b74b60eea1ef103648bb9c5c6318184695d [file] [log] [blame]
#!/usr/bin/python
"""Sets up a file for fs-verity."""
from __future__ import print_function
import argparse
import binascii
import ctypes
import hashlib
import io
import math
import os
import subprocess
import sys
import tempfile
import zlib
DATA_BLOCK_SIZE = 4096
HASH_BLOCK_SIZE = 4096
FS_VERITY_MAGIC = b'TrueBrew'
FS_VERITY_EXT_ELIDE = 1
FS_VERITY_EXT_PATCH = 2
FS_VERITY_EXT_SALT = 3
FS_VERITY_EXT_PKCS7_SIGNATURE = 4
FS_VERITY_ALG_SHA256 = 1
FS_VERITY_ALG_CRC32 = 2
class CRC32Hash(object):
"""hashlib-compatible wrapper for zlib.crc32()."""
digest_size = 4
# Big endian, to be compatible with veritysetup --hash=crc32, which uses
# libgcrypt, which uses big endian CRC-32.
class Digest(ctypes.BigEndianStructure):
_fields_ = [('remainder', ctypes.c_uint32)]
def __init__(self, remainder=0):
self.remainder = remainder
def update(self, string):
self.remainder = zlib.crc32(bytes(string), self.remainder)
def digest(self):
digest = CRC32Hash.Digest()
digest.remainder = self.remainder
return serialize_struct(digest)
def hexdigest(self):
return binascii.hexlify(self.digest()).decode('ascii')
def copy(self):
return CRC32Hash(self.remainder)
class HashAlgorithm(object):
"""A hash algorithm supported by fs-verity"""
def __init__(self, code, name, digest_size):
self.code = code
self.name = name
self.digest_size = digest_size
def create(self):
if self.name == 'crc32':
return CRC32Hash()
else:
return hashlib.new(self.name)
HASH_ALGORITHMS = [
HashAlgorithm(FS_VERITY_ALG_SHA256, 'sha256', 32),
HashAlgorithm(FS_VERITY_ALG_CRC32, 'crc32', 4),
]
class fsverity_footer(ctypes.LittleEndianStructure):
_fields_ = [
('magic', ctypes.c_char * 8), #
('major_version', ctypes.c_uint8),
('minor_version', ctypes.c_uint8),
('log_blocksize', ctypes.c_uint8),
('log_arity', ctypes.c_uint8),
('meta_algorithm', ctypes.c_uint16),
('data_algorithm', ctypes.c_uint16),
('flags', ctypes.c_uint32),
('reserved1', ctypes.c_uint32),
('size', ctypes.c_uint64),
('authenticated_ext_count', ctypes.c_uint8),
('unauthenticated_ext_count', ctypes.c_uint8),
('reserved2', ctypes.c_char * 30)
]
class fsverity_extension(ctypes.LittleEndianStructure):
_fields_ = [
('length', ctypes.c_uint32), #
('type', ctypes.c_uint16),
('reserved', ctypes.c_uint16)
]
class fsverity_extension_patch(ctypes.LittleEndianStructure):
_fields_ = [
('offset', ctypes.c_uint64), #
# followed by variable-length 'databytes'
]
class fsverity_extension_elide(ctypes.LittleEndianStructure):
_fields_ = [
('offset', ctypes.c_uint64), #
('length', ctypes.c_uint64)
]
class fsverity_measurement(ctypes.LittleEndianStructure):
_fields_ = [
('digest_algorithm', ctypes.c_uint16), #
('digest_size', ctypes.c_uint16),
('reserved1', ctypes.c_uint32),
('reserved2', ctypes.c_uint64 * 3)
# followed by variable-length 'digest'
]
class FooterOffset(ctypes.LittleEndianStructure):
_fields_ = [('ftr_offset', ctypes.c_uint32)]
def copy_bytes(src, dst, n):
"""Copies 'n' bytes from the 'src' file to the 'dst' file."""
if n < 0:
raise ValueError('Negative copy count: {}'.format(n))
while n > 0:
buf = src.read(min(n, io.DEFAULT_BUFFER_SIZE))
if not buf:
raise EOFError('Unexpected end of src file')
dst.write(buf)
n -= len(buf)
def copy(src, dst):
"""Copies from the 'src' file to the 'dst' file until EOF on 'src'."""
buf = src.read(io.DEFAULT_BUFFER_SIZE)
while buf:
dst.write(buf)
buf = src.read(io.DEFAULT_BUFFER_SIZE)
def pad_to_block_boundary(f):
"""Pads the file with zeroes to the next data block boundary."""
f.write(b'\0' * (-f.tell() % DATA_BLOCK_SIZE))
def ilog2(n):
l = int(math.log(n, 2))
if n != 1 << l:
raise ValueError('{} is not a power of 2'.format(n))
return l
def serialize_struct(struct):
"""Serializes a ctypes.Structure to a byte array."""
return bytes(ctypes.string_at(ctypes.pointer(struct), ctypes.sizeof(struct)))
def veritysetup(data_filename, tree_filename, salt, algorithm):
"""Built-in Merkle tree generation algorithm."""
salted_hash = algorithm.create()
salted_hash.update(salt)
hashes_per_block = HASH_BLOCK_SIZE // salted_hash.digest_size
level_blocks = [os.stat(data_filename).st_size // DATA_BLOCK_SIZE]
while level_blocks[-1] > 1:
level_blocks.append(
(level_blocks[-1] + hashes_per_block - 1) // hashes_per_block)
hash_block_offset = sum(level_blocks) - level_blocks[0]
with open(data_filename, 'rb') as datafile:
with open(tree_filename, 'r+b') as hashfile:
for level, blockcount in enumerate(level_blocks):
(i, pending) = (0, bytearray())
for j in range(blockcount):
h = salted_hash.copy()
if level == 0:
datafile.seek(j * DATA_BLOCK_SIZE)
h.update(datafile.read(DATA_BLOCK_SIZE))
else:
hashfile.seek((hash_block_offset + j) * HASH_BLOCK_SIZE)
h.update(hashfile.read(HASH_BLOCK_SIZE))
pending += h.digest()
if level + 1 == len(level_blocks):
assert len(pending) == salted_hash.digest_size
return binascii.hexlify(pending).decode('ascii')
if len(pending) == HASH_BLOCK_SIZE or j + 1 == blockcount:
pending += b'\0' * (HASH_BLOCK_SIZE - len(pending))
hashfile.seek((hash_block_offset - level_blocks[level + 1] + i) *
HASH_BLOCK_SIZE)
hashfile.write(pending)
(i, pending) = (i + 1, bytearray())
hash_block_offset -= level_blocks[level + 1]
class Extension(object):
"""An fs-verity extension item."""
def serialize(self):
type_buf = self._serialize_impl()
hdr = fsverity_extension()
hdr.length = ctypes.sizeof(hdr) + len(type_buf)
hdr.type = self.TYPE_CODE
pad = -len(type_buf) % 8
return serialize_struct(hdr) + type_buf + (b'\0' * pad)
class SimpleExtension(Extension):
def __init__(self, raw_data):
self.raw_data = raw_data
def _serialize_impl(self):
return self.raw_data
class SaltExtension(SimpleExtension):
TYPE_CODE = FS_VERITY_EXT_SALT
class PKCS7SignatureExtension(SimpleExtension):
TYPE_CODE = FS_VERITY_EXT_PKCS7_SIGNATURE
class DataExtension(Extension):
"""An fs-verity patch or elide extension."""
def __init__(self, offset, length):
self.offset = offset
self.length = length
if self.length < self.MIN_LENGTH:
raise ValueError('length too small (got {}, need >= {})'.format(
self.length, self.MIN_LENGTH))
if self.length > self.MAX_LENGTH:
raise ValueError('length too large (got {}, need <= {})'.format(
self.length, self.MAX_LENGTH))
if self.offset < 0:
raise ValueError('offset cannot be negative (got {})'.format(self.offset))
def __str__(self):
return '{}(offset {}, length {})'.format(self.__class__.__name__,
self.offset, self.length)
class ElideExtension(DataExtension):
"""An fs-verity elide extension."""
TYPE_CODE = FS_VERITY_EXT_ELIDE
MIN_LENGTH = 1
MAX_LENGTH = (1 << 64) - 1
def __init__(self, offset, length):
DataExtension.__init__(self, offset, length)
def apply(self, out_file):
pass
def _serialize_impl(self):
ext = fsverity_extension_elide()
ext.offset = self.offset
ext.length = self.length
return serialize_struct(ext)
class PatchExtension(DataExtension):
"""An fs-verity patch extension."""
TYPE_CODE = FS_VERITY_EXT_PATCH
MIN_LENGTH = 1
MAX_LENGTH = 255
def __init__(self, offset, data):
DataExtension.__init__(self, offset, len(data))
self.data = data
def apply(self, dst):
dst.write(self.data)
def _serialize_impl(self):
ext = fsverity_extension_patch()
ext.offset = self.offset
return serialize_struct(ext) + self.data
class BadPatchOrElisionError(Exception):
pass
class FSVerityGenerator(object):
"""Sets up a file for fs-verity."""
def __init__(self, in_filename, out_filename, algorithm, **kwargs):
self.in_filename = in_filename
self.original_size = os.stat(in_filename).st_size
self.out_filename = out_filename
self.algorithm = algorithm
self.salt = kwargs.get('salt')
if self.salt is None:
self.salt = bytes()
self.patches_and_elisions = kwargs.get('patches_and_elisions')
if self.patches_and_elisions is None:
self.patches_and_elisions = []
self.external_veritysetup = kwargs.get('external_veritysetup')
if self.external_veritysetup is None:
self.external_veritysetup = False
self.signing_key_file = kwargs.get('signing_key_file')
self.signature_file = kwargs.get('signature_file')
self.tmp_filenames = []
# Patches and elisions must be within the file size and must not overlap.
self.patches_and_elisions = sorted(
self.patches_and_elisions, key=lambda ext: ext.offset)
for i, ext in enumerate(self.patches_and_elisions):
ext_end = ext.offset + ext.length
if ext_end > self.original_size:
raise BadPatchOrElisionError(
'{} extends beyond end of file!'.format(ext))
if i + 1 < len(self.patches_and_elisions
) and ext_end > self.patches_and_elisions[i + 1].offset:
raise BadPatchOrElisionError('{} overlaps {}!'.format(
ext, self.patches_and_elisions[i + 1]))
def _open_tmpfile(self, mode):
f = tempfile.NamedTemporaryFile(mode, delete=False)
self.tmp_filenames.append(f.name)
return f
def _delete_tmpfiles(self):
for filename in self.tmp_filenames:
os.unlink(filename)
def _apply_patch_elide_extensions(self, data_filename):
"""Apply patch and elide extensions."""
with open(data_filename, 'rb') as src:
with self._open_tmpfile('wb') as dst:
src_pos = 0
for ext in self.patches_and_elisions:
print('Applying {}'.format(ext))
copy_bytes(src, dst, ext.offset - src_pos)
ext.apply(dst)
src_pos = ext.offset + ext.length
src.seek(src_pos)
copy(src, dst)
return dst.name
def _generate_merkle_tree(self, data_filename):
"""Generates a file's Merkle tree for fs-verity.
Args:
data_filename: file for which to generate the tree. Patches and/or
elisions may need to be applied on top of it.
Returns:
(root hash as hex, name of the file containing the Merkle tree).
Raises:
OSError: A problem occurred when executing the 'veritysetup'
program to generate the Merkle tree.
"""
# If there are any patch or elide extensions, apply them to a temporary file
# and use that to build the Merkle tree instead of the original data.
if self.patches_and_elisions:
data_filename = self._apply_patch_elide_extensions(data_filename)
# Pad to a data block boundary before building the Merkle tree.
# Note: elisions may result in padding being needed, even if the original
# file was block-aligned!
with open(data_filename, 'ab') as f:
pad_to_block_boundary(f)
# File to which we'll output the Merkle tree
with self._open_tmpfile('wb') as f:
tree_filename = f.name
if self.external_veritysetup:
# Delegate to 'veritysetup' to actually build the Merkle tree.
cmd = [
'veritysetup',
'format',
data_filename,
tree_filename,
'--salt=' + binascii.hexlify(self.salt).decode('ascii'),
'--no-superblock',
'--hash={}'.format(self.algorithm.name),
'--data-block-size={}'.format(DATA_BLOCK_SIZE),
'--hash-block-size={}'.format(HASH_BLOCK_SIZE),
]
print(' '.join(cmd))
output = subprocess.check_output(cmd, universal_newlines=True)
# Extract the root hash from veritysetup's output.
root_hash = None
for line in output.splitlines():
if line.startswith('Root hash'):
root_hash = line.split(':')[1].strip()
break
if root_hash is None:
raise OSError('Root hash not found in veritysetup output!')
else: # builtin veritysetup
root_hash = veritysetup(data_filename, tree_filename, self.salt,
self.algorithm)
return root_hash, tree_filename
def _generate_footer(self):
"""Generates the fixed-size portion of the fs-verity footer."""
footer = fsverity_footer()
assert ctypes.sizeof(footer) == 64
footer.magic = FS_VERITY_MAGIC
footer.major_version = 1
footer.minor_version = 0
footer.log_blocksize = ilog2(DATA_BLOCK_SIZE)
footer.log_arity = ilog2(DATA_BLOCK_SIZE / self.algorithm.digest_size)
footer.meta_algorithm = self.algorithm.code
footer.data_algorithm = self.algorithm.code
footer.size = self.original_size
footer.authenticated_ext_count = len(self.patches_and_elisions)
if self.salt:
footer.authenticated_ext_count += 1
footer.unauthenticated_ext_count = 0
if self.signing_key_file or self.signature_file:
footer.unauthenticated_ext_count += 1
footer.salt = self.salt
return serialize_struct(footer)
def _sign_measurement(self, measurement):
"""Sign the file's measurement using the given signing_key_file."""
m = fsverity_measurement()
m.digest_algorithm = self.algorithm.code
m.digest_size = self.algorithm.digest_size
data_to_sign = serialize_struct(m) + binascii.unhexlify(measurement)
with self._open_tmpfile('wb') as f:
f.write(data_to_sign)
data_to_sign_file = f.name
with self._open_tmpfile('wb') as f:
pkcs7_msg_file = f.name
cmd = [
'openssl', #
'smime',
'-sign',
'-in',
data_to_sign_file,
'-signer',
self.signing_key_file,
'-inform',
'pem',
'-md',
self.algorithm.name,
'-out',
pkcs7_msg_file,
'-outform',
'der',
'-binary',
'-nodetach',
'-noattr'
]
print(' '.join(cmd))
subprocess.check_call(cmd)
with open(pkcs7_msg_file, 'rb') as f:
pkcs7_msg = f.read()
return pkcs7_msg
def generate(self):
"""Sets up a file for fs-verity.
The input file will be copied to the output file, then have the fs-verity
metadata appended to it.
Returns:
(fs-verity measurement, Merkle tree root hash), both as hex.
Raises:
IOError: Problem reading/writing the files.
"""
# Copy the input file to the output file.
with open(self.in_filename, 'rb') as infile:
with open(self.out_filename, 'wb') as outfile:
copy(infile, outfile)
if outfile.tell() != self.original_size:
raise IOError('{}: size changed!'.format(self.in_filename))
try:
# Generate the file's Merkle tree and calculate its root hash.
(root_hash, tree_filename) = self._generate_merkle_tree(self.out_filename)
with open(self.out_filename, 'ab') as outfile:
# Pad to a block boundary and append the Merkle tree.
pad_to_block_boundary(outfile)
with open(tree_filename, 'rb') as treefile:
copy(treefile, outfile)
# Generate the fixed-size portion of the fs-verity footer.
footer = self._generate_footer()
# Generate authenticated extension items, if any.
auth_extensions = bytearray()
for ext in self.patches_and_elisions:
auth_extensions += ext.serialize()
if self.salt:
auth_extensions += SaltExtension(self.salt).serialize()
# Compute the fs-verity measurement.
measurement = self.algorithm.create()
measurement.update(footer)
measurement.update(auth_extensions)
measurement.update(binascii.unhexlify(root_hash))
measurement = measurement.hexdigest()
# Generate unauthenticated extension items, if any.
unauth_extensions = bytearray()
pkcs7_msg = None
if self.signing_key_file:
pkcs7_msg = self._sign_measurement(measurement)
if self.signature_file:
with open(self.signature_file, 'wb') as f:
f.write(pkcs7_msg)
print('Wrote signed file measurement to "{}"'.format(
self.signature_file))
elif self.signature_file:
with open(self.signature_file, 'rb') as f:
pkcs7_msg = f.read()
if pkcs7_msg:
unauth_extensions += PKCS7SignatureExtension(pkcs7_msg).serialize()
# Write the footer to the output file.
outfile.write(footer)
outfile.write(auth_extensions)
outfile.write(unauth_extensions)
ftr_offset = FooterOffset()
ftr_offset.ftr_offset = len(footer) + len(auth_extensions) + len(
unauth_extensions) + ctypes.sizeof(ftr_offset)
outfile.write(serialize_struct(ftr_offset))
finally:
self._delete_tmpfiles()
return (measurement, root_hash)
def convert_hash_argument(argstring):
for alg in HASH_ALGORITHMS:
if alg.name == argstring:
return alg
raise argparse.ArgumentTypeError(
'Unrecognized algorithm: "{}". Choices are: {}'.format(
argstring, [alg.name for alg in HASH_ALGORITHMS]))
def convert_salt_argument(argstring):
try:
return binascii.unhexlify(argstring)
except (ValueError, TypeError):
raise argparse.ArgumentTypeError(
'Must be a hex string. (Got "{}")'.format(argstring))
def convert_patch_argument(argstring):
"""Parse a --patch argument into a PatchExtension."""
try:
(offset, patchfile) = argstring.split(',')
offset = int(offset)
except ValueError:
raise argparse.ArgumentTypeError(
'Must be formatted as <offset,patchfile>. (Got "{}")'.format(
argstring))
try:
with open(patchfile, 'rb') as f:
data = f.read()
return PatchExtension(int(offset), data)
except (IOError, ValueError) as e:
raise argparse.ArgumentTypeError(e)
def convert_elide_argument(argstring):
"""Parse an --elide argument into an ElideExtension."""
try:
(offset, length) = argstring.split(',')
offset = int(offset)
length = int(length)
except ValueError:
raise argparse.ArgumentTypeError(
'Must be formatted as <offset,length>. (Got "{}")'.format(argstring))
try:
return ElideExtension(offset, length)
except ValueError as e:
raise argparse.ArgumentTypeError(e)
def parse_args():
"""Parses the command-line arguments."""
parser = argparse.ArgumentParser(
description='Sets up a file for fs-verity (file-based integrity)')
parser.add_argument(
'in_filename',
metavar='<input_file>',
type=str,
help='Original content input file')
parser.add_argument(
'out_filename',
metavar='<output_file>',
type=str,
help='Output file formatted for fs-verity')
parser.add_argument(
'--salt',
metavar='<hex_string>',
type=convert_salt_argument,
help='Salt, given as a hex string. Default is no salt.')
parser.add_argument(
'--hash',
type=convert_hash_argument,
default='sha256',
help="""Hash algorithm to use. Available algorithms: {}.
Default is sha256.""".format([alg.name for alg in HASH_ALGORITHMS]))
parser.add_argument(
'--patch',
metavar='<offset,patchfile>',
type=convert_patch_argument,
action='append',
dest='patches_and_elisions',
help="""Add a patch extension (not recommended). Data in the region
beginning at <offset> in the original file and continuing for
filesize(<patchfile>) bytes will be replaced with the contents of
<patchfile> for verification purposes, but reads will return the original
data.""")
parser.add_argument(
'--elide',
metavar='<offset,length>',
type=convert_elide_argument,
action='append',
dest='patches_and_elisions',
help="""Add an elide extension (not recommended). Data in the region
beginning at <offset> in the original file and continuing for <length>
bytes will not be verified.""")
parser.add_argument(
'--external-veritysetup',
action='store_const',
const=True,
help="""Invoke the external veritysetup program rather than using the
built-in Merkle tree generation algorithm. They should produce the same
result.""")
parser.add_argument(
'--signing-key',
metavar='<signing_key_file>',
type=str,
help='File containing signing key in PEM format')
parser.add_argument(
'--signature',
metavar='<signature_file>',
type=str,
help="""File containing signed measurement in PKCS#7 DER format. This is
an output file if --signing-key is given, or an input file otherwise.""")
return parser.parse_args()
def main():
args = parse_args()
try:
generator = FSVerityGenerator(
args.in_filename,
args.out_filename,
args.hash,
salt=args.salt,
patches_and_elisions=args.patches_and_elisions,
external_veritysetup=args.external_veritysetup,
signing_key_file=args.signing_key,
signature_file=args.signature)
except BadPatchOrElisionError as e:
sys.stderr.write('ERROR: {}\n'.format(e))
sys.exit(1)
(measurement, root_hash) = generator.generate()
print('Merkle root hash: {}'.format(root_hash))
print('fs-verity measurement: {}'.format(measurement))
if __name__ == '__main__':
main()