server: getting closer to a proper setup Signed-off-by: Jens Axboe <axboe@kernel.dk>
diff --git a/backend.c b/backend.c index 00a23db..60abf10 100644 --- a/backend.c +++ b/backend.c
@@ -1946,5 +1946,6 @@ fio_mutex_remove(writeout_mutex); fio_mutex_remove(disk_thread_mutex); stat_exit(); + reset_fio_state(); return exit_value; }
diff --git a/server.c b/server.c index 8324543..282cc0a 100644 --- a/server.c +++ b/server.c
@@ -30,12 +30,16 @@ int exit_backend = 0; -static int server_fd = -1; +struct server_state { + int server_fd; +}; + static char *fio_server_arg; static char *bind_sock; static struct sockaddr_in saddr_in; static struct sockaddr_in6 saddr_in6; static int use_ipv6; + #ifdef CONFIG_ZLIB static unsigned int has_zlib = 1; #else @@ -49,8 +53,21 @@ int signal; int exited; pid_t pid; + pthread_t thread; }; +struct fio_client_startup { + struct fio_mutex *pid_mutex; + struct fio_mutex *ffi_mutex; + pthread_t thread; + void *data; + struct fio_fork_item *ffi; + pid_t pid; + int sk; +}; + +static pthread_key_t s_key; + static const char *fio_server_ops[FIO_NET_CMD_NR] = { "", "QUIT", @@ -72,6 +89,22 @@ "CMD_IOLOG", }; +static int get_s(void) +{ + return (uintptr_t) pthread_getspecific(s_key); +} + +static void assign_s(int fd) +{ + assert(!pthread_setspecific(s_key, (void *) (uintptr_t) fd)); +} + +static void setup_s(void) +{ + assert(!pthread_key_create(&s_key, NULL)); + assign_s(-1); +} + const char *fio_server_op(unsigned int op) { static char buf[32]; @@ -390,6 +423,12 @@ return ret; } +static int __fio_net_send_cmd(uint16_t opcode, const void *buf, off_t size, + uint64_t *tagptr, struct flist_head *list) +{ + return fio_net_send_cmd(get_s(), opcode, buf, size, tagptr, list); +} + static int fio_net_send_simple_stack_cmd(int sk, uint16_t opcode, uint64_t tag) { struct fio_net_cmd cmd; @@ -453,7 +492,8 @@ return fio_net_send_ack(sk, NULL, error, signal); } -static void fio_server_add_fork_item(pid_t pid, struct flist_head *list) +static void fio_server_add_fork_item(struct fio_client_startup *s, + struct flist_head *list) { struct fio_fork_item *ffi; @@ -461,48 +501,27 @@ ffi->exitval = 0; ffi->signal = 0; ffi->exited = 0; - ffi->pid = pid; + ffi->pid = s->pid; + ffi->thread = s->thread; + s->ffi = ffi; flist_add_tail(&ffi->list, list); } -static void fio_server_add_conn_pid(struct flist_head *conn_list, pid_t pid) +static void fio_server_add_pid(struct fio_client_startup *s, + struct flist_head *list) { - dprint(FD_NET, "server: forked off connection job (pid=%u)\n", (int) pid); - fio_server_add_fork_item(pid, conn_list); -} - -static void fio_server_add_job_pid(struct flist_head *job_list, pid_t pid) -{ - dprint(FD_NET, "server: forked off job job (pid=%u)\n", (int) pid); - fio_server_add_fork_item(pid, job_list); + dprint(FD_NET, "server: forked off job (pid=%u)\n", (int) s->pid); + fio_server_add_fork_item(s, list); } static void fio_server_check_fork_item(struct fio_fork_item *ffi) { - int ret, status; - - ret = waitpid(ffi->pid, &status, WNOHANG); - if (ret < 0) { - if (errno == ECHILD) { - log_err("fio: connection pid %u disappeared\n", (int) ffi->pid); - ffi->exited = 1; - } else - log_err("fio: waitpid: %s\n", strerror(errno)); - } else if (ret == ffi->pid) { - if (WIFSIGNALED(status)) { - ffi->signal = WTERMSIG(status); - ffi->exited = 1; - } - if (WIFEXITED(status)) { - if (WEXITSTATUS(status)) - ffi->exitval = WEXITSTATUS(status); - ffi->exited = 1; - } - } } static void fio_server_fork_item_done(struct fio_fork_item *ffi) { + int server_fd = get_s(); + dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", (int) ffi->pid, ffi->signal, ffi->exitval); /* @@ -539,22 +558,86 @@ fio_server_check_fork_items(conn_list); } -static int handle_run_cmd(struct flist_head *job_list, struct fio_net_cmd *cmd) +static struct fio_fork_item *thread_startup_routine(struct fio_client_startup *startup) { - pid_t pid; + struct fio_fork_item *ffi; + + assign_s(startup->sk); + + printf("assign sk=%d, tid=%u\n", startup->sk, gettid()); + + startup->pid = gettid(); + fio_mutex_up(startup->pid_mutex); + + fio_mutex_down(startup->ffi_mutex); + ffi = startup->ffi; + fio_mutex_remove(startup->ffi_mutex); + + fio_mutex_up(startup->pid_mutex); + return ffi; +} + +static void *fio_client_thread(void *data) +{ + struct fio_fork_item *ffi; int ret; - set_genesis_time(); - - pid = fork(); - if (pid) { - fio_server_add_job_pid(job_list, pid); - return 0; - } + ffi = thread_startup_routine(data); ret = fio_backend(); - free_threads_shm(); - _exit(ret); + ffi->exitval = ret; + ffi->exited = 1; + pthread_exit((void *) (uintptr_t) ret); + return (void *) (uintptr_t) ret; +} + +static int start_thread_sync(void *(*fn)(void *), void *data, + struct flist_head *list, int sk) +{ + struct fio_client_startup startup; + int ret; + + startup.pid_mutex = fio_mutex_init(FIO_MUTEX_LOCKED); + if (!startup.pid_mutex) { + log_err("fio: failed to allocate client startup mutex\n"); + return 1; + } + + startup.ffi_mutex = fio_mutex_init(FIO_MUTEX_LOCKED); + if (!startup.ffi_mutex) { + fio_mutex_remove(startup.pid_mutex); + log_err("fio: failed to allocate client startup mutex\n"); + return 1; + } + + startup.data = data; + startup.sk = sk; + + ret = pthread_create(&startup.thread, NULL, fn, &startup); + if (ret) { + fio_mutex_remove(startup.pid_mutex); + fio_mutex_remove(startup.ffi_mutex); + log_err("fio: failed to fork off client thread: %s\n", strerror(ret)); + return ret; + } + + fio_mutex_down(startup.pid_mutex); + + fio_server_add_pid(&startup, list); + + fio_mutex_up(startup.ffi_mutex); + + fio_mutex_down(startup.pid_mutex); + fio_mutex_remove(startup.pid_mutex); + return 0; +} + +static int handle_run_cmd(int sk, struct flist_head *list, + struct fio_net_cmd *cmd) +{ + set_genesis_time(); + + return start_thread_sync(fio_client_thread, NULL, list, sk); } static int handle_job_cmd(struct fio_net_cmd *cmd) @@ -562,6 +645,7 @@ struct cmd_job_pdu *pdu = (struct cmd_job_pdu *) cmd->payload; void *buf = pdu->buf; struct cmd_start_pdu spdu; + int server_fd = get_s(); pdu->buf_len = le32_to_cpu(pdu->buf_len); pdu->client_type = le32_to_cpu(pdu->client_type); @@ -573,7 +657,7 @@ spdu.jobs = cpu_to_le32(thread_number); spdu.stat_outputs = cpu_to_le32(stat_number); - fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL); + __fio_net_send_cmd(FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL); return 0; } @@ -584,6 +668,7 @@ struct cmd_line_pdu *clp; unsigned long offset; struct cmd_start_pdu spdu; + int server_fd = get_s(); char **argv; int i; @@ -613,7 +698,7 @@ spdu.jobs = cpu_to_le32(thread_number); spdu.stat_outputs = cpu_to_le32(stat_number); - fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL); + __fio_net_send_cmd(FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL); return 0; } @@ -648,7 +733,7 @@ use_zlib = 0; } - return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe), &tag, NULL); + return __fio_net_send_cmd(FIO_NET_CMD_PROBE, &probe, sizeof(probe), &tag, NULL); } static int handle_send_eta_cmd(struct fio_net_cmd *cmd) @@ -693,7 +778,7 @@ je->is_pow2 = cpu_to_le32(je->is_pow2); je->unit_base = cpu_to_le32(je->unit_base); - fio_net_send_cmd(server_fd, FIO_NET_CMD_ETA, je, size, &tag, NULL); + __fio_net_send_cmd(FIO_NET_CMD_ETA, je, size, &tag, NULL); free(je); return 0; } @@ -712,23 +797,25 @@ struct cmd_add_job_pdu *pdu = (struct cmd_add_job_pdu *) cmd->payload; struct thread_data *td; uint32_t tnumber; + int fd = get_s(); tnumber = le32_to_cpu(pdu->thread_number); dprint(FD_NET, "server: updating options for job %u\n", tnumber); if (!tnumber || tnumber > thread_number) { - send_update_job_reply(server_fd, cmd->tag, ENODEV); + send_update_job_reply(fd, cmd->tag, ENODEV); return 0; } td = &threads[tnumber - 1]; convert_thread_options_to_cpu(&td->o, &pdu->top); - send_update_job_reply(server_fd, cmd->tag, 0); + send_update_job_reply(fd, cmd->tag, 0); return 0; } -static int handle_command(struct flist_head *job_list, struct fio_net_cmd *cmd) +static int handle_command(int sk, struct flist_head *job_list, + struct fio_net_cmd *cmd) { int ret; @@ -756,7 +843,7 @@ ret = handle_send_eta_cmd(cmd); break; case FIO_NET_CMD_RUN: - ret = handle_run_cmd(job_list, cmd); + ret = handle_run_cmd(sk, job_list, cmd); break; case FIO_NET_CMD_UPDATE_JOB: ret = handle_update_job_cmd(cmd); @@ -776,7 +863,6 @@ int ret = 0; reset_fio_state(); - server_fd = sk; /* read forever */ while (!exit_backend) { @@ -822,7 +908,7 @@ break; } - ret = handle_command(&job_list, cmd); + ret = handle_command(sk, &job_list, cmd); if (ret) break; @@ -834,7 +920,29 @@ free(cmd); close(sk); - _exit(ret); + return ret; +} + +static void *fio_connection_thread(void *data) +{ + struct fio_client_startup *startup = data; + int sk = (uintptr_t) startup->data; + struct fio_fork_item *ffi; + int ret; + + ffi = thread_startup_routine(startup); + + ffi->exitval = ret = handle_connection(sk); + ffi->exited = 1; + pthread_exit((void *)(uintptr_t) ret); + return (void *)(uintptr_t) ret; +} + +static int add_connection(int sk, struct flist_head *list) +{ + void *data = (void *) (uintptr_t) sk; + + return start_thread_sync(fio_connection_thread, data, list, sk); } static int accept_loop(int listen_sk) @@ -852,8 +960,6 @@ fcntl(listen_sk, F_SETFL, flags); while (!exit_backend) { - pid_t pid; - pfd.fd = listen_sk; pfd.events = POLLIN; do { @@ -889,16 +995,7 @@ } dprint(FD_NET, "server: connect from %s\n", inet_ntoa(addr.sin_addr)); - - pid = fork(); - if (pid) { - close(sk); - fio_server_add_conn_pid(&conn_list, pid); - continue; - } - - /* exits */ - handle_connection(sk); + add_connection(sk, &conn_list); } return exitval; @@ -909,8 +1006,9 @@ struct cmd_text_pdu *pdu; unsigned int tlen; struct timeval tv; + int fd = get_s(); - if (server_fd == -1) + if (fd == -1) return log_local_buf(buf, len); tlen = sizeof(*pdu) + len; @@ -925,7 +1023,7 @@ memcpy(pdu->buf, buf, len); - fio_net_send_cmd(server_fd, FIO_NET_CMD_TEXT, pdu, tlen, NULL, NULL); + __fio_net_send_cmd(FIO_NET_CMD_TEXT, pdu, tlen, NULL, NULL); free(pdu); return len; } @@ -1044,7 +1142,7 @@ convert_gs(&p.rs, rs); - fio_net_send_cmd(server_fd, FIO_NET_CMD_TS, &p, sizeof(p), NULL, NULL); + __fio_net_send_cmd(FIO_NET_CMD_TS, &p, sizeof(p), NULL, NULL); } void fio_server_send_gs(struct group_run_stats *rs) @@ -1054,7 +1152,7 @@ dprint(FD_NET, "server sending group run stats\n"); convert_gs(&gs, rs); - fio_net_send_cmd(server_fd, FIO_NET_CMD_GS, &gs, sizeof(gs), NULL, NULL); + __fio_net_send_cmd(FIO_NET_CMD_GS, &gs, sizeof(gs), NULL, NULL); } static void convert_agg(struct disk_util_agg *dst, struct disk_util_agg *src) @@ -1108,7 +1206,7 @@ convert_dus(&pdu.dus, &du->dus); convert_agg(&pdu.agg, &du->agg); - fio_net_send_cmd(server_fd, FIO_NET_CMD_DU, &pdu, sizeof(pdu), NULL, NULL); + __fio_net_send_cmd(FIO_NET_CMD_DU, &pdu, sizeof(pdu), NULL, NULL); } } @@ -1139,6 +1237,7 @@ #ifdef CONFIG_ZLIB z_stream stream; void *out_pdu; + int fd = get_s(); /* * Dirty - since the log is potentially huge, compress it into @@ -1175,8 +1274,8 @@ if (stream.avail_in) flags = FIO_NET_CMD_F_MORE; - ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, - out_pdu, this_len, 0, flags); + ret = fio_send_cmd_ext_pdu(fd, FIO_NET_CMD_IOLOG, out_pdu, + this_len, 0, flags); if (ret) goto err_zlib; } while (stream.avail_in); @@ -1193,6 +1292,7 @@ { struct cmd_iolog_pdu pdu; int i, ret = 0; + int fd = get_s(); pdu.thread_number = cpu_to_le32(td->thread_number); pdu.nr_samples = __cpu_to_le32(log->nr_samples); @@ -1212,8 +1312,8 @@ /* * Send header first, it's not compressed. */ - ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, &pdu, - sizeof(pdu), 0, FIO_NET_CMD_F_MORE); + ret = fio_send_cmd_ext_pdu(fd, FIO_NET_CMD_IOLOG, &pdu, sizeof(pdu), 0, + FIO_NET_CMD_F_MORE); if (ret) return ret; @@ -1223,7 +1323,7 @@ if (use_zlib) return fio_send_iolog_gz(&pdu, log); - return fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, log->log, + return fio_send_cmd_ext_pdu(fd, FIO_NET_CMD_IOLOG, log->log, log->nr_samples * sizeof(struct io_sample), 0, 0); } @@ -1236,14 +1336,16 @@ pdu.groupid = cpu_to_le32(td->groupid); convert_thread_options_to_net(&pdu.top, &td->o); - fio_net_send_cmd(server_fd, FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), NULL, NULL); + __fio_net_send_cmd(FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), NULL, NULL); } void fio_server_send_start(struct thread_data *td) { - assert(server_fd != -1); + int fd = get_s(); - fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_SERVER_START, 0, NULL); + assert(fd != -1); + + fio_net_send_simple_cmd(fd, FIO_NET_CMD_SERVER_START, 0, NULL); } static int fio_init_server_ip(void) @@ -1528,8 +1630,9 @@ if (!fio_server_arg) goto out; - ret = fio_server_parse_string(fio_server_arg, &bind_sock, &is_sock, - &port, &saddr_in.sin_addr, + ret = fio_server_parse_string(fio_server_arg, &bind_sock, + &is_sock, &port, + &saddr_in.sin_addr, &saddr_in6.sin6_addr, &use_ipv6); if (!is_sock && bind_sock) { @@ -1574,7 +1677,7 @@ void fio_server_got_signal(int signal) { if (signal == SIGPIPE) - server_fd = -1; + assign_s(-1); else { log_info("\nfio: terminating on signal %d\n", signal); exit_backend = 1; @@ -1631,25 +1734,38 @@ pid_t pid; int ret; + setup_s(); + #if defined(WIN32) WSADATA wsd; + + if (pidfile) { + log_err("fio: Windows does not support background servers\n"); + ret = -1; + goto done; + } + WSAStartup(MAKEWORD(2, 2), &wsd); #endif - if (!pidfile) - return fio_server(); + if (!pidfile) { + ret = fio_server(); + goto done; + } if (check_existing_pidfile(pidfile)) { log_err("fio: pidfile %s exists and server appears alive\n", pidfile); - return -1; + ret = -1; + goto done; } pid = fork(); if (pid < 0) { log_err("fio: failed server fork: %s", strerror(errno)); free(pidfile); - return -1; + ret = -1; + goto done; } else if (pid) { int ret = write_pid(pid, pidfile); @@ -1670,6 +1786,7 @@ closelog(); unlink(pidfile); free(pidfile); +done: return ret; }