futex: Add support for private attached futexes

This patch adds support for the futex OP FUTEX_ATTACHED which can only
be used together with FLAGS_SHARED: This is limited to private FUTEXes.
This FUTEX_ATTACHED flag can not be made default because it changes the
API/ABI.
ATTACHED futex, usage howto:
- before usage it needs to be `attached' to initialize the in-kernel
  state.
  cookie = sys_futex(&mutex->__data.__lock,
		     FUTEX_ATTACH | FUTEX_PRIVATE_FLAG,
		     0, 0, 0, 0);
  The return value is either <0 for an error or >= 0 which returns a
  `cookie' which should be used for further operations.

- any operation on this FUTEX should use the `cookie', for example the
  LOCK_PI operation:
  ret = sys_futex((void *)(unsigned long)cookie, FUTEX_LOCK_PI |
		  FUTEX_PRIVATE_FLAG | FUTEX_ATTACHED,
		  0, 0, 0, 0);
  The return value is <0 for an error and 0 for success.

- once the lock is considered removed, the FUTEX_DETACH should be
  invoked in order to remove the in kernel state for the FUTEX. The
  return value is 0 for success and <0 for failure. A FUTEX can not be
  detached if there is an operation pending i.e. a LOCK_PI which did not
  yet complete.

The implementation:
The struct_mm is exended by struct futex_cache. This struct holds the
following members:
- slots
  an array of struct futex_cache_slot. Each entry is deployed after an
  `FUTEX_ATTACH' operation and holds a pointer to struct futex_state.
  The array is extended on demand (never shrunk) and RCU protected.

- cache_map
  each set bit is set if the corresponding `slots' entry is in use. The
  size is limited 4096 bits which means there can not be more than 4096
  FUTEX per process attached / in use.

- cache_size
  Size in bits of the currently deployed slots member.

- cache_lock
  A lock which taken in slowpath on extending of the slots member and on
  removal the fs members.

On each `FUTEX_ATTACH' operation an in kernel state of the userland
FUTEX is allocated: futex_state. This state contains a dedicated
futex_hash_bucket which is used exclusively for the lock. This avoids
lock contentions on the global futex_hash_bucket which means two
different locks share never the same futex_hash_bucket. Also the memory
for the in kernel state is allocated the current NUMA node which should
reduce cross NUMA memory access for the access of the futex_hash_bucket.
The global futex_hash_bucket is used to ensure that a FUTEX is only
enqueued once. A second FUTEX_ATTACH operation on the same uaddr will
fail because it already exists in the global futex_hash_bucket.

Uppon a `FUTEX_ATTACH' operation the slot number of the ->slots array is
returned which holds the in kernel state. This number is used in the
following FUTEX operations i.e. FUTEX_LOCK_PI. In the hotpath, the
cache_map is checked to see if the array member is deployed. The slots
array and fs member is dereferenced within a RCU read section. This
avoids holding any locks in the hotpath. The futex_state has an `users'
reference counter. A value of zero means that the structure exists
within this RCU read section and is subject to removal and therefore
shall not be used. atomic_inc_not_zero() ensures usage of the object
after leave the RCU read section.

The mix of `FUTEX_ATTACHED' flag has the same outcome as the mix of the
`FUTEX_PRIVATE_FLAG' flag: The kernel won't find the correct
futex_hash_bucket and the operation will block.

It is believed that the `FUTEX_ATTACH' operation can be hidding within
pthread_mutex_init() function and the `FUTEX_ATTACH' operation with
pthread_mutex_destroy(). The glibc could turn in on for all private
locks. An automatic in-kernel switch on does not exists because the
current interfaces supplies the address of the lock instead the returned
cookie. A lookup in kernel would involve lock protected list or hashtable
which would bring locks which we try to avoid with the per-lock
futex_hash_bucket.

Signed-off-by: Thomas Gleixner <tglx@linutronix.de>
diff --git a/include/linux/futex.h b/include/linux/futex.h
index 6435f46..a81c76d 100644
--- a/include/linux/futex.h
+++ b/include/linux/futex.h
@@ -47,6 +47,7 @@
 		unsigned long word;
 		void *ptr;
 		int offset;
+		bool attached;
 	} both;
 };
 
@@ -55,17 +56,17 @@
 #ifdef CONFIG_FUTEX
 extern void exit_robust_list(struct task_struct *curr);
 extern void exit_pi_state_list(struct task_struct *curr);
+extern void futex_mm_init(struct mm_struct *mm);
+extern void exit_futex_mm_cache(struct mm_struct *mm);
 #ifdef CONFIG_HAVE_FUTEX_CMPXCHG
 #define futex_cmpxchg_enabled 1
 #else
 extern int futex_cmpxchg_enabled;
 #endif
 #else
-static inline void exit_robust_list(struct task_struct *curr)
-{
-}
-static inline void exit_pi_state_list(struct task_struct *curr)
-{
-}
+static inline void exit_robust_list(struct task_struct *curr) { }
+static inline void exit_pi_state_list(struct task_struct *curr) { }
+static inline int futex_mm_init(struct mm_struct *mm) { }
+static inline void exit_futex_mm_cache(struct mm_struct *mm) { }
 #endif
 #endif
