server: getting closer to a proper setup

Signed-off-by: Jens Axboe <axboe@kernel.dk>
diff --git a/backend.c b/backend.c
index 00a23db..60abf10 100644
--- a/backend.c
+++ b/backend.c
@@ -1946,5 +1946,6 @@
 	fio_mutex_remove(writeout_mutex);
 	fio_mutex_remove(disk_thread_mutex);
 	stat_exit();
+	reset_fio_state();
 	return exit_value;
 }
diff --git a/server.c b/server.c
index 8324543..282cc0a 100644
--- a/server.c
+++ b/server.c
@@ -30,12 +30,16 @@
 
 int exit_backend = 0;
 
-static int server_fd = -1;
+struct server_state {
+	int server_fd;
+};
+
 static char *fio_server_arg;
 static char *bind_sock;
 static struct sockaddr_in saddr_in;
 static struct sockaddr_in6 saddr_in6;
 static int use_ipv6;
+
 #ifdef CONFIG_ZLIB
 static unsigned int has_zlib = 1;
 #else
@@ -49,8 +53,21 @@
 	int signal;
 	int exited;
 	pid_t pid;
+	pthread_t thread;
 };
 
+struct fio_client_startup {
+	struct fio_mutex *pid_mutex;
+	struct fio_mutex *ffi_mutex;
+	pthread_t thread;
+	void *data;
+	struct fio_fork_item *ffi;
+	pid_t pid;
+	int sk;
+};
+
+static pthread_key_t s_key;
+
 static const char *fio_server_ops[FIO_NET_CMD_NR] = {
 	"",
 	"QUIT",
@@ -72,6 +89,22 @@
 	"CMD_IOLOG",
 };
 
+static int get_s(void)
+{
+	return (uintptr_t) pthread_getspecific(s_key);
+}
+
+static void assign_s(int fd)
+{
+	assert(!pthread_setspecific(s_key, (void *) (uintptr_t) fd));
+}
+
+static void setup_s(void)
+{
+	assert(!pthread_key_create(&s_key, NULL));
+	assign_s(-1);
+}
+
 const char *fio_server_op(unsigned int op)
 {
 	static char buf[32];
@@ -390,6 +423,12 @@
 	return ret;
 }
 
+static int __fio_net_send_cmd(uint16_t opcode, const void *buf, off_t size,
+			      uint64_t *tagptr, struct flist_head *list)
+{
+	return fio_net_send_cmd(get_s(), opcode, buf, size, tagptr, list);
+}
+
 static int fio_net_send_simple_stack_cmd(int sk, uint16_t opcode, uint64_t tag)
 {
 	struct fio_net_cmd cmd;
@@ -453,7 +492,8 @@
 	return fio_net_send_ack(sk, NULL, error, signal);
 }
 
-static void fio_server_add_fork_item(pid_t pid, struct flist_head *list)
+static void fio_server_add_fork_item(struct fio_client_startup *s,
+				     struct flist_head *list)
 {
 	struct fio_fork_item *ffi;
 
@@ -461,48 +501,27 @@
 	ffi->exitval = 0;
 	ffi->signal = 0;
 	ffi->exited = 0;
-	ffi->pid = pid;
+	ffi->pid = s->pid;
+	ffi->thread = s->thread;
+	s->ffi = ffi;
 	flist_add_tail(&ffi->list, list);
 }
 
