Merge tag 'ceph-for-4.9-rc1' of git://github.com/ceph/ceph-client

Pull Ceph updates from Ilya Dryomov:
 "The big ticket item here is support for rbd exclusive-lock feature,
  with maintenance operations offloaded to userspace (Douglas Fuller,
  Mike Christie and myself). Another block device bullet is a series
  fixing up layering error paths (myself).

  On the filesystem side, we've got patches that improve our handling of
  buffered vs dio write races (Neil Brown) and a few assorted fixes from
  Zheng. Also included a couple of random cleanups and a minor CRUSH
  update"

* tag 'ceph-for-4.9-rc1' of git://github.com/ceph/ceph-client: (39 commits)
  crush: remove redundant local variable
  crush: don't normalize input of crush_ln iteratively
  libceph: ceph_build_auth() doesn't need ceph_auth_build_hello()
  libceph: use CEPH_AUTH_UNKNOWN in ceph_auth_build_hello()
  ceph: fix description for rsize and rasize mount options
  rbd: use kmalloc_array() in rbd_header_from_disk()
  ceph: use list_move instead of list_del/list_add
  ceph: handle CEPH_SESSION_REJECT message
  ceph: avoid accessing / when mounting a subpath
  ceph: fix mandatory flock check
  ceph: remove warning when ceph_releasepage() is called on dirty page
  ceph: ignore error from invalidate_inode_pages2_range() in direct write
  ceph: fix error handling of start_read()
  rbd: add rbd_obj_request_error() helper
  rbd: img_data requests don't own their page array
  rbd: don't call rbd_osd_req_format_read() for !img_data requests
  rbd: rework rbd_img_obj_exists_submit() error paths
  rbd: don't crash or leak on errors in rbd_img_obj_parent_read_full_callback()
  rbd: move bumping img_request refcount into rbd_obj_request_submit()
  rbd: mark the original request as done if stat request fails
  ...
diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd
index 2ddd680..f208ac5 100644
--- a/Documentation/ABI/testing/sysfs-bus-rbd
+++ b/Documentation/ABI/testing/sysfs-bus-rbd
@@ -6,7 +6,7 @@
 
 Being used for adding and removing rbd block devices.
 
-Usage: <mon ip addr> <options> <pool name> <rbd image name> [snap name]
+Usage: <mon ip addr> <options> <pool name> <rbd image name> [<snap name>]
 
  $ echo "192.168.0.1 name=admin rbd foo" > /sys/bus/rbd/add
 
@@ -14,9 +14,13 @@
 will be assigned for any registered block device. If snapshot is used, it will
 be mapped read-only.
 
-Removal of a device:
+Usage: <dev-id> [force]
 
-  $ echo <dev-id> > /sys/bus/rbd/remove
+ $ echo 2 > /sys/bus/rbd/remove
+
+Optional "force" argument which when passed will wait for running requests and
+then unmap the image. Requests sent to the driver after initiating the removal
+will be failed.  (August 2016, since 4.9.)
 
 What:		/sys/bus/rbd/add_single_major
 Date:		December 2013
@@ -43,10 +47,25 @@
 Entries under /sys/bus/rbd/devices/<dev-id>/
 --------------------------------------------
 
+client_addr
+
+	The ceph unique client entity_addr_t (address + nonce).
+	The format is <address>:<port>/<nonce>: '1.2.3.4:1234/5678' or
+	'[1:2:3:4:5:6:7:8]:1234/5678'.  (August 2016, since 4.9.)
+
 client_id
 
 	The ceph unique client id that was assigned for this specific session.
 
+cluster_fsid
+
+	The ceph cluster UUID.  (August 2016, since 4.9.)
+
+config_info
+
+	The string written into /sys/bus/rbd/add{,_single_major}.  (August
+	2016, since 4.9.)
+
 features
 
 	A hexadecimal encoding of the feature bits for this image.
@@ -92,6 +111,10 @@
 
 	The current snapshot for which the device is mapped.
 
+snap_id
+
+	The current snapshot's id.  (August 2016, since 4.9.)
+
 parent
 
 	Information identifying the chain of parent images in a layered rbd
diff --git a/Documentation/filesystems/ceph.txt b/Documentation/filesystems/ceph.txt
index d6030aa..f5306ee 100644
--- a/Documentation/filesystems/ceph.txt
+++ b/Documentation/filesystems/ceph.txt
@@ -98,6 +98,10 @@
 	size.
 
   rsize=X
+	Specify the maximum read size in bytes.  By default there is no
+	maximum.
+
+  rasize=X
 	Specify the maximum readahead.
 
   mount_timeout=X
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index c1f84df..abb7162 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -31,6 +31,7 @@
 #include <linux/ceph/libceph.h>
 #include <linux/ceph/osd_client.h>
 #include <linux/ceph/mon_client.h>
+#include <linux/ceph/cls_lock_client.h>
 #include <linux/ceph/decode.h>
 #include <linux/parser.h>
 #include <linux/bsearch.h>
@@ -114,12 +115,17 @@
 
 #define RBD_OBJ_PREFIX_LEN_MAX	64
 
+#define RBD_NOTIFY_TIMEOUT	5	/* seconds */
+#define RBD_RETRY_DELAY		msecs_to_jiffies(1000)
+
 /* Feature bits */
 
 #define RBD_FEATURE_LAYERING	(1<<0)
 #define RBD_FEATURE_STRIPINGV2	(1<<1)
-#define RBD_FEATURES_ALL \
-	    (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
+#define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2)
+#define RBD_FEATURES_ALL	(RBD_FEATURE_LAYERING |		\
+				 RBD_FEATURE_STRIPINGV2 |	\
+				 RBD_FEATURE_EXCLUSIVE_LOCK)
 
 /* Features supported by this (client software) implementation. */
 
@@ -128,11 +134,8 @@
 /*
  * An RBD device name will be "rbd#", where the "rbd" comes from
  * RBD_DRV_NAME above, and # is a unique integer identifier.
- * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
- * enough to hold all possible device names.
  */
 #define DEV_NAME_LEN		32
-#define MAX_INT_FORMAT_WIDTH	((5 * sizeof (int)) / 2 + 1)
 
 /*
  * block device image metadata (in-memory version)
@@ -322,6 +325,24 @@
 #define for_each_obj_request_safe(ireq, oreq, n) \
 	list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
 
+enum rbd_watch_state {
+	RBD_WATCH_STATE_UNREGISTERED,
+	RBD_WATCH_STATE_REGISTERED,
+	RBD_WATCH_STATE_ERROR,
+};
+
+enum rbd_lock_state {
+	RBD_LOCK_STATE_UNLOCKED,
+	RBD_LOCK_STATE_LOCKED,
+	RBD_LOCK_STATE_RELEASING,
+};
+
+/* WatchNotify::ClientId */
+struct rbd_client_id {
+	u64 gid;
+	u64 handle;
+};
+
 struct rbd_mapping {
 	u64                     size;
 	u64                     features;
@@ -349,13 +370,29 @@
 	unsigned long		flags;		/* possibly lock protected */
 	struct rbd_spec		*spec;
 	struct rbd_options	*opts;
+	char			*config_info;	/* add{,_single_major} string */
 
 	struct ceph_object_id	header_oid;
 	struct ceph_object_locator header_oloc;
 
-	struct ceph_file_layout	layout;
+	struct ceph_file_layout	layout;		/* used for all rbd requests */
 
+	struct mutex		watch_mutex;
+	enum rbd_watch_state	watch_state;
 	struct ceph_osd_linger_request *watch_handle;
+	u64			watch_cookie;
+	struct delayed_work	watch_dwork;
+
+	struct rw_semaphore	lock_rwsem;
+	enum rbd_lock_state	lock_state;
+	struct rbd_client_id	owner_cid;
+	struct work_struct	acquired_lock_work;
+	struct work_struct	released_lock_work;
+	struct delayed_work	lock_dwork;
+	struct work_struct	unlock_work;
+	wait_queue_head_t	lock_waitq;
+
+	struct workqueue_struct	*task_wq;
 
 	struct rbd_spec		*parent_spec;
 	u64			parent_overlap;
@@ -439,6 +476,29 @@
 	return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
 }
 
+static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
+{
+	return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
+	       rbd_dev->spec->snap_id == CEPH_NOSNAP &&
+	       !rbd_dev->mapping.read_only;
+}
+
+static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
+{
+	return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
+	       rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
+}
+
+static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
+{
+	bool is_lock_owner;
+
+	down_read(&rbd_dev->lock_rwsem);
+	is_lock_owner = __rbd_is_lock_owner(rbd_dev);
+	up_read(&rbd_dev->lock_rwsem);
+	return is_lock_owner;
+}
+
 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
@@ -735,6 +795,7 @@
 	/* string args above */
 	Opt_read_only,
 	Opt_read_write,
+	Opt_lock_on_read,
 	Opt_err
 };
 
@@ -746,16 +807,19 @@
 	{Opt_read_only, "ro"},		/* Alternate spelling */
 	{Opt_read_write, "read_write"},
 	{Opt_read_write, "rw"},		/* Alternate spelling */
+	{Opt_lock_on_read, "lock_on_read"},
 	{Opt_err, NULL}
 };
 
 struct rbd_options {
 	int	queue_depth;
 	bool	read_only;
+	bool	lock_on_read;
 };
 
 #define RBD_QUEUE_DEPTH_DEFAULT	BLKDEV_MAX_RQ
 #define RBD_READ_ONLY_DEFAULT	false
+#define RBD_LOCK_ON_READ_DEFAULT false
 
 static int parse_rbd_opts_token(char *c, void *private)
 {
@@ -791,6 +855,9 @@
 	case Opt_read_write:
 		rbd_opts->read_only = false;
 		break;
+	case Opt_lock_on_read:
+		rbd_opts->lock_on_read = true;
+		break;
 	default:
 		/* libceph prints "bad option" msg */
 		return -EINVAL;
@@ -919,7 +986,6 @@
 	char *snap_names = NULL;
 	u64 *snap_sizes = NULL;
 	u32 snap_count;
-	size_t size;
 	int ret = -ENOMEM;
 	u32 i;
 
@@ -957,9 +1023,9 @@
 			goto out_err;
 
 		/* ...as well as the array of their sizes. */
-
-		size = snap_count * sizeof (*header->snap_sizes);
-		snap_sizes = kmalloc(size, GFP_KERNEL);
+		snap_sizes = kmalloc_array(snap_count,
+					   sizeof(*header->snap_sizes),
+					   GFP_KERNEL);
 		if (!snap_sizes)
 			goto out_err;
 
@@ -1551,11 +1617,18 @@
 	}
 }
 
-static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
-				struct rbd_obj_request *obj_request)
+static void rbd_img_obj_callback(struct rbd_obj_request *obj_request);
+
+static void rbd_obj_request_submit(struct rbd_obj_request *obj_request)
 {
-	dout("%s %p\n", __func__, obj_request);
-	return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
+	struct ceph_osd_request *osd_req = obj_request->osd_req;
+
+	dout("%s %p osd_req %p\n", __func__, obj_request, osd_req);
+	if (obj_request_img_data_test(obj_request)) {
+		WARN_ON(obj_request->callback != rbd_img_obj_callback);
+		rbd_img_request_get(obj_request->img_request);
+	}
+	ceph_osdc_start_request(osd_req->r_osdc, osd_req, false);
 }
 
 static void rbd_obj_request_end(struct rbd_obj_request *obj_request)
@@ -1745,6 +1818,22 @@
 		complete_all(&obj_request->completion);
 }
 
+static void rbd_obj_request_error(struct rbd_obj_request *obj_request, int err)
+{
+	obj_request->result = err;
+	obj_request->xferred = 0;
+	/*
+	 * kludge - mirror rbd_obj_request_submit() to match a put in
+	 * rbd_img_obj_callback()
+	 */
+	if (obj_request_img_data_test(obj_request)) {
+		WARN_ON(obj_request->callback != rbd_img_obj_callback);
+		rbd_img_request_get(obj_request->img_request);
+	}
+	obj_request_done_set(obj_request);
+	rbd_obj_request_complete(obj_request);
+}
+
 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
 {
 	struct rbd_img_request *img_request = NULL;
@@ -1877,11 +1966,10 @@
 
 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
 {
-	struct rbd_img_request *img_request = obj_request->img_request;
 	struct ceph_osd_request *osd_req = obj_request->osd_req;
 
-	if (img_request)
-		osd_req->r_snapid = img_request->snap_id;
+	rbd_assert(obj_request_img_data_test(obj_request));
+	osd_req->r_snapid = obj_request->img_request->snap_id;
 }
 
 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
@@ -2074,7 +2162,9 @@
 			bio_chain_put(obj_request->bio_list);
 		break;
 	case OBJ_REQUEST_PAGES:
-		if (obj_request->pages)
+		/* img_data requests don't own their page array */
+		if (obj_request->pages &&
+		    !obj_request_img_data_test(obj_request))
 			ceph_release_page_vector(obj_request->pages,
 						obj_request->page_count);
 		break;
@@ -2295,13 +2385,6 @@
 		xferred = obj_request->length;
 	}
 
-	/* Image object requests don't own their page array */
-
-	if (obj_request->type == OBJ_REQUEST_PAGES) {
-		obj_request->pages = NULL;
-		obj_request->page_count = 0;
-	}
-
 	if (img_request_child_test(img_request)) {
 		rbd_assert(img_request->obj_request != NULL);
 		more = obj_request->which < img_request->obj_request_count - 1;
@@ -2520,8 +2603,6 @@
 
 		rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
 
-		rbd_img_request_get(img_request);
-
 		img_offset += length;
 		resid -= length;
 	}
