| #!/usr/bin/env python3 |
| # SPDX-License-Identifier: GPL-2.0-only |
| # |
| # Crypto algorithm benchmark and testing script |
| # |
| # Userspace utility for /proc/cryptobench (CONFIG_CRYPTO_BENCHMARK). |
| # |
| # Copyright 2018 Google LLC |
| # |
| |
| import argparse |
| import subprocess |
| import sys |
| |
| def error(msg): |
| sys.stderr.write(msg + '\n') |
| sys.exit(1) |
| |
| def megabytes_per_sec(args, ns_elapsed): |
| nbytes = int(args.niter) * int(args.bufsize) |
| return (nbytes * 1000) / ns_elapsed |
| |
| class BenchmarkError(Exception): |
| def __init__(self, message, error_type): |
| super().__init__(message) |
| self.error_type = error_type |
| |
| def proc_cryptobench(cmd, args): |
| if args.adb: |
| sh_cmd = '' |
| if args.cpu_mask: |
| sh_cmd += ' taskset ' + args.cpu_mask |
| sh_cmd += f' sh -c "echo -e \'{cmd}\' > /proc/cryptobench; cat /proc/cryptobench"' |
| return str(subprocess.check_output(['adb', 'shell', sh_cmd]), 'utf-8') |
| if args.cpu_mask: |
| raise ValueError('TODO: support --cpu-mask without --adb') |
| with open('/proc/cryptobench', 'rt+', encoding='ascii') as file: |
| file.write(cmd) |
| file.seek(0) |
| return file.readline() |
| |
| def do_kernel_benchmark(algtype, algname, keysize, args): |
| cmd = '' |
| cmd += ' algtype=' + algtype |
| cmd += ' algname=' + algname |
| cmd += ' bufsize=' + str(args.bufsize) |
| if args.force_ahash: |
| cmd += ' force_ahash' |
| if args.inplace: |
| cmd += ' inplace' |
| cmd += ' keysize=' + str(keysize) |
| cmd += ' niter=' + str(args.niter) |
| cmd += '\n' |
| |
| fields = proc_cryptobench(cmd, args).split() |
| if fields[0] == 'ERROR': |
| raise BenchmarkError('error with algorithm ' + algname, fields[1]) |
| |
| results = {} |
| for item in fields[1:]: |
| (key, value) = item.split('=') |
| results[key] = value |
| return results |
| |
| def check_measurement(prev_measurement, results, algname): |
| measurement = results['measurement'] |
| if prev_measurement is not None and measurement != prev_measurement: |
| error('Algorithm {} (driver: {}) gave inconsistent results!'.format( |
| algname, results['driver_name'])) |
| return measurement |
| |
| def benchmark_skcipher(algname, keysize, args): |
| enc_time = 2**64 |
| dec_time = 2**64 |
| measurement = None |
| for _try in range(args.ntries): |
| try: |
| results = do_kernel_benchmark('skcipher', algname, keysize, args) |
| except BenchmarkError as ex: |
| print(f'{algname} {ex.error_type}') |
| return |
| measurement = check_measurement(measurement, results, algname) |
| enc_time = min(enc_time, int(results['enc_time'])) |
| dec_time = min(dec_time, int(results['dec_time'])) |
| print('{:30} {:4.2f} {:4.2f}'.format( |
| results['driver_name'], |
| megabytes_per_sec(args, enc_time), |
| megabytes_per_sec(args, dec_time))) |
| |
| def benchmark_hash(algname, keysize, args): |
| time = 2**64 |
| measurement = None |
| for _try in range(args.ntries): |
| try: |
| results = do_kernel_benchmark('hash', algname, keysize, args) |
| except BenchmarkError as ex: |
| print(f'{algname} {ex.error_type}') |
| return |
| measurement = check_measurement(measurement, results, algname) |
| time = min(time, int(results['time'])) |
| print('{:30} {:6.1f}'.format( |
| results['driver_name'], |
| megabytes_per_sec(args, time))) |
| |
| def parse_algnames(optarg): |
| cur_name = '' |
| nesting_level = 0 |
| names = set() |
| for char in optarg + ',': |
| if char == '(': |
| nesting_level += 1 |
| elif char == ')': |
| nesting_level -= 1 |
| if nesting_level < 0: |
| raise ValueError('Malformed argument: ' + optarg) |
| elif char == ',' and nesting_level == 0 and cur_name != '': |
| names.add(cur_name) |
| cur_name = '' |
| continue |
| cur_name += char |
| return sorted(names) |
| |
| def main(): |
| parser = argparse.ArgumentParser(description='Run cryptographic benchmarks.') |
| |
| parser.add_argument('--adb', action='store_true', default=False, |
| help='use connected Android device') |
| parser.add_argument('--bufsize', action='store', default=4096, |
| help='buffer size') |
| parser.add_argument('--ciphers', action='store', |
| help='ciphers to benchmark') |
| parser.add_argument('--cpu-mask', action='store', |
| help='CPUs to allow (default: all)') |
| parser.add_argument('--force-ahash', action='store_true', default=False, |
| help='use ahash even if shash is available?') |
| parser.add_argument('--hashes', action='store', |
| help='hashes to benchmark') |
| parser.add_argument('--inplace', action='store_true', default=False, |
| help='crypt in place?') |
| parser.add_argument('--keysizes', action='store', |
| help='keysizes to benchmark') |
| parser.add_argument('--niter', action='store', default=4096, |
| help='num iterations per benchmark') |
| parser.add_argument('--ntries', action='store', type=int, default=100, |
| help='num tries per benchmark') |
| |
| args = parser.parse_args() |
| |
| if args.ciphers: |
| args.ciphers = parse_algnames(args.ciphers) |
| |
| if args.hashes: |
| args.hashes = parse_algnames(args.hashes) |
| |
| if args.ntries <= 0: |
| error('Must have ntries >= 1') |
| |
| if not (args.ciphers or args.hashes): |
| error('Must specify at least one of --ciphers or --hashes') |
| |
| if args.keysizes: |
| args.keysizes = sorted(set(int(x) for x in args.keysizes.split(','))) |
| |
| if args.ciphers: |
| if not args.keysizes: |
| error('--keysizes must be specified') |
| for cipher in args.ciphers: |
| for keysize in args.keysizes: |
| benchmark_skcipher(cipher, keysize, args) |
| |
| if args.hashes: |
| args.hashes = sorted(args.hashes) |
| for hash_ in args.hashes: |
| for keysize in args.keysizes if args.keysizes else [0]: |
| benchmark_hash(hash_, keysize, args) |
| |
| main() |