-static void fio_server_add_conn_pid(struct flist_head *conn_list, pid_t pid)
+static void fio_server_add_pid(struct fio_client_startup *s,
+			       struct flist_head *list)
 {
-	dprint(FD_NET, "server: forked off connection job (pid=%u)\n", (int) pid);
-	fio_server_add_fork_item(pid, conn_list);
-}
-
-static void fio_server_add_job_pid(struct flist_head *job_list, pid_t pid)
-{
-	dprint(FD_NET, "server: forked off job job (pid=%u)\n", (int) pid);
-	fio_server_add_fork_item(pid, job_list);
+	dprint(FD_NET, "server: forked off job (pid=%u)\n", (int) s->pid);
+	fio_server_add_fork_item(s, list);
 }
 
 static void fio_server_check_fork_item(struct fio_fork_item *ffi)
 {
-	int ret, status;
-
-	ret = waitpid(ffi->pid, &status, WNOHANG);
-	if (ret < 0) {
-		if (errno == ECHILD) {
-			log_err("fio: connection pid %u disappeared\n", (int) ffi->pid);
-			ffi->exited = 1;
-		} else
-			log_err("fio: waitpid: %s\n", strerror(errno));
-	} else if (ret == ffi->pid) {
-		if (WIFSIGNALED(status)) {
-			ffi->signal = WTERMSIG(status);
-			ffi->exited = 1;
-		}
-		if (WIFEXITED(status)) {
-			if (WEXITSTATUS(status))
-				ffi->exitval = WEXITSTATUS(status);
-			ffi->exited = 1;
-		}
-	}
 }
 
 static void fio_server_fork_item_done(struct fio_fork_item *ffi)
 {
+	int server_fd = get_s();
+
 	dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", (int) ffi->pid, ffi->signal, ffi->exitval);
 
 	/*
@@ -539,22 +558,86 @@
 	fio_server_check_fork_items(conn_list);
 }
 
-static int handle_run_cmd(struct flist_head *job_list, struct fio_net_cmd *cmd)
+static struct fio_fork_item *thread_startup_routine(struct fio_client_startup *startup)
 {
-	pid_t pid;
+	struct fio_fork_item *ffi;
+
+	assign_s(startup->sk);
+
+	printf("assign sk=%d, tid=%u\n", startup->sk, gettid());
+
+	startup->pid = gettid();
+	fio_mutex_up(startup->pid_mutex);
+
+	fio_mutex_down(startup->ffi_mutex);
+	ffi = startup->ffi;
+	fio_mutex_remove(startup->ffi_mutex);
+
+	fio_mutex_up(startup->pid_mutex);
+	return ffi;
+}
+
+static void *fio_client_thread(void *data)
+{
+	struct fio_fork_item *ffi;
 	int ret;
 
-	set_genesis_time();
-
-	pid = fork();
-	if (pid) {
-		fio_server_add_job_pid(job_list, pid);
-		return 0;
-	}
+	ffi = thread_startup_routine(data);
 
 	ret = fio_backend();
-	free_threads_shm();
-	_exit(ret);
+	ffi->exitval = ret;
+	ffi->exited = 1;
+	pthread_exit((void *) (uintptr_t) ret);
+	return (void *) (uintptr_t) ret;
+}
+
+static int start_thread_sync(void *(*fn)(void *), void *data,
+			     struct flist_head *list, int sk)
+{
+	struct fio_client_startup startup;
+	int ret;
+
+	startup.pid_mutex = fio_mutex_init(FIO_MUTEX_LOCKED);
+	if (!startup.pid_mutex) {
+		log_err("fio: failed to allocate client startup mutex\n");
+		return 1;
+	}
+
+	startup.ffi_mutex = fio_mutex_init(FIO_MUTEX_LOCKED);
+	if (!startup.ffi_mutex) {
+		fio_mutex_remove(startup.pid_mutex);
+		log_err("fio: failed to allocate client startup mutex\n");
+		return 1;
+	}
+
+	startup.data = data;
+	startup.sk = sk;
+
+	ret = pthread_create(&startup.thread, NULL, fn, &startup);
+	if (ret) {
+		fio_mutex_remove(startup.pid_mutex);
+		fio_mutex_remove(startup.ffi_mutex);
+		log_err("fio: failed to fork off client thread: %s\n", strerror(ret));
+		return ret;
+	}
+
+	fio_mutex_down(startup.pid_mutex);
+
+	fio_server_add_pid(&startup, list);
+
+	fio_mutex_up(startup.ffi_mutex);
+
+	fio_mutex_down(startup.pid_mutex);
+	fio_mutex_remove(startup.pid_mutex);
+	return 0;
+}
+
+static int handle_run_cmd(int sk, struct flist_head *list,
+			  struct fio_net_cmd *cmd)
+{
+	set_genesis_time();
+
+	return start_thread_sync(fio_client_thread, NULL, list, sk);
 }
 
 static int handle_job_cmd(struct fio_net_cmd *cmd)
@@ -562,6 +645,7 @@
 	struct cmd_job_pdu *pdu = (struct cmd_job_pdu *) cmd->payload;
 	void *buf = pdu->buf;
 	struct cmd_start_pdu spdu;
+	int server_fd = get_s();
 
 	pdu->buf_len = le32_to_cpu(pdu->buf_len);
 	pdu->client_type = le32_to_cpu(pdu->client_type);
@@ -573,7 +657,7 @@
 
 	spdu.jobs = cpu_to_le32(thread_number);
 	spdu.stat_outputs = cpu_to_le32(stat_number);
-	fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL);
+	__fio_net_send_cmd(FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL);
 	return 0;
 }
 
@@ -584,6 +668,7 @@
 	struct cmd_line_pdu *clp;
 	unsigned long offset;
 	struct cmd_start_pdu spdu;
+	int server_fd = get_s();
 	char **argv;
 	int i;
 
@@ -613,7 +698,7 @@
 
 	spdu.jobs = cpu_to_le32(thread_number);
 	spdu.stat_outputs = cpu_to_le32(stat_number);
-	fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL);
+	__fio_net_send_cmd(FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, NULL);
 	return 0;
 }
 
@@ -648,7 +733,7 @@
 		use_zlib = 0;
 	}
 
-	return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe), &tag, NULL);
+	return __fio_net_send_cmd(FIO_NET_CMD_PROBE, &probe, sizeof(probe), &tag, NULL);
 }
 
 static int handle_send_eta_cmd(struct fio_net_cmd *cmd)
@@ -693,7 +778,7 @@
 	je->is_pow2		= cpu_to_le32(je->is_pow2);
 	je->unit_base		= cpu_to_le32(je->unit_base);
 
-	fio_net_send_cmd(server_fd, FIO_NET_CMD_ETA, je, size, &tag, NULL);
+	__fio_net_send_cmd(FIO_NET_CMD_ETA, je, size, &tag, NULL);
 	free(je);
 	return 0;
 }
@@ -712,23 +797,25 @@
 	struct cmd_add_job_pdu *pdu = (struct cmd_add_job_pdu *) cmd->payload;
 	struct thread_data *td;
 	uint32_t tnumber;
+	int fd = get_s();
 
 	tnumber = le32_to_cpu(pdu->thread_number);
 
 	dprint(FD_NET, "server: updating options for job %u\n", tnumber);
 
 	if (!tnumber || tnumber > thread_number) {
-		send_update_job_reply(server_fd, cmd->tag, ENODEV);
+		send_update_job_reply(fd, cmd->tag, ENODEV);
 		return 0;
 	}
 
 	td = &threads[tnumber - 1];
 	convert_thread_options_to_cpu(&td->o, &pdu->top);
-	send_update_job_reply(server_fd, cmd->tag, 0);
+	send_update_job_reply(fd, cmd->tag, 0);
 	return 0;
 }
 
-static int handle_command(struct flist_head *job_list, struct fio_net_cmd *cmd)
+static int handle_command(int sk, struct flist_head *job_list,
+			  struct fio_net_cmd *cmd)
 {
 	int ret;
 
@@ -756,7 +843,7 @@
 		ret = handle_send_eta_cmd(cmd);
 		break;
 	case FIO_NET_CMD_RUN:
-		ret = handle_run_cmd(job_list, cmd);
+		ret = handle_run_cmd(sk, job_list, cmd);
 		break;
 	case FIO_NET_CMD_UPDATE_JOB:
 		ret = handle_update_job_cmd(cmd);
@@ -776,7 +863,6 @@
 	int ret = 0;
 
 	reset_fio_state();
-	server_fd = sk;
 
 	/* read forever */
 	while (!exit_backend) {
@@ -822,7 +908,7 @@
 			break;
 		}
 
-		ret = handle_command(&job_list, cmd);
+		ret = handle_command(sk, &job_list, cmd);
 		if (ret)
 			break;
 
@@ -834,7 +920,29 @@
 		free(cmd);
 
 	close(sk);
-	_exit(ret);
+	return ret;
+}
+
+static void *fio_connection_thread(void *data)
+{
+	struct fio_client_startup *startup = data;
+	int sk = (uintptr_t) startup->data;
+	struct fio_fork_item *ffi;
+	int ret;
+
+	ffi = thread_startup_routine(startup);
+
+	ffi->exitval = ret = handle_connection(sk);
+	ffi->exited = 1;
+	pthread_exit((void *)(uintptr_t) ret);
+	return (void *)(uintptr_t) ret;
+}
+
+static int add_connection(int sk, struct flist_head *list)
+{
+	void *data = (void *) (uintptr_t) sk;
+
+	return start_thread_sync(fio_connection_thread, data, list, sk);
 }
 
 static int accept_loop(int listen_sk)