@@ -2579,7 +2660,6 @@
 {
 	struct rbd_obj_request *orig_request;
 	struct ceph_osd_request *osd_req;
-	struct ceph_osd_client *osdc;
 	struct rbd_device *rbd_dev;
 	struct page **pages;
 	enum obj_operation_type op_type;
@@ -2603,7 +2683,7 @@
 	rbd_assert(obj_request_type_valid(orig_request->type));
 	img_result = img_request->result;
 	parent_length = img_request->length;
-	rbd_assert(parent_length == img_request->xferred);
+	rbd_assert(img_result || parent_length == img_request->xferred);
 	rbd_img_request_put(img_request);
 
 	rbd_assert(orig_request->img_request);
@@ -2616,13 +2696,9 @@
 	 * and re-submit the original write request.
 	 */
 	if (!rbd_dev->parent_overlap) {
-		struct ceph_osd_client *osdc;
-
 		ceph_release_page_vector(pages, page_count);
-		osdc = &rbd_dev->rbd_client->client->osdc;
-		img_result = rbd_obj_request_submit(osdc, orig_request);
-		if (!img_result)
-			return;
+		rbd_obj_request_submit(orig_request);
+		return;
 	}
 
 	if (img_result)
@@ -2656,17 +2732,12 @@
 
 	/* All set, send it off. */
 
-	osdc = &rbd_dev->rbd_client->client->osdc;
-	img_result = rbd_obj_request_submit(osdc, orig_request);
-	if (!img_result)
-		return;
-out_err:
-	/* Record the error code and complete the request */
+	rbd_obj_request_submit(orig_request);
+	return;
 
-	orig_request->result = img_result;
-	orig_request->xferred = 0;
-	obj_request_done_set(orig_request);
-	rbd_obj_request_complete(orig_request);
+out_err:
+	ceph_release_page_vector(pages, page_count);
+	rbd_obj_request_error(orig_request, img_result);
 }
 
 /*
@@ -2680,26 +2751,19 @@
  * When the read completes, this page array will be transferred to
  * the original object request for the copyup operation.
  *
- * If an error occurs, record it as the result of the original
- * object request and mark it done so it gets completed.
+ * If an error occurs, it is recorded as the result of the original
+ * object request in rbd_img_obj_exists_callback().
  */
 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
 {
-	struct rbd_img_request *img_request = NULL;
+	struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
 	struct rbd_img_request *parent_request = NULL;
-	struct rbd_device *rbd_dev;
 	u64 img_offset;
 	u64 length;
 	struct page **pages = NULL;
 	u32 page_count;
 	int result;
 
-	rbd_assert(obj_request_img_data_test(obj_request));
-	rbd_assert(obj_request_type_valid(obj_request->type));
-
-	img_request = obj_request->img_request;
-	rbd_assert(img_request != NULL);
-	rbd_dev = img_request->rbd_dev;
 	rbd_assert(rbd_dev->parent != NULL);
 
 	/*
@@ -2740,10 +2804,11 @@
 	result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
 	if (result)
 		goto out_err;
+
 	parent_request->copyup_pages = pages;
 	parent_request->copyup_page_count = page_count;
-
 	parent_request->callback = rbd_img_obj_parent_read_full_callback;
+
 	result = rbd_img_request_submit(parent_request);
 	if (!result)
 		return 0;
@@ -2757,10 +2822,6 @@
 		ceph_release_page_vector(pages, page_count);
 	if (parent_request)
 		rbd_img_request_put(parent_request);
-	obj_request->result = result;
-	obj_request->xferred = 0;
-	obj_request_done_set(obj_request);
-
 	return result;
 }
 
@@ -2793,17 +2854,13 @@
 
 	/*
 	 * If the overlap has become 0 (most likely because the
-	 * image has been flattened) we need to free the pages
-	 * and re-submit the original write request.
+	 * image has been flattened) we need to re-submit the
+	 * original request.
 	 */
 	rbd_dev = orig_request->img_request->rbd_dev;
 	if (!rbd_dev->parent_overlap) {
-		struct ceph_osd_client *osdc;
-
-		osdc = &rbd_dev->rbd_client->client->osdc;
-		result = rbd_obj_request_submit(osdc, orig_request);
-		if (!result)
-			return;
+		rbd_obj_request_submit(orig_request);
+		return;
 	}
 
 	/*
@@ -2816,31 +2873,45 @@
 		obj_request_existence_set(orig_request, true);
 	} else if (result == -ENOENT) {
 		obj_request_existence_set(orig_request, false);
-	} else if (result) {
-		orig_request->result = result;
-		goto out;
+	} else {
+		goto fail_orig_request;
 	}
 
 	/*
 	 * Resubmit the original request now that we have recorded
 	 * whether the target object exists.
 	 */
-	orig_request->result = rbd_img_obj_request_submit(orig_request);
-out:
-	if (orig_request->result)
-		rbd_obj_request_complete(orig_request);
+	result = rbd_img_obj_request_submit(orig_request);
+	if (result)
+		goto fail_orig_request;
+
+	return;
+
+fail_orig_request:
+	rbd_obj_request_error(orig_request, result);
 }
 
 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
 {
+	struct rbd_device *rbd_dev = obj_request->img_request->rbd_dev;
 	struct rbd_obj_request *stat_request;
-	struct rbd_device *rbd_dev;
-	struct ceph_osd_client *osdc;
-	struct page **pages = NULL;
+	struct page **pages;
 	u32 page_count;
 	size_t size;
 	int ret;
 
+	stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
+					      OBJ_REQUEST_PAGES);
+	if (!stat_request)
+		return -ENOMEM;
+
+	stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
+						   stat_request);
+	if (!stat_request->osd_req) {
+		ret = -ENOMEM;
+		goto fail_stat_request;
+	}
+
 	/*
 	 * The response data for a STAT call consists of:
 	 *     le64 length;
@@ -2852,52 +2923,33 @@
 	size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
 	page_count = (u32)calc_pages_for(0, size);
 	pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
-	if (IS_ERR(pages))
-		return PTR_ERR(pages);
+	if (IS_ERR(pages)) {
+		ret = PTR_ERR(pages);
+		goto fail_stat_request;
+	}
 
-	ret = -ENOMEM;
-	stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
-							OBJ_REQUEST_PAGES);
-	if (!stat_request)
-		goto out;
+	osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
+	osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
+				     false, false);
 
 	rbd_obj_request_get(obj_request);
 	stat_request->obj_request = obj_request;
 	stat_request->pages = pages;
 	stat_request->page_count = page_count;
-
-	rbd_assert(obj_request->img_request);
-	rbd_dev = obj_request->img_request->rbd_dev;
-	stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
-						   stat_request);
-	if (!stat_request->osd_req)
-		goto out;
 	stat_request->callback = rbd_img_obj_exists_callback;
 
-	osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT, 0);
-	osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
-					false, false);
-	rbd_osd_req_format_read(stat_request);
+	rbd_obj_request_submit(stat_request);
+	return 0;
 
-	osdc = &rbd_dev->rbd_client->client->osdc;
-	ret = rbd_obj_request_submit(osdc, stat_request);
-out:
-	if (ret)
-		rbd_obj_request_put(obj_request);
-
+fail_stat_request:
+	rbd_obj_request_put(stat_request);
 	return ret;
 }
 
 static bool img_obj_request_simple(struct rbd_obj_request *obj_request)
 {
-	struct rbd_img_request *img_request;
-	struct rbd_device *rbd_dev;
-
-	rbd_assert(obj_request_img_data_test(obj_request));
-
-	img_request = obj_request->img_request;
-	rbd_assert(img_request);
-	rbd_dev = img_request->rbd_dev;
+	struct rbd_img_request *img_request = obj_request->img_request;
+	struct rbd_device *rbd_dev = img_request->rbd_dev;
 
 	/* Reads */
 	if (!img_request_write_test(img_request) &&
@@ -2936,14 +2988,13 @@
 
 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
 {
+	rbd_assert(obj_request_img_data_test(obj_request));
+	rbd_assert(obj_request_type_valid(obj_request->type));
+	rbd_assert(obj_request->img_request);
+
 	if (img_obj_request_simple(obj_request)) {
-		struct rbd_device *rbd_dev;
-		struct ceph_osd_client *osdc;
-
-		rbd_dev = obj_request->img_request->rbd_dev;
-		osdc = &rbd_dev->rbd_client->client->osdc;
-
-		return rbd_obj_request_submit(osdc, obj_request);
+		rbd_obj_request_submit(obj_request);
+		return 0;
 	}
 
 	/*
@@ -3006,12 +3057,8 @@
 	rbd_assert(obj_request->img_request);
 	rbd_dev = obj_request->img_request->rbd_dev;
 	if (!rbd_dev->parent_overlap) {
-		struct ceph_osd_client *osdc;
-
-		osdc = &rbd_dev->rbd_client->client->osdc;
-		img_result = rbd_obj_request_submit(osdc, obj_request);
-		if (!img_result)
-			return;
+		rbd_obj_request_submit(obj_request);
+		return;
 	}
 
 	obj_request->result = img_result;
@@ -3084,65 +3131,724 @@
 	obj_request_done_set(obj_request);
 }
 
-static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev);
-static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev);
+static const struct rbd_client_id rbd_empty_cid;
+
+static bool rbd_cid_equal(const struct rbd_client_id *lhs,
+			  const struct rbd_client_id *rhs)
+{
+	return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
+}
+
+static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
+{
+	struct rbd_client_id cid;
+
+	mutex_lock(&rbd_dev->watch_mutex);
+	cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
+	cid.handle = rbd_dev->watch_cookie;
+	mutex_unlock(&rbd_dev->watch_mutex);
+	return cid;
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
+			      const struct rbd_client_id *cid)
+{
+	dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
+	     rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
+	     cid->gid, cid->handle);
+	rbd_dev->owner_cid = *cid; /* struct */
+}
+
+static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
+{
+	mutex_lock(&rbd_dev->watch_mutex);
+	sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
+	mutex_unlock(&rbd_dev->watch_mutex);
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static int rbd_lock(struct rbd_device *rbd_dev)
+{
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	struct rbd_client_id cid = rbd_get_cid(rbd_dev);
+	char cookie[32];
+	int ret;
+
+	WARN_ON(__rbd_is_lock_owner(rbd_dev));
+
+	format_lock_cookie(rbd_dev, cookie);
+	ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
+			    RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
+			    RBD_LOCK_TAG, "", 0);
+	if (ret)
+		return ret;
+
+	rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
+	rbd_set_owner_cid(rbd_dev, &cid);
+	queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
+	return 0;
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static int rbd_unlock(struct rbd_device *rbd_dev)
+{
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	char cookie[32];
+	int ret;
+
+	WARN_ON(!__rbd_is_lock_owner(rbd_dev));
+
+	rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
+
+	format_lock_cookie(rbd_dev, cookie);
+	ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
+			      RBD_LOCK_NAME, cookie);
+	if (ret && ret != -ENOENT) {
+		rbd_warn(rbd_dev, "cls_unlock failed: %d", ret);
+		return ret;
+	}
+
+	rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
+	queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
+	return 0;
+}
+
+static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
+				enum rbd_notify_op notify_op,
+				struct page ***preply_pages,
+				size_t *preply_len)
+{
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	struct rbd_client_id cid = rbd_get_cid(rbd_dev);
+	int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
+	char buf[buf_size];
+	void *p = buf;
+
+	dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
+
+	/* encode *LockPayload NotifyMessage (op + ClientId) */
+	ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
+	ceph_encode_32(&p, notify_op);
+	ceph_encode_64(&p, cid.gid);
+	ceph_encode_64(&p, cid.handle);
+
+	return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
+				&rbd_dev->header_oloc, buf, buf_size,
+				RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
+}
+
+static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
+			       enum rbd_notify_op notify_op)
+{
+	struct page **reply_pages;
+	size_t reply_len;
+
+	__rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
+	ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
+}
+
+static void rbd_notify_acquired_lock(struct work_struct *work)
+{
+	struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
+						  acquired_lock_work);
+
+	rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
+}
+
+static void rbd_notify_released_lock(struct work_struct *work)
+{
+	struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
+						  released_lock_work);
+
+	rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
+}
+
+static int rbd_request_lock(struct rbd_device *rbd_dev)
+{
+	struct page **reply_pages;
+	size_t reply_len;
+	bool lock_owner_responded = false;
+	int ret;
+
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
+
+	ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
+				   &reply_pages, &reply_len);
+	if (ret && ret != -ETIMEDOUT) {
+		rbd_warn(rbd_dev, "failed to request lock: %d", ret);
+		goto out;
+	}
+
+	if (reply_len > 0 && reply_len <= PAGE_SIZE) {
+		void *p = page_address(reply_pages[0]);
+		void *const end = p + reply_len;
+		u32 n;
+
+		ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
+		while (n--) {
+			u8 struct_v;
+			u32 len;
+
+			ceph_decode_need(&p, end, 8 + 8, e_inval);
+			p += 8 + 8; /* skip gid and cookie */
+
+			ceph_decode_32_safe(&p, end, len, e_inval);
+			if (!len)
+				continue;
+
+			if (lock_owner_responded) {
+				rbd_warn(rbd_dev,
+					 "duplicate lock owners detected");
+				ret = -EIO;
+				goto out;
+			}
+
+			lock_owner_responded = true;
+			ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
+						  &struct_v, &len);
+			if (ret) {
+				rbd_warn(rbd_dev,
+					 "failed to decode ResponseMessage: %d",
+					 ret);
+				goto e_inval;
+			}
+
+			ret = ceph_decode_32(&p);
+		}
+	}
+
+	if (!lock_owner_responded) {
+		rbd_warn(rbd_dev, "no lock owners detected");
+		ret = -ETIMEDOUT;
+	}
+
+out:
+	ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
+	return ret;
+
+e_inval:
+	ret = -EINVAL;
+	goto out;
+}
+
+static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
+{
+	dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
+
+	cancel_delayed_work(&rbd_dev->lock_dwork);
+	if (wake_all)
+		wake_up_all(&rbd_dev->lock_waitq);
+	else
+		wake_up(&rbd_dev->lock_waitq);
+}
+
+static int get_lock_owner_info(struct rbd_device *rbd_dev,
+			       struct ceph_locker **lockers, u32 *num_lockers)
+{
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	u8 lock_type;
+	char *lock_tag;
+	int ret;
+
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
+
+	ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
+				 &rbd_dev->header_oloc, RBD_LOCK_NAME,
+				 &lock_type, &lock_tag, lockers, num_lockers);
+	if (ret)
+		return ret;
+
+	if (*num_lockers == 0) {
+		dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
+		goto out;
+	}
+
+	if (strcmp(lock_tag, RBD_LOCK_TAG)) {
+		rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
+			 lock_tag);
+		ret = -EBUSY;
+		goto out;
+	}
+
+	if (lock_type == CEPH_CLS_LOCK_SHARED) {
+		rbd_warn(rbd_dev, "shared lock type detected");
+		ret = -EBUSY;
+		goto out;
+	}
+
+	if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
+		    strlen(RBD_LOCK_COOKIE_PREFIX))) {
+		rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
+			 (*lockers)[0].id.cookie);
+		ret = -EBUSY;
+		goto out;
+	}
+
+out:
+	kfree(lock_tag);
+	return ret;
+}
+
+static int find_watcher(struct rbd_device *rbd_dev,
+			const struct ceph_locker *locker)
+{
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	struct ceph_watch_item *watchers;
+	u32 num_watchers;
+	u64 cookie;
+	int i;
+	int ret;
+
+	ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
+				      &rbd_dev->header_oloc, &watchers,
+				      &num_watchers);
+	if (ret)
+		return ret;
+
+	sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
+	for (i = 0; i < num_watchers; i++) {
+		if (!memcmp(&watchers[i].addr, &locker->info.addr,
+			    sizeof(locker->info.addr)) &&
+		    watchers[i].cookie == cookie) {
+			struct rbd_client_id cid = {
+				.gid = le64_to_cpu(watchers[i].name.num),
+				.handle = cookie,
+			};
+
+			dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
+			     rbd_dev, cid.gid, cid.handle);
+			rbd_set_owner_cid(rbd_dev, &cid);
+			ret = 1;
+			goto out;
+		}
+	}
+
+	dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
+	ret = 0;
+out:
+	kfree(watchers);
+	return ret;
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static int rbd_try_lock(struct rbd_device *rbd_dev)
+{
+	struct ceph_client *client = rbd_dev->rbd_client->client;
+	struct ceph_locker *lockers;
+	u32 num_lockers;
+	int ret;
+
+	for (;;) {
+		ret = rbd_lock(rbd_dev);
+		if (ret != -EBUSY)
+			return ret;
+
+		/* determine if the current lock holder is still alive */
+		ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
+		if (ret)
+			return ret;
+
+		if (num_lockers == 0)
+			goto again;
+
+		ret = find_watcher(rbd_dev, lockers);
+		if (ret) {
+			if (ret > 0)
+				ret = 0; /* have to request lock */
+			goto out;
+		}
+
+		rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
+			 ENTITY_NAME(lockers[0].id.name));
+
+		ret = ceph_monc_blacklist_add(&client->monc,
+					      &lockers[0].info.addr);
+		if (ret) {
+			rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
+				 ENTITY_NAME(lockers[0].id.name), ret);
+			goto out;
+		}
+
+		ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
+					  &rbd_dev->header_oloc, RBD_LOCK_NAME,
+					  lockers[0].id.cookie,
+					  &lockers[0].id.name);
+		if (ret && ret != -ENOENT)
+			goto out;
+
+again:
+		ceph_free_lockers(lockers, num_lockers);
+	}
+
+out:
+	ceph_free_lockers(lockers, num_lockers);
+	return ret;
+}
+
+/*
+ * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
+ */
+static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
+						int *pret)
+{
+	enum rbd_lock_state lock_state;
+
+	down_read(&rbd_dev->lock_rwsem);
+	dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
+	     rbd_dev->lock_state);
+	if (__rbd_is_lock_owner(rbd_dev)) {
+		lock_state = rbd_dev->lock_state;
+		up_read(&rbd_dev->lock_rwsem);
+		return lock_state;
+	}
+
+	up_read(&rbd_dev->lock_rwsem);
+	down_write(&rbd_dev->lock_rwsem);
+	dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
+	     rbd_dev->lock_state);
+	if (!__rbd_is_lock_owner(rbd_dev)) {
+		*pret = rbd_try_lock(rbd_dev);
+		if (*pret)
+			rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
+	}
+
+	lock_state = rbd_dev->lock_state;
+	up_write(&rbd_dev->lock_rwsem);
+	return lock_state;
+}
+
+static void rbd_acquire_lock(struct work_struct *work)
+{
+	struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
+					    struct rbd_device, lock_dwork);
+	enum rbd_lock_state lock_state;
+	int ret;
+
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
+again:
+	lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
+	if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
+		if (lock_state == RBD_LOCK_STATE_LOCKED)
+			wake_requests(rbd_dev, true);
+		dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
+		     rbd_dev, lock_state, ret);
+		return;
+	}
+
+	ret = rbd_request_lock(rbd_dev);
+	if (ret == -ETIMEDOUT) {
+		goto again; /* treat this as a dead client */
+	} else if (ret < 0) {
+		rbd_warn(rbd_dev, "error requesting lock: %d", ret);
+		mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
+				 RBD_RETRY_DELAY);
+	} else {
+		/*
+		 * lock owner acked, but resend if we don't see them
+		 * release the lock
+		 */
+		dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
+		     rbd_dev);
+		mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
+		    msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
+	}
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static bool rbd_release_lock(struct rbd_device *rbd_dev)
+{
+	dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
+	     rbd_dev->lock_state);
+	if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
+		return false;
+
+	rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
+	downgrade_write(&rbd_dev->lock_rwsem);
+	/*
+	 * Ensure that all in-flight IO is flushed.
+	 *
+	 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
+	 * may be shared with other devices.
+	 */
+	ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
+	up_read(&rbd_dev->lock_rwsem);
+
+	down_write(&rbd_dev->lock_rwsem);
+	dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
+	     rbd_dev->lock_state);
+	if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
+		return false;
+
+	if (!rbd_unlock(rbd_dev))
+		/*
+		 * Give others a chance to grab the lock - we would re-acquire
+		 * almost immediately if we got new IO during ceph_osdc_sync()
+		 * otherwise.  We need to ack our own notifications, so this
+		 * lock_dwork will be requeued from rbd_wait_state_locked()
+		 * after wake_requests() in rbd_handle_released_lock().
+		 */
+		cancel_delayed_work(&rbd_dev->lock_dwork);
+
+	return true;
+}
+
+static void rbd_release_lock_work(struct work_struct *work)
+{
+	struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
+						  unlock_work);
+
+	down_write(&rbd_dev->lock_rwsem);
+	rbd_release_lock(rbd_dev);
+	up_write(&rbd_dev->lock_rwsem);
+}
+
+static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
+				     void **p)
+{
+	struct rbd_client_id cid = { 0 };
+
+	if (struct_v >= 2) {
+		cid.gid = ceph_decode_64(p);
+		cid.handle = ceph_decode_64(p);
+	}
+
+	dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
+	     cid.handle);
+	if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
+		down_write(&rbd_dev->lock_rwsem);
+		if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
+			/*
+			 * we already know that the remote client is
+			 * the owner
+			 */
+			up_write(&rbd_dev->lock_rwsem);
+			return;
+		}
+
+		rbd_set_owner_cid(rbd_dev, &cid);
+		downgrade_write(&rbd_dev->lock_rwsem);
+	} else {
+		down_read(&rbd_dev->lock_rwsem);
+	}
+
+	if (!__rbd_is_lock_owner(rbd_dev))
+		wake_requests(rbd_dev, false);
+	up_read(&rbd_dev->lock_rwsem);
+}
+
+static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
+				     void **p)
+{
+	struct rbd_client_id cid = { 0 };
+
+	if (struct_v >= 2) {
+		cid.gid = ceph_decode_64(p);
+		cid.handle = ceph_decode_64(p);
+	}
+
+	dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
+	     cid.handle);
+	if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
+		down_write(&rbd_dev->lock_rwsem);
+		if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
+			dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
+			     __func__, rbd_dev, cid.gid, cid.handle,
+			     rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
+			up_write(&rbd_dev->lock_rwsem);
+			return;
+		}
+
+		rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
+		downgrade_write(&rbd_dev->lock_rwsem);
+	} else {
+		down_read(&rbd_dev->lock_rwsem);
+	}
+
+	if (!__rbd_is_lock_owner(rbd_dev))
+		wake_requests(rbd_dev, false);
+	up_read(&rbd_dev->lock_rwsem);
+}
+
+static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
+				    void **p)
+{
+	struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
+	struct rbd_client_id cid = { 0 };
+	bool need_to_send;
+
+	if (struct_v >= 2) {
+		cid.gid = ceph_decode_64(p);
+		cid.handle = ceph_decode_64(p);
+	}
+
+	dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
+	     cid.handle);
+	if (rbd_cid_equal(&cid, &my_cid))
+		return false;
+
+	down_read(&rbd_dev->lock_rwsem);
+	need_to_send = __rbd_is_lock_owner(rbd_dev);
+	if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
+		if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) {
+			dout("%s rbd_dev %p queueing unlock_work\n", __func__,
+			     rbd_dev);
+			queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
+		}
+	}
+	up_read(&rbd_dev->lock_rwsem);
+	return need_to_send;
+}
+
+static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
+				     u64 notify_id, u64 cookie, s32 *result)
+{
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
+	char buf[buf_size];
+	int ret;
+
+	if (result) {
+		void *p = buf;
+
+		/* encode ResponseMessage */
+		ceph_start_encoding(&p, 1, 1,
+				    buf_size - CEPH_ENCODING_START_BLK_LEN);
+		ceph_encode_32(&p, *result);
+	} else {
+		buf_size = 0;
+	}
+
+	ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
+				   &rbd_dev->header_oloc, notify_id, cookie,
+				   buf, buf_size);
+	if (ret)
+		rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
+}
+
+static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
+				   u64 cookie)
+{
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
+	__rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
+}
+
+static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
+					  u64 notify_id, u64 cookie, s32 result)
+{
+	dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
+	__rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
+}
 
 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
 			 u64 notifier_id, void *data, size_t data_len)
 {
 	struct rbd_device *rbd_dev = arg;
-	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	void *p = data;
+	void *const end = p + data_len;
+	u8 struct_v;
+	u32 len;
+	u32 notify_op;
 	int ret;
 
-	dout("%s rbd_dev %p cookie %llu notify_id %llu\n", __func__, rbd_dev,
-	     cookie, notify_id);
+	dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
+	     __func__, rbd_dev, cookie, notify_id, data_len);
+	if (data_len) {
+		ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
+					  &struct_v, &len);
+		if (ret) {
+			rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
+				 ret);
+			return;
+		}
 