diff --git a/include/linux/futex_types.h b/include/linux/futex_types.h
new file mode 100644
index 0000000..38242cc
--- /dev/null
+++ b/include/linux/futex_types.h
@@ -0,0 +1,10 @@
+#ifndef _LINUX_FUTEX_TYPES_H
+#define _LINUX_FUTEX_TYPES_H
+#include <linux/rbtree_latch.h>
+
+struct futex_cache {
+	raw_spinlock_t          cache_lock;
+	struct latch_tree_root	futex_rb;
+};
+
+#endif
diff --git a/include/linux/mm_types.h b/include/linux/mm_types.h
index 903200f..99550ec 100644
--- a/include/linux/mm_types.h
+++ b/include/linux/mm_types.h
@@ -13,6 +13,7 @@
 #include <linux/uprobes.h>
 #include <linux/page-flags-layout.h>
 #include <linux/workqueue.h>
+#include <linux/futex_types.h>
 #include <asm/page.h>
 #include <asm/mmu.h>
 
@@ -448,6 +449,9 @@
 
 	struct linux_binfmt *binfmt;
 
+#ifdef CONFIG_FUTEX
+	struct futex_cache futex_cache;
+#endif
 	cpumask_var_t cpu_vm_mask_var;
 
 	/* Architecture-specific MM context */
diff --git a/include/linux/sched.h b/include/linux/sched.h
index 62c68e5..025b3e9 100644
--- a/include/linux/sched.h
+++ b/include/linux/sched.h
@@ -127,6 +127,7 @@
 };
 
 struct futex_pi_state;
+struct futex_cache;
 struct robust_list_head;
 struct bio_list;
 struct fs_struct;
diff --git a/include/uapi/linux/futex.h b/include/uapi/linux/futex.h
index 0b1f716..4affe6b 100644
--- a/include/uapi/linux/futex.h
+++ b/include/uapi/linux/futex.h
@@ -20,10 +20,14 @@
 #define FUTEX_WAKE_BITSET	10
 #define FUTEX_WAIT_REQUEUE_PI	11
 #define FUTEX_CMP_REQUEUE_PI	12
+#define FUTEX_ATTACH		13
+#define FUTEX_DETACH		14
 
 #define FUTEX_PRIVATE_FLAG	128
 #define FUTEX_CLOCK_REALTIME	256
-#define FUTEX_CMD_MASK		~(FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME)
+#define FUTEX_ATTACHED		512
+#define FUTEX_CMD_MASK		~(FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME | \
+				  FUTEX_ATTACHED)
 
 #define FUTEX_WAIT_PRIVATE	(FUTEX_WAIT | FUTEX_PRIVATE_FLAG)
 #define FUTEX_WAKE_PRIVATE	(FUTEX_WAKE | FUTEX_PRIVATE_FLAG)
diff --git a/kernel/fork.c b/kernel/fork.c
index beb3172..780a965 100644
--- a/kernel/fork.c
+++ b/kernel/fork.c
@@ -620,6 +620,9 @@
 	mm_init_owner(mm, p);
 	mmu_notifier_mm_init(mm);
 	clear_tlb_flush_pending(mm);
+#ifdef CONFIG_FUTEX
+	futex_mm_init(mm);
+#endif
 #if defined(CONFIG_TRANSPARENT_HUGEPAGE) && !USE_SPLIT_PMD_PTLOCKS
 	mm->pmd_huge_pte = NULL;
 #endif
@@ -712,6 +715,9 @@
 	khugepaged_exit(mm); /* must run before exit_mmap */
 	exit_mmap(mm);
 	set_mm_exe_file(mm, NULL);