@@ -852,8 +960,6 @@
 	fcntl(listen_sk, F_SETFL, flags);
 
 	while (!exit_backend) {
-		pid_t pid;
-
 		pfd.fd = listen_sk;
 		pfd.events = POLLIN;
 		do {
@@ -889,16 +995,7 @@
 		}
 
 		dprint(FD_NET, "server: connect from %s\n", inet_ntoa(addr.sin_addr));
-
-		pid = fork();
-		if (pid) {
-			close(sk);
-			fio_server_add_conn_pid(&conn_list, pid);
-			continue;
-		}
-
-		/* exits */
-		handle_connection(sk);
+		add_connection(sk, &conn_list);
 	}
 
 	return exitval;
@@ -909,8 +1006,9 @@
 	struct cmd_text_pdu *pdu;
 	unsigned int tlen;
 	struct timeval tv;
+	int fd = get_s();
 
-	if (server_fd == -1)
+	if (fd == -1)
 		return log_local_buf(buf, len);
 
 	tlen = sizeof(*pdu) + len;
@@ -925,7 +1023,7 @@
 
 	memcpy(pdu->buf, buf, len);
 
-	fio_net_send_cmd(server_fd, FIO_NET_CMD_TEXT, pdu, tlen, NULL, NULL);
+	__fio_net_send_cmd(FIO_NET_CMD_TEXT, pdu, tlen, NULL, NULL);
 	free(pdu);
 	return len;
 }