-	/*
-	 * Until adequate refresh error handling is in place, there is
-	 * not much we can do here, except warn.
-	 *
-	 * See http://tracker.ceph.com/issues/5040
-	 */
-	ret = rbd_dev_refresh(rbd_dev);
-	if (ret)
-		rbd_warn(rbd_dev, "refresh failed: %d", ret);
+		notify_op = ceph_decode_32(&p);
+	} else {
+		/* legacy notification for header updates */
+		notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
+		len = 0;
+	}
 
-	ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
-				   &rbd_dev->header_oloc, notify_id, cookie,
-				   NULL, 0);
-	if (ret)
-		rbd_warn(rbd_dev, "notify_ack ret %d", ret);
+	dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
+	switch (notify_op) {
+	case RBD_NOTIFY_OP_ACQUIRED_LOCK:
+		rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
+		rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
+		break;
+	case RBD_NOTIFY_OP_RELEASED_LOCK:
+		rbd_handle_released_lock(rbd_dev, struct_v, &p);
+		rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
+		break;
+	case RBD_NOTIFY_OP_REQUEST_LOCK:
+		if (rbd_handle_request_lock(rbd_dev, struct_v, &p))
+			/*
+			 * send ResponseMessage(0) back so the client
+			 * can detect a missing owner
+			 */
+			rbd_acknowledge_notify_result(rbd_dev, notify_id,
+						      cookie, 0);
+		else
+			rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
+		break;
+	case RBD_NOTIFY_OP_HEADER_UPDATE:
+		ret = rbd_dev_refresh(rbd_dev);
+		if (ret)
+			rbd_warn(rbd_dev, "refresh failed: %d", ret);
+
+		rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
+		break;
+	default:
+		if (rbd_is_lock_owner(rbd_dev))
+			rbd_acknowledge_notify_result(rbd_dev, notify_id,
+						      cookie, -EOPNOTSUPP);
+		else
+			rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
+		break;
+	}
 }
 
+static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
+
 static void rbd_watch_errcb(void *arg, u64 cookie, int err)
 {
 	struct rbd_device *rbd_dev = arg;
-	int ret;
 
 	rbd_warn(rbd_dev, "encountered watch error: %d", err);
 
-	__rbd_dev_header_unwatch_sync(rbd_dev);
+	down_write(&rbd_dev->lock_rwsem);
+	rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
+	up_write(&rbd_dev->lock_rwsem);
 
-	ret = rbd_dev_header_watch_sync(rbd_dev);
-	if (ret) {
-		rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
-		return;
+	mutex_lock(&rbd_dev->watch_mutex);
+	if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
+		__rbd_unregister_watch(rbd_dev);
+		rbd_dev->watch_state = RBD_WATCH_STATE_ERROR;
+
+		queue_delayed_work(rbd_dev->task_wq, &rbd_dev->watch_dwork, 0);
 	}
-
-	ret = rbd_dev_refresh(rbd_dev);
-	if (ret)
-		rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
+	mutex_unlock(&rbd_dev->watch_mutex);
 }
 
 /*
- * Initiate a watch request, synchronously.
+ * watch_mutex must be locked
  */
