server: getting closer to a proper setup
Signed-off-by: Jens Axboe <axboe@kernel.dk>
diff --git a/backend.c b/backend.c
index 00a23db..60abf10 100644
--- a/backend.c
+++ b/backend.c
@@ -1946,5 +1946,6 @@
fio_mutex_remove(writeout_mutex);
fio_mutex_remove(disk_thread_mutex);
stat_exit();
+ reset_fio_state();
return exit_value;
}
diff --git a/server.c b/server.c
index 8324543..282cc0a 100644
--- a/server.c
+++ b/server.c
@@ -30,12 +30,16 @@
int exit_backend = 0;
-static int server_fd = -1;
+struct server_state {
+ int server_fd;
+};
+
static char *fio_server_arg;
static char *bind_sock;
static struct sockaddr_in saddr_in;
static struct sockaddr_in6 saddr_in6;
static int use_ipv6;
+
#ifdef CONFIG_ZLIB
static unsigned int has_zlib = 1;
#else
@@ -49,8 +53,21 @@
int signal;
int exited;
pid_t pid;
+ pthread_t thread;
};
+struct fio_client_startup {
+ struct fio_mutex *pid_mutex;
+ struct fio_mutex *ffi_mutex;
+ pthread_t thread;
+ void *data;
+ struct fio_fork_item *ffi;
+ pid_t pid;
+ int sk;
+};
+
+static pthread_key_t s_key;
+
static const char *fio_server_ops[FIO_NET_CMD_NR] = {
"",
"QUIT",
@@ -72,6 +89,22 @@
"CMD_IOLOG",
};
+static int get_s(void)
+{
+ return (uintptr_t) pthread_getspecific(s_key);
+}
+
+static void assign_s(int fd)
+{
+ assert(!pthread_setspecific(s_key, (void *) (uintptr_t) fd));
+}
+
+static void setup_s(void)
+{
+ assert(!pthread_key_create(&s_key, NULL));
+ assign_s(-1);
+}
+
const char *fio_server_op(unsigned int op)
{
static char buf[32];
@@ -390,6 +423,12 @@
return ret;
}
+static int __fio_net_send_cmd(uint16_t opcode, const void *buf, off_t size,
+ uint64_t *tagptr, struct flist_head *list)
+{
+ return fio_net_send_cmd(get_s(), opcode, buf, size, tagptr, list);
+}
+
static int fio_net_send_simple_stack_cmd(int sk, uint16_t opcode, uint64_t tag)
{
struct fio_net_cmd cmd;
@@ -453,7 +492,8 @@
return fio_net_send_ack(sk, NULL, error, signal);
}
-static void fio_server_add_fork_item(pid_t pid, struct flist_head *list)
+static void fio_server_add_fork_item(struct fio_client_startup *s,
+ struct flist_head *list)
{
struct fio_fork_item *ffi;
@@ -461,48 +501,27 @@
ffi->exitval = 0;
ffi->signal = 0;
ffi->exited = 0;
- ffi->pid = pid;
+ ffi->pid = s->pid;
+ ffi->thread = s->thread;
+ s->ffi = ffi;
flist_add_tail(&ffi->list, list);
}
-static void fio_server_add_conn_pid(struct flist_head *conn_list, pid_t pid)
+static void fio_server_add_pid(struct fio_client_startup *s,
+ struct flist_head *list)
{
- dprint(FD_NET, "server: forked off connection job (pid=%u)\n", (int) pid);
- fio_server_add_fork_item(pid, conn_list);
-}
-
-static void fio_server_add_job_pid(struct flist_head *job_list, pid_t pid)
-{
- dprint(FD_NET, "server: forked off job job (pid=%u)\n", (int) pid);
- fio_server_add_fork_item(pid, job_list);
+ dprint(FD_NET, "server: forked off job (pid=%u)\n", (int) s->pid);
+ fio_server_add_fork_item(s, list);
}
static void fio_server_check_fork_item(struct fio_fork_item *ffi)
{
- int ret, status;
-
- ret = waitpid(ffi->pid, &status, WNOHANG);
- if (ret < 0) {
- if (errno == ECHILD) {
- log_err("fio: connection pid %u disappeared\n", (int) ffi->pid);
- ffi->exited = 1;
- } else
- log_err("fio: waitpid: %s\n", strerror(errno));
- } else if (ret == ffi->pid) {
- if (WIFSIGNALED(status)) {
- ffi->signal = WTERMSIG(status);
- ffi->exited = 1;
- }
- if (WIFEXITED(status)) {
- if (WEXITSTATUS(status))
- ffi->exitval = WEXITSTATUS(status);
- ffi->exited = 1;
- }
- }
}
static void fio_server_fork_item_done(struct fio_fork_item *ffi)
{
+ int server_fd = get_s();
+
dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", (int) ffi->pid, ffi->signal, ffi->exitval);
/*
@@ -539,22 +558,86 @@
fio_server_check_fork_items(conn_list);
}
-static int handle_run_cmd(struct flist_head *job_list, struct fio_net_cmd *cmd)
+static struct fio_fork_item *thread_startup_routine(struct fio_client_startup *startup)
{
- pid_t pid;
+ struct fio_fork_item *ffi;
+
+ assign_s(startup->sk);
+
+ printf("assign sk=%d, tid=%u\n", startup->sk, gettid());
+
+ startup->pid = gettid();
+ fio_mutex_up(startup->pid_mutex);
+
+ fio_mutex_down(startup->ffi_mutex);
+ ffi = startup->ffi;
+ fio_mutex_remove(startup->ffi_mutex);
+
+ fio_mutex_up(startup->pid_mutex);
+ return ffi;
+}
+
+static void *fio_client_thread(void *data)
+{
+ struct fio_fork_item *ffi;
int ret;
- set_genesis_time();
-
- pid = fork();
- if (pid) {
- fio_server_add_job_pid(job_list, pid);
- return 0;
- }
+ ffi = thread_startup_routine(data);
ret = fio_backend();
- free_threads_shm();
- _exit(ret);
+ ffi->exitval = ret;
+ ffi->exited = 1;
+ pthread_exit((void *) (uintptr_t) ret);
+ return (void *) (uintptr_t) ret;
+}
+
+static int start_thread_sync(void *(*fn)(void *), void *data,
+ struct flist_head *list, int sk)
+{
+ struct fio_client_startup startup;
+ int ret;
+
+ startup.pid_mutex = fio_mutex_init(FIO_MUTEX_LOCKED);
+ if (!startup.pid_mutex) {
+ log_err("fio: failed to allocate client startup mutex\n");
+ return 1;
+ }
+
+ startup.ffi_mutex = fio_mutex_init(FIO_MUTEX_LOCKED);
+ if (!startup.ffi_mutex) {
+ fio_mutex_remove(startup.pid_mutex);
+ log_err("fio: failed to allocate client startup mutex\n");
+ return 1;
+ }
+
+ startup.data = data;
+ startup.sk = sk;
+
+ ret = pthread_create(&startup.thread, NULL, fn, &startup);
+ if (ret) {
+ fio_mutex_remove(startup.pid_mutex);
+ fio_mutex_remove(startup.ffi_mutex);
+ log_err("fio: failed to fork off client thread: %s\n", strerror(ret));
+ return ret;
+ }
+
+ fio_mutex_down(startup.pid_mutex);
+
+ fio_server_add_pid(&startup, list);
+
+ fio_mutex_up(startup.ffi_mutex);
+
+ fio_mutex_down(startup.pid_mutex);
+ fio_mutex_remove(startup.pid_mutex);
+ return 0;
+}
+
+static int handle_run_cmd(int sk, struct flist_head *list,
+ struct fio_net_cmd *cmd)
+{
+ set_genesis_time();
+
+ return start_thread_sync(fio_client_thread, NULL, list, sk);
}
static int handle_job_cmd(struct fio_net_cmd *cmd)
@@ -562,6 +645,7 @@
struct cmd_job_pdu *pdu = (struct cmd_job_pdu *) cmd->payload;
void *buf = pdu->buf;
struct cmd_start_pdu spdu;
+ int server_fd = get_s();
pdu->buf_len = le32_to_cpu(pdu->buf_len);
pdu->client_type = le32_to_cpu(pdu->client_type);
@@ -573,7 +657,7 @@
spdu.jobs = cpu_to_le32(thread_number);
spdu.stat_outputs = cpu_to_le32(stat_number);
- fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL);
+ __fio_net_send_cmd(FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL);
return 0;
}
@@ -584,6 +668,7 @@
struct cmd_line_pdu *clp;
unsigned long offset;
struct cmd_start_pdu spdu;
+ int server_fd = get_s();
char **argv;
int i;
@@ -613,7 +698,7 @@
spdu.jobs = cpu_to_le32(thread_number);
spdu.stat_outputs = cpu_to_le32(stat_number);
- fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL);
+ __fio_net_send_cmd(FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL);
return 0;
}
@@ -648,7 +733,7 @@
use_zlib = 0;
}
- return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe), &tag, NULL);
+ return __fio_net_send_cmd(FIO_NET_CMD_PROBE, &probe, sizeof(probe), &tag, NULL);
}
static int handle_send_eta_cmd(struct fio_net_cmd *cmd)
@@ -693,7 +778,7 @@
je->is_pow2 = cpu_to_le32(je->is_pow2);
je->unit_base = cpu_to_le32(je->unit_base);
- fio_net_send_cmd(server_fd, FIO_NET_CMD_ETA, je, size, &tag, NULL);
+ __fio_net_send_cmd(FIO_NET_CMD_ETA, je, size, &tag, NULL);
free(je);
return 0;
}
@@ -712,23 +797,25 @@
struct cmd_add_job_pdu *pdu = (struct cmd_add_job_pdu *) cmd->payload;
struct thread_data *td;
uint32_t tnumber;
+ int fd = get_s();
tnumber = le32_to_cpu(pdu->thread_number);
dprint(FD_NET, "server: updating options for job %u\n", tnumber);
if (!tnumber || tnumber > thread_number) {
- send_update_job_reply(server_fd, cmd->tag, ENODEV);
+ send_update_job_reply(fd, cmd->tag, ENODEV);
return 0;
}
td = &threads[tnumber - 1];
convert_thread_options_to_cpu(&td->o, &pdu->top);
- send_update_job_reply(server_fd, cmd->tag, 0);
+ send_update_job_reply(fd, cmd->tag, 0);
return 0;
}
-static int handle_command(struct flist_head *job_list, struct fio_net_cmd *cmd)
+static int handle_command(int sk, struct flist_head *job_list,
+ struct fio_net_cmd *cmd)
{
int ret;
@@ -756,7 +843,7 @@
ret = handle_send_eta_cmd(cmd);
break;
case FIO_NET_CMD_RUN:
- ret = handle_run_cmd(job_list, cmd);
+ ret = handle_run_cmd(sk, job_list, cmd);
break;
case FIO_NET_CMD_UPDATE_JOB:
ret = handle_update_job_cmd(cmd);
@@ -776,7 +863,6 @@
int ret = 0;
reset_fio_state();
- server_fd = sk;
/* read forever */
while (!exit_backend) {
@@ -822,7 +908,7 @@
break;
}
- ret = handle_command(&job_list, cmd);
+ ret = handle_command(sk, &job_list, cmd);
if (ret)
break;
@@ -834,7 +920,29 @@
free(cmd);
close(sk);
- _exit(ret);
+ return ret;
+}
+
+static void *fio_connection_thread(void *data)
+{
+ struct fio_client_startup *startup = data;
+ int sk = (uintptr_t) startup->data;
+ struct fio_fork_item *ffi;
+ int ret;
+
+ ffi = thread_startup_routine(startup);
+
+ ffi->exitval = ret = handle_connection(sk);
+ ffi->exited = 1;
+ pthread_exit((void *)(uintptr_t) ret);
+ return (void *)(uintptr_t) ret;
+}
+
+static int add_connection(int sk, struct flist_head *list)
+{
+ void *data = (void *) (uintptr_t) sk;
+
+ return start_thread_sync(fio_connection_thread, data, list, sk);
}
static int accept_loop(int listen_sk)
@@ -852,8 +960,6 @@
fcntl(listen_sk, F_SETFL, flags);
while (!exit_backend) {
- pid_t pid;
-
pfd.fd = listen_sk;
pfd.events = POLLIN;
do {
@@ -889,16 +995,7 @@
}
dprint(FD_NET, "server: connect from %s\n", inet_ntoa(addr.sin_addr));
-
- pid = fork();
- if (pid) {
- close(sk);
- fio_server_add_conn_pid(&conn_list, pid);
- continue;
- }
-
- /* exits */
- handle_connection(sk);
+ add_connection(sk, &conn_list);
}
return exitval;
@@ -909,8 +1006,9 @@
struct cmd_text_pdu *pdu;
unsigned int tlen;
struct timeval tv;
+ int fd = get_s();
- if (server_fd == -1)
+ if (fd == -1)
return log_local_buf(buf, len);
tlen = sizeof(*pdu) + len;
@@ -925,7 +1023,7 @@
memcpy(pdu->buf, buf, len);
- fio_net_send_cmd(server_fd, FIO_NET_CMD_TEXT, pdu, tlen, NULL, NULL);
+ __fio_net_send_cmd(FIO_NET_CMD_TEXT, pdu, tlen, NULL, NULL);
free(pdu);
return len;
}
@@ -1044,7 +1142,7 @@
convert_gs(&p.rs, rs);
- fio_net_send_cmd(server_fd, FIO_NET_CMD_TS, &p, sizeof(p), NULL, NULL);
+ __fio_net_send_cmd(FIO_NET_CMD_TS, &p, sizeof(p), NULL, NULL);
}
void fio_server_send_gs(struct group_run_stats *rs)
@@ -1054,7 +1152,7 @@
dprint(FD_NET, "server sending group run stats\n");
convert_gs(&gs, rs);
- fio_net_send_cmd(server_fd, FIO_NET_CMD_GS, &gs, sizeof(gs), NULL, NULL);
+ __fio_net_send_cmd(FIO_NET_CMD_GS, &gs, sizeof(gs), NULL, NULL);
}
static void convert_agg(struct disk_util_agg *dst, struct disk_util_agg *src)
@@ -1108,7 +1206,7 @@
convert_dus(&pdu.dus, &du->dus);
convert_agg(&pdu.agg, &du->agg);
- fio_net_send_cmd(server_fd, FIO_NET_CMD_DU, &pdu, sizeof(pdu), NULL, NULL);
+ __fio_net_send_cmd(FIO_NET_CMD_DU, &pdu, sizeof(pdu), NULL, NULL);
}
}
@@ -1139,6 +1237,7 @@
#ifdef CONFIG_ZLIB
z_stream stream;
void *out_pdu;
+ int fd = get_s();
/*
* Dirty - since the log is potentially huge, compress it into
@@ -1175,8 +1274,8 @@
if (stream.avail_in)
flags = FIO_NET_CMD_F_MORE;
- ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG,
- out_pdu, this_len, 0, flags);
+ ret = fio_send_cmd_ext_pdu(fd, FIO_NET_CMD_IOLOG, out_pdu,
+ this_len, 0, flags);
if (ret)
goto err_zlib;
} while (stream.avail_in);
@@ -1193,6 +1292,7 @@
{
struct cmd_iolog_pdu pdu;
int i, ret = 0;
+ int fd = get_s();
pdu.thread_number = cpu_to_le32(td->thread_number);
pdu.nr_samples = __cpu_to_le32(log->nr_samples);
@@ -1212,8 +1312,8 @@
/*
* Send header first, it's not compressed.
*/
- ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, &pdu,
- sizeof(pdu), 0, FIO_NET_CMD_F_MORE);
+ ret = fio_send_cmd_ext_pdu(fd, FIO_NET_CMD_IOLOG, &pdu, sizeof(pdu), 0,
+ FIO_NET_CMD_F_MORE);
if (ret)
return ret;
@@ -1223,7 +1323,7 @@
if (use_zlib)
return fio_send_iolog_gz(&pdu, log);
- return fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, log->log,
+ return fio_send_cmd_ext_pdu(fd, FIO_NET_CMD_IOLOG, log->log,
log->nr_samples * sizeof(struct io_sample), 0, 0);
}
@@ -1236,14 +1336,16 @@
pdu.groupid = cpu_to_le32(td->groupid);
convert_thread_options_to_net(&pdu.top, &td->o);
- fio_net_send_cmd(server_fd, FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), NULL, NULL);
+ __fio_net_send_cmd(FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), NULL, NULL);
}
void fio_server_send_start(struct thread_data *td)
{
- assert(server_fd != -1);
+ int fd = get_s();
- fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_SERVER_START, 0, NULL);
+ assert(fd != -1);
+
+ fio_net_send_simple_cmd(fd, FIO_NET_CMD_SERVER_START, 0, NULL);
}
static int fio_init_server_ip(void)
@@ -1528,8 +1630,9 @@
if (!fio_server_arg)
goto out;
- ret = fio_server_parse_string(fio_server_arg, &bind_sock, &is_sock,
- &port, &saddr_in.sin_addr,
+ ret = fio_server_parse_string(fio_server_arg, &bind_sock,
+ &is_sock, &port,
+ &saddr_in.sin_addr,
&saddr_in6.sin6_addr, &use_ipv6);
if (!is_sock && bind_sock) {
@@ -1574,7 +1677,7 @@
void fio_server_got_signal(int signal)
{
if (signal == SIGPIPE)
- server_fd = -1;
+ assign_s(-1);
else {
log_info("\nfio: terminating on signal %d\n", signal);
exit_backend = 1;
@@ -1631,25 +1734,38 @@
pid_t pid;
int ret;
+ setup_s();
+
#if defined(WIN32)
WSADATA wsd;
+
+ if (pidfile) {
+ log_err("fio: Windows does not support background servers\n");
+ ret = -1;
+ goto done;
+ }
+
WSAStartup(MAKEWORD(2, 2), &wsd);
#endif
- if (!pidfile)
- return fio_server();
+ if (!pidfile) {
+ ret = fio_server();
+ goto done;
+ }
if (check_existing_pidfile(pidfile)) {
log_err("fio: pidfile %s exists and server appears alive\n",
pidfile);
- return -1;
+ ret = -1;
+ goto done;
}
pid = fork();
if (pid < 0) {
log_err("fio: failed server fork: %s", strerror(errno));
free(pidfile);
- return -1;
+ ret = -1;
+ goto done;
} else if (pid) {
int ret = write_pid(pid, pidfile);
@@ -1670,6 +1786,7 @@
closelog();
unlink(pidfile);
free(pidfile);
+done:
return ret;
}