dlm: Add DLM_LKF_IDLE flag

In certain situations, dlm_lock() and dlm_unlock() can return -EBUSY to
indicate that a lock is busy and that the locking operation should be
retried later.  For example, this regularly happens when a pending
locking request is canceled with dlm_unlock(DLM_LKF_CANCEL), immediately
followed by another dlm_lock().

This is problematic because callers of dlm_lock() and dlm_unlock() don't
have a way of determining when the lock will no longer be busy.  Fix
that by adding a DLM_LKF_IDLE flag that will make dlm_lock() and
dlm_unlock() wait when necessary instead of returning -EBUSY.

Signed-off-by: Alexander Ahring Oder Aring <aahringo@redhat.com>
Signed-off-by: Andreas Gruenbacher <agruenba@redhat.com>
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 74a9590..134a448 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -265,6 +265,8 @@ struct dlm_lkb {
 
 	uint64_t		lkb_recover_seq; /* from ls_recover_seq */
 
+	wait_queue_head_t	lkb_idle;
+
 	char			*lkb_lvbptr;
 	struct dlm_lksb		*lkb_lksb;      /* caller's status block */
 	void			(*lkb_astfn) (void *astparam);
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index bdb51d2..c87e16c 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1199,6 +1199,7 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
 	INIT_LIST_HEAD(&lkb->lkb_cb_list);
 	mutex_init(&lkb->lkb_cb_mutex);
 	INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
+	init_waitqueue_head(&lkb->lkb_idle);
 
 	idr_preload(GFP_NOFS);
 	spin_lock(&ls->ls_lkbidr_spin);
@@ -1335,6 +1336,7 @@ static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
 		break;
 	case DLM_LKSTS_GRANTED:
+		wake_up(&lkb->lkb_idle);
 		/* convention says granted locks kept in order of grmode */
 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
 				lkb->lkb_grmode);
@@ -1586,6 +1588,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
 		lkb->lkb_wait_count--;
 		lkb->lkb_wait_type = 0;
+		wake_up(&lkb->lkb_idle);
 	}
 
 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
@@ -1595,6 +1598,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
 	if (!lkb->lkb_wait_count)
 		list_del_init(&lkb->lkb_wait_reply);
 	unhold_lkb(lkb);
+	wake_up(&lkb->lkb_idle);
 	return 0;
 }
 
@@ -2771,6 +2775,7 @@ static void process_lookup_list(struct dlm_rsb *r)
 
 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
 		list_del_init(&lkb->lkb_rsb_lookup);
+		wake_up(&lkb->lkb_idle);
 		_request_lock(r, lkb);
 		schedule();
 	}
@@ -2805,6 +2810,7 @@ static void confirm_master(struct dlm_rsb *r, int error)
 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
 					 lkb_rsb_lookup);
 			list_del_init(&lkb->lkb_rsb_lookup);
+			wake_up(&lkb->lkb_idle);
 			r->res_first_lkid = lkb->lkb_id;
 			_request_lock(r, lkb);
 		}
@@ -2974,6 +2980,7 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
 		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
 			list_del_init(&lkb->lkb_rsb_lookup);
+			wake_up(&lkb->lkb_idle);
 			queue_cast(lkb->lkb_resource, lkb,
 				   args->flags & DLM_LKF_CANCEL ?
 				   -DLM_ECANCEL : -DLM_EUNLOCK);