@@ -1044,7 +1142,7 @@
 
 	convert_gs(&p.rs, rs);
 
-	fio_net_send_cmd(server_fd, FIO_NET_CMD_TS, &p, sizeof(p), NULL, NULL);
+	__fio_net_send_cmd(FIO_NET_CMD_TS, &p, sizeof(p), NULL, NULL);
 }
 
 void fio_server_send_gs(struct group_run_stats *rs)
@@ -1054,7 +1152,7 @@
 	dprint(FD_NET, "server sending group run stats\n");
 
 	convert_gs(&gs, rs);
-	fio_net_send_cmd(server_fd, FIO_NET_CMD_GS, &gs, sizeof(gs), NULL, NULL);
+	__fio_net_send_cmd(FIO_NET_CMD_GS, &gs, sizeof(gs), NULL, NULL);
 }
 
 static void convert_agg(struct disk_util_agg *dst, struct disk_util_agg *src)
@@ -1108,7 +1206,7 @@
 		convert_dus(&pdu.dus, &du->dus);
 		convert_agg(&pdu.agg, &du->agg);
 
-		fio_net_send_cmd(server_fd, FIO_NET_CMD_DU, &pdu, sizeof(pdu), NULL, NULL);
+		__fio_net_send_cmd(FIO_NET_CMD_DU, &pdu, sizeof(pdu), NULL, NULL);
 	}
 }
 
@@ -1139,6 +1237,7 @@
 #ifdef CONFIG_ZLIB
 	z_stream stream;
 	void *out_pdu;