-static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
+static int __rbd_register_watch(struct rbd_device *rbd_dev)
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	struct ceph_osd_linger_request *handle;
 
 	rbd_assert(!rbd_dev->watch_handle);
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
 
 	handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
 				 &rbd_dev->header_oloc, rbd_watch_cb,
@@ -3154,13 +3860,16 @@
 	return 0;
 }
 
-static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
+/*
+ * watch_mutex must be locked
+ */
+static void __rbd_unregister_watch(struct rbd_device *rbd_dev)
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	int ret;
 
-	if (!rbd_dev->watch_handle)
-		return;
+	rbd_assert(rbd_dev->watch_handle);
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
 
 	ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
 	if (ret)
@@ -3169,17 +3878,100 @@
 	rbd_dev->watch_handle = NULL;
 }
 
-/*
- * Tear down a watch request, synchronously.
- */
-static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
+static int rbd_register_watch(struct rbd_device *rbd_dev)
 {
-	__rbd_dev_header_unwatch_sync(rbd_dev);
+	int ret;
 
-	dout("%s flushing notifies\n", __func__);
+	mutex_lock(&rbd_dev->watch_mutex);
+	rbd_assert(rbd_dev->watch_state == RBD_WATCH_STATE_UNREGISTERED);
+	ret = __rbd_register_watch(rbd_dev);
+	if (ret)
+		goto out;
+
+	rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
+	rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
+
+out:
+	mutex_unlock(&rbd_dev->watch_mutex);
+	return ret;
+}
+
+static void cancel_tasks_sync(struct rbd_device *rbd_dev)
+{
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
+
+	cancel_delayed_work_sync(&rbd_dev->watch_dwork);
+	cancel_work_sync(&rbd_dev->acquired_lock_work);
+	cancel_work_sync(&rbd_dev->released_lock_work);
+	cancel_delayed_work_sync(&rbd_dev->lock_dwork);
+	cancel_work_sync(&rbd_dev->unlock_work);
+}
+
+static void rbd_unregister_watch(struct rbd_device *rbd_dev)
+{
+	WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
+	cancel_tasks_sync(rbd_dev);
+
+	mutex_lock(&rbd_dev->watch_mutex);
+	if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED)
+		__rbd_unregister_watch(rbd_dev);
+	rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
+	mutex_unlock(&rbd_dev->watch_mutex);
+
 	ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
 }
 
+static void rbd_reregister_watch(struct work_struct *work)
+{
+	struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
+					    struct rbd_device, watch_dwork);
+	bool was_lock_owner = false;
+	int ret;
+
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
+
+	down_write(&rbd_dev->lock_rwsem);
+	if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
+		was_lock_owner = rbd_release_lock(rbd_dev);
+
+	mutex_lock(&rbd_dev->watch_mutex);
+	if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR)
+		goto fail_unlock;
+
+	ret = __rbd_register_watch(rbd_dev);
+	if (ret) {
+		rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
+		if (ret != -EBLACKLISTED)
+			queue_delayed_work(rbd_dev->task_wq,
+					   &rbd_dev->watch_dwork,
+					   RBD_RETRY_DELAY);
+		goto fail_unlock;
+	}
+
+	rbd_dev->watch_state = RBD_WATCH_STATE_REGISTERED;
+	rbd_dev->watch_cookie = rbd_dev->watch_handle->linger_id;
+	mutex_unlock(&rbd_dev->watch_mutex);
+
+	ret = rbd_dev_refresh(rbd_dev);
+	if (ret)
+		rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
+
+	if (was_lock_owner) {
+		ret = rbd_try_lock(rbd_dev);
+		if (ret)
+			rbd_warn(rbd_dev, "reregisteration lock failed: %d",
+				 ret);
+	}
+
+	up_write(&rbd_dev->lock_rwsem);
+	wake_requests(rbd_dev, true);
+	return;
+
+fail_unlock:
+	mutex_unlock(&rbd_dev->watch_mutex);
+	up_write(&rbd_dev->lock_rwsem);
+}
+
 /*
  * Synchronous osd object method call.  Returns the number of bytes
  * returned in the outbound buffer, or a negative error code.
@@ -3193,7 +3985,6 @@
 			     void *inbound,
 			     size_t inbound_size)
 {
-	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	struct rbd_obj_request *obj_request;
 	struct page **pages;
 	u32 page_count;
@@ -3242,11 +4033,8 @@
 	osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
 					obj_request->pages, inbound_size,
 					0, false, false);
-	rbd_osd_req_format_read(obj_request);
 
-	ret = rbd_obj_request_submit(osdc, obj_request);
-	if (ret)
-		goto out;
+	rbd_obj_request_submit(obj_request);
 	ret = rbd_obj_request_wait(obj_request);
 	if (ret)
 		goto out;
@@ -3267,6 +4055,29 @@
 	return ret;
 }
 
+/*
+ * lock_rwsem must be held for read
+ */
+static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
+{
+	DEFINE_WAIT(wait);
+
+	do {
+		/*
+		 * Note the use of mod_delayed_work() in rbd_acquire_lock()
+		 * and cancel_delayed_work() in wake_requests().
+		 */
+		dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
+		queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
+		prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
+					  TASK_UNINTERRUPTIBLE);
+		up_read(&rbd_dev->lock_rwsem);
+		schedule();
+		down_read(&rbd_dev->lock_rwsem);
+	} while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
+	finish_wait(&rbd_dev->lock_waitq, &wait);
+}
+
 static void rbd_queue_workfn(struct work_struct *work)
 {
 	struct request *rq = blk_mq_rq_from_pdu(work);
@@ -3277,6 +4088,7 @@
 	u64 length = blk_rq_bytes(rq);
 	enum obj_operation_type op_type;
 	u64 mapping_size;
+	bool must_be_locked;
 	int result;
 
 	if (rq->cmd_type != REQ_TYPE_FS) {
@@ -3338,6 +4150,10 @@
 	if (op_type != OBJ_OP_READ) {
 		snapc = rbd_dev->header.snapc;
 		ceph_get_snap_context(snapc);
+		must_be_locked = rbd_is_lock_supported(rbd_dev);
+	} else {
+		must_be_locked = rbd_dev->opts->lock_on_read &&
+					rbd_is_lock_supported(rbd_dev);
 	}
 	up_read(&rbd_dev->header_rwsem);
 
@@ -3348,11 +4164,17 @@
 		goto err_rq;
 	}
 
+	if (must_be_locked) {
+		down_read(&rbd_dev->lock_rwsem);
+		if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
+			rbd_wait_state_locked(rbd_dev);
+	}
+
 	img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
 					     snapc);
 	if (!img_request) {
 		result = -ENOMEM;
-		goto err_rq;
+		goto err_unlock;
 	}
 	img_request->rq = rq;
 	snapc = NULL; /* img_request consumes a ref */
@@ -3370,10 +4192,15 @@
 	if (result)
 		goto err_img_request;
 
+	if (must_be_locked)
+		up_read(&rbd_dev->lock_rwsem);
 	return;
 
 err_img_request:
 	rbd_img_request_put(img_request);
+err_unlock:
+	if (must_be_locked)
+		up_read(&rbd_dev->lock_rwsem);
 err_rq:
 	if (result)
 		rbd_warn(rbd_dev, "%s %llx at %llx result %d",
@@ -3415,7 +4242,6 @@
 				u64 offset, u64 length, void *buf)
 
 {
-	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	struct rbd_obj_request *obj_request;
 	struct page **pages = NULL;
 	u32 page_count;
@@ -3448,11 +4274,8 @@
 					obj_request->length,
 					obj_request->offset & ~PAGE_MASK,
 					false, false);
-	rbd_osd_req_format_read(obj_request);
 
-	ret = rbd_obj_request_submit(osdc, obj_request);
-	if (ret)
-		goto out;
+	rbd_obj_request_submit(obj_request);
 	ret = rbd_obj_request_wait(obj_request);
 	if (ret)
 		goto out;
@@ -3751,13 +4574,40 @@
 	return sprintf(buf, "%d\n", rbd_dev->minor);
 }
 
+static ssize_t rbd_client_addr_show(struct device *dev,
+				    struct device_attribute *attr, char *buf)
+{
+	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+	struct ceph_entity_addr *client_addr =
+	    ceph_client_addr(rbd_dev->rbd_client->client);
+
+	return sprintf(buf, "%pISpc/%u\n", &client_addr->in_addr,
+		       le32_to_cpu(client_addr->nonce));
+}
+
 static ssize_t rbd_client_id_show(struct device *dev,
 				  struct device_attribute *attr, char *buf)
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
 	return sprintf(buf, "client%lld\n",
-			ceph_client_id(rbd_dev->rbd_client->client));
+		       ceph_client_gid(rbd_dev->rbd_client->client));
+}
+
+static ssize_t rbd_cluster_fsid_show(struct device *dev,
+				     struct device_attribute *attr, char *buf)
+{
+	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+
+	return sprintf(buf, "%pU\n", &rbd_dev->rbd_client->client->fsid);
+}
+
+static ssize_t rbd_config_info_show(struct device *dev,
+				    struct device_attribute *attr, char *buf)
+{
+	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+
+	return sprintf(buf, "%s\n", rbd_dev->config_info);
 }
 
 static ssize_t rbd_pool_show(struct device *dev,
@@ -3809,6 +4659,14 @@
 	return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
 }
 
+static ssize_t rbd_snap_id_show(struct device *dev,
+				struct device_attribute *attr, char *buf)
+{
+	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+
+	return sprintf(buf, "%llu\n", rbd_dev->spec->snap_id);
+}
+
 /*
  * For a v2 image, shows the chain of parent images, separated by empty
  * lines.  For v1 images or if there is no parent, shows "(no parent
@@ -3861,13 +4719,17 @@
 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
 static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
+static DEVICE_ATTR(client_addr, S_IRUGO, rbd_client_addr_show, NULL);
 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
+static DEVICE_ATTR(cluster_fsid, S_IRUGO, rbd_cluster_fsid_show, NULL);
+static DEVICE_ATTR(config_info, S_IRUSR, rbd_config_info_show, NULL);
 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
+static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
 
 static struct attribute *rbd_attrs[] = {
@@ -3875,12 +4737,16 @@
 	&dev_attr_features.attr,
 	&dev_attr_major.attr,
 	&dev_attr_minor.attr,
+	&dev_attr_client_addr.attr,
 	&dev_attr_client_id.attr,
+	&dev_attr_cluster_fsid.attr,
+	&dev_attr_config_info.attr,
 	&dev_attr_pool.attr,
 	&dev_attr_pool_id.attr,
 	&dev_attr_name.attr,
 	&dev_attr_image_id.attr,
 	&dev_attr_current_snap.attr,
+	&dev_attr_snap_id.attr,
 	&dev_attr_parent.attr,
 	&dev_attr_refresh.attr,
 	NULL
@@ -3943,18 +4809,32 @@
 	kfree(spec);
 }
 
-static void rbd_dev_release(struct device *dev)
+static void rbd_dev_free(struct rbd_device *rbd_dev)
 {
-	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
-	bool need_put = !!rbd_dev->opts;
+	WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
+	WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
 
 	ceph_oid_destroy(&rbd_dev->header_oid);
 	ceph_oloc_destroy(&rbd_dev->header_oloc);
+	kfree(rbd_dev->config_info);
 
 	rbd_put_client(rbd_dev->rbd_client);
 	rbd_spec_put(rbd_dev->spec);
 	kfree(rbd_dev->opts);
 	kfree(rbd_dev);
+}
+
+static void rbd_dev_release(struct device *dev)
+{
+	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+	bool need_put = !!rbd_dev->opts;
+
+	if (need_put) {
+		destroy_workqueue(rbd_dev->task_wq);
+		ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
+	}
+
+	rbd_dev_free(rbd_dev);
 
 	/*
 	 * This is racy, but way better than putting module outside of
@@ -3965,25 +4845,34 @@
 		module_put(THIS_MODULE);
 }
 
-static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
-					 struct rbd_spec *spec,
-					 struct rbd_options *opts)
+static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc,
+					   struct rbd_spec *spec)
 {
 	struct rbd_device *rbd_dev;
 
-	rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
+	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
 	if (!rbd_dev)
 		return NULL;
 
 	spin_lock_init(&rbd_dev->lock);
-	rbd_dev->flags = 0;
-	atomic_set(&rbd_dev->parent_ref, 0);
 	INIT_LIST_HEAD(&rbd_dev->node);
 	init_rwsem(&rbd_dev->header_rwsem);
 
 	ceph_oid_init(&rbd_dev->header_oid);
 	ceph_oloc_init(&rbd_dev->header_oloc);
 
+	mutex_init(&rbd_dev->watch_mutex);
+	rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
+	INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
+
+	init_rwsem(&rbd_dev->lock_rwsem);
+	rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
+	INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
+	INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
+	INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
+	INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
+	init_waitqueue_head(&rbd_dev->lock_waitq);
+
 	rbd_dev->dev.bus = &rbd_bus_type;
 	rbd_dev->dev.type = &rbd_device_type;
 	rbd_dev->dev.parent = &rbd_root_dev;
@@ -3991,9 +4880,6 @@
 
 	rbd_dev->rbd_client = rbdc;
 	rbd_dev->spec = spec;
-	rbd_dev->opts = opts;
-
-	/* Initialize the layout used for all rbd requests */
 
 	rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER;
 	rbd_dev->layout.stripe_count = 1;
@@ -4001,17 +4887,50 @@
 	rbd_dev->layout.pool_id = spec->pool_id;
 	RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
 