+#ifdef CONFIG_FUTEX
+	exit_futex_mm_cache(mm);
+#endif
 	if (!list_empty(&mm->mmlist)) {
 		spin_lock(&mmlist_lock);
 		list_del(&mm->mmlist);
diff --git a/kernel/futex.c b/kernel/futex.c
index 626ed8c..9b60631 100644
--- a/kernel/futex.c
+++ b/kernel/futex.c
@@ -190,6 +190,7 @@
 #endif
 #define FLAGS_CLOCKRT		0x02
 #define FLAGS_HAS_TIMEOUT	0x04
+#define FLAGS_ATTACHED		0x08
 
 /*
  * Priority Inheritance state:
@@ -263,6 +264,16 @@
 	struct plist_head chain;
 } ____cacheline_aligned_in_smp;
 
+struct futex_state {
+	struct latch_tree_node		node;
+	u32 __user			*uaddr;
+	struct futex_hash_bucket	hb;
+	struct futex_q			q;
+	struct futex_hash_bucket	*global_hb;
+	atomic_t			users;
+	struct rcu_head			rcu;
+};
+
 /*
  * The base of the bucket array and its size are always used together
  * (after initialization only in hash_futex()), so ensure that they
@@ -411,22 +422,6 @@
 }
 
 /**
- * hash_futex - Return the hash bucket in the global hash
- * @key:	Pointer to the futex key for which the hash is calculated
- *
- * We hash on the keys returned from get_futex_key (see below) and return the
- * corresponding hash bucket in the global hash.
- */
-static struct futex_hash_bucket *hash_futex(union futex_key *key)
-{
-	u32 hash = jhash2((u32*)&key->both.word,
-			  (sizeof(key->both.word)+sizeof(key->both.ptr))/4,
-			  key->both.offset);
-	return &futex_queues[hash & (futex_hashsize - 1)];
-}
-
-
-/**
  * match_futex - Check whether to futex keys are equal
  * @key1:	Pointer to key1
  * @key2:	Pointer to key2
@@ -438,21 +433,240 @@
 	return (key1 && key2
 		&& key1->both.word == key2->both.word
 		&& key1->both.ptr == key2->both.ptr
-		&& key1->both.offset == key2->both.offset);
+		&& key1->both.offset == key2->both.offset
+		&& key1->both.attached == key2->both.attached);
+}
+
+/**
+ * hash_global_futex - Return the hash bucket in the global hash
+ * @key:	Pointer to the futex key for which the hash is calculated
+ *
+ * We hash on the keys returned from get_futex_key (see below) and return the
+ * corresponding hash bucket in the global hash.
+ */
+static struct futex_hash_bucket *hash_global_futex(union futex_key *key)
+{
+	u32 hash = jhash2((u32*)&key->both.word,
+			  (sizeof(key->both.word)+sizeof(key->both.ptr))/4,
+			  key->both.offset);
+	return &futex_queues[hash & (futex_hashsize - 1)];
+}
+
+static __always_inline u32 __user *
+ftree_latch_get_uaddr(struct latch_tree_node *n)
+{
+	struct futex_state *fs = container_of(n, struct futex_state, node);
+
+	return fs->uaddr;
+}
+
+static __always_inline bool ftree_tree_less(struct latch_tree_node *a,
+					    struct latch_tree_node *b)
+{
+	return ftree_latch_get_uaddr(a) < ftree_latch_get_uaddr(b);
+}
+
+static __always_inline int ftree_tree_comp(void *key, struct latch_tree_node *n)
+{
+	u32 __user *uaddr = key;
+	u32 __user *node_uadddr = ftree_latch_get_uaddr(n);
+
+	if (uaddr < node_uadddr)
+		return -1;
+
+	if (uaddr > node_uadddr)
+		return 1;
+
+	return 0;
+}
+
+static const struct latch_tree_ops futex_tree_ops = {
+	.less	= ftree_tree_less,
+	.comp	= ftree_tree_comp,
+};
+
+static struct futex_state *futex_tree_find(u32 __user *uaddr,
+					   struct futex_cache *tc)
+{
+	struct latch_tree_node *node;
+
+	node = latch_tree_find(uaddr, &tc->futex_rb, &futex_tree_ops);
+	if (!node)
+		return NULL;
+	return container_of(node, struct futex_state, node);
+}
+
+/**
+ * hash_futex - Get the hash bucket for a futex
+ *
+ * Returns either the local or the global hash bucket which fits the key.
+ *
+ * In case of an attached futex, we already verified that the hash and the
+ * slot exists, so we can unconditionally dereference it.
+ */
+static struct futex_hash_bucket *hash_futex(union futex_key *key)
+{
+	struct futex_cache *tc;
+	struct futex_hash_bucket *hb = NULL;
+	struct futex_state *fs;
+	u32 __user *key_uaddr;
+
+	if (!key->both.attached)
+		return hash_global_futex(key);
+
+	key_uaddr = (u32 __user *)(key->private.address | key->private.offset);
+	tc = &current->mm->futex_cache;
+
+	rcu_read_lock();
+	fs = futex_tree_find(key_uaddr, tc);
+	if (!fs)
+		goto out;
+
+	if (!atomic_inc_not_zero(&fs->users))
+		goto out;
+
+	/* It has been long time since we first matched index to uaddr. Let's
+	 * see if it did change in the meantime. Technically it wouldn't matter
+	 * if the user did an attach+detach while this operation was in progress
+	 * and its slot got reused: it is still a valid futex. However it might
+	 * ease debugging if this did not happen on purpose but by mistake.
+	 */
+	if (match_futex(key, &fs->q.key))
+		hb = &fs->hb;
+	else
+		atomic_dec(&fs->users);
+out:
+	rcu_read_unlock();
+	return hb;
+}
+
+static void futex_private_ret_hb(struct futex_hash_bucket *hb,
+				union futex_key *key)
+{
+	struct futex_cache *tc;
+	struct futex_state *fs;
+
+	if (!hb)
+		return;
+	if (key->both.attached == false)
+		return;
+
+	tc = &current->mm->futex_cache;
+	fs = container_of(hb, struct futex_state, hb);
+
+	atomic_dec(&fs->users);
+}
+
+static void futex_fs_cleanup(struct rcu_head *head)
+{
+	struct futex_state *fs = container_of(head, struct futex_state, rcu);
+
+	kfree(fs);
+}
+
+/**
+ * futex_detach_task - Detach task from global state
+ * @slot:	Slot number in the task local cache
+ *
+ * If the global state refcount drops to zero, the global state is destroyed.
+ */
+static int futex_detach_task(struct futex_cache *tc, u32 __user *uaddr)
+{
+	struct futex_state *fs;
+	struct futex_hash_bucket *hb;
+	struct futex_q *q;
+
+	/* Remove it from the task local cache */
+	raw_spin_lock(&tc->cache_lock);
+
+	fs = futex_tree_find(uaddr, tc);
+	if (!fs) {
+		raw_spin_unlock(&tc->cache_lock);
+		return -EINVAL;
+	}
+
+	if (!atomic_dec_and_test(&fs->users)) {
+		atomic_inc(&fs->users);
+		raw_spin_unlock(&tc->cache_lock);
+		return -EBUSY;
+	}
+
+	latch_tree_erase(&fs->node, &tc->futex_rb, &futex_tree_ops);
+
+	raw_spin_unlock(&tc->cache_lock);
+
+	hb = fs->global_hb;
+	q = &fs->q;
+
+	/*
+	 * Once removed from global state, the same uaddr can be attached again.
+	 */
+	spin_lock(&hb->lock);
+	hb_remove_q(q, hb);
+	spin_unlock(&hb->lock);
+	/*
+	 * XXX
+	 * force grace_period if there are too much pending fs cleanups
+	 * synchronize_rcu();
+	 */
+	call_rcu(&fs->rcu, futex_fs_cleanup);
+	return 0;
+}
+
+/**
+ * futex_attach_task - Attach current to a global state
+ * @fs:		Pointer to global state
+ * @uaddr:	User space address of the futex
+ * @slot:	Hash slot to reference @fs in current
+ *
+ * Take a refcount on the global state and store the pointer to it in the
+ * given @slot of the current tasks futex cache along with @uaddr. Mark the
+ * slot as occupied.
+ *
+ * Must be called with fs->global_hb->lock held
+ */
+static void futex_attach_task(struct futex_state *fs, u32 __user *uaddr,
+			      struct futex_cache *tc)
+{
+	raw_spin_lock(&tc->cache_lock);
+
+	latch_tree_insert(&fs->node, &tc->futex_rb, &futex_tree_ops);
+
+	raw_spin_unlock(&tc->cache_lock);
+}
+
+/**
+ * futex_queue_state - Queue a futex state object in the global hash
+ * @fs:		Pointer to the futex state object
+ * @hb:		Pointer to the hash bucket
+ *
+ * Must be called with hb->lock held
+ */
+static void
+futex_queue_state(struct futex_state *fs, struct futex_hash_bucket *hb)
+{
+	int prio = NICE_TO_PRIO(MIN_NICE);
+
+	fs->global_hb = hb;
+	fs->q.lock_ptr = &hb->lock;
+	hb_insert_q(&fs->q, hb, prio);
 }
 
 /**
  * futex_key_init - Initialize a futex key
- * @key:	Pointer to the key to initialize
- * @uaddr:	User space address of the futex
- * @flags:	Flags to check for futex mode. Not yet used
+ * @key:       Pointer to the key to initialize
+ * @uaddr:     User space address of the futex
+ * @flags:     Flags to check for futex mode.
  *
- * Returns:	@uaddr
+ * Returns:    @uaddr
  */
 static u32 __user *futex_key_init(union futex_key *key, u32 __user *uaddr,
 				  unsigned int flags)
 {
 	*key = FUTEX_KEY_INIT;
+	if (flags & FLAGS_ATTACHED)
+		key->both.attached = true;
+
 	return uaddr;
 }
 
@@ -520,29 +734,10 @@
 	}
 }
 
