blob: dbb77946bfd11a3a12ef5c350a3f579156b6cc90 [file] [log] [blame]
/*
* simoop.c
*
* Copyright (C) 2016 Facebook
* Chris Mason <clm@fb.com>
*
* GPLv2, portions copied from the kernel and from Jens Axboe's fio
*/
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#include <getopt.h>
#include <sys/time.h>
#include <time.h>
#include <string.h>
#include <linux/futex.h>
#include <sys/syscall.h>
#include <sys/mman.h>
#include <libgen.h>
#include <locale.h>
#include <ctype.h>
#include <limits.h>
#include <stdint.h>
#include "xxhash.h"
/* these are part of the histogram accounting */
#define PLAT_BITS 8
#define PLAT_VAL (1 << PLAT_BITS)
#define PLAT_GROUP_NR 19
#define PLAT_NR (PLAT_GROUP_NR * PLAT_VAL)
#define PLAT_LIST_MAX 20
/* how deep a directory chain to make */
#define DIR_LEVEL 64
/* buffer size for reads and writes during filler */
#define BUF_SIZE (10 * 1024 * 1024)
#define NAME_LEN 256
#define FILES_SPLIT 8
#ifndef O_DIRECT
# define O_DIRECT 00040000
#endif
/*
* we make a few different kinds of files, these are appended onto the
* file name to separate them
*/
#define DATA_FILE NULL
#define RESULT_FILE "extra"
#define TMP_FILE "tmp"
/* each path in the paths array gets a thread pool hammering on it. */
char **paths;
int total_paths = 0;
/* -t number of workers thread */
static int worker_threads = 16;
/* -r seconds */
static int runtime = 30;
/* -c usec */
static unsigned long long cputime = 3000000;
/* -f size of the files we create */
static unsigned long long file_size = 64 * 1024 * 1024;
/* -n number of files we create */
static unsigned long num_files = 65536;
/* -R read size */
static unsigned long read_size = 2 * 1024 * 1024;
/* -W write size */
static unsigned long write_size = 2 * 1024 * 1024;
/* -T number of files to read */
static int rw_threads = 8;
/* -D number of threads running du */
static int du_threads = 0;
/* memory to allocate and use during each task */
static int thinking_mem = 128 * 1024 * 1024;
/* should we fsync sometimes? */
static int funksync = 0;
/* are we just appending bytes onto the ends of the working set files */
static int append_mode = 0;
/* randomize the write size */
static int oddsizes = 0;
/* use odirect sometimes */
static int odirect = 0;
/* check contents at startup */
static int check_initial_files = 0;
/* -M how much memory we allocate to benchmark allocations */
static int mmap_size = 64 * 1024 * 1024;
/* these do nothing but spin */
static int cpu_threads = 24;
/* how long we sleep while processing requests */
static int sleeptime = 10000;
static uint64_t global_rand_seed = 0x89ABCEF;
/*
* after warmup_seconds, we reset the counters to get rid of noise from
* early in the run
*/
static int warmup_seconds = 60;
/* reporting interval */
static int interval_seconds = 120;
/* the master thread flips this to true when runtime is up */
static volatile unsigned long stopping = 0;
static volatile unsigned long warmup_done = 0;
/*
* one stat struct per thread data, when the workers sleep this records the
* latency between when they are woken up and when they actually get the
* CPU again. The message threads sum up the stats of all the workers and
* then bubble them up to main() for printing
*/
struct stats {
unsigned int plat[PLAT_NR];
unsigned int nr_samples;
unsigned int max;
unsigned int min;
};
/* this defines which latency profiles get printed */
#define PLIST_P99 2
#define PLIST_P95 1
#define PLIST_P50 0
static double plist[PLAT_LIST_MAX] = { 50.0, 95.0, 99.0, };
enum {
HELP_LONG_OPT = 1,
};
/* this enum needs to match up with the labels array below */
enum {
READ_STATS = 0,
WRITE_STATS,
ALLOC_STATS,
TOTAL_STATS,
};
char *stat_labels[] = {
"Read latency",
"Write latency",
"Allocation latency",
NULL,
};
/* match this with vmstat_labels */
enum {
ALLOCSTALLS,
VMSCAN_NR_WRITE,
TOTAL_VMSTATS,
};
char *vmstat_labels[] = {
"allocstall",
"nr_vmscan_write",
NULL,
};
struct vmstat_info {
double instant_rate[TOTAL_VMSTATS];
double last_rate[TOTAL_VMSTATS];
double rate[TOTAL_VMSTATS];
struct stats stats[TOTAL_VMSTATS];
};
#define VERIFY_MAGIC "simoopv1"
#define VERIFY_MAGIC_LEN 8
/* our verify blocks are pretty small, which allows us to sub-page writes */
#define VERIFY_ALIGNMENT ((loff_t)1024)
char pattern_buffer[VERIFY_ALIGNMENT];
struct verify_header {
uint64_t crc;
/* how many bytes did we crc */
uint64_t length;
/* starting offset of this block in the file */
uint64_t offset;
/* seed for recreating the random data */
uint64_t rand_seed;
uint64_t spare[2];
/* VERIFY_MAGIC above (zero filled first) */
char magic[VERIFY_MAGIC_LEN];
};
void *verify_crc_start(struct verify_header *verify_header)
{
return &verify_header->length;
}
unsigned long verify_crc_offset(void)
{
struct verify_header *verify_header = NULL;
return (unsigned long)(&verify_header->length);
}
static uint64_t __calc_crc(void *xxhash_state,
struct verify_header *verify_header)
{
int ret;
ret = XXH32_resetState(xxhash_state, verify_header->rand_seed);
if (ret) {
fprintf(stderr, "error %d during XXH32_resetState\n", ret);
exit(1);
}
ret = XXH32_update(xxhash_state, verify_crc_start(verify_header),
verify_header->length - verify_crc_offset());
if (ret) {
fprintf(stderr, "error %d from XXH32_update\n", ret);
exit(1);
}
return XXH32_intermediateDigest(xxhash_state);
}
static void init_pattern_buffer(char *buffer, uint64_t seed)
{
char pattern[128];
int i;
int this_copy;
int length = VERIFY_ALIGNMENT;
char *p = buffer;
srand(seed);
for (i = 0; i < 128; i++)
pattern[i] = rand();
while(length > 0) {
if (length > 128)
this_copy = 128;
else
this_copy = length;
memcpy(p, pattern, this_copy);
p += this_copy;
length -= this_copy;
}
}
static void crc_block(void *xxhash_state, char *block, uint64_t offset,
uint64_t length, uint64_t rand_seed)
{
struct verify_header *verify_header = (struct verify_header *)block;
if (length < sizeof(*verify_header)) {
fprintf(stderr, "blocksize too small for crc header\n");
exit(1);
}
memset(verify_header, 0, sizeof(*verify_header));
verify_header->length = length;
verify_header->offset = offset;
verify_header->rand_seed = rand_seed;
strncpy(verify_header->magic, VERIFY_MAGIC, VERIFY_MAGIC_LEN);
verify_header->crc = __calc_crc(xxhash_state, verify_header);
}
static loff_t verify_align(loff_t value)
{
return (value / VERIFY_ALIGNMENT) * VERIFY_ALIGNMENT;
}
static loff_t verify_align_up(loff_t value)
{
value += VERIFY_ALIGNMENT - 1;
return verify_align(value);
}
static void dump_bad_block(char *block, uint64_t offset)
{
struct verify_header *verify_header = (struct verify_header *)block;
uint64_t seed = verify_header->rand_seed;
char *tmp = malloc(VERIFY_ALIGNMENT);
int i;
if (!tmp) {
fprintf(stderr, "malloc failed\n");
exit(1);
}
init_pattern_buffer(tmp, seed);
memcpy(tmp, verify_header, sizeof(*verify_header));
for (i = 0; i < VERIFY_ALIGNMENT; i++) {
if (tmp[i] != block[i]) {
fprintf(stderr, "bad_block offset %lu (index %d) found 0x%x wanted 0x%x\n",
offset + i, i, block[i], tmp[i]);
break;
}
}
free(tmp);
}
static void check_block_headers(void *xxhash_state, char *filename,
char *block, uint64_t offset, uint64_t length)
{
struct verify_header *verify_header = (struct verify_header *)block;
uint64_t crc;
if (strncmp(verify_header->magic, VERIFY_MAGIC, VERIFY_MAGIC_LEN)) {
fprintf(stderr, "bad magic file %s offset %lu\n", filename, offset);
exit(1);
}
if (verify_header->offset != offset) {
fprintf(stderr, "bad offset file %s offset %lu found %lu\n",
filename, offset, verify_header->offset);
exit(1);
}
if (verify_header->length != length) {
fprintf(stderr, "bad buffer length file %s length %lu found %lu\n",
filename, length, verify_header->length);
exit(1);
}
crc = __calc_crc(xxhash_state, verify_header);
if (crc != verify_header->crc) {
fprintf(stderr, "bad crc file %s crc %lx found %lx\n",
filename, crc, verify_header->crc);
dump_bad_block(block, offset);
exit(1);
}
}
/*
* A not-so-good version fls64. No fascinating optimization since
* no one except parse_size use it
*/
static int fls64(unsigned long long x)
{
int i;
for (i = 0; i <64; i++)
if (x << i & (1ULL << 63))
return 64 - i;
return 64 - i;
}
unsigned long long parse_size(char *s)
{
char c;
char *endptr;
unsigned long long mult = 1;
unsigned long long ret;
if (!s) {
fprintf(stderr, "size value is empty\n");
exit(1);
}
if (s[0] == '-') {
fprintf(stderr, "size value '%s' is less equal than 0\n", s);
exit(1);
}
ret = strtoull(s, &endptr, 10);
if (endptr == s) {
fprintf(stderr, "size value '%s' is invalid\n", s);
exit(1);
}
if (endptr[0] && endptr[1]) {
fprintf(stderr, "illegal suffix contains character '%c' in wrong position\n",
endptr[1]);
exit(1);
}
/*
* strtoll returns LLONG_MAX when overflow, if this happens,
* need to call strtoull to get the real size
*/
if (errno == ERANGE && ret == ULLONG_MAX) {
fprintf(stderr, "size value '%s' is too large for unsigned long long", s);
exit(1);
}
if (endptr[0]) {
c = tolower(endptr[0]);
switch (c) {
case 'e':
mult *= 1024;
/* fallthrough */
case 'p':
mult *= 1024;
/* fallthrough */
case 't':
mult *= 1024;
/* fallthrough */
case 'g':
mult *= 1024;
/* fallthrough */
case 'm':
mult *= 1024;
/* fallthrough */
case 'k':
mult *= 1024;
/* fallthrough */
case 'b':
break;
default:
fprintf(stderr, "unknown size descriptor '%c'", c);
exit(1);
}
}
/* Check whether ret * mult overflow */
if (fls64(ret) + fls64(mult) - 1 > 64) {
fprintf(stderr, "size value '%s' is too large for unsigned long long\n", s);
exit(1);
}
ret *= mult;
return ret;
}
char *option_string = "t:s:C:c:r:n:f:FR:T:m:W:M:w:i:D:oaOV";
static struct option long_options[] = {
{"appendmode", required_argument, 0, 'a'},
{"mmapsize", required_argument, 0, 'M'},
{"filesize", required_argument, 0, 'f'},
{"numfiles", required_argument, 0, 'n'},
{"readsize", required_argument, 0, 'R'},
{"writesize", required_argument, 0, 'W'},
{"readthreads", required_argument, 0, 'T'},
{"duthreads", required_argument, 0, 'D'},
{"threads", required_argument, 0, 't'},
{"runtime", required_argument, 0, 'r'},
{"warmuptime", required_argument, 0, 'w'},
{"sleeptime", required_argument, 0, 's'},
{"interval", required_argument, 0, 'i'},
{"cputime", required_argument, 0, 'c'},
{"cputhreads", required_argument, 0, 'C'},
{"memory", required_argument, 0, 'm'},
{"funksync", no_argument, 0, 'F'},
{"oddsizes", no_argument, 0, 'o'},
{"odirect", no_argument, 0, 'O'},
{"verifystartup", no_argument, 0, 'V'},
{"help", no_argument, 0, HELP_LONG_OPT},
{0, 0, 0, 0}
};
static void print_usage(void)
{
fprintf(stderr, "simoop usage:\n"
"\t-a (--appendmode): append onto working files\n"
"\t-t (--threads): worker threads (def: 16)\n"
"\t-m (--memory): memory to allocate during think time in each worker (def 128m)\n"
"\t-M (--mmapsize): amount to mmap to time allocator (64M)\n"
"\t-r (--runtime): How long to run before exiting (seconds, def: 30)\n"
"\t-w (--warmuptime): How long to warmup before resetting the stats (seconds, def: 60)\n"
"\t-i (--interval): Sleep time in seconds between latency reports (sec, def: 120\n"
"\t-V (--verify-startup): Verify all files on startup (def: no)\n"
"\t-s (--sleeptime): Sleep time in usecs between worker loops (usec, def: 10000\n"
"\t-c (--cputime): How long to think during each worker loop (seconds, def: 3)\n"
"\t-C (--cputhreads): How many threads do the cpu time loop (24)\n"
"\t-n (--numfiles): Number of files per directory tree (65536)\n"
"\t-f (--filesize): Size of each file (64M)\n"
"\t-R (--readsize): amount to read from each file (2M)\n"
"\t-W (--writesize): amount to write to tmp files (2M)\n"
"\t-T (--rwthreads): how many threads to read/write (8)\n"
"\t-D (--duthraeds): how many threads to scanning the working dirs (0)\n"
"\t-F (--funksync): fsync sometimes\n"
"\t-o (--oddsizes): randomize sizes to unaligned values\n"
"\t-O (--odirect): use O_DIRECT sometimes\n"
"\t dir1 [dir2 ... dirN]\n"
"\nall sizes are in bytes k,m,g,t modifiers can be used\n"
);
exit(1);
}
static void parse_options(int ac, char **av)
{
int c;
int found_sleeptime = -1;
int found_cputime = -1;
int i;
while (1) {
int option_index = 0;
c = getopt_long(ac, av, option_string,
long_options, &option_index);
if (c == -1)
break;
switch(c) {
case 'a':
append_mode = 1;
break;
case 's':
found_sleeptime = atoi(optarg);
break;
case 'm':
thinking_mem = parse_size(optarg);
break;
case 'M':
mmap_size = parse_size(optarg);
break;
case 'c':
found_cputime = atoi(optarg);
break;
case 'C':
cpu_threads = atoi(optarg);
break;
case 't':
worker_threads = atoi(optarg);
break;
case 'r':
runtime = atoi(optarg);
break;
case 'w':
warmup_seconds = atoi(optarg);
break;
case 'i':
interval_seconds = atoi(optarg);
break;
case 'F':
funksync = 1;
break;
case 'f':
file_size = parse_size(optarg);
file_size = verify_align_up(file_size);
break;
case 'n':
num_files = atoi(optarg);
num_files = ((num_files + FILES_SPLIT - 1)/ FILES_SPLIT) * FILES_SPLIT;
break;
case 'R':
read_size = parse_size(optarg);
read_size = verify_align_up(read_size);
break;
case 'W':
write_size = parse_size(optarg);
write_size = verify_align_up(write_size);
break;
case 'T':
rw_threads = atoi(optarg);
break;
case 'D':
du_threads = atoi(optarg);
break;
case 'o':
oddsizes = 1;
break;
case 'O':
odirect = 1;
break;
case 'V':
check_initial_files = 1;
break;
case '?':
case HELP_LONG_OPT:
print_usage();
break;
default:
break;
}
}
total_paths = ac - optind;
if (total_paths <= 0) {
fprintf(stderr, "No directories specified\n");
print_usage();
exit(1);
}
paths = malloc(sizeof(char *) * (total_paths + 1));
paths[total_paths] = NULL;
for (i = 0; i < total_paths; i++) {
paths[i] = strdup(av[optind++]);
fprintf(stderr, "adding path %s\n", paths[i]);
}
/*
* by default pipe mode zeros out cputime and sleep time. This
* sets them to any args that were actually passed in
*/
if (found_sleeptime >= 0)
sleeptime = found_sleeptime;
if (found_cputime >= 0)
cputime = found_cputime * 1000000;
if (cputime == 0 || cpu_threads == 0) {
cputime = 0;
cpu_threads = 0;
if (!found_sleeptime)
sleeptime = 0;
}
if (optind < ac) {
fprintf(stderr, "Error Extra arguments '%s'\n", av[optind]);
exit(1);
}
}
void tvsub(struct timeval * tdiff, struct timeval * t1, struct timeval * t0)
{
tdiff->tv_sec = t1->tv_sec - t0->tv_sec;
tdiff->tv_usec = t1->tv_usec - t0->tv_usec;
if (tdiff->tv_usec < 0 && tdiff->tv_sec > 0) {
tdiff->tv_sec--;
tdiff->tv_usec += 1000000;
if (tdiff->tv_usec < 0) {
fprintf(stderr, "lat_fs: tvsub shows test time ran backwards!\n");
exit(1);
}
}
/* time shouldn't go backwards!!! */
if (tdiff->tv_usec < 0 || t1->tv_sec < t0->tv_sec) {
tdiff->tv_sec = 0;
tdiff->tv_usec = 0;
}
}
/*
* returns the difference between start and stop in usecs. Negative values
* are turned into 0
*/
unsigned long long tvdelta(struct timeval *start, struct timeval *stop)
{
struct timeval td;
unsigned long long usecs;
tvsub(&td, stop, start);
usecs = td.tv_sec;
usecs *= 1000000;
usecs += td.tv_usec;
return (usecs);
}
/* mr axboe's magic latency histogram */
static unsigned int plat_val_to_idx(unsigned int val)
{
unsigned int msb, error_bits, base, offset;
/* Find MSB starting from bit 0 */
if (val == 0)
msb = 0;
else
msb = sizeof(val)*8 - __builtin_clz(val) - 1;
/*
* MSB <= (PLAT_BITS-1), cannot be rounded off. Use
* all bits of the sample as index
*/
if (msb <= PLAT_BITS)
return val;
/* Compute the number of error bits to discard*/
error_bits = msb - PLAT_BITS;
/* Compute the number of buckets before the group */
base = (error_bits + 1) << PLAT_BITS;
/*
* Discard the error bits and apply the mask to find the
* index for the buckets in the group
*/
offset = (PLAT_VAL - 1) & (val >> error_bits);
/* Make sure the index does not exceed (array size - 1) */
return (base + offset) < (PLAT_NR - 1) ?
(base + offset) : (PLAT_NR - 1);
}
/*
* Convert the given index of the bucket array to the value
* represented by the bucket
*/
static unsigned int plat_idx_to_val(unsigned int idx)
{
unsigned int error_bits, k, base;
if (idx >= PLAT_NR) {
fprintf(stderr, "idx %u is too large\n", idx);
exit(1);
}
/* MSB <= (PLAT_BITS-1), cannot be rounded off. Use
* all bits of the sample as index */
if (idx < (PLAT_VAL << 1))
return idx;
/* Find the group and compute the minimum value of that group */
error_bits = (idx >> PLAT_BITS) - 1;
base = 1 << (error_bits + PLAT_BITS);
/* Find its bucket number of the group */
k = idx % PLAT_VAL;
/* Return the mean of the range of the bucket */
return base + ((k + 0.5) * (1 << error_bits));
}
static unsigned int calc_percentiles(unsigned int *io_u_plat, unsigned long nr,
unsigned int **output)
{
unsigned long sum = 0;
unsigned int len, i, j = 0;
unsigned int oval_len = 0;
unsigned int *ovals = NULL;
int is_last;
len = 0;
while (len < PLAT_LIST_MAX && plist[len] != 0.0)
len++;
if (!len)
return 0;
/*
* Calculate bucket values, note down max and min values
*/
is_last = 0;
for (i = 0; i < PLAT_NR && !is_last; i++) {
sum += io_u_plat[i];
while (sum >= (plist[j] / 100.0 * nr)) {
if (j == oval_len) {
oval_len += 100;
ovals = realloc(ovals, oval_len * sizeof(unsigned int));
}
ovals[j] = plat_idx_to_val(i);
is_last = (j == len - 1);
if (is_last)
break;
j++;
}
}
*output = ovals;
return len;
}
static void calc_p99(struct stats *s, unsigned int *p50,
unsigned int *p95, unsigned int *p99)
{
unsigned int *ovals = NULL;
int len;
*p50 = 0;
*p95 = 0;
*p99 = 0;
len = calc_percentiles(s->plat, s->nr_samples, &ovals);
if (len && len > PLIST_P99)
*p99 = ovals[PLIST_P99];
if (len && len > PLIST_P99)
*p95 = ovals[PLIST_P95];
if (len && len > PLIST_P50)
*p50 = ovals[PLIST_P50];
if (ovals)
free(ovals);
}
/* fold latency info from s into d */
void combine_stats(struct stats *d, struct stats *s)
{
int i;
for (i = 0; i < PLAT_NR; i++)
d->plat[i] += s->plat[i];
d->nr_samples += s->nr_samples;
if (s->max > d->max)
d->max = s->max;
if (s->min < d->min)
d->min = s->min;
}
/* record a latency result into the histogram */
static void add_lat(struct stats *s, unsigned int us)
{
int lat_index = 0;
if (us > s->max)
s->max = us;
if (us < s->min)
s->min = us;
lat_index = plat_val_to_idx(us);
__sync_fetch_and_add(&s->plat[lat_index], 1);
__sync_fetch_and_add(&s->nr_samples, 1);
}
/*
* every thread has one of these, it comes out to about 19K thanks to the
* giant stats struct
*/
struct thread_data {
pthread_t tid;
/* per-thread count of worker loops over the life of the run */
unsigned long long work_done;
char *read_buf;
char *write_buf;
/* latency histogram */
struct stats stats[TOTAL_STATS];
};
#define nop __asm__ __volatile__("rep;nop": : :"memory")
static void usec_spin(unsigned long spin_time)
{
struct timeval now;
struct timeval start;
unsigned long long delta;
if (spin_time == 0)
return;
gettimeofday(&start, NULL);
while (1) {
gettimeofday(&now, NULL);
delta = tvdelta(&start, &now);
if (delta > spin_time)
return;
nop;
}
}
/*
* runs during initial file creation to create one dir
* in the tree
*/
static void make_one_dir(char *path, int a, int b)
{
char subdir[NAME_LEN];
int ret;
if (b >= 0)
ret = snprintf(subdir, NAME_LEN, "%s/%d/%d", path, a, b);
else
ret = snprintf(subdir, NAME_LEN, "%s/%d", path, a);
if (ret >= NAME_LEN || ret < 0) {
perror("file name too long\n");
exit(1);
}
ret = mkdir(subdir, 0700);
if (ret && errno != EEXIST) {
perror("mkdir");
exit(1);
}
}
/* create the subdir tree (no files) */
static void make_dirs(char *path)
{
int first;
int second;
for (first = 0; first < 64; first++) {
make_one_dir(path, first, -1);
for (second = 0; second < 64; second++) {
make_one_dir(path, first, second);
}
}
}
/*
* helper to form pathnames, if postfix isn't NULL, it'll be tossed
* onto the end of the filename
*/
static void join_path(char *name, char *path, int seq, char *postfix)
{
int a;
int b;
int ret;
a = seq % DIR_LEVEL;
b = (seq / DIR_LEVEL) % DIR_LEVEL;
if (postfix)
ret = snprintf(name, NAME_LEN, "%s/%d/%d/%d-%s", path, a, b, seq, postfix);
else
ret = snprintf(name, NAME_LEN, "%s/%d/%d/%d", path, a, b, seq);
if (ret >= NAME_LEN || ret < 0) {
perror("file name too long\n");
exit(1);
}
}
static void read_whole_file(char *path, int seq, char *postfix,
char *buf, size_t buf_size);
/* unlink working files not part of the main dataset for a given filename. */
static void unlink_extra(char *path, int seq, char *buf, size_t buf_size)
{
char name[NAME_LEN];
int ret;
join_path(name, path, seq, RESULT_FILE);
if (check_initial_files)
read_whole_file(path, seq, RESULT_FILE, buf, buf_size);
ret = unlink(name);
if (ret < 0 && errno != ENOENT) {
perror("unlink");
exit(1);
}
join_path(name, path, seq, TMP_FILE);
if (check_initial_files)
read_whole_file(path, seq, TMP_FILE, buf, buf_size);
ret = unlink(name);
if (ret < 0 && errno != ENOENT) {
perror("unlink");
exit(1);
}
}
/* construct a filename and return the fd */
static int open_path(char *path, int seq, char *postfix, int flags)
{
int fd;
char name[NAME_LEN];
join_path(name, path, seq, postfix);
fd = open(name, O_RDWR | O_CREAT | flags, 0600);
if (fd < 0) {
perror("open");
exit(1);
}
return fd;
}
static loff_t randomize_size(int sz)
{
if (!oddsizes)
return sz;
return rand() % sz;
}
static void maybe_fsync(int fd)
{
int ret;
if (!funksync)
return;
ret = sync_file_range(fd, 0, 0, SYNC_FILE_RANGE_WRITE);
if (ret < 0) {
perror("sync_file_range");
exit(1);
}
if ((rand() % 5) != 0)
return;
ret = fsync(fd);
if (ret < 0) {
perror("fsync");
exit(1);
}
}
static void maybe_toggle_odirect(int fd, unsigned long start,
unsigned long len)
{
int flags;
int ret;
if (!odirect)
return;
flags = fcntl(fd, F_GETFL);
/*
* if we're doing an unaligned IO, turn off O_DIRECT and
* exit
*/
if ((start & 511) || (len & 511)) {
if (flags & O_DIRECT) {
ret = fcntl(fd, F_SETFL, flags & (~O_DIRECT));
if (ret) {
perror("fcntl");
exit(1);
}
}
return;
}
if ((rand() % 3) != 0)
return;
if (flags & O_DIRECT) {
ret = fcntl(fd, F_SETFL, flags & (~O_DIRECT));
} else {
ret = fcntl(fd, F_SETFL, flags | O_DIRECT);
}
if (ret) {
perror("fcntl");
exit(1);
}
}
static void send_pwrite(int fd, char *buf, loff_t start, ssize_t bytes)
{
ssize_t this_write;
int ret;
int num_headers = bytes / VERIFY_ALIGNMENT;
int index;
int i;
for (i = 0; i < 3; i++) {
maybe_toggle_odirect(fd, start, this_write);
/*
* the goal here is to break up our huge IO into
* something that isn't completely page aligned.
*/
index = rand() % num_headers;
num_headers -= index;
this_write = index * VERIFY_ALIGNMENT;
bytes -= this_write;
while (this_write > 0) {
ret = pwrite(fd, buf, this_write, start);
if (ret <= 0) {
perror("pwrite");
exit(1);
}
start += ret;
this_write -= ret;
buf += ret;
}
maybe_fsync(fd);
if (bytes == 0)
break;
}
while (bytes > 0) {
ret = pwrite(fd, buf, bytes, start);
if (ret <= 0) {
perror("pwrite");
exit(1);
}
start += ret;
bytes -= ret;
buf += ret;
}
maybe_fsync(fd);
}
static void write_pattern(int fd, void *xxhash_state, char *buf,
int buffer_len, loff_t start, off_t length)
{
loff_t aligned_start;
char *p;
ssize_t this_write;
ssize_t cur_len;
/* round down the start */
aligned_start = verify_align(start);
length += start - aligned_start;
/* round up the length */
length = verify_align_up(length);
while (length > 0) {
if (length > buffer_len)
this_write = buffer_len;
else
this_write = length;
/* fill the buffer with the pattern and headers */
cur_len = 0;
p = buf;
while (cur_len < this_write) {
memcpy(p, pattern_buffer, VERIFY_ALIGNMENT);
crc_block(xxhash_state, p, aligned_start + cur_len,
VERIFY_ALIGNMENT, global_rand_seed);
cur_len += VERIFY_ALIGNMENT;
p += VERIFY_ALIGNMENT;
}
send_pwrite(fd, buf, aligned_start, this_write);
aligned_start += this_write;
length -= this_write;
}
}
static void read_and_crc(int fd, char *filename,
void **xxhash_state, char *buf,
int buffer_len, loff_t start, off_t length)
{
ssize_t ret;
loff_t aligned_start;
char *p;
ssize_t this_read;
ssize_t cur_len;
if (!read_size)
return;
aligned_start = verify_align(start);
length += start - aligned_start;
length = verify_align_up(length);
while (length > 0) {
if (length > buffer_len)
this_read = buffer_len;
else
this_read = length;
maybe_toggle_odirect(fd, aligned_start, this_read);
ret = pread(fd, buf, this_read, aligned_start);
if (ret != this_read) {
/* someone is deleting this file at the same time
* we're reading it
*/
break;
}
p = buf;
cur_len = 0;
while (cur_len < this_read) {
check_block_headers(xxhash_state, filename,
p, aligned_start + cur_len,
VERIFY_ALIGNMENT);
cur_len += VERIFY_ALIGNMENT;
p += VERIFY_ALIGNMENT;
}
aligned_start += this_read;
length -= this_read;
}
}
/* helper for startup, do initial writes to a given fd */
static void fill_one_file(int fd, void *xxhash_state, char *buf, size_t buf_size)
{
struct stat st;
int ret;
loff_t cur_size;
loff_t this_size = randomize_size(file_size);
ret = fstat(fd, &st);
if (ret < 0) {
perror("stat");
exit(1);
}
cur_size = st.st_size;
if (cur_size >= this_size)
return;
write_pattern(fd, xxhash_state, buf, buf_size, cur_size, this_size - cur_size);
}
/*
* The du thread runs every so often and stats every single file in a
* given path. This puts a lot of stress on the slab caches, and at
* least for XFS sets a bunch of radix bits used to track which allocation
* groups need to have their inodes cleaned. It creates stress inside
* the shrinker.
*/
static void *du_thread(void *arg)
{
unsigned long seq;
char *path = arg;
struct stat st;
int fd;
int ret;
while (!stopping) {
fprintf(stderr, "du thread is running %s\n", path);
for (seq = 0; seq < num_files; seq++) {
fd = open_path(path, seq, DATA_FILE, 0);
ret = fstat(fd, &st);
if (ret < 0 && errno != ENOENT) {
perror("fstat");
exit(1);
}
close(fd);
}
fprintf(stderr, "du thread is done %s\n", path);
/*
* we need some jitter in here so all the du threads are
* staggered
*/
sleep(45 + (rand() % 90));
}
return NULL;
}
/*
* create a temporary file and dirty it
*/
static void dirty_an_inode(char *path)
{
int fd;
int seq = rand() % num_files;
fd = open_path(path, seq, TMP_FILE, 0);
ftruncate(fd, 100);
ftruncate(fd, 0);
close(fd);
}
static void record_one_lat(struct stats *stat, struct timeval *start,
struct timeval *finish)
{
unsigned long long delta;
delta = tvdelta(start, finish);
if (delta > 0)
add_lat(stat, delta);
}
/* reads from a random (well aligned) offset in one of the main data files */
static void read_from_file(char *path, int seq, char *buf)
{
int fd;
int ret;
int i;
off_t offset;
ssize_t read_bytes = read_size;
struct stat st;
void *xxhash_state = XXH32_init(global_rand_seed);
char name[NAME_LEN];
join_path(name, path, seq, DATA_FILE);
fd = open_path(path, seq, DATA_FILE, 0);
ret = fstat(fd, &st);
if (ret < 0) {
perror("stat");
exit(1);
}
offset = rand() % 100;
offset = (offset * st.st_size) / 100;
offset = verify_align(offset);
/* we are too big? */
if (offset + read_bytes > st.st_size)
offset = 0;
if (offset + read_bytes > st.st_size)
read_bytes = verify_align(st.st_size);
read_and_crc(fd, name, xxhash_state, buf, read_bytes, offset,
read_bytes);
/* if we don't have writers making dirty inodes, make some here */
if (!write_size) {
for (i = 0; i < 8; i++)
dirty_an_inode(path);
}
close(fd);
XXH32_digest(xxhash_state);
}
static void read_whole_file(char *path, int seq, char *postfix,
char *buf, size_t buf_size)
{
int fd;
int ret;
int i;
off_t offset;
ssize_t read_bytes = buf_size;
struct stat st;
void *xxhash_state;
char name[NAME_LEN];
if (read_size == 0)
return;
join_path(name, path, seq, postfix);
fd = open(name, O_RDONLY, 0600);
if (fd < 0)
return;
ret = fstat(fd, &st);
if (ret < 0)
return;
xxhash_state = XXH32_init(global_rand_seed);
offset = 0;
read_and_crc(fd, name, xxhash_state, buf, read_bytes, offset,
st.st_size);
/* if we don't have writers making dirty inodes, make some here */
if (!write_size) {
for (i = 0; i < 8; i++)
dirty_an_inode(path);
}
close(fd);
XXH32_digest(xxhash_state);
}
/* creates a temp file in one of the subdirs and sends down write_bytes to it */
static void write_to_file(char *path, int seq, char *buf)
{
int fd;
int i;
int write_bytes = randomize_size(write_size);
loff_t offset;
void *xxhash_state = XXH32_init(global_rand_seed);
char *postfix;
char name[NAME_LEN];
if (append_mode) {
postfix = DATA_FILE;
fd = open_path(path, seq, DATA_FILE, O_APPEND);
offset = lseek(fd, 0, SEEK_CUR);
if (offset < 0) {
perror("lseek");
exit(1);
}
} else {
postfix = RESULT_FILE;
fd = open_path(path, seq, RESULT_FILE, 0);
offset = 0;
}
write_pattern(fd, xxhash_state, buf, write_size, offset, write_bytes * 4);
join_path(name, path, seq, postfix);
read_and_crc(fd, name, xxhash_state, buf, write_size, offset,
write_bytes * 4);
XXH32_digest(xxhash_state);
close(fd);
/* make some dirty inodes */
if (!append_mode) {
for (i = 0; i < 8; i++)
dirty_an_inode(path);
} else if (rand() % 10 == 0) {
/* delete some files */
ftruncate(fd, 0);
}
}
/* make all the worker files under a main path */
static void make_files(char *path, unsigned long seq_start,
unsigned long seq_num)
{
unsigned long seq;
int fd;
void *xxhash_state = XXH32_init(global_rand_seed);
int ret;
char *buf;
ret = posix_memalign((void **)(&buf), getpagesize(), BUF_SIZE);
if (ret) {
perror("posix_memalign");
exit(1);
}
for (seq = seq_start; seq < seq_start + seq_num; seq++) {
if (read_size && check_initial_files)
read_whole_file(path, seq, DATA_FILE, buf, BUF_SIZE);
fd = open_path(path, seq, DATA_FILE, O_APPEND);
fill_one_file(fd, xxhash_state, buf, BUF_SIZE);
close(fd);
/* cleanup from the last run */
unlink_extra(path, seq, buf, BUF_SIZE);
}
free(buf);
/* just to free the state */
XXH32_digest(xxhash_state);
}
struct filler {
char *path;
unsigned long seq_start;
unsigned long seq_num;
};
void *filler_thread(void *arg)
{
struct filler *filler = arg;
fprintf(stderr, "filling %s start %lu num %lu\n", filler->path, filler->seq_start, filler->seq_num);
make_dirs(filler->path);
make_files(filler->path, filler->seq_start, filler->seq_num);
free(filler);
return 0;
}
/* start one thread per path, create the directory tree */
void run_filler_threads(void)
{
int i;
int ret;
int j;
pthread_t *tids;
tids = malloc(sizeof(*tids) * total_paths * FILES_SPLIT);
if (!tids) {
perror("malloc");
exit(1);
}
fprintf(stderr, "Creating working files\n");
for (i = 0; i < total_paths; i++) {
for (j = 0; j < FILES_SPLIT; j++) {
pthread_t tid;
struct filler *filler;
filler = malloc(sizeof(*filler));
filler->path = paths[i];
filler->seq_start = j * (num_files / FILES_SPLIT);
filler->seq_num = num_files / FILES_SPLIT;
ret = pthread_create(&tid, NULL, filler_thread, filler);
if (ret) {
fprintf(stderr, "error %d from pthread_create\n", ret);
exit(1);
}
tids[i * j] = tid;
}
}
for (i = 0; i < total_paths; i++) {
for (j = 0; j < FILES_SPLIT; j++) {
pthread_join(tids[i * j], NULL);
}
}
fprintf(stderr, "done creating working files\n");
free(tids);
}
void *read_thread(void *arg)
{
int index = rand() % total_paths;
int seq = rand() % num_files;
char *path = paths[index];
char *buf = arg;
read_from_file(path, seq, buf);
return NULL;
}
/* startup reader threads, returns the tids for later waiting */
void read_some_files(char *buf, pthread_t *tids)
{
int i;
int ret;
for (i = 0; i < rw_threads; i++) {
pthread_t tid;
ret = pthread_create(&tid, NULL, read_thread,
buf + i * read_size);
if (ret) {
fprintf(stderr, "error %d from pthread_create\n", ret);
exit(1);
}
tids[i] = tid;
}
}
void *write_thread(void *arg)
{
int index = rand() % total_paths;
int seq = rand() % num_files;
char *path = paths[index];
char *buf = arg;
write_to_file(path, seq, buf);
return NULL;
}
/* startup writer threads, returns the tids for later waiting */
void write_some_files(char *buf, pthread_t *tids)
{
int i;
int ret;
for (i = 0; i < rw_threads; i++) {
pthread_t tid;
ret = pthread_create(&tid, NULL, write_thread,
buf + i * write_size);
if (ret) {
fprintf(stderr, "error %d from pthread_create\n", ret);
exit(1);
}
tids[i] = tid;
}
}
/* main work loop */
void *worker_thread(void *arg)
{
struct timeval now;
struct timeval start;
struct thread_data *td = arg;
char *read_buf = NULL;
char *write_buf = NULL;
char *mem = NULL;
pthread_t *read_tids;
pthread_t *write_tids;
char *mmap_ptr;
int warmup_zerod = 0;
int i;
int ret;
read_tids = malloc(sizeof(*read_tids) * rw_threads);
write_tids = malloc(sizeof(*write_tids) * rw_threads);
if (thinking_mem)
mem = malloc(thinking_mem);
if (!read_tids || !write_tids || (thinking_mem && !mem)) {
perror("allocation failed\n");
exit(1);
}
while(!stopping) {
/*
* reset our stats after warmup so we don't have noise
* from initial thread creation
*/
if (warmup_done && !warmup_zerod) {
memset(td->stats, 0, sizeof(*td->stats) * TOTAL_STATS);
warmup_zerod = 1;
}
ret = posix_memalign((void **)(&read_buf), getpagesize(), rw_threads * read_size);
ret |= posix_memalign((void **)(&write_buf), getpagesize(), rw_threads * write_size);
if (ret) {
perror("allocation");
exit(1);
}
/* if someone swapped out our thinking mem, bring it back */
if (thinking_mem)
memset(mem, 0, thinking_mem);
gettimeofday(&start, NULL);
/* Start the threads to read files */
if (read_size) {
read_some_files(read_buf, read_tids);
/* think in parallel */
usec_spin(cputime);
/* wait for our reads to finish */
for (i = 0; i < rw_threads; i++) {
pthread_join(read_tids[i], NULL);
}
gettimeofday(&now, NULL);
/*
* record how long the reading stage took. This
* includes all of the latencies for thread creation,
* doing the reads and waiting for completeion
*/
record_one_lat(&td->stats[READ_STATS], &start, &now);
}
/* write out the (pretend) results */
if (write_size) {
gettimeofday(&start, NULL);
write_some_files(write_buf, write_tids);
for (i = 0; i < rw_threads; i++) {
pthread_join(write_tids[i], NULL);
}
gettimeofday(&now, NULL);
record_one_lat(&td->stats[WRITE_STATS], &start, &now);
}
/*
* we also track the latency to allocate and fault in
* a chunk of pages. This is basicaly the user-visible
* impact of allocation stalls
*/
if (mmap_size) {
gettimeofday(&start, NULL);
mmap_ptr = mmap(NULL, mmap_size, PROT_READ | PROT_WRITE,
MAP_ANONYMOUS | MAP_PRIVATE,
-1, 0);
if (mmap_ptr == MAP_FAILED) {
perror("mmap");
exit(1);
}
/* fault in all those pages */
for (i = 0; i < mmap_size; i += 4096) {
mmap_ptr[i] = 'a';
}
/* measure how long all of this took */
gettimeofday(&now, NULL);
record_one_lat(&td->stats[ALLOC_STATS], &start, &now);
/* think again, pretending we've done something useful */
munmap(mmap_ptr, mmap_size);
}
usec_spin(cputime);
free(read_buf);
free(write_buf);
td->work_done++;
if (sleeptime)
usleep(sleeptime);
}
free(mem);
free(read_tids);
free(write_tids);
return NULL;
}
/*
* we want to keep the CPUs saturated so kswapd has to compete for CPU time
* these cpu threads don't do IO.
*/
static void *cpu_thread(void *arg)
{
char *unused = arg;
arg = unused;
while(!stopping) {
usec_spin(cputime);
usleep(1);
}
return NULL;
}
static void save_vmstat_rates(struct vmstat_info *vmstat_info)
{
int i;
for (i = 0; i < TOTAL_VMSTATS; i++) {
vmstat_info->last_rate[i] = vmstat_info->rate[i];
}
}
static void save_instant_vmstat_rates(struct vmstat_info *vmstat_info)
{
int i;
for (i = 0; i < TOTAL_VMSTATS; i++) {
vmstat_info->instant_rate[i] = vmstat_info->rate[i];
}
}
/*
* read in /proc/vmstat so we can sum the allocation stall lines and
* print them out
*/
static void read_vmstat(struct vmstat_info *vmstat_info)
{
int val;
FILE * fp;
char * line = NULL;
size_t len = 0;
ssize_t read;
int i;
fp = fopen("/proc/vmstat", "r");
if (fp == NULL)
return;
memset(vmstat_info->rate, 0, sizeof(double) * TOTAL_VMSTATS);
while ((read = getline(&line, &len, fp)) != -1) {
/*
* newer kernels break out different types of allocstall,
* just add them all together
*/
for (i = 0; i < TOTAL_VMSTATS; i++) {
if (strstr(line, vmstat_labels[i])) {
char *p = strchr(line, ' ');
if (p && p[1] != '\0') {
val = atoi(p + 1);
vmstat_info->rate[i] += val;
}
}
}
}
if (line)
free(line);
fclose(fp);
}
/*
* every worker thread tracks latencies individually. This pulls them all
* into a single destination stat array for printing
*/
static void collect_stats(struct stats *dest, struct thread_data *worker_threads_mem)
{
int i;
int j;
memset(dest, 0, sizeof(*dest) * TOTAL_STATS);
for (i = 0; i < TOTAL_STATS; i++) {
for (j = 0; j < worker_threads; j++)
combine_stats(&dest[i], &worker_threads_mem[j].stats[i]);
}
for (i = 0; i < TOTAL_STATS; i++) {
unsigned int p50 = 0, p95 = 0, p99 = 0;
calc_p99(&dest[i], &p50, &p95, &p99);
printf("%s (p50: %'d) (p95: %'d) (p99: %'d)\n",
stat_labels[i], p50, p95, p99);
}
}
/*
* print out the current stats, along with averages and latency histogram
* numbers
*/
static void print_latencies(struct thread_data *worker_threads_mem,
struct stats *stats,
struct stats *work_done_stats,
struct vmstat_info *vmstat_info,
double work_done, double instant_work_done,
unsigned long long delta,
unsigned long long instant_delta)
{
double rate;
double instant_rate;
double seconds = (double)delta / 1000000;
unsigned int p50, p95, p99;
int i;
printf("___\n");
printf("Run time: %.0f seconds\n", seconds);
/* this also prints the histogram results from the workers */
collect_stats(stats, worker_threads_mem);
/* calculate the work done over this period, add to histogram */
rate = (work_done * 1000000) / delta;
instant_rate = (instant_work_done * 1000000) / instant_delta;
add_lat(work_done_stats, rate * 100);
calc_p99(work_done_stats, &p50, &p95, &p99);
printf("work rate = %.2f/sec (avg %.2f/sec) (p50: %.2f) (p95: %.2f) (p99: %.2f)\n",
instant_rate, rate, (double)p50/100.00, (double)p95/100.00, (double)p99/100.00);
for (i = 0; i < TOTAL_VMSTATS; i++) {
rate = vmstat_info->rate[i] - vmstat_info->last_rate[i];
if (rate < 0)
rate = 0;
instant_rate = vmstat_info->rate[i] - vmstat_info->instant_rate[i];
if (instant_rate < 0)
instant_rate = 0;
rate = (rate * 1000000) / delta;
instant_rate = (instant_rate * 1000000) / delta;
add_lat(&vmstat_info->stats[i], rate * 100);
calc_p99(&vmstat_info->stats[i], &p50, &p95, &p99);
printf("%s rate = %.2f/sec (avg: %.2f) (p50: %.2f) (p95: %.2f) (p99: %.2f)\n",
vmstat_labels[i], instant_rate, rate,
(double)p50/100.00, (double)p95/100.00, (double)p99/100.00);
}
}
/* runtime from the command line is in seconds. Sleep until its up */
static void sleep_for_runtime(struct thread_data *worker_threads_mem)
{
struct timeval now;
struct timeval start;
struct timeval rate_start;
struct timeval instant_start;
unsigned long long delta;
unsigned long long rate_delta;
unsigned long long instant_delta;
unsigned long long runtime_usec = runtime * 1000000;
unsigned long long warmup_usec = warmup_seconds * 1000000;
double work_done = 0;
double instant_work_done = 0;
double last_work_done = 0;
struct stats stats[TOTAL_STATS];
struct vmstat_info vmstat_info;
struct stats work_done_stats;
int i;
gettimeofday(&start, NULL);
rate_start = start;
memset(&work_done_stats, 0, sizeof(work_done_stats));
memset(&vmstat_info, 0, sizeof(vmstat_info));
read_vmstat(&vmstat_info);
save_vmstat_rates(&vmstat_info);
save_instant_vmstat_rates(&vmstat_info);
while(1) {
gettimeofday(&now, NULL);
instant_start = now;
delta = tvdelta(&start, &now);
if (!warmup_done && delta > warmup_usec) {
printf("Warmup complete (%d seconds)\n", warmup_seconds);
__sync_synchronize();
warmup_done = 1;
memset(&work_done_stats, 0, sizeof(work_done_stats));
memset(&vmstat_info, 0, sizeof(vmstat_info));
read_vmstat(&vmstat_info);
save_vmstat_rates(&vmstat_info);
save_instant_vmstat_rates(&vmstat_info);
last_work_done = work_done;
rate_start = now;
}
instant_work_done = work_done;
if (runtime_usec == 0 || delta < runtime_usec)
sleep(interval_seconds);
else
break;
gettimeofday(&now, NULL);
rate_delta = tvdelta(&rate_start, &now);
instant_delta = tvdelta(&instant_start, &now);
work_done = 0;
for (i = 0; i < worker_threads; i++)
work_done += worker_threads_mem[i].work_done;
read_vmstat(&vmstat_info);
print_latencies(worker_threads_mem, stats,
&work_done_stats,
&vmstat_info,
work_done - last_work_done,
work_done - instant_work_done,
rate_delta, instant_delta);
save_instant_vmstat_rates(&vmstat_info);
}
__sync_synchronize();
stopping = 1;
for (i = 0; i < cpu_threads; i++) {
pthread_join(worker_threads_mem[i + worker_threads].tid, NULL);
}
work_done = 0;
for (i = 0; i < worker_threads; i++)
work_done += worker_threads_mem[i].work_done;
gettimeofday(&now, NULL);
rate_delta = tvdelta(&rate_start, &now);
instant_delta = tvdelta(&instant_start, &now);
read_vmstat(&vmstat_info);
print_latencies(worker_threads_mem, stats,
&work_done_stats,
&vmstat_info,
work_done - last_work_done,
work_done - instant_work_done,
rate_delta, instant_delta);
}
int main(int ac, char **av)
{
int i;
int ret;
int index;
struct thread_data *worker_threads_mem = NULL;
pthread_t *du_tids;
setlocale(LC_NUMERIC, "");
parse_options(ac, av);
if (du_threads > total_paths)
du_threads = total_paths;
init_pattern_buffer(pattern_buffer, global_rand_seed);
/* du threads might be zero */
du_tids = calloc(du_threads + 1, sizeof(pthread_t));
worker_threads_mem = calloc(worker_threads + cpu_threads,
sizeof(struct thread_data));
if (!worker_threads_mem || !du_tids) {
perror("calloc");
exit(1);
}
/* fill up our directory tree. This might take a really long time */
run_filler_threads();
stopping = 0;
/* worker threads do the IO and the real stuff */
for (i = 0; i < worker_threads; i++) {
pthread_t tid;
ret = pthread_create(&tid, NULL, worker_thread,
worker_threads_mem + i);
if (ret) {
perror("pthread_create");
exit(1);
}
worker_threads_mem[i].tid = tid;
}
/* CPU threads just soak up cycles */
for (i = 0; i < cpu_threads; i++) {
pthread_t tid;
ret = pthread_create(&tid, NULL, cpu_thread,
worker_threads_mem + i + worker_threads);
if (ret) {
perror("pthread_create");
exit(1);
}
worker_threads_mem[i + worker_threads].tid = tid;
}
/*
* du threads read in inodes, the goal is to have it happen on just
* a couple of paths
*/
index = rand();
for (i = 0; i < du_threads; i++) {
ret = pthread_create(&du_tids[i], NULL, du_thread,
paths[index++ % total_paths]);
if (ret) {
fprintf(stderr, "error %d from pthread_create\n", ret);
exit(1);
}
}
/* let all the magic happen and collect results */
sleep_for_runtime(worker_threads_mem);
for (i = 0; i < du_threads; i++) {
pthread_join(du_tids[i], NULL);
}
free(worker_threads_mem);
return 0;
}