-	/*
-	 * If this is a mapping rbd_dev (as opposed to a parent one),
-	 * pin our module.  We have a ref from do_rbd_add(), so use
-	 * __module_get().
-	 */
-	if (rbd_dev->opts)
-		__module_get(THIS_MODULE);
-
 	return rbd_dev;
 }
 
+/*
+ * Create a mapping rbd_dev.
+ */
+static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
+					 struct rbd_spec *spec,
+					 struct rbd_options *opts)
+{
+	struct rbd_device *rbd_dev;
+
+	rbd_dev = __rbd_dev_create(rbdc, spec);
+	if (!rbd_dev)
+		return NULL;
+
+	rbd_dev->opts = opts;
+
+	/* get an id and fill in device name */
+	rbd_dev->dev_id = ida_simple_get(&rbd_dev_id_ida, 0,
+					 minor_to_rbd_dev_id(1 << MINORBITS),
+					 GFP_KERNEL);
+	if (rbd_dev->dev_id < 0)
+		goto fail_rbd_dev;
+
+	sprintf(rbd_dev->name, RBD_DRV_NAME "%d", rbd_dev->dev_id);
+	rbd_dev->task_wq = alloc_ordered_workqueue("%s-tasks", WQ_MEM_RECLAIM,
+						   rbd_dev->name);
+	if (!rbd_dev->task_wq)
+		goto fail_dev_id;
+
+	/* we have a ref from do_rbd_add() */
+	__module_get(THIS_MODULE);
+
+	dout("%s rbd_dev %p dev_id %d\n", __func__, rbd_dev, rbd_dev->dev_id);
+	return rbd_dev;
+
+fail_dev_id:
+	ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
+fail_rbd_dev:
+	rbd_dev_free(rbd_dev);
+	return NULL;
+}
+
 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
 {
 	if (rbd_dev)
@@ -4645,46 +5564,6 @@
 }
 
 /*
- * Get a unique rbd identifier for the given new rbd_dev, and add
- * the rbd_dev to the global list.
- */
-static int rbd_dev_id_get(struct rbd_device *rbd_dev)
-{
-	int new_dev_id;
-
-	new_dev_id = ida_simple_get(&rbd_dev_id_ida,
-				    0, minor_to_rbd_dev_id(1 << MINORBITS),
-				    GFP_KERNEL);
-	if (new_dev_id < 0)
-		return new_dev_id;
-
-	rbd_dev->dev_id = new_dev_id;
-
-	spin_lock(&rbd_dev_list_lock);
-	list_add_tail(&rbd_dev->node, &rbd_dev_list);
-	spin_unlock(&rbd_dev_list_lock);
-
-	dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);
-
-	return 0;
-}
-
-/*
- * Remove an rbd_dev from the global list, and record that its
- * identifier is no longer in use.
- */
-static void rbd_dev_id_put(struct rbd_device *rbd_dev)
-{
-	spin_lock(&rbd_dev_list_lock);
-	list_del_init(&rbd_dev->node);
-	spin_unlock(&rbd_dev_list_lock);
-
-	ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
-
-	dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
-}
-
-/*
  * Skips over white space at *buf, and updates *buf to point to the
  * first found non-space character (if any). Returns the length of
  * the token (string of non-white space characters) found.  Note
@@ -4859,6 +5738,7 @@
 
 	rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
 	rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
+	rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
 
 	copts = ceph_parse_options(options, mon_addrs,
 					mon_addrs + mon_addrs_size - 1,
@@ -5076,8 +5956,7 @@
 		goto out_err;
 	}
 
-	parent = rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec,
-				NULL);
+	parent = __rbd_dev_create(rbd_dev->rbd_client, rbd_dev->parent_spec);
 	if (!parent) {
 		ret = -ENOMEM;
 		goto out_err;
@@ -5112,22 +5991,12 @@
 {
 	int ret;
 
-	/* Get an id and fill in device name. */
-
-	ret = rbd_dev_id_get(rbd_dev);
-	if (ret)
-		goto err_out_unlock;
-
-	BUILD_BUG_ON(DEV_NAME_LEN
-			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
-	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
-
 	/* Record our major and minor device numbers. */
 
 	if (!single_major) {
 		ret = register_blkdev(0, rbd_dev->name);
 		if (ret < 0)
-			goto err_out_id;
+			goto err_out_unlock;
 
 		rbd_dev->major = ret;
 		rbd_dev->minor = 0;
@@ -5159,9 +6028,14 @@
 	set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
 	up_write(&rbd_dev->header_rwsem);
 
+	spin_lock(&rbd_dev_list_lock);
+	list_add_tail(&rbd_dev->node, &rbd_dev_list);
+	spin_unlock(&rbd_dev_list_lock);
+
 	add_disk(rbd_dev->disk);
-	pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
-		(unsigned long long) rbd_dev->mapping.size);
+	pr_info("%s: capacity %llu features 0x%llx\n", rbd_dev->disk->disk_name,
+		(unsigned long long)get_capacity(rbd_dev->disk) << SECTOR_SHIFT,
+		rbd_dev->header.features);
 
 	return ret;
 
@@ -5172,8 +6046,6 @@
 err_out_blkdev:
 	if (!single_major)
 		unregister_blkdev(rbd_dev->major, rbd_dev->name);
-err_out_id:
-	rbd_dev_id_put(rbd_dev);
 err_out_unlock:
 	up_write(&rbd_dev->header_rwsem);
 	return ret;
@@ -5234,7 +6106,7 @@
 		goto err_out_format;
 
 	if (!depth) {
-		ret = rbd_dev_header_watch_sync(rbd_dev);
+		ret = rbd_register_watch(rbd_dev);
 		if (ret) {
 			if (ret == -ENOENT)
 				pr_info("image %s/%s does not exist\n",
@@ -5293,7 +6165,7 @@
 	rbd_dev_unprobe(rbd_dev);
 err_out_watch:
 	if (!depth)
-		rbd_dev_header_unwatch_sync(rbd_dev);
+		rbd_unregister_watch(rbd_dev);
 err_out_format:
 	rbd_dev->image_format = 0;
 	kfree(rbd_dev->spec->image_id);
@@ -5345,10 +6217,18 @@
 	spec = NULL;		/* rbd_dev now owns this */
 	rbd_opts = NULL;	/* rbd_dev now owns this */
 
+	rbd_dev->config_info = kstrdup(buf, GFP_KERNEL);
+	if (!rbd_dev->config_info) {
+		rc = -ENOMEM;
+		goto err_out_rbd_dev;
+	}
+
 	down_write(&rbd_dev->header_rwsem);
 	rc = rbd_dev_image_probe(rbd_dev, 0);
-	if (rc < 0)
+	if (rc < 0) {
+		up_write(&rbd_dev->header_rwsem);
 		goto err_out_rbd_dev;
+	}
 
 	/* If we are mapping a snapshot it must be marked read-only */
 
@@ -5360,11 +6240,11 @@
 	rc = rbd_dev_device_setup(rbd_dev);
 	if (rc) {
 		/*
-		 * rbd_dev_header_unwatch_sync() can't be moved into
+		 * rbd_unregister_watch() can't be moved into
 		 * rbd_dev_image_release() without refactoring, see
 		 * commit 1f3ef78861ac.
 		 */
-		rbd_dev_header_unwatch_sync(rbd_dev);
+		rbd_unregister_watch(rbd_dev);
 		rbd_dev_image_release(rbd_dev);
 		goto out;
 	}
@@ -5375,7 +6255,6 @@
 	return rc;
 
 err_out_rbd_dev:
-	up_write(&rbd_dev->header_rwsem);
 	rbd_dev_destroy(rbd_dev);
 err_out_client:
 	rbd_put_client(rbdc);
@@ -5405,12 +6284,16 @@
 static void rbd_dev_device_release(struct rbd_device *rbd_dev)
 {
 	rbd_free_disk(rbd_dev);
+
+	spin_lock(&rbd_dev_list_lock);
+	list_del_init(&rbd_dev->node);
+	spin_unlock(&rbd_dev_list_lock);
+
 	clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
 	device_del(&rbd_dev->dev);
 	rbd_dev_mapping_clear(rbd_dev);
 	if (!single_major)
 		unregister_blkdev(rbd_dev->major, rbd_dev->name);
-	rbd_dev_id_put(rbd_dev);
 }
 
 static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
@@ -5446,18 +6329,26 @@
 	struct rbd_device *rbd_dev = NULL;
 	struct list_head *tmp;
 	int dev_id;
-	unsigned long ul;
+	char opt_buf[6];
 	bool already = false;
+	bool force = false;
 	int ret;
 
-	ret = kstrtoul(buf, 10, &ul);
-	if (ret)
-		return ret;
-
-	/* convert to int; abort if we lost anything in the conversion */
-	dev_id = (int)ul;
-	if (dev_id != ul)
+	dev_id = -1;
+	opt_buf[0] = '\0';
+	sscanf(buf, "%d %5s", &dev_id, opt_buf);
+	if (dev_id < 0) {
+		pr_err("dev_id out of range\n");
 		return -EINVAL;
+	}
+	if (opt_buf[0] != '\0') {
+		if (!strcmp(opt_buf, "force")) {
+			force = true;
+		} else {
+			pr_err("bad remove option at '%s'\n", opt_buf);
+			return -EINVAL;
+		}
+	}
 
 	ret = -ENOENT;
 	spin_lock(&rbd_dev_list_lock);
@@ -5470,7 +6361,7 @@
 	}
 	if (!ret) {
 		spin_lock_irq(&rbd_dev->lock);
-		if (rbd_dev->open_count)
+		if (rbd_dev->open_count && !force)
 			ret = -EBUSY;
 		else
 			already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
@@ -5481,7 +6372,20 @@
 	if (ret < 0 || already)
 		return ret;
 
-	rbd_dev_header_unwatch_sync(rbd_dev);
+	if (force) {
+		/*
+		 * Prevent new IO from being queued and wait for existing
+		 * IO to complete/fail.
+		 */
+		blk_mq_freeze_queue(rbd_dev->disk->queue);
+		blk_set_queue_dying(rbd_dev->disk->queue);
+	}
+
+	down_write(&rbd_dev->lock_rwsem);
+	if (__rbd_is_lock_owner(rbd_dev))
+		rbd_unlock(rbd_dev);
+	up_write(&rbd_dev->lock_rwsem);
+	rbd_unregister_watch(rbd_dev);
 
 	/*
 	 * Don't free anything from rbd_dev->disk until after all
diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h
index 49d77cb..94f367db2 100644
--- a/drivers/block/rbd_types.h
+++ b/drivers/block/rbd_types.h
@@ -28,6 +28,17 @@
 #define RBD_DATA_PREFIX        "rbd_data."
 #define RBD_ID_PREFIX          "rbd_id."
 
+#define RBD_LOCK_NAME          "rbd_lock"
+#define RBD_LOCK_TAG           "internal"
+#define RBD_LOCK_COOKIE_PREFIX "auto"
+
+enum rbd_notify_op {
+	RBD_NOTIFY_OP_ACQUIRED_LOCK      = 0,
+	RBD_NOTIFY_OP_RELEASED_LOCK      = 1,
+	RBD_NOTIFY_OP_REQUEST_LOCK       = 2,
+	RBD_NOTIFY_OP_HEADER_UPDATE      = 3,
+};
+
 /*
  * For format version 1, rbd image 'foo' consists of objects
  *   foo.rbd		- image metadata
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index d5b6f95..ef3ebd7 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -175,9 +175,8 @@
 
 static int ceph_releasepage(struct page *page, gfp_t g)
 {
-	dout("%p releasepage %p idx %lu\n", page->mapping->host,
-	     page, page->index);
-	WARN_ON(PageDirty(page));
+	dout("%p releasepage %p idx %lu (%sdirty)\n", page->mapping->host,
+	     page, page->index, PageDirty(page) ? "" : "not ");
 
 	/* Can we release the page from the cache? */
 	if (!ceph_release_fscache_page(page, g))
@@ -298,14 +297,6 @@
 	kfree(osd_data->pages);
 }
 
-static void ceph_unlock_page_vector(struct page **pages, int num_pages)
-{
-	int i;
-
-	for (i = 0; i < num_pages; i++)
-		unlock_page(pages[i]);
-}
-
 /*
  * start an async read(ahead) operation.  return nr_pages we submitted
  * a read for on success, or negative error code.
@@ -370,6 +361,10 @@
 			dout("start_read %p add_to_page_cache failed %p\n",
 			     inode, page);
 			nr_pages = i;
+			if (nr_pages > 0) {
+				len = nr_pages << PAGE_SHIFT;
+				break;
+			}
 			goto out_pages;
 		}
 		pages[i] = page;
@@ -386,8 +381,11 @@
 	return nr_pages;
 
 out_pages:
-	ceph_unlock_page_vector(pages, nr_pages);
-	ceph_release_page_vector(pages, nr_pages);
+	for (i = 0; i < nr_pages; ++i) {
+		ceph_fscache_readpage_cancel(inode, pages[i]);
+		unlock_page(pages[i]);
+	}
+	ceph_put_page_vector(pages, nr_pages, false);
 out:
 	ceph_osdc_put_request(req);
 	return ret;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 0f5375d..395c7fc 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -902,10 +902,10 @@
 		return ret;
 
 	if (write) {
-		ret = invalidate_inode_pages2_range(inode->i_mapping,
+		int ret2 = invalidate_inode_pages2_range(inode->i_mapping,
 					pos >> PAGE_SHIFT,
 					(pos + count) >> PAGE_SHIFT);
-		if (ret < 0)
+		if (ret2 < 0)
 			dout("invalidate_inode_pages2_range returned %d\n", ret);
 
 		flags = CEPH_OSD_FLAG_ORDERSNAP |
diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c
index a2cb0c2..6806dbe 100644
--- a/fs/ceph/locks.c
+++ b/fs/ceph/locks.c
@@ -210,8 +210,8 @@
 	if (!(fl->fl_flags & FL_FLOCK))
 		return -ENOLCK;
 	/* No mandatory locks */
-	if (__mandatory_lock(file->f_mapping->host) && fl->fl_type != F_UNLCK)
-		return -ENOLCK;
+	if (fl->fl_type & LOCK_MAND)
+		return -EOPNOTSUPP;
 
 	dout("ceph_flock, fl_file: %p", fl->fl_file);
 
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index f72d4ae..815acd1 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -370,6 +370,7 @@
 	case CEPH_MDS_SESSION_CLOSING: return "closing";
 	case CEPH_MDS_SESSION_RESTARTING: return "restarting";
 	case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
+	case CEPH_MDS_SESSION_REJECTED: return "rejected";
 	default: return "???";
 	}
 }
@@ -1150,8 +1151,7 @@
 		while (!list_empty(&ci->i_cap_flush_list)) {
 			cf = list_first_entry(&ci->i_cap_flush_list,
 					      struct ceph_cap_flush, i_list);
-			list_del(&cf->i_list);
-			list_add(&cf->i_list, &to_remove);
+			list_move(&cf->i_list, &to_remove);
 		}
 
 		spin_lock(&mdsc->cap_dirty_lock);
@@ -1378,7 +1378,7 @@
 	if (!msg)
 		return -ENOMEM;
 	ceph_con_send(&session->s_con, msg);
-	return 0;
+	return 1;
 }
 
 /*
@@ -2131,6 +2131,10 @@
 	     ceph_session_state_name(session->s_state));
 	if (session->s_state != CEPH_MDS_SESSION_OPEN &&
 	    session->s_state != CEPH_MDS_SESSION_HUNG) {
+		if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
+			err = -EACCES;
+			goto out_session;
+		}
 		if (session->s_state == CEPH_MDS_SESSION_NEW ||
 		    session->s_state == CEPH_MDS_SESSION_CLOSING)
 			__open_session(mdsc, session);
@@ -2652,6 +2656,15 @@
 		wake_up_session_caps(session, 0);
 		break;
 
+	case CEPH_SESSION_REJECT:
+		WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
+		pr_info("mds%d rejected session\n", session->s_mds);
+		session->s_state = CEPH_MDS_SESSION_REJECTED;
+		cleanup_session_requests(mdsc, session);
+		remove_session_caps(session);
+		wake = 2; /* for good measure */
+		break;
+
 	default:
 		pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
 		WARN_ON(1);