-/**
- * get_futex_key() - Get parameters which are the keys for a futex
- * @uaddr:	virtual address of the futex
- * @fshared:	0 for a PROCESS_PRIVATE futex, 1 for PROCESS_SHARED
- * @key:	address where result is stored.
- * @rw:		mapping needs to be read/write (values: VERIFY_READ,
- *              VERIFY_WRITE)
- *
- * Return: a negative error code or 0
- *
- * The key words are stored in *key on success.
- *
- * For shared mappings, it's (page->index, file_inode(vma->vm_file),
- * offset_within_page).  For private mappings, it's (uaddr, current->mm).
- * We can usually work out the index without swapping in the page.
- *
- * lock_page() might sleep, the caller should not hold a spinlock.
- */
-static int
-get_futex_key(u32 __user *uaddr, int fshared, union futex_key *key, int rw)
+static int __get_futex_key(u32 __user *uaddr, int fshared, union futex_key *key,
+			   int rw, struct mm_struct *mm)
 {
 	unsigned long address = (unsigned long)uaddr;
-	struct mm_struct *mm = current->mm;
 	struct page *page, *tail;
 	struct address_space *mapping;
 	int err, ro = 0;
@@ -745,6 +940,30 @@
 	return err;
 }
 
+/**
+ * get_futex_key() - Get parameters which are the keys for a futex
+ * @uaddr:	virtual address of the futex
+ * @fshared:	0 for a PROCESS_PRIVATE futex, 1 for PROCESS_SHARED
+ * @key:	address where result is stored.
+ * @rw:		mapping needs to be read/write (values: VERIFY_READ,
+ *              VERIFY_WRITE)
+ *
+ * Return: a negative error code or 0
+ *
+ * The key words are stored in *key on success.
+ *
+ * For shared mappings, it's (page->index, file_inode(vma->vm_file),
+ * offset_within_page).  For private mappings, it's (uaddr, current->mm).
+ * We can usually work out the index without swapping in the page.
+ *
+ * lock_page() might sleep, the caller should not hold a spinlock.
+ */
+static int
+get_futex_key(u32 __user *uaddr, int fshared, union futex_key *key, int rw)
+{
+	return __get_futex_key(uaddr, fshared, key, rw, current->mm);
+}
+
 static inline void put_futex_key(union futex_key *key)
 {
 	drop_futex_key_refs(key);
@@ -938,6 +1157,17 @@
 		pi_state = list_entry(next, struct futex_pi_state, list);
 		key = pi_state->key;
 		hb = hash_futex(&key);
+		if (WARN_ON_ONCE(!hb)) {
+			/*
+			 * This is something that should have not happen. Remove
+			 * that entry from list so we don't spin here forever
+			 * without holding the hb lock since we can't find it.
+			 * No wake ups.
+			 */
+			list_del_init(&pi_state->list);
+			pi_state->owner = NULL;
+			continue;
+		}
 		raw_spin_unlock_irq(&curr->pi_lock);
 
 		spin_lock(&hb->lock);
@@ -949,6 +1179,7 @@
 		 */
 		if (head->next != next) {
 			spin_unlock(&hb->lock);
+			futex_private_ret_hb(hb, &key);
 			continue;
 		}
 
@@ -961,6 +1192,7 @@
 		rt_mutex_unlock(&pi_state->pi_mutex);
 
 		spin_unlock(&hb->lock);
+		futex_private_ret_hb(hb, &key);
 
 		raw_spin_lock_irq(&curr->pi_lock);
 	}
