Major rewrite of futex_requeue_pi.c

futex_requeue_pi.c still had left-over code from its humble origins
outside of futextest. This patch removes it's dependency on pthread
IPC mechanisms and relies on raw futex calls and atomic operations.
The return codes of the various threads are now considered in the final
result reporting. Several race conditions fixed, etc.

Signed-off-by: Darren Hart <dvhltc@us.ibm.com>
diff --git a/functional/futex_requeue_pi.c b/functional/futex_requeue_pi.c
index d452721..4c66d1c 100644
--- a/functional/futex_requeue_pi.c
+++ b/functional/futex_requeue_pi.c
@@ -40,18 +40,20 @@
 #include <stdlib.h>
 #include <signal.h>
 #include <string.h>
+#include "atomic.h"
 #include "futextest.h"
 #include "logging.h"
 
+#define MAX_WAKE_ITERS 1000
 #define THREAD_MAX 10
 #define SIGNAL_PERIOD_US 100
 
-pthread_barrier_t wake_barrier;
-pthread_barrier_t waiter_barrier;
-volatile int waiters_blocked = 0;
-int waiters_woken;
+atomic_t waiters_blocked = ATOMIC_INITIALIZER;
+atomic_t waiters_woken = ATOMIC_INITIALIZER;
+
 futex_t f1 = FUTEX_INITIALIZER;
 futex_t f2 = FUTEX_INITIALIZER;
+futex_t wake_complete = FUTEX_INITIALIZER;
 
 /* Test option defaults */
 static long timeout_ns = 0;
@@ -59,10 +61,13 @@
 static int owner = 0;
 static int locked = 0;
 
