| /* |
| * schbench.c |
| * |
| * Copyright (C) 2016 Facebook |
| * Chris Mason <clm@fb.com> |
| * |
| * GPLv2, portions copied from the kernel and from Jens Axboe's fio |
| * |
| * gcc -Wall -O0 -W schbench.c -o schbench -lpthread |
| */ |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <pthread.h> |
| #include <fcntl.h> |
| #include <unistd.h> |
| #include <errno.h> |
| #include <getopt.h> |
| #include <sys/time.h> |
| #include <time.h> |
| #include <string.h> |
| #include <math.h> |
| #include <linux/futex.h> |
| #include <sys/syscall.h> |
| #include <sys/sysinfo.h> |
| |
| #define PLAT_BITS 8 |
| #define PLAT_VAL (1 << PLAT_BITS) |
| #define PLAT_GROUP_NR 19 |
| #define PLAT_NR (PLAT_GROUP_NR * PLAT_VAL) |
| #define PLAT_LIST_MAX 20 |
| |
| /* when -p is on, how much do we send back and forth */ |
| #define PIPE_TRANSFER_BUFFER (1 * 1024 * 1024) |
| |
| #define USEC_PER_SEC (1000000) |
| |
| /* -m number of message threads */ |
| static int message_threads = 1; |
| /* -t number of workers per message thread */ |
| static int worker_threads = 0; |
| /* -r seconds */ |
| static int runtime = 30; |
| /* -w seconds */ |
| static int warmuptime = 5; |
| /* -i seconds */ |
| static int intervaltime = 10; |
| /* -z seconds */ |
| static int zerotime = 0; |
| /* -f cache_footprint_kb */ |
| static unsigned long cache_footprint_kb = 256; |
| /* -n operations */ |
| static unsigned long operations = 5; |
| /* -A, int percentage busy */ |
| static int auto_rps = 0; |
| static int auto_rps_target_hit = 0; |
| /* -p bytes */ |
| static int pipe_test = 0; |
| /* -R requests per sec */ |
| static int requests_per_sec = 0; |
| /* -C bool for calibration mode */ |
| static int calibrate_only = 0; |
| /* -L bool no locking during CPU work */ |
| static int skip_locking = 0; |
| |
| /* the message threads flip this to true when they decide runtime is up */ |
| static volatile unsigned long stopping = 0; |
| |
| /* size of matrices to multiply */ |
| static unsigned long matrix_size = 0; |
| |
| struct per_cpu_lock { |
| pthread_mutex_t lock; |
| } __attribute__((aligned)); |
| |
| static struct per_cpu_lock *per_cpu_locks; |
| static int num_cpu_locks; |
| |
| /* |
| * one stat struct per thread data, when the workers sleep this records the |
| * latency between when they are woken up and when they actually get the |
| * CPU again. The message threads sum up the stats of all the workers and |
| * then bubble them up to main() for printing |
| */ |
| struct stats { |
| unsigned int plat[PLAT_NR]; |
| unsigned long nr_samples; |
| unsigned int max; |
| unsigned int min; |
| }; |
| |
| struct stats rps_stats; |
| |
| /* this defines which latency profiles get printed */ |
| #define PLIST_20 (1 << 0) |
| #define PLIST_50 (1 << 1) |
| #define PLIST_90 (1 << 2) |
| #define PLIST_99 (1 << 3) |
| #define PLIST_99_INDEX 3 |
| #define PLIST_999 (1 << 4) |
| |
| #define PLIST_FOR_LAT (PLIST_50 | PLIST_90 | PLIST_99 | PLIST_999) |
| #define PLIST_FOR_RPS (PLIST_20 | PLIST_50 | PLIST_90) |
| |
| static double plist[PLAT_LIST_MAX] = { 20.0, 50.0, 90.0, 99.0, 99.9 }; |
| |
| enum { |
| HELP_LONG_OPT = 1, |
| }; |
| |
| char *option_string = "p:m:t:Cr:R:w:i:z:A:n:F:L"; |
| static struct option long_options[] = { |
| {"pipe", required_argument, 0, 'p'}, |
| {"message-threads", required_argument, 0, 'm'}, |
| {"threads", required_argument, 0, 't'}, |
| {"runtime", required_argument, 0, 'r'}, |
| {"rps", required_argument, 0, 'R'}, |
| {"auto-rps", required_argument, 0, 'A'}, |
| {"cache_footprint", required_argument, 0, 'f'}, |
| {"calibrate", no_argument, 0, 'C'}, |
| {"no-locking", no_argument, 0, 'L'}, |
| {"operations", required_argument, 0, 'n'}, |
| {"warmuptime", required_argument, 0, 'w'}, |
| {"intervaltime", required_argument, 0, 'i'}, |
| {"zerotime", required_argument, 0, 'z'}, |
| {"help", no_argument, 0, HELP_LONG_OPT}, |
| {0, 0, 0, 0} |
| }; |
| |
| static void print_usage(void) |
| { |
| fprintf(stderr, "schbench usage:\n" |
| "\t-C (--calibrate): run our work loop and report on timing\n" |
| "\t-L (--no-locking): don't spinlock during CPU work (def: locking on)\n" |
| "\t-m (--message-threads): number of message threads (def: 1)\n" |
| "\t-t (--threads): worker threads per message thread (def: num_cpus)\n" |
| "\t-r (--runtime): How long to run before exiting (seconds, def: 30)\n" |
| "\t-F (--cache_footprint): cache footprint (kb, def: 256)\n" |
| "\t-n (--operations): think time operations to perform (def: 5)\n" |
| "\t-A (--auto-rps): grow RPS until cpu utilization hits target (def: none)\n" |
| "\t-p (--pipe): transfer size bytes to simulate a pipe test (def: 0)\n" |
| "\t-R (--rps): requests per second mode (count, def: 0)\n" |
| "\t-w (--warmuptime): how long to warmup before resettings stats (seconds, def: 5)\n" |
| "\t-i (--intervaltime): interval for printing latencies (seconds, def: 10)\n" |
| "\t-z (--zerotime): interval for zeroing latencies (seconds, def: never)\n" |
| ); |
| exit(1); |
| } |
| |
| static void parse_options(int ac, char **av) |
| { |
| int c; |
| int found_warmuptime = -1; |
| |
| while (1) { |
| int option_index = 0; |
| |
| c = getopt_long(ac, av, option_string, |
| long_options, &option_index); |
| |
| if (c == -1) |
| break; |
| |
| switch(c) { |
| case 'C': |
| calibrate_only = 1; |
| break; |
| case 'L': |
| skip_locking = 1; |
| break; |
| case 'A': |
| auto_rps = atoi(optarg); |
| warmuptime = 0; |
| if (requests_per_sec == 0) |
| requests_per_sec = 10; |
| break; |
| case 'p': |
| pipe_test = atoi(optarg); |
| if (pipe_test > PIPE_TRANSFER_BUFFER) { |
| fprintf(stderr, "pipe size too big, using %d\n", |
| PIPE_TRANSFER_BUFFER); |
| pipe_test = PIPE_TRANSFER_BUFFER; |
| } |
| warmuptime = 0; |
| break; |
| case 'w': |
| found_warmuptime = atoi(optarg); |
| break; |
| case 'm': |
| message_threads = atoi(optarg); |
| break; |
| case 't': |
| worker_threads = atoi(optarg); |
| break; |
| case 'r': |
| runtime = atoi(optarg); |
| break; |
| case 'i': |
| intervaltime = atoi(optarg); |
| break; |
| case 'z': |
| zerotime = atoi(optarg); |
| break; |
| case 'R': |
| requests_per_sec = atoi(optarg); |
| break; |
| case 'n': |
| operations = atoi(optarg); |
| break; |
| case 'F': |
| cache_footprint_kb = atoi(optarg); |
| break; |
| case '?': |
| case HELP_LONG_OPT: |
| print_usage(); |
| break; |
| default: |
| break; |
| } |
| } |
| |
| /* |
| * by default pipe mode zeros out some options. This |
| * sets them to any args that were actually passed in |
| */ |
| if (found_warmuptime >= 0) |
| warmuptime = found_warmuptime; |
| |
| if (calibrate_only) |
| skip_locking = 1; |
| |
| if (runtime < 30) |
| warmuptime = 0; |
| |
| if (optind < ac) { |
| fprintf(stderr, "Error Extra arguments '%s'\n", av[optind]); |
| exit(1); |
| } |
| } |
| |
| void tvsub(struct timeval * tdiff, struct timeval * t1, struct timeval * t0) |
| { |
| tdiff->tv_sec = t1->tv_sec - t0->tv_sec; |
| tdiff->tv_usec = t1->tv_usec - t0->tv_usec; |
| if (tdiff->tv_usec < 0 && tdiff->tv_sec > 0) { |
| tdiff->tv_sec--; |
| tdiff->tv_usec += USEC_PER_SEC; |
| if (tdiff->tv_usec < 0) { |
| fprintf(stderr, "lat_fs: tvsub shows test time ran backwards!\n"); |
| exit(1); |
| } |
| } |
| |
| /* time shouldn't go backwards!!! */ |
| if (tdiff->tv_usec < 0 || t1->tv_sec < t0->tv_sec) { |
| tdiff->tv_sec = 0; |
| tdiff->tv_usec = 0; |
| } |
| } |
| |
| /* |
| * returns the difference between start and stop in usecs. Negative values |
| * are turned into 0 |
| */ |
| unsigned long long tvdelta(struct timeval *start, struct timeval *stop) |
| { |
| struct timeval td; |
| unsigned long long usecs; |
| |
| tvsub(&td, stop, start); |
| usecs = td.tv_sec; |
| usecs *= USEC_PER_SEC; |
| usecs += td.tv_usec; |
| return (usecs); |
| } |
| |
| /* mr axboe's magic latency histogram */ |
| static unsigned int plat_val_to_idx(unsigned int val) |
| { |
| unsigned int msb, error_bits, base, offset; |
| |
| /* Find MSB starting from bit 0 */ |
| if (val == 0) |
| msb = 0; |
| else |
| msb = sizeof(val)*8 - __builtin_clz(val) - 1; |
| |
| /* |
| * MSB <= (PLAT_BITS-1), cannot be rounded off. Use |
| * all bits of the sample as index |
| */ |
| if (msb <= PLAT_BITS) |
| return val; |
| |
| /* Compute the number of error bits to discard*/ |
| error_bits = msb - PLAT_BITS; |
| |
| /* Compute the number of buckets before the group */ |
| base = (error_bits + 1) << PLAT_BITS; |
| |
| /* |
| * Discard the error bits and apply the mask to find the |
| * index for the buckets in the group |
| */ |
| offset = (PLAT_VAL - 1) & (val >> error_bits); |
| |
| /* Make sure the index does not exceed (array size - 1) */ |
| return (base + offset) < (PLAT_NR - 1) ? |
| (base + offset) : (PLAT_NR - 1); |
| } |
| |
| /* |
| * Convert the given index of the bucket array to the value |
| * represented by the bucket |
| */ |
| static unsigned int plat_idx_to_val(unsigned int idx) |
| { |
| unsigned int error_bits, k, base; |
| |
| if (idx >= PLAT_NR) { |
| fprintf(stderr, "idx %u is too large\n", idx); |
| exit(1); |
| } |
| |
| /* MSB <= (PLAT_BITS-1), cannot be rounded off. Use |
| * all bits of the sample as index */ |
| if (idx < (PLAT_VAL << 1)) |
| return idx; |
| |
| /* Find the group and compute the minimum value of that group */ |
| error_bits = (idx >> PLAT_BITS) - 1; |
| base = 1 << (error_bits + PLAT_BITS); |
| |
| /* Find its bucket number of the group */ |
| k = idx % PLAT_VAL; |
| |
| /* Return the mean of the range of the bucket */ |
| return base + ((k + 0.5) * (1 << error_bits)); |
| } |
| |
| |
| static unsigned int calc_percentiles(unsigned int *io_u_plat, unsigned long nr, |
| unsigned int **output, |
| unsigned long **output_counts) |
| { |
| unsigned long sum = 0; |
| unsigned int len, i, j = 0; |
| unsigned int oval_len = 0; |
| unsigned int *ovals = NULL; |
| unsigned long *ocounts = NULL; |
| unsigned long last = 0; |
| int is_last; |
| |
| len = 0; |
| while (len < PLAT_LIST_MAX && plist[len] != 0.0) |
| len++; |
| |
| if (!len) |
| return 0; |
| |
| /* |
| * Calculate bucket values, note down max and min values |
| */ |
| is_last = 0; |
| for (i = 0; i < PLAT_NR && !is_last; i++) { |
| sum += io_u_plat[i]; |
| while (sum >= (plist[j] / 100.0 * nr)) { |
| if (j == oval_len) { |
| oval_len += 100; |
| ovals = realloc(ovals, oval_len * sizeof(unsigned int)); |
| ocounts = realloc(ocounts, oval_len * sizeof(unsigned long)); |
| } |
| |
| ovals[j] = plat_idx_to_val(i); |
| ocounts[j] = sum; |
| is_last = (j == len - 1); |
| if (is_last) |
| break; |
| j++; |
| } |
| } |
| |
| for (i = 1; i < len; i++) { |
| last += ocounts[i - 1]; |
| ocounts[i] -= last; |
| } |
| *output = ovals; |
| *output_counts = ocounts; |
| return len; |
| } |
| |
| static void show_latencies(struct stats *s, char *label, char *units, |
| unsigned long long runtime, unsigned long mask, |
| unsigned long star) |
| { |
| unsigned int *ovals = NULL; |
| unsigned long *ocounts = NULL; |
| unsigned int len, i; |
| |
| len = calc_percentiles(s->plat, s->nr_samples, &ovals, &ocounts); |
| if (len) { |
| fprintf(stderr, "%s percentiles (%s) runtime %llu (s) (%lu total samples)\n", |
| label, units, runtime, s->nr_samples); |
| for (i = 0; i < len; i++) { |
| unsigned long bit = 1 << i; |
| if (!(mask & bit)) |
| continue; |
| fprintf(stderr, "\t%s%2.1fth: %-10u (%lu samples)\n", |
| bit == star ? "* " : " ", |
| plist[i], ovals[i], ocounts[i]); |
| } |
| } |
| |
| if (ovals) |
| free(ovals); |
| if (ocounts) |
| free(ocounts); |
| |
| fprintf(stderr, "\t min=%u, max=%u\n", s->min, s->max); |
| } |
| |
| /* fold latency info from s into d */ |
| void combine_stats(struct stats *d, struct stats *s) |
| { |
| int i; |
| for (i = 0; i < PLAT_NR; i++) |
| d->plat[i] += s->plat[i]; |
| d->nr_samples += s->nr_samples; |
| if (s->max > d->max) |
| d->max = s->max; |
| if (d->min == 0 || s->min < d->min) |
| d->min = s->min; |
| } |
| |
| /* record a latency result into the histogram */ |
| static void add_lat(struct stats *s, unsigned int us) |
| { |
| int lat_index = 0; |
| |
| if (us > s->max) |
| s->max = us; |
| if (s->min == 0 || us < s->min) |
| s->min = us; |
| |
| lat_index = plat_val_to_idx(us); |
| __sync_fetch_and_add(&s->plat[lat_index], 1); |
| __sync_fetch_and_add(&s->nr_samples, 1); |
| } |
| |
| struct request { |
| struct timeval start_time; |
| struct request *next; |
| }; |
| |
| /* |
| * every thread has one of these, it comes out to about 19K thanks to the |
| * giant stats struct |
| */ |
| struct thread_data { |
| pthread_t tid; |
| /* ->next is for placing us on the msg_thread's list for waking */ |
| struct thread_data *next; |
| |
| /* ->request is all of our pending request */ |
| struct request *request; |
| |
| /* our parent thread and messaging partner */ |
| struct thread_data *msg_thread; |
| |
| /* |
| * the msg thread stuffs gtod in here before waking us, so we can |
| * measure scheduler latency |
| */ |
| struct timeval wake_time; |
| |
| /* keep the futex and the wake_time in the same cacheline */ |
| int futex; |
| |
| /* mr axboe's magic latency histogram */ |
| struct stats wakeup_stats; |
| struct stats request_stats; |
| unsigned long long loop_count; |
| unsigned long long runtime; |
| unsigned long pending; |
| |
| char pipe_page[PIPE_TRANSFER_BUFFER]; |
| |
| /* matrices to multiply */ |
| unsigned long *data; |
| }; |
| |
| /* we're so fancy we make our own futex wrappers */ |
| #define FUTEX_BLOCKED 0 |
| #define FUTEX_RUNNING 1 |
| |
| static int futex(int *uaddr, int futex_op, int val, |
| const struct timespec *timeout, int *uaddr2, int val3) |
| { |
| return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3); |
| } |
| |
| /* |
| * wakeup a process waiting on a futex, making sure they are really waiting |
| * first |
| */ |
| static void fpost(int *futexp) |
| { |
| int s; |
| |
| if (__sync_bool_compare_and_swap(futexp, FUTEX_BLOCKED, |
| FUTEX_RUNNING)) { |
| s = futex(futexp, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0); |
| if (s == -1) { |
| perror("FUTEX_WAKE"); |
| exit(1); |
| } |
| } |
| } |
| |
| /* |
| * wait on a futex, with an optional timeout. Make sure to set |
| * the futex to FUTEX_BLOCKED beforehand. |
| * |
| * This will return zero if all went well, or return -ETIMEDOUT if you |
| * hit the timeout without getting posted |
| */ |
| static int fwait(int *futexp, struct timespec *timeout) |
| { |
| int s; |
| while (1) { |
| /* Is the futex available? */ |
| if (__sync_bool_compare_and_swap(futexp, FUTEX_RUNNING, |
| FUTEX_BLOCKED)) { |
| break; /* Yes */ |
| } |
| /* Futex is not available; wait */ |
| s = futex(futexp, FUTEX_WAIT_PRIVATE, FUTEX_BLOCKED, timeout, NULL, 0); |
| if (s == -1 && errno != EAGAIN) { |
| if (errno == ETIMEDOUT) |
| return -ETIMEDOUT; |
| perror("futex-FUTEX_WAIT"); |
| exit(1); |
| } |
| } |
| return 0; |
| } |
| |
| /* |
| * cmpxchg based list prepend |
| */ |
| static void xlist_add(struct thread_data *head, struct thread_data *add) |
| { |
| struct thread_data *old; |
| struct thread_data *ret; |
| |
| while (1) { |
| old = head->next; |
| add->next = old; |
| ret = __sync_val_compare_and_swap(&head->next, old, add); |
| if (ret == old) |
| break; |
| } |
| } |
| |
| /* |
| * xchg based list splicing. This returns the entire list and |
| * replaces the head->next with NULL |
| */ |
| static struct thread_data *xlist_splice(struct thread_data *head) |
| { |
| struct thread_data *old; |
| struct thread_data *ret; |
| |
| while (1) { |
| old = head->next; |
| ret = __sync_val_compare_and_swap(&head->next, old, NULL); |
| if (ret == old) |
| break; |
| } |
| return ret; |
| } |
| |
| /* |
| * cmpxchg based list prepend |
| */ |
| static struct request *request_add(struct thread_data *head, struct request *add) |
| { |
| struct request *old; |
| struct request *ret; |
| |
| while (1) { |
| old = head->request; |
| add->next = old; |
| ret = __sync_val_compare_and_swap(&head->request, old, add); |
| if (ret == old) |
| return old; |
| } |
| } |
| |
| /* |
| * xchg based list splicing. This returns the entire list and |
| * replaces the head->request with NULL. The list is reversed before |
| * returning |
| */ |
| static struct request *request_splice(struct thread_data *head) |
| { |
| struct request *old; |
| struct request *ret; |
| struct request *reverse = NULL; |
| |
| while (1) { |
| old = head->request; |
| ret = __sync_val_compare_and_swap(&head->request, old, NULL); |
| if (ret == old) |
| break; |
| } |
| |
| while(ret) { |
| struct request *tmp = ret; |
| ret = ret->next; |
| tmp->next = reverse; |
| reverse = tmp; |
| } |
| return reverse; |
| } |
| |
| static struct request *allocate_request(void) |
| { |
| struct request *ret = malloc(sizeof(*ret)); |
| |
| if (!ret) { |
| perror("malloc"); |
| exit(1); |
| } |
| |
| gettimeofday(&ret->start_time, NULL); |
| ret->next = NULL; |
| return ret; |
| } |
| |
| |
| /* |
| * Wake everyone currently waiting on the message list, filling in their |
| * thread_data->wake_time with the current time. |
| * |
| * It's not exactly the current time, it's really the time at the start of |
| * the list run. We want to detect when the scheduler is just preempting the |
| * waker and giving away the rest of its timeslice. So we gtod once at |
| * the start of the loop and use that for all the threads we wake. |
| * |
| * Since pipe mode ends up measuring this other ways, we do the gtod |
| * every time in pipe mode |
| */ |
| static void xlist_wake_all(struct thread_data *td) |
| { |
| struct thread_data *list; |
| struct thread_data *next; |
| struct timeval now; |
| |
| list = xlist_splice(td); |
| gettimeofday(&now, NULL); |
| while (list) { |
| next = list->next; |
| list->next = NULL; |
| if (pipe_test) { |
| memset(list->pipe_page, 1, pipe_test); |
| gettimeofday(&list->wake_time, NULL); |
| } else { |
| memcpy(&list->wake_time, &now, sizeof(now)); |
| } |
| fpost(&list->futex); |
| list = next; |
| } |
| } |
| |
| /* |
| * called by worker threads to send a message and wait for the answer. |
| * In reality we're just trading one cacheline with the gtod and futex in |
| * it, but that's good enough. We gtod after waking and use that to |
| * record scheduler latency. |
| */ |
| static struct request *msg_and_wait(struct thread_data *td) |
| { |
| struct request *req; |
| struct timeval now; |
| unsigned long long delta; |
| |
| if (pipe_test) |
| memset(td->pipe_page, 2, pipe_test); |
| |
| /* set ourselves to blocked */ |
| td->futex = FUTEX_BLOCKED; |
| gettimeofday(&td->wake_time, NULL); |
| |
| /* add us to the list */ |
| if (requests_per_sec) { |
| td->pending = 0; |
| req = request_splice(td); |
| if (req) { |
| td->futex = FUTEX_RUNNING; |
| return req; |
| } |
| } else { |
| xlist_add(td->msg_thread, td); |
| } |
| |
| fpost(&td->msg_thread->futex); |
| |
| /* |
| * don't wait if the main threads are shutting down, |
| * they will never kick us fpost has a full barrier, so as long |
| * as the message thread walks his list after setting stopping, |
| * we shouldn't miss the wakeup |
| */ |
| if (!stopping) { |
| /* if he hasn't already woken us up, wait */ |
| fwait(&td->futex, NULL); |
| } |
| gettimeofday(&now, NULL); |
| delta = tvdelta(&td->wake_time, &now); |
| if (delta > 0) |
| add_lat(&td->wakeup_stats, delta); |
| |
| return NULL; |
| } |
| |
| /* |
| * read /proc/stat, return the percentage of non-idle time since |
| * the last read. |
| */ |
| float read_busy(int fd, char *buf, int len, |
| unsigned long long *total_time_ret, |
| unsigned long long *idle_time_ret) |
| { |
| unsigned long long total_time = 0; |
| unsigned long long idle_time = 0; |
| unsigned long long delta; |
| unsigned long long delta_idle; |
| unsigned long long val; |
| int col = 1; |
| int ret; |
| char *c; |
| char *save = NULL; |
| |
| |
| ret = lseek(fd, 0, SEEK_SET); |
| if (ret < 0) { |
| perror("lseek"); |
| exit(1); |
| } |
| ret = read(fd, buf, len-1); |
| if (ret < 0) { |
| perror("failed to read /proc/stat"); |
| exit(1); |
| } |
| buf[ret] = '\0'; |
| |
| /* find the newline */ |
| c = strchr(buf, '\n'); |
| if (c == NULL) { |
| perror("unable to parse /proc/stat"); |
| exit(1); |
| } |
| *c = '\0'; |
| |
| /* cpu 590315893 45841886 375984879 82585100131 166708940 0 5453892 0 0 0 */ |
| c = strtok_r(buf, " ", &save); |
| if (strcmp(c, "cpu") != 0) { |
| perror("unable to parse summary in /proc/stat"); |
| exit(1); |
| } |
| |
| while (c != NULL) { |
| c = strtok_r(NULL, " ", &save); |
| if (!c) |
| break; |
| val = atoll(c); |
| if (col++ == 4) |
| idle_time = val; |
| total_time += val; |
| } |
| |
| if (*total_time_ret == 0) { |
| *total_time_ret = total_time; |
| *idle_time_ret = idle_time; |
| return 0.0; |
| } |
| |
| /* delta is the total time spent doing everything */ |
| delta = total_time - *total_time_ret; |
| delta_idle = idle_time - *idle_time_ret; |
| |
| *total_time_ret = total_time; |
| *idle_time_ret = idle_time; |
| |
| return 100.00 - ((float)delta_idle/(float)delta) * 100.00; |
| } |
| |
| #if defined(__x86_64__) || defined(__i386__) |
| #define nop __asm__ __volatile__("rep;nop": : :"memory") |
| #elif defined(__aarch64__) |
| #define nop __asm__ __volatile__("yield" ::: "memory") |
| #elif defined(__powerpc64__) |
| #define nop __asm__ __volatile__("nop": : :"memory") |
| #else |
| #error Unsupported architecture |
| #endif |
| |
| /* |
| * once the message thread starts all his children, this is where he |
| * loops until our runtime is up. Basically this sits around waiting |
| * for posting by the worker threads, and replying to their messages. |
| */ |
| static void run_msg_thread(struct thread_data *td) |
| { |
| while (1) { |
| td->futex = FUTEX_BLOCKED; |
| xlist_wake_all(td); |
| |
| if (stopping) { |
| xlist_wake_all(td); |
| break; |
| } |
| fwait(&td->futex, NULL); |
| } |
| } |
| |
| void auto_scale_rps(int *proc_stat_fd, |
| unsigned long long *total_time, |
| unsigned long long *total_idle) |
| { |
| int fd = *proc_stat_fd; |
| float busy = 0; |
| char proc_stat_buf[512]; |
| int first_run = 0; |
| float delta; |
| float target = 1; |
| |
| if (fd == -1) { |
| fd = open("/proc/stat", O_RDONLY); |
| if (fd < 0) { |
| perror("unable to open /proc/stat"); |
| exit(1); |
| } |
| *proc_stat_fd = fd; |
| first_run = 1; |
| } |
| busy = read_busy(fd, proc_stat_buf, 512, total_time, total_idle); |
| if (first_run) |
| return; |
| if (busy < auto_rps) { |
| delta = (float)auto_rps / busy; |
| /* delta is > 1 */ |
| if (delta > 3) { |
| delta = 3; |
| } else if (delta < 1.2) { |
| delta = 1 + (delta - 1) / 8; |
| if (delta < 1.05 && !auto_rps_target_hit) { |
| auto_rps_target_hit = 1; |
| memset(&rps_stats, 0, sizeof(rps_stats)); |
| } |
| |
| } else if (delta < 1.5) { |
| delta = 1 + (delta - 1) / 4; |
| } |
| target = ceilf((float)requests_per_sec * delta); |
| if (target >= (1ULL << 31)) { |
| /* |
| * sometimes we don't have enough threads to hit the |
| * target load |
| */ |
| target = requests_per_sec; |
| } |
| } else if (busy > auto_rps) { |
| /* delta < 1 */ |
| delta = (float)auto_rps / busy; |
| if (delta < 0.3) { |
| delta = 0.3; |
| } else if (delta > .9) { |
| delta += (1 - delta) / 8; |
| if (delta > .95 && !auto_rps_target_hit) { |
| auto_rps_target_hit = 1; |
| memset(&rps_stats, 0, sizeof(rps_stats)); |
| } |
| } else if (delta > .8) { |
| delta += (1 - delta) / 4; |
| } |
| target = floorf((float)requests_per_sec * delta); |
| if (target <= 0) |
| target = 0; |
| } else { |
| target = requests_per_sec; |
| if (!auto_rps_target_hit) { |
| auto_rps_target_hit = 1; |
| memset(&rps_stats, 0, sizeof(rps_stats)); |
| } |
| } |
| requests_per_sec = target; |
| } |
| |
| /* |
| * once the message thread starts all his children, this is where he |
| * loops until our runtime is up. Basically this sits around waiting |
| * for posting by the worker threads, replying to their messages. |
| */ |
| static void run_rps_thread(struct thread_data *worker_threads_mem) |
| { |
| /* start and end of the thread run */ |
| struct timeval start; |
| struct timeval now; |
| struct request *request; |
| unsigned long long delta; |
| |
| /* how long do we sleep between each wake */ |
| unsigned long sleep_time; |
| int batch = 8; |
| int cur_tid = 0; |
| int i; |
| |
| while (1) { |
| gettimeofday(&start, NULL); |
| sleep_time = (USEC_PER_SEC / requests_per_sec) * batch; |
| for (i = 1; i < requests_per_sec + 1; i++) { |
| struct thread_data *worker; |
| |
| gettimeofday(&now, NULL); |
| |
| worker = worker_threads_mem + cur_tid % worker_threads; |
| cur_tid++; |
| |
| /* at some point, there's just too much, don't queue more */ |
| if (worker->pending > 8) { |
| continue; |
| } |
| worker->pending++; |
| request = allocate_request(); |
| request_add(worker, request); |
| memcpy(&worker->wake_time, &now, sizeof(now)); |
| fpost(&worker->futex); |
| if ((i % batch) == 0) |
| usleep(sleep_time); |
| } |
| gettimeofday(&now, NULL); |
| |
| delta = tvdelta(&start, &now); |
| while (delta < USEC_PER_SEC) { |
| delta = USEC_PER_SEC - delta; |
| usleep(delta); |
| |
| gettimeofday(&now, NULL); |
| delta = tvdelta(&start, &now); |
| } |
| |
| if (stopping) { |
| for (i = 0; i < worker_threads; i++) |
| fpost(&worker_threads_mem[i].futex); |
| break; |
| } |
| } |
| |
| if (auto_rps) |
| fprintf(stderr, "final rps goal was %d\n", requests_per_sec); |
| } |
| |
| /* |
| * multiply two matrices in a naive way to emulate some cache footprint |
| */ |
| static void do_some_math(struct thread_data *thread_data) |
| { |
| unsigned long i, j, k; |
| unsigned long *m1, *m2, *m3; |
| |
| m1 = &thread_data->data[0]; |
| m2 = &thread_data->data[matrix_size * matrix_size]; |
| m3 = &thread_data->data[2 * matrix_size * matrix_size]; |
| |
| for (i = 0; i < matrix_size; i++) { |
| for (j = 0; j < matrix_size; j++) { |
| m3[i * matrix_size + j] = 0; |
| |
| for (k = 0; k < matrix_size; k++) |
| m3[i * matrix_size + j] += |
| m1[i * matrix_size + k] * |
| m2[k * matrix_size + j]; |
| } |
| } |
| } |
| |
| static pthread_mutex_t *lock_this_cpu(void) |
| { |
| int cpu; |
| int cur_cpu; |
| pthread_mutex_t *lock; |
| |
| again: |
| cpu = sched_getcpu(); |
| if (cpu < 0) { |
| perror("sched_getcpu failed\n"); |
| exit(1); |
| } |
| lock = &per_cpu_locks[cpu].lock; |
| while (pthread_mutex_trylock(lock) != 0) |
| nop; |
| |
| cur_cpu = sched_getcpu(); |
| if (cur_cpu < 0) { |
| perror("sched_getcpu failed\n"); |
| exit(1); |
| } |
| |
| if (cur_cpu != cpu) { |
| /* we got the lock but we migrated */ |
| pthread_mutex_unlock(lock); |
| goto again; |
| } |
| return lock; |
| |
| } |
| |
| /* |
| * spin or do some matrix arithmetic |
| */ |
| static void do_work(struct thread_data *td) |
| { |
| pthread_mutex_t *lock = NULL; |
| unsigned long i; |
| |
| /* using --calibrate or --no-locking skips the locks */ |
| if (!skip_locking) |
| lock = lock_this_cpu(); |
| for (i = 0; i < operations; i++) |
| do_some_math(td); |
| if (!skip_locking) |
| pthread_mutex_unlock(lock); |
| } |
| |
| /* |
| * the worker thread is pretty simple, it just does a single spin and |
| * then waits on a message from the message thread |
| */ |
| void *worker_thread(void *arg) |
| { |
| struct thread_data *td = arg; |
| struct timeval now; |
| struct timeval work_start; |
| struct timeval start; |
| unsigned long long delta; |
| struct request *req = NULL; |
| |
| gettimeofday(&start, NULL); |
| while(1) { |
| if (stopping) |
| break; |
| |
| req = msg_and_wait(td); |
| if (requests_per_sec && !req) |
| continue; |
| |
| do { |
| struct request *tmp; |
| |
| if (pipe_test) { |
| gettimeofday(&work_start, NULL); |
| } else { |
| if (calibrate_only) { |
| /* |
| * in calibration mode, don't include the |
| * usleep in the timing |
| */ |
| usleep(100); |
| gettimeofday(&work_start, NULL); |
| } else { |
| /* |
| * lets start off with some simulated networking, |
| * and also make sure we get a fresh clean timeslice |
| */ |
| gettimeofday(&work_start, NULL); |
| usleep(100); |
| } |
| do_work(td); |
| } |
| |
| gettimeofday(&now, NULL); |
| |
| td->runtime = tvdelta(&start, &now); |
| if (req) { |
| tmp = req->next; |
| free(req); |
| req = tmp; |
| } |
| td->loop_count++; |
| |
| delta = tvdelta(&work_start, &now); |
| if (delta > 0) |
| add_lat(&td->request_stats, delta); |
| } while (req); |
| } |
| gettimeofday(&now, NULL); |
| td->runtime = tvdelta(&start, &now); |
| |
| return NULL; |
| } |
| |
| /* |
| * the message thread starts his own gaggle of workers and then sits around |
| * replying when they post him. He collects latency stats as all the threads |
| * exit |
| */ |
| void *message_thread(void *arg) |
| { |
| struct thread_data *td = arg; |
| struct thread_data *worker_threads_mem = NULL; |
| int i; |
| int ret; |
| |
| worker_threads_mem = td + 1; |
| |
| if (!worker_threads_mem) { |
| perror("unable to allocate ram"); |
| pthread_exit((void *)-ENOMEM); |
| } |
| |
| for (i = 0; i < worker_threads; i++) { |
| pthread_t tid; |
| worker_threads_mem[i].data = malloc(3 * sizeof(unsigned long) * matrix_size * matrix_size); |
| if (!worker_threads_mem[i].data) { |
| perror("unable to allocate ram"); |
| pthread_exit((void *)-ENOMEM); |
| } |
| |
| worker_threads_mem[i].msg_thread = td; |
| ret = pthread_create(&tid, NULL, worker_thread, |
| worker_threads_mem + i); |
| if (ret) { |
| fprintf(stderr, "error %d from pthread_create\n", ret); |
| exit(1); |
| } |
| worker_threads_mem[i].tid = tid; |
| } |
| |
| if (requests_per_sec) |
| run_rps_thread(worker_threads_mem); |
| else |
| run_msg_thread(td); |
| |
| for (i = 0; i < worker_threads; i++) { |
| fpost(&worker_threads_mem[i].futex); |
| pthread_join(worker_threads_mem[i].tid, NULL); |
| } |
| return NULL; |
| } |
| |
| static char *units[] = { "B", "KB", "MB", "GB", "TB", "PB", "EB", NULL}; |
| |
| static double pretty_size(double number, char **str) |
| { |
| int divs = 0; |
| |
| while(number >= 1024) { |
| if (units[divs + 1] == NULL) |
| break; |
| divs++; |
| number /= 1024; |
| } |
| *str = units[divs]; |
| return number; |
| } |
| |
| static void combine_message_thread_stats(struct stats *wakeup_stats, |
| struct stats *request_stats, |
| struct thread_data *thread_data, |
| unsigned long long *loop_count, |
| unsigned long long *loop_runtime) |
| { |
| struct thread_data *worker; |
| int i; |
| int msg_i; |
| int index = 0; |
| |
| *loop_count = 0; |
| *loop_runtime = 0; |
| for (msg_i = 0; msg_i < message_threads; msg_i++) { |
| index++; |
| for (i = 0; i < worker_threads; i++) { |
| worker = thread_data + index++; |
| combine_stats(wakeup_stats, &worker->wakeup_stats); |
| combine_stats(request_stats, &worker->request_stats); |
| *loop_count += worker->loop_count; |
| *loop_runtime += worker->runtime; |
| } |
| } |
| } |
| |
| static void reset_thread_stats(struct thread_data *thread_data) |
| { |
| struct thread_data *worker; |
| int i; |
| int msg_i; |
| int index = 0; |
| |
| memset(&rps_stats, 0, sizeof(rps_stats)); |
| for (msg_i = 0; msg_i < message_threads; msg_i++) { |
| index++; |
| for (i = 0; i < worker_threads; i++) { |
| worker = thread_data + index++; |
| memset(&worker->wakeup_stats, 0, sizeof(worker->wakeup_stats)); |
| memset(&worker->request_stats, 0, sizeof(worker->request_stats)); |
| } |
| } |
| } |
| |
| /* runtime from the command line is in seconds. Sleep until its up */ |
| static void sleep_for_runtime(struct thread_data *message_threads_mem) |
| { |
| struct timeval now; |
| struct timeval zero_time; |
| struct timeval last_calc; |
| struct timeval start; |
| struct stats wakeup_stats; |
| struct stats request_stats; |
| unsigned long long last_loop_count = 0; |
| unsigned long long loop_count; |
| unsigned long long loop_runtime; |
| unsigned long long delta; |
| unsigned long long runtime_delta; |
| unsigned long long runtime_usec = runtime * USEC_PER_SEC; |
| unsigned long long warmup_usec = warmuptime * USEC_PER_SEC; |
| unsigned long long interval_usec = intervaltime * USEC_PER_SEC; |
| unsigned long long zero_usec = zerotime * USEC_PER_SEC; |
| int warmup_done = 0; |
| int total_intervals = 0; |
| |
| /* if we're autoscaling RPS */ |
| int proc_stat_fd = -1; |
| unsigned long long total_time = 0; |
| unsigned long long total_idle = 0; |
| int done = 0; |
| |
| memset(&wakeup_stats, 0, sizeof(wakeup_stats)); |
| gettimeofday(&start, NULL); |
| last_calc = start; |
| zero_time = start; |
| |
| while(!done) { |
| gettimeofday(&now, NULL); |
| runtime_delta = tvdelta(&start, &now); |
| |
| if (runtime_usec && runtime_delta >= runtime_usec) |
| done = 1; |
| |
| if (!requests_per_sec && !pipe_test && |
| runtime_delta > warmup_usec && |
| !warmup_done && warmuptime) { |
| warmup_done = 1; |
| fprintf(stderr, "warmup done, zeroing stats\n"); |
| zero_time = now; |
| reset_thread_stats(message_threads_mem); |
| } else if (!pipe_test) { |
| delta = tvdelta(&last_calc, &now); |
| if (delta >= interval_usec) { |
| double rps; |
| |
| memset(&wakeup_stats, 0, sizeof(wakeup_stats)); |
| memset(&request_stats, 0, sizeof(request_stats)); |
| combine_message_thread_stats(&wakeup_stats, |
| &request_stats, message_threads_mem, |
| &loop_count, &loop_runtime); |
| last_calc = now; |
| |
| rps = (double)((loop_count - last_loop_count) * USEC_PER_SEC) / delta; |
| last_loop_count = loop_count; |
| |
| if (!auto_rps || auto_rps_target_hit) |
| add_lat(&rps_stats, rps); |
| |
| show_latencies(&wakeup_stats, "Wakeup Latencies", |
| "usec", runtime_delta / USEC_PER_SEC, |
| PLIST_FOR_LAT, PLIST_99); |
| show_latencies(&request_stats, "Request Latencies", |
| "usec", runtime_delta / USEC_PER_SEC, |
| PLIST_FOR_LAT, PLIST_99); |
| if (total_intervals > 10) { |
| show_latencies(&rps_stats, "RPS", |
| "requests", runtime_delta / USEC_PER_SEC, |
| PLIST_FOR_RPS, PLIST_50); |
| } |
| fprintf(stderr, "current rps: %.2f\n", rps); |
| total_intervals++; |
| } |
| } |
| if (zero_usec) { |
| unsigned long long zero_delta; |
| zero_delta = tvdelta(&zero_time, &now); |
| if (zero_delta > zero_usec) { |
| zero_time = now; |
| reset_thread_stats(message_threads_mem); |
| } |
| } |
| if (auto_rps) |
| auto_scale_rps(&proc_stat_fd, &total_time, &total_idle); |
| if (!done) |
| sleep(1); |
| } |
| if (proc_stat_fd >= 0) |
| close(proc_stat_fd); |
| __sync_synchronize(); |
| stopping = 1; |
| } |
| |
| |
| int main(int ac, char **av) |
| { |
| int i; |
| int ret; |
| struct thread_data *message_threads_mem = NULL; |
| struct stats wakeup_stats; |
| struct stats request_stats; |
| double loops_per_sec; |
| unsigned long long loop_count; |
| unsigned long long loop_runtime; |
| |
| parse_options(ac, av); |
| |
| if (worker_threads == 0) { |
| worker_threads = get_nprocs(); |
| fprintf(stderr, "setting worker threads to %d\n", worker_threads); |
| } |
| |
| matrix_size = sqrt(cache_footprint_kb * 1024 / 3 / sizeof(unsigned long)); |
| |
| num_cpu_locks = get_nprocs(); |
| per_cpu_locks = calloc(num_cpu_locks, sizeof(struct per_cpu_lock)); |
| if (!per_cpu_locks) { |
| perror("unable to allocate memory for per cpu locks\n"); |
| exit(1); |
| } |
| |
| for (i = 0; i < num_cpu_locks; i++) { |
| pthread_mutex_t *lock = &per_cpu_locks[i].lock; |
| ret = pthread_mutex_init(lock, NULL); |
| if (ret) { |
| perror("mutex init failed\n"); |
| exit(1); |
| } |
| } |
| |
| requests_per_sec /= message_threads; |
| loops_per_sec = 0; |
| stopping = 0; |
| memset(&wakeup_stats, 0, sizeof(wakeup_stats)); |
| memset(&request_stats, 0, sizeof(request_stats)); |
| memset(&rps_stats, 0, sizeof(rps_stats)); |
| |
| message_threads_mem = calloc(message_threads * worker_threads + message_threads, |
| sizeof(struct thread_data)); |
| |
| |
| if (!message_threads_mem) { |
| perror("unable to allocate message threads"); |
| exit(1); |
| } |
| |
| /* start our message threads, each one starts its own workers */ |
| for (i = 0; i < message_threads; i++) { |
| pthread_t tid; |
| int index = i * worker_threads + i; |
| ret = pthread_create(&tid, NULL, message_thread, |
| message_threads_mem + index); |
| if (ret) { |
| fprintf(stderr, "error %d from pthread_create\n", ret); |
| exit(1); |
| } |
| message_threads_mem[index].tid = tid; |
| } |
| |
| sleep_for_runtime(message_threads_mem); |
| |
| for (i = 0; i < message_threads; i++) { |
| int index = i * worker_threads + i; |
| fpost(&message_threads_mem[index].futex); |
| pthread_join(message_threads_mem[index].tid, NULL); |
| } |
| memset(&wakeup_stats, 0, sizeof(wakeup_stats)); |
| memset(&request_stats, 0, sizeof(request_stats)); |
| combine_message_thread_stats(&wakeup_stats, &request_stats, |
| message_threads_mem, |
| &loop_count, &loop_runtime); |
| |
| loops_per_sec = loop_count * USEC_PER_SEC; |
| loops_per_sec /= loop_runtime; |
| |
| free(message_threads_mem); |
| |
| if (pipe_test) { |
| char *pretty; |
| double mb_per_sec; |
| |
| show_latencies(&wakeup_stats, "Wakeup Latencies", "usec", runtime, |
| PLIST_20 | PLIST_FOR_LAT, PLIST_99); |
| |
| mb_per_sec = (loop_count * pipe_test * USEC_PER_SEC) / loop_runtime; |
| mb_per_sec = pretty_size(mb_per_sec, &pretty); |
| fprintf(stderr, "avg worker transfer: %.2f ops/sec %.2f%s/s\n", |
| loops_per_sec, mb_per_sec, pretty); |
| } else { |
| show_latencies(&wakeup_stats, "Wakeup Latencies", "usec", runtime, |
| PLIST_FOR_LAT, PLIST_99); |
| show_latencies(&request_stats, "Request Latencies", "usec", runtime, |
| PLIST_FOR_LAT, PLIST_99); |
| show_latencies(&rps_stats, "RPS", "requests", runtime, |
| PLIST_FOR_RPS, PLIST_50); |
| if (!auto_rps) |
| fprintf(stderr, "average rps: %.2f\n", |
| (double)(loop_count) / runtime); |
| } |
| |
| return 0; |
| } |