@@ -1473,15 +1705,17 @@
 	if (!bitset)
 		return -EINVAL;
 
-	uaddr = futex_key_init(&key, uaddr, flags);
-	if (!uaddr)
-		return -EINVAL;
+	futex_key_init(&key, uaddr, flags);
 
 	ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key, VERIFY_READ);
 	if (unlikely(ret != 0))
 		goto out;
 
 	hb = hash_futex(&key);
+	if (!hb) {
+		ret = -EINVAL;
+		goto out_put_key;
+	}
 
 	/* Make sure we really have tasks to wakeup */
 	if (!hb_waiters_pending(hb))
@@ -1510,6 +1744,7 @@
 	wake_up_q(&wake_q);
 out_put_key:
 	put_futex_key(&key);
+	futex_private_ret_hb(hb, &key);
 out:
 	return ret;
 }
@@ -1528,14 +1763,8 @@
 	int ret, op_ret;
 	WAKE_Q(wake_q);
 
-	uaddr1 = futex_key_init(&key1, uaddr1, flags);
-	if (!uaddr1)
-		return -EINVAL;
-
-	uaddr2 = futex_key_init(&key2, uaddr2, flags);
-	if (!uaddr2)
-		return -EINVAL;
-
+	futex_key_init(&key1, uaddr1, flags);
+	futex_key_init(&key2, uaddr2, flags);
 retry:
 	ret = get_futex_key(uaddr1, flags & FLAGS_SHARED, &key1, VERIFY_READ);
 	if (unlikely(ret != 0))
@@ -1546,6 +1775,8 @@
 
 	hb1 = hash_futex(&key1);
 	hb2 = hash_futex(&key2);
+	if (!hb1 || !hb2)
+		goto out_put_keys;
 
 retry_private:
 	double_lock_hb(hb1, hb2);
@@ -1575,6 +1806,9 @@
 		if (!(flags & FLAGS_SHARED))
 			goto retry_private;
 
+		futex_private_ret_hb(hb1, &key1);
+		futex_private_ret_hb(hb2, &key2);
+
 		put_futex_key(&key2);
 		put_futex_key(&key1);
 		goto retry;
@@ -1612,6 +1846,8 @@
 	double_unlock_hb(hb1, hb2);
 	wake_up_q(&wake_q);
 out_put_keys:
+	futex_private_ret_hb(hb1, &key1);
+	futex_private_ret_hb(hb2, &key2);
 	put_futex_key(&key2);
 out_put_key1:
 	put_futex_key(&key1);
@@ -1775,6 +2011,9 @@
 	union futex_key key1, key2;
 	WAKE_Q(wake_q);
 