-typedef struct struct_waiter_arg {
+typedef struct {
 	long id;
 	struct timespec *timeout;
-} waiter_arg_t;
+	int lock;
+	int ret;
+} thread_arg_t;
+#define THREAD_ARG_INITIALIZER { 0, NULL, 0, 0 }
 
 void usage(char *prog)
 {
@@ -71,7 +76,7 @@
 	printf("  -c	Use color\n");
 	printf("  -h	Display this help message\n");
 	printf("  -l	Lock the pi futex across requeue\n");
-	printf("  -o	Use a third party pi futex owner during requeue\n");
+	printf("  -o	Use a third party pi futex owner during requeue (cancels -l)\n");
 	printf("  -t N	Timeout in nanoseconds (default: 100,000)\n");
 	printf("  -v L	Verbosity level: %d=QUIET %d=CRITICAL %d=INFO\n",
 	       VQUIET, VCRITICAL, VINFO);
@@ -112,9 +117,8 @@
 
 void *waiterfn(void *arg)
 {
-	waiter_arg_t *args = (waiter_arg_t *)arg;
-	unsigned int old_val;
-	int ret;
+	thread_arg_t *args = (thread_arg_t *)arg;
+	futex_t old_val;
 
 	info("Waiter %ld: running\n", args->id);
 	/* Each thread sleeps for a different amount of time
@@ -122,82 +126,98 @@
 	 * external mutex here */
 	usleep(1000 * (long)args->id);
 
-	/* FIXME: need to hold the mutex prior to waiting right?... sort of... */
-
-	/* cond_wait */
 	old_val = f1;
-	pthread_barrier_wait(&waiter_barrier);
+	atomic_inc(&waiters_blocked);
 	info("Calling futex_wait_requeue_pi: %p (%u) -> %p\n",
 	     &f1, f1, &f2);
-	ret = futex_wait_requeue_pi(&f1, old_val, &f2,
-				    args->timeout, FUTEX_PRIVATE_FLAG);
-	info("waiter %ld woke\n", args->id);
-	if (ret < 0) {
+	args->ret = futex_wait_requeue_pi(&f1, old_val, &f2, args->timeout,
+					  FUTEX_PRIVATE_FLAG);
+
+	info("waiter %ld woke with %d %s\n", args->id, args->ret,
+	     args->ret < 0 ? strerror(errno): "");
+	atomic_inc(&waiters_woken);
+	if (args->ret < 0) {
 		if (args->timeout && errno == ETIMEDOUT)
-			ret = 0;
+			args->ret = 0;
 		else {
-			ret = -errno;
+			args->ret = RET_ERROR;
 			error("futex_wait_requeue_pi\n", errno);
 		}
 		futex_lock_pi(&f2, NULL, 0, FUTEX_PRIVATE_FLAG);
 	}
-	waiters_woken++;
 	futex_unlock_pi(&f2, FUTEX_PRIVATE_FLAG);
 
-	info("Waiter %ld: exiting with %d\n", args->id, ret);
-	return (void*)(long)ret;
+	info("Waiter %ld: exiting with %d\n", args->id, args->ret);
+	pthread_exit((void *)&args->ret);
 }
 
 void *broadcast_wakerfn(void *arg)
 {
-	unsigned int old_val;
-	int nr_wake = 1;
+	thread_arg_t *args = (thread_arg_t *)arg;
 	int nr_requeue = INT_MAX;
-	long lock = (long)arg;
-	int ret = 0;
-	pthread_barrier_wait(&waiter_barrier);
-	usleep(100000); /*icky*/
-	info("Waker: Calling broadcast\n");
+	int task_count = 0;
+	futex_t old_val;
+	int nr_wake = 1;
+	int i = 0;
+ 
+	info("Waker: waiting for waiters to block\n");
+	while (waiters_blocked.val < THREAD_MAX);
+		usleep(1000);
+	usleep(1000);
 
-	if (lock) {
-		info("Calling FUTEX_LOCK_PI on mutex=%x @ %p\n", 
-		     f2, &f2);
+	info("Waker: Calling broadcast\n");
+	if (args->lock) {
+		info("Calling FUTEX_LOCK_PI on mutex=%x @ %p\n", f2, &f2);
 		futex_lock_pi(&f2, NULL, 0, FUTEX_PRIVATE_FLAG);
 	}
-	/* cond_broadcast */
+ continue_requeue:
 	old_val = f1;
-	ret = futex_cmp_requeue_pi(&f1, old_val, &f2, nr_wake,
-				   nr_requeue, FUTEX_PRIVATE_FLAG);
-	if (ret < 0) {
-		ret = -errno;
+	args->ret = futex_cmp_requeue_pi(&f1, old_val, &f2, nr_wake, nr_requeue,
+				   FUTEX_PRIVATE_FLAG);
+	if (args->ret < 0) {
+		args->ret = RET_ERROR;
 		error("FUTEX_CMP_REQUEUE_PI failed\n", errno);
+	} else if (++i < MAX_WAKE_ITERS) {
+		task_count += args->ret;
+		if (task_count < THREAD_MAX - waiters_woken.val)
+			goto continue_requeue;
+	} else {
+		error("max broadcast iterations (%d) reached with %d/%d "
+		      "tasks woken or requeued\n", 0, MAX_WAKE_ITERS,
+		      task_count, THREAD_MAX);
+		args->ret = RET_ERROR;
 	}
 
-	if (pthread_barrier_wait(&wake_barrier) == -EINVAL)
-		error("broadcast_wakerfn\n", errno);
+	futex_wake(&wake_complete, 1, FUTEX_PRIVATE_FLAG);
 
-	if (lock)
+	if (args->lock)
 		futex_unlock_pi(&f2, FUTEX_PRIVATE_FLAG);
 
-	info("Waker: exiting with %d\n", ret);
-	return (void *)(long)ret;;
+	if (args->ret > 0)
+		args->ret = task_count;
+
+	info("Waker: exiting with %d\n", args->ret);
+	pthread_exit((void *)&args->ret);
 }
 
 void *signal_wakerfn(void *arg)
 {
-	long lock = (long)arg;
+	thread_arg_t *args = (thread_arg_t *)arg;
 	unsigned int old_val;
 	int nr_requeue = 0;
 	int task_count = 0;
 	int nr_wake = 1;
-	int ret = 0;
 	int i = 0;
 
-	pthread_barrier_wait(&waiter_barrier);
-	while (task_count < THREAD_MAX && waiters_woken < THREAD_MAX) {
+	info("Waker: waiting for waiters to block\n");
+	while (waiters_blocked.val < THREAD_MAX);
+		usleep(1000);
+	usleep(1000);
+
+	while (task_count < THREAD_MAX && waiters_woken.val < THREAD_MAX) {
 		info("task_count: %d, waiters_woken: %d\n",
-		     task_count, waiters_woken);
-		if (lock) {
+		     task_count, waiters_woken.val);
+		if (args->lock) {
 			info("Calling FUTEX_LOCK_PI on mutex=%x @ %p\n", 
 			     f2, &f2);
 			futex_lock_pi(&f2, NULL, 0, FUTEX_PRIVATE_FLAG);
@@ -205,61 +225,77 @@
 		info("Waker: Calling signal\n");
 		/* cond_signal */
 		old_val = f1;
-		ret = futex_cmp_requeue_pi(&f1, old_val, &f2,
-					   nr_wake, nr_requeue, FUTEX_PRIVATE_FLAG);
-		if (ret < 0)
-			ret = -errno;
+		args->ret = futex_cmp_requeue_pi(&f1, old_val, &f2,
+						 nr_wake, nr_requeue,
+						 FUTEX_PRIVATE_FLAG);
+		if (args->ret < 0)
+			args->ret = -errno;
 		info("futex: %x\n", f2);
-		if (lock) {
+		if (args->lock) {
 			info("Calling FUTEX_UNLOCK_PI on mutex=%x @ %p\n", 
 			     f2, &f2);
 			futex_unlock_pi(&f2, FUTEX_PRIVATE_FLAG);
 		}
 		info("futex: %x\n", f2);
-		if (ret < 0) {
+		if (args->ret < 0) {
 			error("FUTEX_CMP_REQUEUE_PI failed\n", errno);
+			args->ret = RET_ERROR;
 			break;
 		}
 		
-		if (!i) {
-			info("waker waiting on wake_barrier\n");
-			if (pthread_barrier_wait(&wake_barrier) == -EINVAL)
-				error("signal_wakerfn", errno);
-		}
-
-		task_count += ret;
+		task_count += args->ret;
 		usleep(SIGNAL_PERIOD_US);
 		i++;
-		if (i > 1000) {
-			info("i>1000, giving up on pending waiters...\n");
+		/* we have to loop at least THREAD_MAX times */
+		if (i > MAX_WAKE_ITERS + THREAD_MAX) {
+			error("max signaling iterations (%d) reached, giving "
+			      "up on pending waiters.\n",
+			      0, MAX_WAKE_ITERS + THREAD_MAX);
+			args->ret = RET_ERROR;
 			break;
 		}
 	}
-	if (ret >= 0)
-		ret = task_count;
 
-	info("Waker: exiting with %d\n", ret);
-	info("Waker: waiters_woken: %d\n", waiters_woken);
-	return (void *)(long)ret;
+	futex_wake(&wake_complete, 1, FUTEX_PRIVATE_FLAG);
+
+	if (args->ret >= 0)
+		args->ret = task_count;
+
+	info("Waker: exiting with %d\n", args->ret);
+	info("Waker: waiters_woken: %d\n", waiters_woken.val);
+	pthread_exit((void *)&args->ret);
 }
 
 void *third_party_blocker(void *arg)
 {
-	futex_lock_pi(&f2, NULL, 0, FUTEX_PRIVATE_FLAG);
-	if (pthread_barrier_wait(&wake_barrier) == -EINVAL)
-		error("third_party_blocker\n", errno);
-	futex_unlock_pi(&f2, FUTEX_PRIVATE_FLAG);
-	return NULL;
+	thread_arg_t *args = (thread_arg_t *)arg;
+	int ret2 = 0;
+
+	if ((args->ret = futex_lock_pi(&f2, NULL, 0, FUTEX_PRIVATE_FLAG)))
+		goto out;
+	args->ret = futex_wait(&wake_complete, wake_complete, NULL,
+			       FUTEX_PRIVATE_FLAG);
+	ret2 = futex_unlock_pi(&f2, FUTEX_PRIVATE_FLAG);
+
+ out:
+	if (args->ret || ret2) {
+		error("third_party_blocker() futex error", 0);
+		args->ret = RET_ERROR;
+	}
+
+	pthread_exit((void *)&args->ret);
 }
 
 int unit_test(int broadcast, long lock, int third_party_owner, long timeout_ns)
 {
 	void *(*wakerfn)(void *) = signal_wakerfn;
+	thread_arg_t blocker_arg = THREAD_ARG_INITIALIZER;
+	thread_arg_t waker_arg = THREAD_ARG_INITIALIZER;
 	pthread_t waiter[THREAD_MAX], waker, blocker;
-	waiter_arg_t args[THREAD_MAX];
 	struct timespec ts, *tsp = NULL;
-	int ret;
-	long i;
+	thread_arg_t args[THREAD_MAX];
+	int *waiter_ret;
+	int i, ret = RET_PASS;
 
 	if (timeout_ns) {
 		info("timeout_ns = %ld\n", timeout_ns);
@@ -272,60 +308,59 @@
 		tsp = &ts;
 	}
 
-	if ((ret = pthread_barrier_init(&wake_barrier, NULL,
-					1+third_party_owner))) {
-		error("pthread_barrier_init(wake_barrier) failed\n", errno);
-		return ret;
-	}
-	if ((ret = pthread_barrier_init(&waiter_barrier, NULL,
-					1+THREAD_MAX))) {
-		error("pthread_barrier_init(waiter_barrier) failed\n", errno);
-		return ret;
-	}
-
 	if (broadcast)
 		wakerfn = broadcast_wakerfn;
 
 	if (third_party_owner) {
-		if ((ret = create_rt_thread(&blocker, third_party_blocker, NULL,
-					    SCHED_FIFO, 1))) {
+		if (create_rt_thread(&blocker, third_party_blocker,
+				     (void *)&blocker_arg, SCHED_FIFO, 1)) {
 			error("Creating third party blocker thread failed\n",
 			      errno);
+			ret = RET_ERROR;
 			goto out;
 		}
 	}
 
-	waiters_woken = 0;
+	atomic_set(&waiters_woken, 0);
 	for (i = 0; i < THREAD_MAX; i++) {
 		args[i].id = i;
 		args[i].timeout = tsp;
-		info("Starting thread %ld\n", i);
-		if ((ret = create_rt_thread(&waiter[i], waiterfn, (void *)&args[i],
-					    SCHED_FIFO, 1))) {
+		info("Starting thread %d\n", i);
+		if (create_rt_thread(&waiter[i], waiterfn, (void *)&args[i],
+				     SCHED_FIFO, 1)) {
 			error("Creating waiting thread failed\n", errno);
+			ret = RET_ERROR;
 			goto out;
 		}
 	}
-	if ((ret = create_rt_thread(&waker, wakerfn, (void *)lock,
-				    SCHED_FIFO, 1))) {
+	waker_arg.lock = lock;
+	if (create_rt_thread(&waker, wakerfn, (void *)&waker_arg,
+				    SCHED_FIFO, 1)) {
 		error("Creating waker thread failed\n", errno);
+		ret = RET_ERROR;
 		goto out;
 	}
 
 	/* Wait for threads to finish */
+	/* Store the first error or failure encountered in waiter_ret */
+	waiter_ret = &args[0].ret;
 	for (i=0; i<THREAD_MAX; i++) {
-		pthread_join(waiter[i], NULL);
+		pthread_join(waiter[i], *waiter_ret ? NULL : (void **)&waiter_ret);
 	}
 	if (third_party_owner)
 		pthread_join(blocker, NULL);
 	pthread_join(waker, NULL);
 
 out:
-	if ((ret = pthread_barrier_destroy(&wake_barrier)))
-		error("pthread_barrier_destroy(wake_barrier) failed\n", errno);
-	if ((ret = pthread_barrier_destroy(&waiter_barrier)))
-		error("pthread_barrier_destroy(waiter_barrier) failed\n",
-		      errno);
+	if (!ret) {
+		if (*waiter_ret)
+			ret = *waiter_ret;
+		else if (waker_arg.ret < 0)
+			ret = waker_arg.ret;
+		else if (blocker_arg.ret)
+			ret = blocker_arg.ret;
+	}
+		
 	return ret;
 }
 
@@ -349,6 +384,7 @@
 			break;
 		case 'o':
 			owner = 1;
+			locked = 0;
 			break;
 		case 't':
 			timeout_ns = atoi(optarg);
@@ -373,7 +409,6 @@
 	 */
 	ret = unit_test(broadcast, locked, owner, timeout_ns);
 
-	/* FIXME: need to distinguish between FAIL and ERROR */
-	printf("Result: %s\n", ret ? ERROR : PASS);
+	print_result(ret);
 	return ret;
 }