+	int fd = get_s();
 
 	/*
 	 * Dirty - since the log is potentially huge, compress it into
@@ -1175,8 +1274,8 @@
 		if (stream.avail_in)
 			flags = FIO_NET_CMD_F_MORE;
 
-		ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG,
-					   out_pdu, this_len, 0, flags);
+		ret = fio_send_cmd_ext_pdu(fd, FIO_NET_CMD_IOLOG, out_pdu,
+						this_len, 0, flags);
 		if (ret)
 			goto err_zlib;
 	} while (stream.avail_in);
@@ -1193,6 +1292,7 @@
 {
 	struct cmd_iolog_pdu pdu;
 	int i, ret = 0;
+	int fd = get_s();
 
 	pdu.thread_number = cpu_to_le32(td->thread_number);
 	pdu.nr_samples = __cpu_to_le32(log->nr_samples);
@@ -1212,8 +1312,8 @@
 	/*
 	 * Send header first, it's not compressed.
 	 */
-	ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, &pdu,
-					sizeof(pdu), 0, FIO_NET_CMD_F_MORE);
+	ret = fio_send_cmd_ext_pdu(fd, FIO_NET_CMD_IOLOG, &pdu, sizeof(pdu), 0,
+					FIO_NET_CMD_F_MORE);
 	if (ret)
 		return ret;
 
@@ -1223,7 +1323,7 @@
 	if (use_zlib)
 		return fio_send_iolog_gz(&pdu, log);
 
-	return fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, log->log,
+	return fio_send_cmd_ext_pdu(fd, FIO_NET_CMD_IOLOG, log->log,
 			log->nr_samples * sizeof(struct io_sample), 0, 0);
 }
 
@@ -1236,14 +1336,16 @@
 	pdu.groupid = cpu_to_le32(td->groupid);
 	convert_thread_options_to_net(&pdu.top, &td->o);
 
-	fio_net_send_cmd(server_fd, FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), NULL, NULL);
+	__fio_net_send_cmd(FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), NULL, NULL);
 }
 
 void fio_server_send_start(struct thread_data *td)
 {
-	assert(server_fd != -1);
+	int fd = get_s();
 
-	fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_SERVER_START, 0, NULL);
+	assert(fd != -1);
+
+	fio_net_send_simple_cmd(fd, FIO_NET_CMD_SERVER_START, 0, NULL);
 }
 
 static int fio_init_server_ip(void)
@@ -1528,8 +1630,9 @@
 	if (!fio_server_arg)
 		goto out;
 
-	ret = fio_server_parse_string(fio_server_arg, &bind_sock, &is_sock,
-					&port, &saddr_in.sin_addr,
+	ret = fio_server_parse_string(fio_server_arg, &bind_sock,
+					&is_sock, &port,
+					&saddr_in.sin_addr,
 					&saddr_in6.sin6_addr, &use_ipv6);
 
 	if (!is_sock && bind_sock) {
@@ -1574,7 +1677,7 @@
 void fio_server_got_signal(int signal)
 {
 	if (signal == SIGPIPE)
-		server_fd = -1;
+		assign_s(-1);
 	else {
 		log_info("\nfio: terminating on signal %d\n", signal);
 		exit_backend = 1;
@@ -1631,25 +1734,38 @@
 	pid_t pid;
 	int ret;
 
+	setup_s();
+
 #if defined(WIN32)
 	WSADATA wsd;
+
+	if (pidfile) {
+		log_err("fio: Windows does not support background servers\n");
+		ret = -1;
+		goto done;
+	}
+
 	WSAStartup(MAKEWORD(2, 2), &wsd);
 #endif
 
-	if (!pidfile)
-		return fio_server();
+	if (!pidfile) {
+		ret = fio_server();
+		goto done;
+	}
 
 	if (check_existing_pidfile(pidfile)) {
 		log_err("fio: pidfile %s exists and server appears alive\n",
 								pidfile);
-		return -1;
+		ret = -1;
+		goto done;
 	}
 
 	pid = fork();
 	if (pid < 0) {
 		log_err("fio: failed server fork: %s", strerror(errno));
 		free(pidfile);
-		return -1;
+		ret = -1;
+		goto done;
 	} else if (pid) {
 		int ret = write_pid(pid, pidfile);
 
@@ -1670,6 +1786,7 @@
 	closelog();
 	unlink(pidfile);
 	free(pidfile);
+done:
 	return ret;
 }