+	futex_key_init(&key1, uaddr1, flags);
+	futex_key_init(&key2, uaddr2, flags);
+
 	if (requeue_pi) {
 		/*
 		 * Requeue PI only works on two distinct uaddrs. This
@@ -1803,14 +2042,6 @@
 			return -EINVAL;
 	}
 
-	uaddr1 = futex_key_init(&key1, uaddr1, flags);
-	if (!uaddr1)
-		return -EINVAL;
-
-	uaddr2 = futex_key_init(&key2, uaddr2, flags);
-	if (!uaddr2)
-		return -EINVAL;
-
 retry:
 	ret = get_futex_key(uaddr1, flags & FLAGS_SHARED, &key1, VERIFY_READ);
 	if (unlikely(ret != 0))
@@ -1831,6 +2062,8 @@
 
 	hb1 = hash_futex(&key1);
 	hb2 = hash_futex(&key2);
+	if (!hb1 || !hb2)
+		goto out_ret_hb;
 
 retry_private:
 	hb_waiters_inc(hb2);
@@ -1847,11 +2080,13 @@
 
 			ret = get_user(curval, uaddr1);
 			if (ret)
-				goto out_put_keys;
+				goto out_ret_hb;
 
 			if (!(flags & FLAGS_SHARED))
 				goto retry_private;
 
+			futex_private_ret_hb(hb1, &key1);
+			futex_private_ret_hb(hb2, &key2);
 			put_futex_key(&key2);
 			put_futex_key(&key1);
 			goto retry;
@@ -1909,6 +2144,8 @@
 		case -EFAULT:
 			double_unlock_hb(hb1, hb2);
 			hb_waiters_dec(hb2);
+			futex_private_ret_hb(hb1, &key1);
+			futex_private_ret_hb(hb2, &key2);
 			put_futex_key(&key2);
 			put_futex_key(&key1);
 			ret = fault_in_user_writeable(uaddr2);
@@ -1924,6 +2161,8 @@
 			 */
 			double_unlock_hb(hb1, hb2);
 			hb_waiters_dec(hb2);
+			futex_private_ret_hb(hb1, &key1);
+			futex_private_ret_hb(hb2, &key2);
 			put_futex_key(&key2);
 			put_futex_key(&key1);
 			cond_resched();
@@ -2040,6 +2279,9 @@
 	while (--drop_count >= 0)
 		drop_futex_key_refs(&key1);
 
+out_ret_hb:
+	futex_private_ret_hb(hb1, &key1);
+	futex_private_ret_hb(hb2, &key2);
 out_put_keys:
 	put_futex_key(&key2);
 out_put_key1:
@@ -2055,7 +2297,8 @@
 	struct futex_hash_bucket *hb;
 
 	hb = hash_futex(&q->key);
-
+	if (!hb)
+		return NULL;
 	/*
 	 * Increment the counter before taking the lock so that
 	 * a potential waker won't miss a to-be-slept task that is
@@ -2122,7 +2365,7 @@
  *   1 - if the futex_q was still queued (and we removed unqueued it);
  *   0 - if the futex_q was already removed by the waking thread
  */
-static int unqueue_me(struct futex_q *q)
+static int unqueue_me(struct futex_q *q, struct futex_hash_bucket *hb)
 {
 	spinlock_t *lock_ptr;
 	int ret = 0;
@@ -2161,7 +2404,7 @@
 		spin_unlock(lock_ptr);
 		ret = 1;
 	}
-
+	futex_private_ret_hb(hb, &q->key);
 	drop_futex_key_refs(&q->key);
 	return ret;
 }
@@ -2442,11 +2685,16 @@
 
 retry_private:
 	*hb = queue_lock(q);