@@ -3557,11 +3570,11 @@
 /*
  * true if all sessions are closed, or we force unmount
  */
-static bool done_closing_sessions(struct ceph_mds_client *mdsc)
+static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
 {
 	if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
 		return true;
-	return atomic_read(&mdsc->num_sessions) == 0;
+	return atomic_read(&mdsc->num_sessions) <= skipped;
 }
 
 /*
@@ -3572,6 +3585,7 @@
 	struct ceph_options *opts = mdsc->fsc->client->options;
 	struct ceph_mds_session *session;
 	int i;
+	int skipped = 0;
 
 	dout("close_sessions\n");
 
@@ -3583,7 +3597,8 @@
 			continue;
 		mutex_unlock(&mdsc->mutex);
 		mutex_lock(&session->s_mutex);
-		__close_session(mdsc, session);
+		if (__close_session(mdsc, session) <= 0)
+			skipped++;
 		mutex_unlock(&session->s_mutex);
 		ceph_put_mds_session(session);
 		mutex_lock(&mdsc->mutex);
@@ -3591,7 +3606,8 @@
 	mutex_unlock(&mdsc->mutex);
 
 	dout("waiting for sessions to close\n");
-	wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
+	wait_event_timeout(mdsc->session_close_wq,
+			   done_closing_sessions(mdsc, skipped),
 			   ceph_timeout_jiffies(opts->mount_timeout));
 
 	/* tear down remaining sessions */
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 6b36797..3c6f77b 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -121,6 +121,7 @@
 	CEPH_MDS_SESSION_CLOSING = 5,
 	CEPH_MDS_SESSION_RESTARTING = 6,
 	CEPH_MDS_SESSION_RECONNECTING = 7,
+	CEPH_MDS_SESSION_REJECTED = 8,
 };
 
 struct ceph_mds_session {
diff --git a/fs/ceph/strings.c b/fs/ceph/strings.c
index 89e6bc3..913dea1 100644
--- a/fs/ceph/strings.c
+++ b/fs/ceph/strings.c
@@ -43,6 +43,8 @@
 	case CEPH_SESSION_RECALL_STATE: return "recall_state";
 	case CEPH_SESSION_FLUSHMSG: return "flushmsg";
 	case CEPH_SESSION_FLUSHMSG_ACK: return "flushmsg_ack";
+	case CEPH_SESSION_FORCE_RO: return "force_ro";
+	case CEPH_SESSION_REJECT: return "reject";
 	}
 	return "???";
 }
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index e247f6f..a29ffce 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -396,10 +396,12 @@
 	 */
 	dev_name_end = strchr(dev_name, '/');
 	if (dev_name_end) {
-		fsopt->server_path = kstrdup(dev_name_end, GFP_KERNEL);
-		if (!fsopt->server_path) {
-			err = -ENOMEM;
-			goto out;
+		if (strlen(dev_name_end) > 1) {
+			fsopt->server_path = kstrdup(dev_name_end, GFP_KERNEL);
+			if (!fsopt->server_path) {
+				err = -ENOMEM;
+				goto out;
+			}
 		}
 	} else {
 		dev_name_end = dev_name + strlen(dev_name);
@@ -788,15 +790,10 @@
 		struct inode *inode = req->r_target_inode;
 		req->r_target_inode = NULL;
 		dout("open_root_inode success\n");
-		if (ceph_ino(inode) == CEPH_INO_ROOT &&
-		    fsc->sb->s_root == NULL) {
-			root = d_make_root(inode);
-			if (!root) {
-				root = ERR_PTR(-ENOMEM);
-				goto out;
-			}
-		} else {
-			root = d_obtain_root(inode);
+		root = d_make_root(inode);
+		if (!root) {
+			root = ERR_PTR(-ENOMEM);
+			goto out;
 		}
 		ceph_init_dentry(root);
 		dout("open_root_inode success, root dentry is %p\n", root);
@@ -825,17 +822,24 @@
 	mutex_lock(&fsc->client->mount_mutex);
 
 	if (!fsc->sb->s_root) {
+		const char *path;
 		err = __ceph_open_session(fsc->client, started);
 		if (err < 0)
 			goto out;
 
-		dout("mount opening root\n");
-		root = open_root_dentry(fsc, "", started);
+		if (!fsc->mount_options->server_path) {
+			path = "";
+			dout("mount opening path \\t\n");
+		} else {
+			path = fsc->mount_options->server_path + 1;
+			dout("mount opening path %s\n", path);
+		}
+		root = open_root_dentry(fsc, path, started);
 		if (IS_ERR(root)) {
 			err = PTR_ERR(root);
 			goto out;
 		}
-		fsc->sb->s_root = root;
+		fsc->sb->s_root = dget(root);
 		first = 1;
 
 		err = ceph_fs_debugfs_init(fsc);
@@ -843,19 +847,6 @@
 			goto fail;
 	}
 
-	if (!fsc->mount_options->server_path) {
-		root = fsc->sb->s_root;
-		dget(root);
-	} else {
-		const char *path = fsc->mount_options->server_path + 1;
-		dout("mount opening path %s\n", path);
-		root = open_root_dentry(fsc, path, started);
-		if (IS_ERR(root)) {
-			err = PTR_ERR(root);
-			goto fail;
-		}
-	}
-
 	fsc->mount_state = CEPH_MOUNT_MOUNTED;
 	dout("mount success\n");
 	mutex_unlock(&fsc->client->mount_mutex);
diff --git a/include/linux/ceph/auth.h b/include/linux/ceph/auth.h
index 1563265..374bb1c 100644
--- a/include/linux/ceph/auth.h
+++ b/include/linux/ceph/auth.h
@@ -104,7 +104,7 @@
 extern int ceph_handle_auth_reply(struct ceph_auth_client *ac,
 				  void *buf, size_t len,
 				  void *reply_buf, size_t reply_len);
-extern int ceph_entity_name_encode(const char *name, void **p, void *end);
+int ceph_auth_entity_name_encode(const char *name, void **p, void *end);
 
 extern int ceph_build_auth(struct ceph_auth_client *ac,
 		    void *msg_buf, size_t msg_len);
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index 7868d602..f96de8d 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -138,6 +138,9 @@
 #define CEPH_MSG_POOLOP_REPLY           48
 #define CEPH_MSG_POOLOP                 49
 
+/* mon commands */
+#define CEPH_MSG_MON_COMMAND            50
+#define CEPH_MSG_MON_COMMAND_ACK        51
 
 /* osd */
 #define CEPH_MSG_OSD_MAP                41
@@ -176,6 +179,14 @@
 	struct ceph_statfs st;
 } __attribute__ ((packed));
 
+struct ceph_mon_command {
+	struct ceph_mon_request_header monhdr;
+	struct ceph_fsid fsid;
+	__le32 num_strs;         /* always 1 */
+	__le32 str_len;
+	char str[];
+} __attribute__ ((packed));
+
 struct ceph_osd_getmap {
 	struct ceph_mon_request_header monhdr;
 	struct ceph_fsid fsid;
@@ -270,6 +281,7 @@
 	CEPH_SESSION_FLUSHMSG,
 	CEPH_SESSION_FLUSHMSG_ACK,
 	CEPH_SESSION_FORCE_RO,
+	CEPH_SESSION_REJECT,
 };
 
 extern const char *ceph_session_op_name(int op);
diff --git a/include/linux/ceph/cls_lock_client.h b/include/linux/ceph/cls_lock_client.h
new file mode 100644
index 0000000..84884d8
--- /dev/null
+++ b/include/linux/ceph/cls_lock_client.h
@@ -0,0 +1,49 @@
+#ifndef _LINUX_CEPH_CLS_LOCK_CLIENT_H
+#define _LINUX_CEPH_CLS_LOCK_CLIENT_H
+
+#include <linux/ceph/osd_client.h>
+
+enum ceph_cls_lock_type {
+	CEPH_CLS_LOCK_NONE = 0,
+	CEPH_CLS_LOCK_EXCLUSIVE = 1,
+	CEPH_CLS_LOCK_SHARED = 2,
+};
+
+struct ceph_locker_id {
+	struct ceph_entity_name name;	/* locker's client name */
+	char *cookie;			/* locker's cookie */
+};
+
+struct ceph_locker_info {
+	struct ceph_entity_addr addr;	/* locker's address */
+};
+
+struct ceph_locker {
+	struct ceph_locker_id id;
+	struct ceph_locker_info info;
+};
+
+int ceph_cls_lock(struct ceph_osd_client *osdc,
+		  struct ceph_object_id *oid,
+		  struct ceph_object_locator *oloc,
+		  char *lock_name, u8 type, char *cookie,
+		  char *tag, char *desc, u8 flags);
+int ceph_cls_unlock(struct ceph_osd_client *osdc,
+		    struct ceph_object_id *oid,
+		    struct ceph_object_locator *oloc,
+		    char *lock_name, char *cookie);
+int ceph_cls_break_lock(struct ceph_osd_client *osdc,
+			struct ceph_object_id *oid,
+			struct ceph_object_locator *oloc,
+			char *lock_name, char *cookie,
+			struct ceph_entity_name *locker);
+
+void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers);
+
+int ceph_cls_lock_info(struct ceph_osd_client *osdc,
+		       struct ceph_object_id *oid,
+		       struct ceph_object_locator *oloc,
+		       char *lock_name, u8 *type, char **tag,
+		       struct ceph_locker **lockers, u32 *num_lockers);
+
+#endif
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 83fc1ff..1816c5e 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -264,7 +264,8 @@
 					      void *private,
 					      u64 supported_features,
 					      u64 required_features);
-extern u64 ceph_client_id(struct ceph_client *client);
+struct ceph_entity_addr *ceph_client_addr(struct ceph_client *client);
+u64 ceph_client_gid(struct ceph_client *client);
 extern void ceph_destroy_client(struct ceph_client *client);
 extern int __ceph_open_session(struct ceph_client *client,
 			       unsigned long started);
diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h
index 24d704d..d5a3ece 100644
--- a/include/linux/ceph/mon_client.h
+++ b/include/linux/ceph/mon_client.h
@@ -141,6 +141,9 @@
 int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
 				ceph_monc_callback_t cb, u64 private_data);
 
+int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
+			    struct ceph_entity_addr *client_addr);
+
 extern int ceph_monc_open_session(struct ceph_mon_client *monc);
 
 extern int ceph_monc_validate_auth(struct ceph_mon_client *monc);
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 8589323..96337b1 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -121,6 +121,9 @@
 			struct ceph_osd_data response_data;
 		} notify;
 		struct {
+			struct ceph_osd_data response_data;
+		} list_watchers;
+		struct {
 			u64 expected_object_size;
 			u64 expected_write_size;
 		} alloc_hint;
@@ -249,6 +252,12 @@
 	size_t *preply_len;
 };
 
+struct ceph_watch_item {
+	struct ceph_entity_name name;
+	u64 cookie;
+	struct ceph_entity_addr addr;
+};
+
 struct ceph_osd_client {
 	struct ceph_client     *client;
 
@@ -346,7 +355,6 @@
 					struct page **pages, u64 length,
 					u32 alignment, bool pages_from_pool,
 					bool own_pages);
-
 extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
 					unsigned int which, u16 opcode,
 					const char *class, const char *method);
@@ -389,6 +397,14 @@
 extern void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc);
 void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc);
 