@@ -3434,11 +3441,14 @@ int dlm_lock(dlm_lockspace_t *lockspace,
 	struct dlm_lkb *lkb;
 	struct dlm_args args;
 	int error, convert = flags & DLM_LKF_CONVERT;
+	struct wait_queue_entry wait;
+	bool wait_queued = false;
 
 	ls = dlm_find_lockspace_local(lockspace);
 	if (!ls)
 		return -EINVAL;
 
+ again:
 	dlm_lock_recovery(ls);
 
 	if (convert)
@@ -3456,6 +3466,13 @@ int dlm_lock(dlm_lockspace_t *lockspace,
 	if (error)
 		goto out_put;
 
+	if (flags & DLM_LKF_IDLE) {
+		hold_lkb(lkb);
+		init_wait_func(&wait, woken_wake_function);
+		add_wait_queue(&lkb->lkb_idle, &wait);
+		wait_queued = true;
+	}
+
 	if (convert)
 		error = convert_lock(ls, lkb, &args);
 	else
@@ -3472,6 +3489,16 @@ int dlm_lock(dlm_lockspace_t *lockspace,
 		error = 0;
  out:
 	dlm_unlock_recovery(ls);
+	if (wait_queued) {
+		if (error == -EBUSY)
+			wait_woken(&wait, TASK_UNINTERRUPTIBLE,
+				   MAX_SCHEDULE_TIMEOUT);
+		remove_wait_queue(&lkb->lkb_idle, &wait);
+		wait_queued = false;
+		__put_lkb(ls, lkb);
+		if (error == -EBUSY)
+			goto again;
+	}
 	dlm_put_lockspace(ls);
 	return error;
 }
@@ -3486,11 +3513,14 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
 	struct dlm_lkb *lkb;
 	struct dlm_args args;
 	int error;
+	struct wait_queue_entry wait;
+	bool wait_queued = false;
 
 	ls = dlm_find_lockspace_local(lockspace);
 	if (!ls)
 		return -EINVAL;
 
+ again:
 	dlm_lock_recovery(ls);
 
 	error = find_lkb(ls, lkid, &lkb);
@@ -3503,6 +3533,13 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
 	if (error)
 		goto out_put;
 
+	if (flags & DLM_LKF_IDLE) {
+		hold_lkb(lkb);
+		init_wait_func(&wait, woken_wake_function);
+		add_wait_queue(&lkb->lkb_idle, &wait);
+		wait_queued = true;
+	}
+
 	if (flags & DLM_LKF_CANCEL)
 		error = cancel_lock(ls, lkb, &args);
 	else
@@ -3518,6 +3555,16 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
 	dlm_put_lkb(lkb);
  out:
 	dlm_unlock_recovery(ls);
+	if (wait_queued) {
+		if (error == -EBUSY)
+			wait_woken(&wait, TASK_UNINTERRUPTIBLE,
+				   MAX_SCHEDULE_TIMEOUT);
+		remove_wait_queue(&lkb->lkb_idle, &wait);
+		wait_queued = false;
+		dlm_put_lkb(lkb);
+		if (error == -EBUSY)
+			goto again;
+	}
 	dlm_put_lockspace(ls);
 	return error;
 }
@@ -4617,6 +4664,7 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
 	}
+	wake_up(&lkb->lkb_idle);
  out:
 	unlock_rsb(r);
 	put_rsb(r);
@@ -5332,6 +5380,7 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
 		lkb->lkb_wait_type = 0;
 		lkb->lkb_wait_count = 0;
+		wake_up(&lkb->lkb_idle);
 		mutex_lock(&ls->ls_waiters_mutex);
 		list_del_init(&lkb->lkb_wait_reply);
 		mutex_unlock(&ls->ls_waiters_mutex);
diff --git a/include/uapi/linux/dlmconstants.h b/include/uapi/linux/dlmconstants.h
index a8ae47c..4e113f5 100644
--- a/include/uapi/linux/dlmconstants.h
+++ b/include/uapi/linux/dlmconstants.h
@@ -132,6 +132,10 @@
  * Unlock the lock even if it is converting or waiting or has sublocks.
  * Only really for use by the userland device.c code.
  *
+ * DLM_LKF_IDLE
+ *
+ * When the lock is busy, wait instead of returning -EBUSY.
+ *
  */
 
 #define DLM_LKF_NOQUEUE		0x00000001
@@ -153,6 +157,7 @@
 #define DLM_LKF_ALTCW		0x00010000
 #define DLM_LKF_FORCEUNLOCK	0x00020000
 #define DLM_LKF_TIMEOUT		0x00040000
+#define DLM_LKF_IDLE		0x00080000
 
 /*
  * Some return codes that are not in errno.h