dlm: Add DLM_LKF_IDLE flag
In certain situations, dlm_lock() and dlm_unlock() can return -EBUSY to
indicate that a lock is busy and that the locking operation should be
retried later. For example, this regularly happens when a pending
locking request is canceled with dlm_unlock(DLM_LKF_CANCEL), immediately
followed by another dlm_lock().
This is problematic because callers of dlm_lock() and dlm_unlock() don't
have a way of determining when the lock will no longer be busy. Fix
that by adding a DLM_LKF_IDLE flag that will make dlm_lock() and
dlm_unlock() wait when necessary instead of returning -EBUSY.
Signed-off-by: Alexander Ahring Oder Aring <aahringo@redhat.com>
Signed-off-by: Andreas Gruenbacher <agruenba@redhat.com>
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index 74a9590..134a448 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -265,6 +265,8 @@ struct dlm_lkb {
uint64_t lkb_recover_seq; /* from ls_recover_seq */
+ wait_queue_head_t lkb_idle;
+
char *lkb_lvbptr;
struct dlm_lksb *lkb_lksb; /* caller's status block */
void (*lkb_astfn) (void *astparam);
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index bdb51d2..c87e16c 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -1199,6 +1199,7 @@ static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
INIT_LIST_HEAD(&lkb->lkb_cb_list);
mutex_init(&lkb->lkb_cb_mutex);
INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
+ init_waitqueue_head(&lkb->lkb_idle);
idr_preload(GFP_NOFS);
spin_lock(&ls->ls_lkbidr_spin);
@@ -1335,6 +1336,7 @@ static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
break;
case DLM_LKSTS_GRANTED:
+ wake_up(&lkb->lkb_idle);
/* convention says granted locks kept in order of grmode */
lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
lkb->lkb_grmode);
@@ -1586,6 +1588,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
lkb->lkb_id, mstype, lkb->lkb_wait_type);
lkb->lkb_wait_count--;
lkb->lkb_wait_type = 0;
+ wake_up(&lkb->lkb_idle);
}
DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
@@ -1595,6 +1598,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
if (!lkb->lkb_wait_count)
list_del_init(&lkb->lkb_wait_reply);
unhold_lkb(lkb);
+ wake_up(&lkb->lkb_idle);
return 0;
}
@@ -2771,6 +2775,7 @@ static void process_lookup_list(struct dlm_rsb *r)
list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
list_del_init(&lkb->lkb_rsb_lookup);
+ wake_up(&lkb->lkb_idle);
_request_lock(r, lkb);
schedule();
}
@@ -2805,6 +2810,7 @@ static void confirm_master(struct dlm_rsb *r, int error)
lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
lkb_rsb_lookup);
list_del_init(&lkb->lkb_rsb_lookup);
+ wake_up(&lkb->lkb_idle);
r->res_first_lkid = lkb->lkb_id;
_request_lock(r, lkb);
}
@@ -2974,6 +2980,7 @@ static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
list_del_init(&lkb->lkb_rsb_lookup);
+ wake_up(&lkb->lkb_idle);
queue_cast(lkb->lkb_resource, lkb,
args->flags & DLM_LKF_CANCEL ?
-DLM_ECANCEL : -DLM_EUNLOCK);
@@ -3434,11 +3441,14 @@ int dlm_lock(dlm_lockspace_t *lockspace,
struct dlm_lkb *lkb;
struct dlm_args args;
int error, convert = flags & DLM_LKF_CONVERT;
+ struct wait_queue_entry wait;
+ bool wait_queued = false;
ls = dlm_find_lockspace_local(lockspace);
if (!ls)
return -EINVAL;
+ again:
dlm_lock_recovery(ls);
if (convert)
@@ -3456,6 +3466,13 @@ int dlm_lock(dlm_lockspace_t *lockspace,
if (error)
goto out_put;
+ if (flags & DLM_LKF_IDLE) {
+ hold_lkb(lkb);
+ init_wait_func(&wait, woken_wake_function);
+ add_wait_queue(&lkb->lkb_idle, &wait);
+ wait_queued = true;
+ }
+
if (convert)
error = convert_lock(ls, lkb, &args);
else
@@ -3472,6 +3489,16 @@ int dlm_lock(dlm_lockspace_t *lockspace,
error = 0;
out:
dlm_unlock_recovery(ls);
+ if (wait_queued) {
+ if (error == -EBUSY)
+ wait_woken(&wait, TASK_UNINTERRUPTIBLE,
+ MAX_SCHEDULE_TIMEOUT);
+ remove_wait_queue(&lkb->lkb_idle, &wait);
+ wait_queued = false;
+ __put_lkb(ls, lkb);
+ if (error == -EBUSY)
+ goto again;
+ }
dlm_put_lockspace(ls);
return error;
}
@@ -3486,11 +3513,14 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
struct dlm_lkb *lkb;
struct dlm_args args;
int error;
+ struct wait_queue_entry wait;
+ bool wait_queued = false;
ls = dlm_find_lockspace_local(lockspace);
if (!ls)
return -EINVAL;
+ again:
dlm_lock_recovery(ls);
error = find_lkb(ls, lkid, &lkb);
@@ -3503,6 +3533,13 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
if (error)
goto out_put;
+ if (flags & DLM_LKF_IDLE) {
+ hold_lkb(lkb);
+ init_wait_func(&wait, woken_wake_function);
+ add_wait_queue(&lkb->lkb_idle, &wait);
+ wait_queued = true;
+ }
+
if (flags & DLM_LKF_CANCEL)
error = cancel_lock(ls, lkb, &args);
else
@@ -3518,6 +3555,16 @@ int dlm_unlock(dlm_lockspace_t *lockspace,
dlm_put_lkb(lkb);
out:
dlm_unlock_recovery(ls);
+ if (wait_queued) {
+ if (error == -EBUSY)
+ wait_woken(&wait, TASK_UNINTERRUPTIBLE,
+ MAX_SCHEDULE_TIMEOUT);
+ remove_wait_queue(&lkb->lkb_idle, &wait);
+ wait_queued = false;
+ dlm_put_lkb(lkb);
+ if (error == -EBUSY)
+ goto again;
+ }
dlm_put_lockspace(ls);
return error;
}
@@ -4617,6 +4664,7 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
}
+ wake_up(&lkb->lkb_idle);
out:
unlock_rsb(r);
put_rsb(r);
@@ -5332,6 +5380,7 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
lkb->lkb_wait_type = 0;
lkb->lkb_wait_count = 0;
+ wake_up(&lkb->lkb_idle);
mutex_lock(&ls->ls_waiters_mutex);
list_del_init(&lkb->lkb_wait_reply);
mutex_unlock(&ls->ls_waiters_mutex);
diff --git a/include/uapi/linux/dlmconstants.h b/include/uapi/linux/dlmconstants.h
index a8ae47c..4e113f5 100644
--- a/include/uapi/linux/dlmconstants.h
+++ b/include/uapi/linux/dlmconstants.h
@@ -132,6 +132,10 @@
* Unlock the lock even if it is converting or waiting or has sublocks.
* Only really for use by the userland device.c code.
*
+ * DLM_LKF_IDLE
+ *
+ * When the lock is busy, wait instead of returning -EBUSY.
+ *
*/
#define DLM_LKF_NOQUEUE 0x00000001
@@ -153,6 +157,7 @@
#define DLM_LKF_ALTCW 0x00010000
#define DLM_LKF_FORCEUNLOCK 0x00020000
#define DLM_LKF_TIMEOUT 0x00040000
+#define DLM_LKF_IDLE 0x00080000
/*
* Some return codes that are not in errno.h