+int ceph_osdc_call(struct ceph_osd_client *osdc,
+		   struct ceph_object_id *oid,
+		   struct ceph_object_locator *oloc,
+		   const char *class, const char *method,
+		   unsigned int flags,
+		   struct page *req_page, size_t req_len,
+		   struct page *resp_page, size_t *resp_len);
+
 extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
 			       struct ceph_vino vino,
 			       struct ceph_file_layout *layout,
@@ -434,5 +450,10 @@
 		     size_t *preply_len);
 int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
 			  struct ceph_osd_linger_request *lreq);
+int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
+			    struct ceph_object_id *oid,
+			    struct ceph_object_locator *oloc,
+			    struct ceph_watch_item **watchers,
+			    u32 *num_watchers);
 #endif
 
diff --git a/net/ceph/Makefile b/net/ceph/Makefile
index 84cbed6..6a51809 100644
--- a/net/ceph/Makefile
+++ b/net/ceph/Makefile
@@ -5,6 +5,7 @@
 
 libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
 	mon_client.o \
+	cls_lock_client.o \
 	osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \
 	debugfs.o \
 	auth.o auth_none.o \
diff --git a/net/ceph/auth.c b/net/ceph/auth.c
index 2bc5965..c822b3a 100644
--- a/net/ceph/auth.c
+++ b/net/ceph/auth.c
@@ -82,7 +82,10 @@
 	mutex_unlock(&ac->mutex);
 }
 
-int ceph_entity_name_encode(const char *name, void **p, void *end)
+/*
+ * EntityName, not to be confused with entity_name_t
+ */
+int ceph_auth_entity_name_encode(const char *name, void **p, void *end)
 {
 	int len = strlen(name);
 
@@ -111,7 +114,7 @@
 	monhdr->session_mon = cpu_to_le16(-1);
 	monhdr->session_mon_tid = 0;
 
-	ceph_encode_32(&p, 0);  /* no protocol, yet */
+	ceph_encode_32(&p, CEPH_AUTH_UNKNOWN);  /* no protocol, yet */
 
 	lenp = p;
 	p += sizeof(u32);
@@ -124,7 +127,7 @@
 	for (i = 0; i < num; i++)
 		ceph_encode_32(&p, supported_protocols[i]);
 
-	ret = ceph_entity_name_encode(ac->name, &p, end);
+	ret = ceph_auth_entity_name_encode(ac->name, &p, end);
 	if (ret < 0)
 		goto out;
 	ceph_decode_need(&p, end, sizeof(u64), bad);
@@ -259,9 +262,7 @@
 	int ret = 0;
 
 	mutex_lock(&ac->mutex);
-	if (!ac->protocol)
-		ret = ceph_auth_build_hello(ac, msg_buf, msg_len);
-	else if (ac->ops->should_authenticate(ac))
+	if (ac->ops->should_authenticate(ac))
 		ret = ceph_build_auth_request(ac, msg_buf, msg_len);
 	mutex_unlock(&ac->mutex);
 	return ret;
diff --git a/net/ceph/auth_none.c b/net/ceph/auth_none.c
index 5f836f0..df45e46 100644
--- a/net/ceph/auth_none.c
+++ b/net/ceph/auth_none.c
@@ -46,7 +46,7 @@
 	int ret;
 
 	ceph_encode_8_safe(&p, end, 1, e_range);
-	ret = ceph_entity_name_encode(ac->name, &p, end);
+	ret = ceph_auth_entity_name_encode(ac->name, &p, end);
 	if (ret < 0)
 		return ret;
 
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index bddfcf6..464e885 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -566,11 +566,17 @@
 }
 EXPORT_SYMBOL(ceph_print_client_options);
 
-u64 ceph_client_id(struct ceph_client *client)
+struct ceph_entity_addr *ceph_client_addr(struct ceph_client *client)
+{
+	return &client->msgr.inst.addr;
+}
+EXPORT_SYMBOL(ceph_client_addr);
+
+u64 ceph_client_gid(struct ceph_client *client)
 {
 	return client->monc.auth->global_id;
 }
-EXPORT_SYMBOL(ceph_client_id);
+EXPORT_SYMBOL(ceph_client_gid);
 
 /*
  * create a fresh client instance
@@ -685,7 +691,8 @@
 			return client->auth_err;
 	}
 
-	pr_info("client%llu fsid %pU\n", ceph_client_id(client), &client->fsid);
+	pr_info("client%llu fsid %pU\n", ceph_client_gid(client),
+		&client->fsid);
 	ceph_debugfs_client_init(client);
 
 	return 0;
diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c
index 3773a4f..19b7d8a 100644
--- a/net/ceph/ceph_strings.c
+++ b/net/ceph/ceph_strings.c
@@ -15,6 +15,7 @@
 	default: return "unknown";
 	}
 }
+EXPORT_SYMBOL(ceph_entity_type_name);
 
 const char *ceph_osd_op_name(int op)
 {
diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c
new file mode 100644
index 0000000..50f040f
--- /dev/null
+++ b/net/ceph/cls_lock_client.c
@@ -0,0 +1,325 @@
+#include <linux/ceph/ceph_debug.h>
+
+#include <linux/types.h>
+#include <linux/slab.h>
+
+#include <linux/ceph/cls_lock_client.h>
+#include <linux/ceph/decode.h>
+
+/**
+ * ceph_cls_lock - grab rados lock for object
+ * @oid, @oloc: object to lock
+ * @lock_name: the name of the lock
+ * @type: lock type (CEPH_CLS_LOCK_EXCLUSIVE or CEPH_CLS_LOCK_SHARED)
+ * @cookie: user-defined identifier for this instance of the lock
+ * @tag: user-defined tag
+ * @desc: user-defined lock description
+ * @flags: lock flags
+ *
+ * All operations on the same lock should use the same tag.
+ */
+int ceph_cls_lock(struct ceph_osd_client *osdc,
+		  struct ceph_object_id *oid,
+		  struct ceph_object_locator *oloc,
+		  char *lock_name, u8 type, char *cookie,
+		  char *tag, char *desc, u8 flags)
+{
+	int lock_op_buf_size;
+	int name_len = strlen(lock_name);
+	int cookie_len = strlen(cookie);
+	int tag_len = strlen(tag);
+	int desc_len = strlen(desc);
+	void *p, *end;
+	struct page *lock_op_page;
+	struct timespec mtime;
+	int ret;
+
+	lock_op_buf_size = name_len + sizeof(__le32) +
+			   cookie_len + sizeof(__le32) +
+			   tag_len + sizeof(__le32) +
+			   desc_len + sizeof(__le32) +
+			   sizeof(struct ceph_timespec) +
+			   /* flag and type */
+			   sizeof(u8) + sizeof(u8) +
+			   CEPH_ENCODING_START_BLK_LEN;
+	if (lock_op_buf_size > PAGE_SIZE)
+		return -E2BIG;
+
+	lock_op_page = alloc_page(GFP_NOIO);
+	if (!lock_op_page)
+		return -ENOMEM;
+
+	p = page_address(lock_op_page);
+	end = p + lock_op_buf_size;
+
+	/* encode cls_lock_lock_op struct */
+	ceph_start_encoding(&p, 1, 1,
+			    lock_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
+	ceph_encode_string(&p, end, lock_name, name_len);
+	ceph_encode_8(&p, type);
+	ceph_encode_string(&p, end, cookie, cookie_len);
+	ceph_encode_string(&p, end, tag, tag_len);
+	ceph_encode_string(&p, end, desc, desc_len);
+	/* only support infinite duration */
+	memset(&mtime, 0, sizeof(mtime));
+	ceph_encode_timespec(p, &mtime);
+	p += sizeof(struct ceph_timespec);
+	ceph_encode_8(&p, flags);
+
+	dout("%s lock_name %s type %d cookie %s tag %s desc %s flags 0x%x\n",
+	     __func__, lock_name, type, cookie, tag, desc, flags);
+	ret = ceph_osdc_call(osdc, oid, oloc, "lock", "lock",
+			     CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+			     lock_op_page, lock_op_buf_size, NULL, NULL);
+
+	dout("%s: status %d\n", __func__, ret);
+	__free_page(lock_op_page);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_cls_lock);
+
+/**
+ * ceph_cls_unlock - release rados lock for object
+ * @oid, @oloc: object to lock
+ * @lock_name: the name of the lock
+ * @cookie: user-defined identifier for this instance of the lock
+ */
+int ceph_cls_unlock(struct ceph_osd_client *osdc,
+		    struct ceph_object_id *oid,
+		    struct ceph_object_locator *oloc,
+		    char *lock_name, char *cookie)
+{
+	int unlock_op_buf_size;
+	int name_len = strlen(lock_name);
+	int cookie_len = strlen(cookie);
+	void *p, *end;
+	struct page *unlock_op_page;
+	int ret;
+
+	unlock_op_buf_size = name_len + sizeof(__le32) +
+			     cookie_len + sizeof(__le32) +
+			     CEPH_ENCODING_START_BLK_LEN;
+	if (unlock_op_buf_size > PAGE_SIZE)
+		return -E2BIG;
+
+	unlock_op_page = alloc_page(GFP_NOIO);
+	if (!unlock_op_page)
+		return -ENOMEM;
+
+	p = page_address(unlock_op_page);
+	end = p + unlock_op_buf_size;
+
+	/* encode cls_lock_unlock_op struct */
+	ceph_start_encoding(&p, 1, 1,
+			    unlock_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
+	ceph_encode_string(&p, end, lock_name, name_len);
+	ceph_encode_string(&p, end, cookie, cookie_len);
+
+	dout("%s lock_name %s cookie %s\n", __func__, lock_name, cookie);
+	ret = ceph_osdc_call(osdc, oid, oloc, "lock", "unlock",
+			     CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+			     unlock_op_page, unlock_op_buf_size, NULL, NULL);
+
+	dout("%s: status %d\n", __func__, ret);
+	__free_page(unlock_op_page);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_cls_unlock);
+
+/**
+ * ceph_cls_break_lock - release rados lock for object for specified client
+ * @oid, @oloc: object to lock
+ * @lock_name: the name of the lock
+ * @cookie: user-defined identifier for this instance of the lock
+ * @locker: current lock owner
+ */
+int ceph_cls_break_lock(struct ceph_osd_client *osdc,
+			struct ceph_object_id *oid,
+			struct ceph_object_locator *oloc,
+			char *lock_name, char *cookie,
+			struct ceph_entity_name *locker)
+{
+	int break_op_buf_size;
+	int name_len = strlen(lock_name);
+	int cookie_len = strlen(cookie);
+	struct page *break_op_page;
+	void *p, *end;
+	int ret;
+
+	break_op_buf_size = name_len + sizeof(__le32) +
+			    cookie_len + sizeof(__le32) +
+			    sizeof(u8) + sizeof(__le64) +
+			    CEPH_ENCODING_START_BLK_LEN;
+	if (break_op_buf_size > PAGE_SIZE)
+		return -E2BIG;
+
+	break_op_page = alloc_page(GFP_NOIO);
+	if (!break_op_page)
+		return -ENOMEM;
+
+	p = page_address(break_op_page);
+	end = p + break_op_buf_size;
+
+	/* encode cls_lock_break_op struct */
+	ceph_start_encoding(&p, 1, 1,
+			    break_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
+	ceph_encode_string(&p, end, lock_name, name_len);
+	ceph_encode_copy(&p, locker, sizeof(*locker));
+	ceph_encode_string(&p, end, cookie, cookie_len);
+
+	dout("%s lock_name %s cookie %s locker %s%llu\n", __func__, lock_name,
+	     cookie, ENTITY_NAME(*locker));
+	ret = ceph_osdc_call(osdc, oid, oloc, "lock", "break_lock",
+			     CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+			     break_op_page, break_op_buf_size, NULL, NULL);
+
+	dout("%s: status %d\n", __func__, ret);
+	__free_page(break_op_page);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_cls_break_lock);
+
+void ceph_free_lockers(struct ceph_locker *lockers, u32 num_lockers)
+{
+	int i;
+
+	for (i = 0; i < num_lockers; i++)
+		kfree(lockers[i].id.cookie);
+	kfree(lockers);
+}
+EXPORT_SYMBOL(ceph_free_lockers);
+
+static int decode_locker(void **p, void *end, struct ceph_locker *locker)
+{
+	u8 struct_v;
+	u32 len;
+	char *s;
+	int ret;
+
+	ret = ceph_start_decoding(p, end, 1, "locker_id_t", &struct_v, &len);
+	if (ret)
+		return ret;
+
+	ceph_decode_copy(p, &locker->id.name, sizeof(locker->id.name));
+	s = ceph_extract_encoded_string(p, end, NULL, GFP_NOIO);
+	if (IS_ERR(s))
+		return PTR_ERR(s);
+
+	locker->id.cookie = s;
+
+	ret = ceph_start_decoding(p, end, 1, "locker_info_t", &struct_v, &len);
+	if (ret)
+		return ret;
+
+	*p += sizeof(struct ceph_timespec); /* skip expiration */
+	ceph_decode_copy(p, &locker->info.addr, sizeof(locker->info.addr));
+	ceph_decode_addr(&locker->info.addr);
+	len = ceph_decode_32(p);
+	*p += len; /* skip description */
+
+	dout("%s %s%llu cookie %s addr %s\n", __func__,
+	     ENTITY_NAME(locker->id.name), locker->id.cookie,
+	     ceph_pr_addr(&locker->info.addr.in_addr));
+	return 0;
+}
+
+static int decode_lockers(void **p, void *end, u8 *type, char **tag,
+			  struct ceph_locker **lockers, u32 *num_lockers)
+{
+	u8 struct_v;
+	u32 struct_len;
+	char *s;
+	int i;
+	int ret;
+
+	ret = ceph_start_decoding(p, end, 1, "cls_lock_get_info_reply",
+				  &struct_v, &struct_len);
+	if (ret)
+		return ret;
+
+	*num_lockers = ceph_decode_32(p);
+	*lockers = kcalloc(*num_lockers, sizeof(**lockers), GFP_NOIO);
+	if (!*lockers)
+		return -ENOMEM;
+
+	for (i = 0; i < *num_lockers; i++) {
+		ret = decode_locker(p, end, *lockers + i);
+		if (ret)
+			goto err_free_lockers;
+	}
+
+	*type = ceph_decode_8(p);
+	s = ceph_extract_encoded_string(p, end, NULL, GFP_NOIO);
+	if (IS_ERR(s)) {
+		ret = PTR_ERR(s);
+		goto err_free_lockers;
+	}
+
+	*tag = s;
+	return 0;
+
+err_free_lockers:
+	ceph_free_lockers(*lockers, *num_lockers);
+	return ret;
+}
+
+/*
+ * On success, the caller is responsible for:
+ *
+ *     kfree(tag);
+ *     ceph_free_lockers(lockers, num_lockers);
+ */
+int ceph_cls_lock_info(struct ceph_osd_client *osdc,
+		       struct ceph_object_id *oid,
+		       struct ceph_object_locator *oloc,
+		       char *lock_name, u8 *type, char **tag,
+		       struct ceph_locker **lockers, u32 *num_lockers)
+{
+	int get_info_op_buf_size;
+	int name_len = strlen(lock_name);
+	struct page *get_info_op_page, *reply_page;
+	size_t reply_len;
+	void *p, *end;
+	int ret;
+
+	get_info_op_buf_size = name_len + sizeof(__le32) +
+			       CEPH_ENCODING_START_BLK_LEN;
+	if (get_info_op_buf_size > PAGE_SIZE)
+		return -E2BIG;
+
+	get_info_op_page = alloc_page(GFP_NOIO);
+	if (!get_info_op_page)
+		return -ENOMEM;
+
+	reply_page = alloc_page(GFP_NOIO);
+	if (!reply_page) {
+		__free_page(get_info_op_page);
+		return -ENOMEM;
+	}
+
+	p = page_address(get_info_op_page);
+	end = p + get_info_op_buf_size;
+
+	/* encode cls_lock_get_info_op struct */
+	ceph_start_encoding(&p, 1, 1,
+			    get_info_op_buf_size - CEPH_ENCODING_START_BLK_LEN);
+	ceph_encode_string(&p, end, lock_name, name_len);
+
+	dout("%s lock_name %s\n", __func__, lock_name);
+	ret = ceph_osdc_call(osdc, oid, oloc, "lock", "get_info",
+			     CEPH_OSD_FLAG_READ, get_info_op_page,
+			     get_info_op_buf_size, reply_page, &reply_len);
+
+	dout("%s: status %d\n", __func__, ret);
+	if (ret >= 0) {
+		p = page_address(reply_page);
+		end = p + reply_len;
+
+		ret = decode_lockers(&p, end, type, tag, lockers, num_lockers);
+	}
+
+	__free_page(get_info_op_page);
+	__free_page(reply_page);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_cls_lock_info);
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index 5fcfb98..a421e90 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -245,7 +245,7 @@
 /* compute 2^44*log2(input+1) */
 static __u64 crush_ln(unsigned int xin)
 {
-	unsigned int x = xin, x1;
+	unsigned int x = xin;
 	int iexpon, index1, index2;
 	__u64 RH, LH, LL, xl64, result;
 
@@ -253,9 +253,15 @@
 
 	/* normalize input */
 	iexpon = 15;
-	while (!(x & 0x18000)) {
-		x <<= 1;
-		iexpon--;
+
+	/*
+	 * figure out number of bits we need to shift and
+	 * do it in one step instead of iteratively
+	 */
+	if (!(x & 0x18000)) {
+		int bits = __builtin_clz(x & 0x1FFFF) - 16;
+		x <<= bits;
+		iexpon = 15 - bits;
 	}
 
 	index1 = (x >> 8) << 1;
@@ -267,12 +273,11 @@
 	/* RH*x ~ 2^48 * (2^15 + xf), xf<2^8 */
 	xl64 = (__s64)x * RH;
 	xl64 >>= 48;
-	x1 = xl64;
 
 	result = iexpon;
 	result <<= (12 + 32);
 
-	index2 = x1 & 0xff;
+	index2 = xl64 & 0xff;
 	/* LL ~ 2^48*log2(1.0+index2/2^15) */
 	LL = __LL_tbl[index2];
 
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index ef34a02..a8effc8 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -835,6 +835,83 @@
 }
 EXPORT_SYMBOL(ceph_monc_get_version_async);
 
+static void handle_command_ack(struct ceph_mon_client *monc,
+			       struct ceph_msg *msg)
+{
+	struct ceph_mon_generic_request *req;
+	void *p = msg->front.iov_base;
+	void *const end = p + msg->front_alloc_len;
+	u64 tid = le64_to_cpu(msg->hdr.tid);
+
+	dout("%s msg %p tid %llu\n", __func__, msg, tid);
+
+	ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
+							    sizeof(u32), bad);
+	p += sizeof(struct ceph_mon_request_header);
+
+	mutex_lock(&monc->mutex);
+	req = lookup_generic_request(&monc->generic_request_tree, tid);
+	if (!req) {
+		mutex_unlock(&monc->mutex);
+		return;
+	}
+
+	req->result = ceph_decode_32(&p);
+	__finish_generic_request(req);
+	mutex_unlock(&monc->mutex);
+
+	complete_generic_request(req);
+	return;
+
+bad:
+	pr_err("corrupt mon_command ack, tid %llu\n", tid);
+	ceph_msg_dump(msg);
+}
+
+int ceph_monc_blacklist_add(struct ceph_mon_client *monc,
+			    struct ceph_entity_addr *client_addr)
+{
+	struct ceph_mon_generic_request *req;
+	struct ceph_mon_command *h;
+	int ret = -ENOMEM;
+	int len;
+
+	req = alloc_generic_request(monc, GFP_NOIO);
+	if (!req)
+		goto out;
+
+	req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
+	if (!req->request)
+		goto out;
+
+	req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
+				  true);
+	if (!req->reply)
+		goto out;
+
+	mutex_lock(&monc->mutex);
+	register_generic_request(req);
+	h = req->request->front.iov_base;
+	h->monhdr.have_version = 0;
+	h->monhdr.session_mon = cpu_to_le16(-1);
+	h->monhdr.session_mon_tid = 0;
+	h->fsid = monc->monmap->fsid;
+	h->num_strs = cpu_to_le32(1);
+	len = sprintf(h->str, "{ \"prefix\": \"osd blacklist\", \
+		                 \"blacklistop\": \"add\", \
+				 \"addr\": \"%pISpc/%u\" }",
+		      &client_addr->in_addr, le32_to_cpu(client_addr->nonce));
+	h->str_len = cpu_to_le32(len);
+	send_generic_request(monc, req);
+	mutex_unlock(&monc->mutex);
+
+	ret = wait_generic_request(req);
+out:
+	put_generic_request(req);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_monc_blacklist_add);
+
 /*
  * Resend pending generic requests.
  */
@@ -1139,6 +1216,10 @@
 		handle_get_version_reply(monc, msg);
 		break;
 
+	case CEPH_MSG_MON_COMMAND_ACK:
+		handle_command_ack(monc, msg);
+		break;
+
 	case CEPH_MSG_MON_MAP:
 		ceph_monc_handle_map(monc, msg);
 		break;
@@ -1178,6 +1259,7 @@
 		m = ceph_msg_get(monc->m_subscribe_ack);
 		break;
 	case CEPH_MSG_STATFS_REPLY:
+	case CEPH_MSG_MON_COMMAND_ACK:
 		return get_generic_reply(con, hdr, skip);
 	case CEPH_MSG_AUTH_REPLY:
 		m = ceph_msg_get(monc->m_auth_reply);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index a97e7b5..d9bf7a1 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -338,6 +338,9 @@
 		ceph_osd_data_release(&op->notify.request_data);
 		ceph_osd_data_release(&op->notify.response_data);
 		break;
+	case CEPH_OSD_OP_LIST_WATCHERS:
+		ceph_osd_data_release(&op->list_watchers.response_data);
+		break;
 	default:
 		break;
 	}
@@ -863,6 +866,8 @@
 	case CEPH_OSD_OP_NOTIFY:
 		dst->notify.cookie = cpu_to_le64(src->notify.cookie);
 		break;
+	case CEPH_OSD_OP_LIST_WATCHERS:
+		break;
 	case CEPH_OSD_OP_SETALLOCHINT:
 		dst->alloc_hint.expected_object_size =
 		    cpu_to_le64(src->alloc_hint.expected_object_size);
@@ -1445,6 +1450,10 @@
 			ceph_osdc_msg_data_add(req->r_reply,
 					       &op->extent.osd_data);
 			break;
+		case CEPH_OSD_OP_LIST_WATCHERS:
+			ceph_osdc_msg_data_add(req->r_reply,
+					       &op->list_watchers.response_data);
+			break;
 
 		/* both */
 		case CEPH_OSD_OP_CALL:
@@ -3891,12 +3900,121 @@
 	return ret;
 }
 