+	if (!*hb) {
+		ret = -EINVAL;
+		goto out;
+	}
 
 	ret = get_futex_value_locked(&uval, uaddr);
 
 	if (ret) {
 		queue_unlock(*hb);
+		futex_private_ret_hb(*hb, &q->key);
 
 		ret = get_user(uval, uaddr);
 		if (ret)
@@ -2461,6 +2709,7 @@
 
 	if (uval != val) {
 		queue_unlock(*hb);
+		futex_private_ret_hb(*hb, &q->key);
 		ret = -EWOULDBLOCK;
 	}
 
@@ -2482,9 +2731,7 @@
 	if (!bitset)
 		return -EINVAL;
 
-	uaddr = futex_key_init(&q.key, uaddr, flags);
-	if (!uaddr)
-		return -EINVAL;
+	futex_key_init(&q.key, uaddr, flags);
 
 	q.bitset = bitset;
 
@@ -2514,7 +2761,7 @@
 	/* If we were woken (and unqueued), we succeeded, whatever. */
 	ret = 0;
 	/* unqueue_me() drops q.key ref */
-	if (!unqueue_me(&q))
+	if (!unqueue_me(&q, hb))
 		goto out;
 	ret = -ETIMEDOUT;
 	if (to && !to->task)
@@ -2586,9 +2833,7 @@
 	if (refill_pi_state_cache())
 		return -ENOMEM;
 
-	uaddr = futex_key_init(&q.key, uaddr, flags);
-	if (!uaddr)
-		return -EINVAL;
+	futex_key_init(&q.key, uaddr, flags);
 
 	if (time) {
 		to = &timeout;
@@ -2605,6 +2850,10 @@
 
 retry_private:
 	hb = queue_lock(&q);
+	if (!hb) {
+		ret = -EINVAL;
+		goto out_put_key;
+	}
 
 	ret = futex_lock_pi_atomic(uaddr, hb, &q.key, &q.pi_state, current, 0);
 	if (unlikely(ret)) {
@@ -2627,6 +2876,7 @@
 			 * - The user space value changed.
 			 */
 			queue_unlock(hb);
+			futex_private_ret_hb(hb, &q.key);
 			put_futex_key(&q.key);
 			cond_resched();
 			goto retry;
@@ -2674,11 +2924,13 @@
 
 	/* Unqueue and drop the lock */
 	unqueue_me_pi(&q);
+	futex_private_ret_hb(hb, &q.key);
 
 	goto out_put_key;
 
 out_unlock_put_key:
 	queue_unlock(hb);
+	futex_private_ret_hb(hb, &q.key);
 
 out_put_key:
 	put_futex_key(&q.key);
@@ -2689,6 +2941,7 @@
 
 uaddr_faulted:
 	queue_unlock(hb);
+	futex_private_ret_hb(hb, &q.key);
 
 	ret = fault_in_user_writeable(uaddr);
 	if (ret)
@@ -2714,9 +2967,7 @@
 	union futex_key key;
 	int ret;
 
-	uaddr = futex_key_init(&key, uaddr, flags);
-	if (!uaddr)
-		return -EINVAL;
+	futex_key_init(&key, uaddr, flags);
 retry:
 	if (get_user(uval, uaddr))
 		return -EFAULT;
@@ -2731,6 +2982,10 @@
 		return ret;
 
 	hb = hash_futex(&key);
+	if (!hb) {
+		ret = -EINVAL;
+		goto out_putkey;
+	}
 	spin_lock(&hb->lock);
 
 	/*
@@ -2759,6 +3014,7 @@
 		 */
 		if (ret == -EAGAIN) {
 			spin_unlock(&hb->lock);
+			futex_private_ret_hb(hb, &key);
 			put_futex_key(&key);
 			goto retry;
 		}
@@ -2787,11 +3043,13 @@
 out_unlock:
 	spin_unlock(&hb->lock);
 out_putkey:
+	futex_private_ret_hb(hb, &key);
 	put_futex_key(&key);
 	return ret;
 
 pi_faulted:
 	spin_unlock(&hb->lock);
+	futex_private_ret_hb(hb, &key);
 	put_futex_key(&key);
 
 	ret = fault_in_user_writeable(uaddr);
@@ -2908,13 +3166,8 @@
 	if (!bitset)
 		return -EINVAL;
 
-	uaddr = futex_key_init(&q.key, uaddr, flags);
-	if (!uaddr)
-		return -EINVAL;
-
-	uaddr2 = futex_key_init(&key2, uaddr2, flags);
-	if (!uaddr2)
-		return -EINVAL;
+	futex_key_init(&q.key, uaddr, flags);
+	futex_key_init(&key2, uaddr2, flags);
 
 	if (abs_time) {
 		to = &timeout;
@@ -3042,6 +3295,7 @@
 	}
 
 out_put_keys:
+	futex_private_ret_hb(hb, &q.key);
 	put_futex_key(&q.key);
 out_key2:
 	put_futex_key(&key2);
@@ -3274,6 +3528,166 @@
 				   curr, pip);
 }
 
+/**
+ * futex_create - Create an attached futex object
+ * @uaddr:	The user space address of the futex
+ * @key:	Pointer to a initialized futex key object
+ * @hb:		Pointer to the hash bucket in the global hash corresponding
+ *		to @key
+ * @slot:	Free task cache slot number
+ *
+ * Returns:
+ *  Success:	Slot number >= 0 which is used as a cookie for
+ *		all further operations on that futex
+ *  Failure:	Proper error code
+ *		ENOMEM: Out of memory
+ *		EEXIST: Global state exists already
+ */
+static int futex_create(u32 __user *uaddr, union futex_key *key,
+			struct futex_hash_bucket *hb, struct futex_cache *tc)
+{
+	struct futex_state *fs;
+	struct futex_q *match;
+
+	fs = kzalloc_node(sizeof(*fs), GFP_KERNEL, numa_node_id());
+	if (!fs)
+		return -ENOMEM;
+
+	atomic_set(&fs->users, 1);
+	atomic_set(&fs->hb.waiters, 0);
+	plist_head_init(&fs->hb.chain);
+	spin_lock_init(&fs->hb.lock);
+
+	fs->q = futex_q_init;
+	fs->q.key.both.attached = true;
+
+	/* This is the global state object. */
+	fs->q.key = *key;
+	fs->uaddr = uaddr;
+
+	/* Verify again whether global state for this futex exists already */
+	spin_lock(&hb->lock);
+	match = futex_top_waiter(hb, &fs->q.key);
+	if (match) {
+		spin_unlock(&hb->lock);
+		kfree(fs);
+		return -EEXIST;
+	}
+	/*
+	 * Queue the new global state in the global hash and add it to the task
+	 * tree.
+	 */
+	futex_queue_state(fs, hb);
+	futex_attach_task(fs, uaddr, tc);
+
+	spin_unlock(&hb->lock);
+	return 0;
+}
+
+/**
+ * futex_attach - Attach a task to a registered futex
+ * @uaddr:	The user space address of the futex
+ *
+ * Returns:
+ *  Success:	Slot number >= 0 which is used as a cookie for
+ *		all further operations on that futex
+ *  Failure:	Proper error code
+ *		ENOMEM: Out of memory
+ *		EINVAL: Invalid @uaddr
+ *		EFAULT: @uaddr access would fault
+ *		ENOSPC: TASK_CACHE_MAX_SIZE reached, no free slots
+ *
+ * Note: We do not check whether the futex has been attached already as this
+ * would involve a walk through the slots. There is no damage except a little
+ * bit memory usage by this task.
+ */
+static int futex_attach(u32 __user *uaddr)
+{
+	union futex_key key = FUTEX_KEY_INIT;
+	struct futex_hash_bucket *hb;
+	struct futex_cache *tc;
+	struct futex_q *match;
+	int ret;
+
+	ret = get_futex_key(uaddr, 0 , &key, VERIFY_WRITE);
+	if (ret)
+		return ret;
+
+	tc = &current->mm->futex_cache;
+
+	/* Find the global state and attach to it */
+	key.both.attached = true;
+	hb = hash_global_futex(&key);
+
+	spin_lock(&hb->lock);
+	match = futex_top_waiter(hb, &key);
+	spin_unlock(&hb->lock);
+	if (match) {
+		ret = -EEXIST;
+		goto out_put_key;
+	}
+	ret = futex_create(uaddr, &key, hb, tc);
+
+out_put_key:
+	put_futex_key(&key);
+	return ret;
+}
+
+/**
+ * futex_detach - Detach a task from a registered futex
+ * @uaddr:	The cookie which was returned from attach
+ *
+ * Returns:
+ *  Success:	0
+ *  Failure:	Proper error code
+ *		EINVAL: Invalid @flags or invalid @uaddr
+ */
+static int futex_detach(u32 __user *uaddr)
+{
+	struct futex_cache *tc = &current->mm->futex_cache;
+
+	return futex_detach_task(tc, uaddr);
+}
+
+void futex_mm_init(struct mm_struct *mm)
+{
+	memset(&mm->futex_cache, 0, sizeof(struct futex_cache));
+	raw_spin_lock_init(&mm->futex_cache.cache_lock);
+}
+
+void exit_futex_mm_cache(struct mm_struct *mm)
+{
+	struct futex_cache *tc = &mm->futex_cache;
+	struct latch_tree_root *ltr = &tc->futex_rb;
+	struct futex_state *fs;
+	struct futex_hash_bucket *hb;
+	struct rb_node *node;
+
+	/*
+	 * Everything left here is because the user did not detach it.
+	 * RCU isn't used for free them because there are no users left.
+	 */
+
+	node = rcu_dereference_raw(ltr->tree[0].rb_node);
+	while (node) {
+		struct latch_tree_node *ltn;
+
+		ltn = __lt_from_rb(node, 0);
+		fs = container_of(ltn, struct futex_state, node);
+
+		hb = fs->global_hb;
+
+		spin_lock(&hb->lock);
+		hb_remove_q(&fs->q, hb);
+		spin_unlock(&hb->lock);
+
+		latch_tree_erase(ltn, &tc->futex_rb, &futex_tree_ops);
+		kfree(fs);
+
+		node = rcu_dereference_raw(ltr->tree[0].rb_node);
+	}
+}
+
 long do_futex(u32 __user *uaddr, int op, u32 val, ktime_t *timeout,
 		u32 __user *uaddr2, u32 val2, u32 val3)
 {
@@ -3290,6 +3704,12 @@
 			return -ENOSYS;
 	}
 
+	if (op & FUTEX_ATTACHED) {
+		if (flags & FLAGS_SHARED)
+			return -EINVAL;
+		flags |= FLAGS_ATTACHED;
+	}
+
 	switch (cmd) {
 	case FUTEX_LOCK_PI:
 	case FUTEX_UNLOCK_PI:
@@ -3327,6 +3747,10 @@
 					     uaddr2);
 	case FUTEX_CMP_REQUEUE_PI:
 		return futex_requeue(uaddr, flags, uaddr2, val, val2, &val3, 1);
+	case FUTEX_ATTACH:
+		return futex_attach(uaddr);
+	case FUTEX_DETACH:
+		return futex_detach(uaddr);
 	}
 	return -ENOSYS;
 }