+static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
+{
+	u8 struct_v;
+	u32 struct_len;
+	int ret;
+
+	ret = ceph_start_decoding(p, end, 2, "watch_item_t",
+				  &struct_v, &struct_len);
+	if (ret)
+		return ret;
+
+	ceph_decode_copy(p, &item->name, sizeof(item->name));
+	item->cookie = ceph_decode_64(p);
+	*p += 4; /* skip timeout_seconds */
+	if (struct_v >= 2) {
+		ceph_decode_copy(p, &item->addr, sizeof(item->addr));
+		ceph_decode_addr(&item->addr);
+	}
+
+	dout("%s %s%llu cookie %llu addr %s\n", __func__,
+	     ENTITY_NAME(item->name), item->cookie,
+	     ceph_pr_addr(&item->addr.in_addr));
+	return 0;
+}
+
+static int decode_watchers(void **p, void *end,
+			   struct ceph_watch_item **watchers,
+			   u32 *num_watchers)
+{
+	u8 struct_v;
+	u32 struct_len;
+	int i;
+	int ret;
+
+	ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t",
+				  &struct_v, &struct_len);
+	if (ret)
+		return ret;
+
+	*num_watchers = ceph_decode_32(p);
+	*watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO);
+	if (!*watchers)
+		return -ENOMEM;
+
+	for (i = 0; i < *num_watchers; i++) {
+		ret = decode_watcher(p, end, *watchers + i);
+		if (ret) {
+			kfree(*watchers);
+			return ret;
+		}
+	}
+
+	return 0;
+}
+
+/*
+ * On success, the caller is responsible for:
+ *
+ *     kfree(watchers);
+ */
+int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
+			    struct ceph_object_id *oid,
+			    struct ceph_object_locator *oloc,
+			    struct ceph_watch_item **watchers,
+			    u32 *num_watchers)
+{
+	struct ceph_osd_request *req;
+	struct page **pages;
+	int ret;
+
+	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
+	if (!req)
+		return -ENOMEM;
+
+	ceph_oid_copy(&req->r_base_oid, oid);
+	ceph_oloc_copy(&req->r_base_oloc, oloc);
+	req->r_flags = CEPH_OSD_FLAG_READ;
+
+	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+	if (ret)
+		goto out_put_req;
+
+	pages = ceph_alloc_page_vector(1, GFP_NOIO);
+	if (IS_ERR(pages)) {
+		ret = PTR_ERR(pages);
+		goto out_put_req;
+	}
+
+	osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0);
+	ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
+						 response_data),
+				 pages, PAGE_SIZE, 0, false, true);
+
+	ceph_osdc_start_request(osdc, req, false);
+	ret = ceph_osdc_wait_request(osdc, req);
+	if (ret >= 0) {
+		void *p = page_address(pages[0]);
+		void *const end = p + req->r_ops[0].outdata_len;
+
+		ret = decode_watchers(&p, end, watchers, num_watchers);
+	}
+
+out_put_req:
+	ceph_osdc_put_request(req);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_osdc_list_watchers);
+
 /*
  * Call all pending notify callbacks - for use after a watch is
  * unregistered, to make sure no more callbacks for it will be invoked
  */
 void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
 {
+	dout("%s osdc %p\n", __func__, osdc);
 	flush_workqueue(osdc->notify_wq);
 }
 EXPORT_SYMBOL(ceph_osdc_flush_notifies);
@@ -3910,6 +4028,57 @@
 EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
 
 /*
+ * Execute an OSD class method on an object.
+ *
+ * @flags: CEPH_OSD_FLAG_*
+ * @resp_len: out param for reply length
+ */
+int ceph_osdc_call(struct ceph_osd_client *osdc,
+		   struct ceph_object_id *oid,
+		   struct ceph_object_locator *oloc,
+		   const char *class, const char *method,
+		   unsigned int flags,
+		   struct page *req_page, size_t req_len,
+		   struct page *resp_page, size_t *resp_len)
+{
+	struct ceph_osd_request *req;
+	int ret;
+
+	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
+	if (!req)
+		return -ENOMEM;
+
+	ceph_oid_copy(&req->r_base_oid, oid);
+	ceph_oloc_copy(&req->r_base_oloc, oloc);
+	req->r_flags = flags;
+
+	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+	if (ret)
+		goto out_put_req;
+
+	osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
+	if (req_page)
+		osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
+						  0, false, false);
+	if (resp_page)
+		osd_req_op_cls_response_data_pages(req, 0, &resp_page,
+						   PAGE_SIZE, 0, false, false);
+
+	ceph_osdc_start_request(osdc, req, false);
+	ret = ceph_osdc_wait_request(osdc, req);
+	if (ret >= 0) {
+		ret = req->r_ops[0].rval;
+		if (resp_page)
+			*resp_len = req->r_ops[0].outdata_len;
+	}
+
+out_put_req:
+	ceph_osdc_put_request(req);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_osdc_call);
+
+/*
  * init, shutdown
  */
 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)