Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull Ceph updates from Sage Weil:
 "This has a mix of bug fixes and cleanups.

  Alex's patch fixes a rare race in RBD.  Ilya's patches fix an ENOENT
  check when a second rbd image is mapped and a couple memory leaks.
  Zheng fixes several issues with fragmented directories and multiple
  MDSs.  Josh fixes a spin/sleep issue, and Josh and Guangliang's
  patches fix setting and unsetting RBD images read-only.

  Naturally there are several other cleanups mixed in for good measure"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (23 commits)
  rbd: only set disk to read-only once
  rbd: move calls that may sleep out of spin lock range
  rbd: add ioctl for rbd
  ceph: use truncate_pagecache() instead of truncate_inode_pages()
  ceph: include time stamp in every MDS request
  rbd: fix ida/idr memory leak
  rbd: use reference counts for image requests
  rbd: fix osd_request memory leak in __rbd_dev_header_watch_sync()
  rbd: make sure we have latest osdmap on 'rbd map'
  libceph: add ceph_monc_wait_osdmap()
  libceph: mon_get_version request infrastructure
  libceph: recognize poolop requests in debugfs
  ceph: refactor readpage_nounlock() to make the logic clearer
  mds: check cap ID when handling cap export message
  ceph: remember subtree root dirfrag's auth MDS
  ceph: introduce ceph_fill_fragtree()
  ceph: handle cap import atomically
  ceph: pre-allocate ceph_cap struct for ceph_add_cap()
  ceph: update inode fields according to issued caps
  rbd: replace IS_ERR and PTR_ERR with PTR_ERR_OR_ZERO
  ...
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 4c95b50..bbeb404 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -541,7 +541,6 @@
 		return -ENOENT;
 
 	(void) get_device(&rbd_dev->dev);
-	set_device_ro(bdev, rbd_dev->mapping.read_only);
 
 	return 0;
 }
@@ -559,10 +558,76 @@
 	put_device(&rbd_dev->dev);
 }
 
+static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
+{
+	int ret = 0;
+	int val;
+	bool ro;
+	bool ro_changed = false;
+
+	/* get_user() may sleep, so call it before taking rbd_dev->lock */
+	if (get_user(val, (int __user *)(arg)))
+		return -EFAULT;
+
+	ro = val ? true : false;
+	/* Snapshot doesn't allow to write*/
+	if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
+		return -EROFS;
+
+	spin_lock_irq(&rbd_dev->lock);
+	/* prevent others open this device */
+	if (rbd_dev->open_count > 1) {
+		ret = -EBUSY;
+		goto out;
+	}
+
+	if (rbd_dev->mapping.read_only != ro) {
+		rbd_dev->mapping.read_only = ro;
+		ro_changed = true;
+	}
+
+out:
+	spin_unlock_irq(&rbd_dev->lock);
+	/* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
+	if (ret == 0 && ro_changed)
+		set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
+
+	return ret;
+}
+
+static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
+			unsigned int cmd, unsigned long arg)
+{
+	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
+	int ret = 0;
+
+	switch (cmd) {
+	case BLKROSET:
+		ret = rbd_ioctl_set_ro(rbd_dev, arg);
+		break;
+	default:
+		ret = -ENOTTY;
+	}
+
+	return ret;
+}
+
+#ifdef CONFIG_COMPAT
+static int rbd_compat_ioctl(struct block_device *bdev, fmode_t mode,
+				unsigned int cmd, unsigned long arg)
+{
+	return rbd_ioctl(bdev, mode, cmd, arg);
+}
+#endif /* CONFIG_COMPAT */
+
 static const struct block_device_operations rbd_bd_ops = {
 	.owner			= THIS_MODULE,
 	.open			= rbd_open,
 	.release		= rbd_release,
+	.ioctl			= rbd_ioctl,
+#ifdef CONFIG_COMPAT
+	.compat_ioctl		= rbd_compat_ioctl,
+#endif
 };
 
 /*
@@ -1382,6 +1447,13 @@
 	kref_put(&obj_request->kref, rbd_obj_request_destroy);
 }
 
+static void rbd_img_request_get(struct rbd_img_request *img_request)
+{
+	dout("%s: img %p (was %d)\n", __func__, img_request,
+	     atomic_read(&img_request->kref.refcount));
+	kref_get(&img_request->kref);
+}
+
 static bool img_request_child_test(struct rbd_img_request *img_request);
 static void rbd_parent_request_destroy(struct kref *kref);
 static void rbd_img_request_destroy(struct kref *kref);
@@ -2142,6 +2214,7 @@
 	img_request->next_completion = which;
 out:
 	spin_unlock_irq(&img_request->completion_lock);
+	rbd_img_request_put(img_request);
 
 	if (!more)
 		rbd_img_request_complete(img_request);
@@ -2242,6 +2315,7 @@
 			goto out_unwind;
 		obj_request->osd_req = osd_req;
 		obj_request->callback = rbd_img_obj_callback;
+		rbd_img_request_get(img_request);
 
 		if (write_request) {
 			osd_req_op_alloc_hint_init(osd_req, which,
@@ -2872,56 +2946,55 @@
 }
 
 /*
- * Request sync osd watch/unwatch.  The value of "start" determines
- * whether a watch request is being initiated or torn down.
+ * Initiate a watch request, synchronously.
  */
-static int __rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	struct rbd_obj_request *obj_request;
 	int ret;
 
-	rbd_assert(start ^ !!rbd_dev->watch_event);
-	rbd_assert(start ^ !!rbd_dev->watch_request);
+	rbd_assert(!rbd_dev->watch_event);
+	rbd_assert(!rbd_dev->watch_request);
 
-	if (start) {
-		ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
-						&rbd_dev->watch_event);
-		if (ret < 0)
-			return ret;
-		rbd_assert(rbd_dev->watch_event != NULL);
-	}
+	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
+				     &rbd_dev->watch_event);
+	if (ret < 0)
+		return ret;
 
-	ret = -ENOMEM;
+	rbd_assert(rbd_dev->watch_event);
+
 	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
-							OBJ_REQUEST_NODATA);
-	if (!obj_request)
+					     OBJ_REQUEST_NODATA);
+	if (!obj_request) {
+		ret = -ENOMEM;
 		goto out_cancel;
+	}
 
 	obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
 						  obj_request);
-	if (!obj_request->osd_req)
-		goto out_cancel;
+	if (!obj_request->osd_req) {
+		ret = -ENOMEM;
+		goto out_put;
+	}
 
-	if (start)
-		ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
-	else
-		ceph_osdc_unregister_linger_request(osdc,
-					rbd_dev->watch_request->osd_req);
+	ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
 
 	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
-				rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
+			      rbd_dev->watch_event->cookie, 0, 1);
 	rbd_osd_req_format_write(obj_request);
 
 	ret = rbd_obj_request_submit(osdc, obj_request);
 	if (ret)
-		goto out_cancel;
+		goto out_linger;
+
 	ret = rbd_obj_request_wait(obj_request);
 	if (ret)
-		goto out_cancel;
+		goto out_linger;
+
 	ret = obj_request->result;
 	if (ret)
-		goto out_cancel;
+		goto out_linger;
 
 	/*
 	 * A watch request is set to linger, so the underlying osd
@@ -2931,36 +3004,84 @@
 	 * it.  We'll drop that reference (below) after we've
 	 * unregistered it.
 	 */
-	if (start) {
-		rbd_dev->watch_request = obj_request;
+	rbd_dev->watch_request = obj_request;
 
-		return 0;
-	}
+	return 0;
 
-	/* We have successfully torn down the watch request */
-
-	rbd_obj_request_put(rbd_dev->watch_request);
-	rbd_dev->watch_request = NULL;
+out_linger:
+	ceph_osdc_unregister_linger_request(osdc, obj_request->osd_req);
+out_put:
+	rbd_obj_request_put(obj_request);
 out_cancel:
-	/* Cancel the event if we're tearing down, or on error */
 	ceph_osdc_cancel_event(rbd_dev->watch_event);
 	rbd_dev->watch_event = NULL;
-	if (obj_request)
-		rbd_obj_request_put(obj_request);
 
 	return ret;
 }
 
-static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
+/*
+ * Tear down a watch request, synchronously.
+ */
+static int __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
 {
-	return __rbd_dev_header_watch_sync(rbd_dev, true);
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	struct rbd_obj_request *obj_request;
+	int ret;
+
+	rbd_assert(rbd_dev->watch_event);
+	rbd_assert(rbd_dev->watch_request);
+
+	obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
+					     OBJ_REQUEST_NODATA);
+	if (!obj_request) {
+		ret = -ENOMEM;
+		goto out_cancel;
+	}
+
+	obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, 1,
+						  obj_request);
+	if (!obj_request->osd_req) {
+		ret = -ENOMEM;
+		goto out_put;
+	}
+
+	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
+			      rbd_dev->watch_event->cookie, 0, 0);
+	rbd_osd_req_format_write(obj_request);
+
+	ret = rbd_obj_request_submit(osdc, obj_request);
+	if (ret)
+		goto out_put;
+
+	ret = rbd_obj_request_wait(obj_request);
+	if (ret)
+		goto out_put;
+
+	ret = obj_request->result;
+	if (ret)
+		goto out_put;
+
+	/* We have successfully torn down the watch request */
+
+	ceph_osdc_unregister_linger_request(osdc,
+					    rbd_dev->watch_request->osd_req);
+	rbd_obj_request_put(rbd_dev->watch_request);
+	rbd_dev->watch_request = NULL;
+
+out_put:
+	rbd_obj_request_put(obj_request);
+out_cancel:
+	ceph_osdc_cancel_event(rbd_dev->watch_event);
+	rbd_dev->watch_event = NULL;
+
+	return ret;
 }
 
 static void rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
 {
 	int ret;
 
-	ret = __rbd_dev_header_watch_sync(rbd_dev, false);
+	ret = __rbd_dev_header_unwatch_sync(rbd_dev);
 	if (ret) {
 		rbd_warn(rbd_dev, "unable to tear down watch request: %d\n",
 			 ret);
@@ -3058,7 +3179,6 @@
 		__releases(q->queue_lock) __acquires(q->queue_lock)
 {
 	struct rbd_device *rbd_dev = q->queuedata;
-	bool read_only = rbd_dev->mapping.read_only;
 	struct request *rq;
 	int result;
 
@@ -3094,7 +3214,7 @@
 
 		if (write_request) {
 			result = -EROFS;
-			if (read_only)
+			if (rbd_dev->mapping.read_only)
 				goto end_request;
 			rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
 		}
@@ -4683,6 +4803,38 @@
 }
 
 /*
+ * Return pool id (>= 0) or a negative error code.
+ */
+static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name)
+{
+	u64 newest_epoch;
+	unsigned long timeout = rbdc->client->options->mount_timeout * HZ;
+	int tries = 0;
+	int ret;
+
+again:
+	ret = ceph_pg_poolid_by_name(rbdc->client->osdc.osdmap, pool_name);
+	if (ret == -ENOENT && tries++ < 1) {
+		ret = ceph_monc_do_get_version(&rbdc->client->monc, "osdmap",
+					       &newest_epoch);
+		if (ret < 0)
+			return ret;
+
+		if (rbdc->client->osdc.osdmap->epoch < newest_epoch) {
+			ceph_monc_request_next_osdmap(&rbdc->client->monc);
+			(void) ceph_monc_wait_osdmap(&rbdc->client->monc,
+						     newest_epoch, timeout);
+			goto again;
+		} else {
+			/* the osdmap we have is new enough */
+			return -ENOENT;
+		}
+	}
+
+	return ret;
+}
+
+/*
  * An rbd format 2 image has a unique identifier, distinct from the
  * name given to it by the user.  Internally, that identifier is
  * what's used to specify the names of objects related to the image.
@@ -4752,7 +4904,7 @@
 
 		image_id = ceph_extract_encoded_string(&p, p + ret,
 						NULL, GFP_NOIO);
-		ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
+		ret = PTR_ERR_OR_ZERO(image_id);
 		if (!ret)
 			rbd_dev->image_format = 2;
 	} else {
@@ -4907,6 +5059,7 @@
 	if (ret)
 		goto err_out_disk;
 	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
+	set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
 
 	ret = rbd_bus_add_dev(rbd_dev);
 	if (ret)
@@ -5053,7 +5206,6 @@
 	struct rbd_options *rbd_opts = NULL;
 	struct rbd_spec *spec = NULL;
 	struct rbd_client *rbdc;
-	struct ceph_osd_client *osdc;
 	bool read_only;
 	int rc = -ENOMEM;
 
@@ -5075,8 +5227,7 @@
 	}
 
 	/* pick the pool */
-	osdc = &rbdc->client->osdc;
-	rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
+	rc = rbd_add_get_pool_id(rbdc, spec->pool_name);
 	if (rc < 0)
 		goto err_out_client;
 	spec->pool_id = (u64)rc;
@@ -5387,6 +5538,7 @@
 
 static void __exit rbd_exit(void)
 {
+	ida_destroy(&rbd_dev_id_ida);
 	rbd_sysfs_cleanup();
 	if (single_major)
 		unregister_blkdev(rbd_major, RBD_DRV_NAME);
diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c
index 21887d6..469f2e8 100644
--- a/fs/ceph/acl.c
+++ b/fs/ceph/acl.c
@@ -104,12 +104,6 @@
 	umode_t new_mode = inode->i_mode, old_mode = inode->i_mode;
 	struct dentry *dentry;
 
-	if (acl) {
-		ret = posix_acl_valid(acl);
-		if (ret < 0)
-			goto out;
-	}
-
 	switch (type) {
 	case ACL_TYPE_ACCESS:
 		name = POSIX_ACL_XATTR_ACCESS;
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 4f3f690..90b3954 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -211,18 +211,15 @@
 		SetPageError(page);
 		ceph_fscache_readpage_cancel(inode, page);
 		goto out;
-	} else {
-		if (err < PAGE_CACHE_SIZE) {
-		/* zero fill remainder of page */
-			zero_user_segment(page, err, PAGE_CACHE_SIZE);
-		} else {
-			flush_dcache_page(page);
-		}
 	}
-	SetPageUptodate(page);
+	if (err < PAGE_CACHE_SIZE)
+		/* zero fill remainder of page */
+		zero_user_segment(page, err, PAGE_CACHE_SIZE);
+	else
+		flush_dcache_page(page);
 
-	if (err >= 0)
-		ceph_readpage_to_fscache(inode, page);
+	SetPageUptodate(page);
+	ceph_readpage_to_fscache(inode, page);
 
 out:
 	return err < 0 ? err : 0;
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index c561b62..1fde164 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -221,8 +221,8 @@
 	return 0;
 }
 
-static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc,
-				struct ceph_cap_reservation *ctx)
+struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
+			      struct ceph_cap_reservation *ctx)
 {
 	struct ceph_cap *cap = NULL;
 
@@ -508,15 +508,14 @@
  * it is < 0.  (This is so we can atomically add the cap and add an
  * open file reference to it.)
  */
-int ceph_add_cap(struct inode *inode,
-		 struct ceph_mds_session *session, u64 cap_id,
-		 int fmode, unsigned issued, unsigned wanted,
-		 unsigned seq, unsigned mseq, u64 realmino, int flags,
-		 struct ceph_cap_reservation *caps_reservation)
+void ceph_add_cap(struct inode *inode,
+		  struct ceph_mds_session *session, u64 cap_id,
+		  int fmode, unsigned issued, unsigned wanted,
+		  unsigned seq, unsigned mseq, u64 realmino, int flags,
+		  struct ceph_cap **new_cap)
 {
 	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_cap *new_cap = NULL;
 	struct ceph_cap *cap;
 	int mds = session->s_mds;
 	int actual_wanted;
@@ -531,20 +530,10 @@
 	if (fmode >= 0)
 		wanted |= ceph_caps_for_mode(fmode);
 
-retry:
-	spin_lock(&ci->i_ceph_lock);
 	cap = __get_cap_for_mds(ci, mds);
 	if (!cap) {
-		if (new_cap) {
-			cap = new_cap;
-			new_cap = NULL;
-		} else {
-			spin_unlock(&ci->i_ceph_lock);
-			new_cap = get_cap(mdsc, caps_reservation);
-			if (new_cap == NULL)
-				return -ENOMEM;
-			goto retry;
-		}
+		cap = *new_cap;
+		*new_cap = NULL;
 
 		cap->issued = 0;
 		cap->implemented = 0;
@@ -562,9 +551,6 @@
 		session->s_nr_caps++;
 		spin_unlock(&session->s_cap_lock);
 	} else {
-		if (new_cap)
-			ceph_put_cap(mdsc, new_cap);
-
 		/*
 		 * auth mds of the inode changed. we received the cap export
 		 * message, but still haven't received the cap import message.
@@ -626,7 +612,6 @@
 			ci->i_auth_cap = cap;
 			cap->mds_wanted = wanted;
 		}
-		ci->i_cap_exporting_issued = 0;
 	} else {
 		WARN_ON(ci->i_auth_cap == cap);
 	}
@@ -648,9 +633,6 @@
 
 	if (fmode >= 0)
 		__ceph_get_fmode(ci, fmode);
-	spin_unlock(&ci->i_ceph_lock);
-	wake_up_all(&ci->i_cap_wq);
-	return 0;
 }
 
 /*
@@ -685,7 +667,7 @@
  */
 int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
 {
-	int have = ci->i_snap_caps | ci->i_cap_exporting_issued;
+	int have = ci->i_snap_caps;
 	struct ceph_cap *cap;
 	struct rb_node *p;
 
@@ -900,7 +882,7 @@
  */
 static int __ceph_is_any_caps(struct ceph_inode_info *ci)
 {
-	return !RB_EMPTY_ROOT(&ci->i_caps) || ci->i_cap_exporting_issued;
+	return !RB_EMPTY_ROOT(&ci->i_caps);
 }
 
 int ceph_is_any_caps(struct inode *inode)
@@ -2397,32 +2379,30 @@
  * actually be a revocation if it specifies a smaller cap set.)
  *
  * caller holds s_mutex and i_ceph_lock, we drop both.
- *
- * return value:
- *  0 - ok
- *  1 - check_caps on auth cap only (writeback)
- *  2 - check_caps (ack revoke)
  */
-static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
+static void handle_cap_grant(struct ceph_mds_client *mdsc,
+			     struct inode *inode, struct ceph_mds_caps *grant,
+			     void *snaptrace, int snaptrace_len,
+			     struct ceph_buffer *xattr_buf,
 			     struct ceph_mds_session *session,
-			     struct ceph_cap *cap,
-			     struct ceph_buffer *xattr_buf)
-		__releases(ci->i_ceph_lock)
+			     struct ceph_cap *cap, int issued)
+	__releases(ci->i_ceph_lock)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	int mds = session->s_mds;
 	int seq = le32_to_cpu(grant->seq);
 	int newcaps = le32_to_cpu(grant->caps);
-	int issued, implemented, used, wanted, dirty;
+	int used, wanted, dirty;
 	u64 size = le64_to_cpu(grant->size);
 	u64 max_size = le64_to_cpu(grant->max_size);
 	struct timespec mtime, atime, ctime;
 	int check_caps = 0;
-	int wake = 0;
-	int writeback = 0;
-	int queue_invalidate = 0;
-	int deleted_inode = 0;
-	int queue_revalidate = 0;
+	bool wake = 0;
+	bool writeback = 0;
+	bool queue_trunc = 0;
+	bool queue_invalidate = 0;
+	bool queue_revalidate = 0;
+	bool deleted_inode = 0;
 
 	dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
 	     inode, cap, mds, seq, ceph_cap_string(newcaps));
@@ -2466,16 +2446,13 @@
 	}
 
 	/* side effects now are allowed */
-
-	issued = __ceph_caps_issued(ci, &implemented);
-	issued |= implemented | __ceph_caps_dirty(ci);
-
 	cap->cap_gen = session->s_cap_gen;
 	cap->seq = seq;
 
 	__check_cap_issue(ci, cap, newcaps);
 
-	if ((issued & CEPH_CAP_AUTH_EXCL) == 0) {
+	if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
+	    (issued & CEPH_CAP_AUTH_EXCL) == 0) {
 		inode->i_mode = le32_to_cpu(grant->mode);
 		inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid));
 		inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid));
@@ -2484,7 +2461,8 @@
 		     from_kgid(&init_user_ns, inode->i_gid));
 	}
 
-	if ((issued & CEPH_CAP_LINK_EXCL) == 0) {
+	if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
+	    (issued & CEPH_CAP_LINK_EXCL) == 0) {
 		set_nlink(inode, le32_to_cpu(grant->nlink));
 		if (inode->i_nlink == 0 &&
 		    (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL)))
@@ -2511,30 +2489,35 @@
 	if ((issued & CEPH_CAP_FILE_CACHE) && ci->i_rdcache_gen > 1)
 		queue_revalidate = 1;
 
-	/* size/ctime/mtime/atime? */
-	ceph_fill_file_size(inode, issued,
-			    le32_to_cpu(grant->truncate_seq),
-			    le64_to_cpu(grant->truncate_size), size);
-	ceph_decode_timespec(&mtime, &grant->mtime);
-	ceph_decode_timespec(&atime, &grant->atime);
-	ceph_decode_timespec(&ctime, &grant->ctime);
-	ceph_fill_file_time(inode, issued,
-			    le32_to_cpu(grant->time_warp_seq), &ctime, &mtime,
-			    &atime);
+	if (newcaps & CEPH_CAP_ANY_RD) {
+		/* ctime/mtime/atime? */
+		ceph_decode_timespec(&mtime, &grant->mtime);
+		ceph_decode_timespec(&atime, &grant->atime);
+		ceph_decode_timespec(&ctime, &grant->ctime);
+		ceph_fill_file_time(inode, issued,
+				    le32_to_cpu(grant->time_warp_seq),
+				    &ctime, &mtime, &atime);
+	}
 
-
-	/* file layout may have changed */
-	ci->i_layout = grant->layout;
-
-	/* max size increase? */
-	if (ci->i_auth_cap == cap && max_size != ci->i_max_size) {
-		dout("max_size %lld -> %llu\n", ci->i_max_size, max_size);
-		ci->i_max_size = max_size;
-		if (max_size >= ci->i_wanted_max_size) {
-			ci->i_wanted_max_size = 0;  /* reset */
-			ci->i_requested_max_size = 0;
+	if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
+		/* file layout may have changed */
+		ci->i_layout = grant->layout;
+		/* size/truncate_seq? */
+		queue_trunc = ceph_fill_file_size(inode, issued,
+					le32_to_cpu(grant->truncate_seq),
+					le64_to_cpu(grant->truncate_size),
+					size);
+		/* max size increase? */
+		if (ci->i_auth_cap == cap && max_size != ci->i_max_size) {
+			dout("max_size %lld -> %llu\n",
+			     ci->i_max_size, max_size);
+			ci->i_max_size = max_size;
+			if (max_size >= ci->i_wanted_max_size) {
+				ci->i_wanted_max_size = 0;  /* reset */
+				ci->i_requested_max_size = 0;
+			}
+			wake = 1;
 		}
-		wake = 1;
 	}
 
 	/* check cap bits */
@@ -2595,6 +2578,23 @@
 
 	spin_unlock(&ci->i_ceph_lock);
 
+	if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
+		down_write(&mdsc->snap_rwsem);
+		ceph_update_snap_trace(mdsc, snaptrace,
+				       snaptrace + snaptrace_len, false);
+		downgrade_write(&mdsc->snap_rwsem);
+		kick_flushing_inode_caps(mdsc, session, inode);
+		up_read(&mdsc->snap_rwsem);
+		if (newcaps & ~issued)
+			wake = 1;
+	}
+
+	if (queue_trunc) {
+		ceph_queue_vmtruncate(inode);
+		ceph_queue_revalidate(inode);
+	} else if (queue_revalidate)
+		ceph_queue_revalidate(inode);
+
 	if (writeback)
 		/*
 		 * queue inode for writeback: we can't actually call
@@ -2606,8 +2606,6 @@
 		ceph_queue_invalidate(inode);
 	if (deleted_inode)
 		invalidate_aliases(inode);
-	if (queue_revalidate)
-		ceph_queue_revalidate(inode);
 	if (wake)
 		wake_up_all(&ci->i_cap_wq);
 
@@ -2784,7 +2782,7 @@
 {
 	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
 	struct ceph_mds_session *tsession = NULL;
-	struct ceph_cap *cap, *tcap;
+	struct ceph_cap *cap, *tcap, *new_cap = NULL;
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	u64 t_cap_id;
 	unsigned mseq = le32_to_cpu(ex->migrate_seq);
@@ -2807,7 +2805,7 @@
 retry:
 	spin_lock(&ci->i_ceph_lock);
 	cap = __get_cap_for_mds(ci, mds);
-	if (!cap)
+	if (!cap || cap->cap_id != le64_to_cpu(ex->cap_id))
 		goto out_unlock;
 
 	if (target < 0) {
@@ -2846,15 +2844,14 @@
 		}
 		__ceph_remove_cap(cap, false);
 		goto out_unlock;
-	}
-
-	if (tsession) {
-		int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
-		spin_unlock(&ci->i_ceph_lock);
+	} else if (tsession) {
 		/* add placeholder for the export tagert */
+		int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
 		ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0,
-			     t_seq - 1, t_mseq, (u64)-1, flag, NULL);
-		goto retry;
+			     t_seq - 1, t_mseq, (u64)-1, flag, &new_cap);
+
+		__ceph_remove_cap(cap, false);
+		goto out_unlock;
 	}
 
 	spin_unlock(&ci->i_ceph_lock);
@@ -2873,6 +2870,7 @@
 					  SINGLE_DEPTH_NESTING);
 		}
 		ceph_add_cap_releases(mdsc, tsession);
+		new_cap = ceph_get_cap(mdsc, NULL);
 	} else {
 		WARN_ON(1);
 		tsession = NULL;
@@ -2887,24 +2885,27 @@
 		mutex_unlock(&tsession->s_mutex);
 		ceph_put_mds_session(tsession);
 	}
+	if (new_cap)
+		ceph_put_cap(mdsc, new_cap);
 }
 
 /*
- * Handle cap IMPORT.  If there are temp bits from an older EXPORT,
- * clean them up.
+ * Handle cap IMPORT.
  *
- * caller holds s_mutex.
+ * caller holds s_mutex. acquires i_ceph_lock
  */
 static void handle_cap_import(struct ceph_mds_client *mdsc,
 			      struct inode *inode, struct ceph_mds_caps *im,
 			      struct ceph_mds_cap_peer *ph,
 			      struct ceph_mds_session *session,
-			      void *snaptrace, int snaptrace_len)
+			      struct ceph_cap **target_cap, int *old_issued)
+	__acquires(ci->i_ceph_lock)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_cap *cap;
+	struct ceph_cap *cap, *ocap, *new_cap = NULL;
 	int mds = session->s_mds;
-	unsigned issued = le32_to_cpu(im->caps);
+	int issued;
+	unsigned caps = le32_to_cpu(im->caps);
 	unsigned wanted = le32_to_cpu(im->wanted);
 	unsigned seq = le32_to_cpu(im->seq);
 	unsigned mseq = le32_to_cpu(im->migrate_seq);
@@ -2924,40 +2925,52 @@
 	dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n",
 	     inode, ci, mds, mseq, peer);
 
+retry:
 	spin_lock(&ci->i_ceph_lock);
-	cap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
-	if (cap && cap->cap_id == p_cap_id) {
+	cap = __get_cap_for_mds(ci, mds);
+	if (!cap) {
+		if (!new_cap) {
+			spin_unlock(&ci->i_ceph_lock);
+			new_cap = ceph_get_cap(mdsc, NULL);
+			goto retry;
+		}
+		cap = new_cap;
+	} else {
+		if (new_cap) {
+			ceph_put_cap(mdsc, new_cap);
+			new_cap = NULL;
+		}
+	}
+
+	__ceph_caps_issued(ci, &issued);
+	issued |= __ceph_caps_dirty(ci);
+
+	ceph_add_cap(inode, session, cap_id, -1, caps, wanted, seq, mseq,
+		     realmino, CEPH_CAP_FLAG_AUTH, &new_cap);
+
+	ocap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
+	if (ocap && ocap->cap_id == p_cap_id) {
 		dout(" remove export cap %p mds%d flags %d\n",
-		     cap, peer, ph->flags);
+		     ocap, peer, ph->flags);
 		if ((ph->flags & CEPH_CAP_FLAG_AUTH) &&
-		    (cap->seq != le32_to_cpu(ph->seq) ||
-		     cap->mseq != le32_to_cpu(ph->mseq))) {
+		    (ocap->seq != le32_to_cpu(ph->seq) ||
+		     ocap->mseq != le32_to_cpu(ph->mseq))) {
 			pr_err("handle_cap_import: mismatched seq/mseq: "
 			       "ino (%llx.%llx) mds%d seq %d mseq %d "
 			       "importer mds%d has peer seq %d mseq %d\n",
-			       ceph_vinop(inode), peer, cap->seq,
-			       cap->mseq, mds, le32_to_cpu(ph->seq),
+			       ceph_vinop(inode), peer, ocap->seq,
+			       ocap->mseq, mds, le32_to_cpu(ph->seq),
 			       le32_to_cpu(ph->mseq));
 		}
-		ci->i_cap_exporting_issued = cap->issued;
-		__ceph_remove_cap(cap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
+		__ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
 	}
 
 	/* make sure we re-request max_size, if necessary */
 	ci->i_wanted_max_size = 0;
 	ci->i_requested_max_size = 0;
-	spin_unlock(&ci->i_ceph_lock);
 
-	down_write(&mdsc->snap_rwsem);
-	ceph_update_snap_trace(mdsc, snaptrace, snaptrace+snaptrace_len,
-			       false);
-	downgrade_write(&mdsc->snap_rwsem);
-	ceph_add_cap(inode, session, cap_id, -1,
-		     issued, wanted, seq, mseq, realmino, CEPH_CAP_FLAG_AUTH,
-		     NULL /* no caps context */);
-	kick_flushing_inode_caps(mdsc, session, inode);
-	up_read(&mdsc->snap_rwsem);
-
+	*old_issued = issued;
+	*target_cap = cap;
 }
 
 /*
@@ -2977,7 +2990,7 @@
 	struct ceph_mds_caps *h;
 	struct ceph_mds_cap_peer *peer = NULL;
 	int mds = session->s_mds;
-	int op;
+	int op, issued;
 	u32 seq, mseq;
 	struct ceph_vino vino;
 	u64 cap_id;
@@ -3069,7 +3082,10 @@
 
 	case CEPH_CAP_OP_IMPORT:
 		handle_cap_import(mdsc, inode, h, peer, session,
-				  snaptrace, snaptrace_len);
+				  &cap, &issued);
+		handle_cap_grant(mdsc, inode, h,  snaptrace, snaptrace_len,
+				 msg->middle, session, cap, issued);
+		goto done_unlocked;
 	}
 
 	/* the rest require a cap */
@@ -3086,8 +3102,10 @@
 	switch (op) {
 	case CEPH_CAP_OP_REVOKE:
 	case CEPH_CAP_OP_GRANT:
-	case CEPH_CAP_OP_IMPORT:
-		handle_cap_grant(inode, h, session, cap, msg->middle);
+		__ceph_caps_issued(ci, &issued);
+		issued |= __ceph_caps_dirty(ci);
+		handle_cap_grant(mdsc, inode, h, NULL, 0, msg->middle,
+				 session, cap, issued);
 		goto done_unlocked;
 
 	case CEPH_CAP_OP_FLUSH_ACK:
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 00d6af6..8d7d782 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -169,7 +169,7 @@
 	return dentry;
 }
 
-struct dentry *ceph_get_parent(struct dentry *child)
+static struct dentry *ceph_get_parent(struct dentry *child)
 {
 	/* don't re-export snaps */
 	if (ceph_snap(child->d_inode) != CEPH_NOSNAP)
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index e4fff9f..04c89c2 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -10,6 +10,7 @@
 #include <linux/writeback.h>
 #include <linux/vmalloc.h>
 #include <linux/posix_acl.h>
+#include <linux/random.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -179,9 +180,8 @@
  * specified, copy the frag delegation info to the caller if
  * it is present.
  */
-u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
-		     struct ceph_inode_frag *pfrag,
-		     int *found)
+static u32 __ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
+			      struct ceph_inode_frag *pfrag, int *found)
 {
 	u32 t = ceph_frag_make(0, 0);
 	struct ceph_inode_frag *frag;
@@ -191,7 +191,6 @@
 	if (found)
 		*found = 0;
 
-	mutex_lock(&ci->i_fragtree_mutex);
 	while (1) {
 		WARN_ON(!ceph_frag_contains_value(t, v));
 		frag = __ceph_find_frag(ci, t);
@@ -220,10 +219,19 @@
 	}
 	dout("choose_frag(%x) = %x\n", v, t);
 
-	mutex_unlock(&ci->i_fragtree_mutex);
 	return t;
 }
 
+u32 ceph_choose_frag(struct ceph_inode_info *ci, u32 v,
+		     struct ceph_inode_frag *pfrag, int *found)
+{
+	u32 ret;
+	mutex_lock(&ci->i_fragtree_mutex);
+	ret = __ceph_choose_frag(ci, v, pfrag, found);
+	mutex_unlock(&ci->i_fragtree_mutex);
+	return ret;
+}
+
 /*
  * Process dirfrag (delegation) info from the mds.  Include leaf
  * fragment in tree ONLY if ndist > 0.  Otherwise, only
@@ -237,11 +245,17 @@
 	u32 id = le32_to_cpu(dirinfo->frag);
 	int mds = le32_to_cpu(dirinfo->auth);
 	int ndist = le32_to_cpu(dirinfo->ndist);
+	int diri_auth = -1;
 	int i;
 	int err = 0;
 
+	spin_lock(&ci->i_ceph_lock);
+	if (ci->i_auth_cap)
+		diri_auth = ci->i_auth_cap->mds;
+	spin_unlock(&ci->i_ceph_lock);
+
 	mutex_lock(&ci->i_fragtree_mutex);
-	if (ndist == 0) {
+	if (ndist == 0 && mds == diri_auth) {
 		/* no delegation info needed. */
 		frag = __ceph_find_frag(ci, id);
 		if (!frag)
@@ -286,6 +300,75 @@
 	return err;
 }
 
+static int ceph_fill_fragtree(struct inode *inode,
+			      struct ceph_frag_tree_head *fragtree,
+			      struct ceph_mds_reply_dirfrag *dirinfo)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_inode_frag *frag;
+	struct rb_node *rb_node;
+	int i;
+	u32 id, nsplits;
+	bool update = false;
+
+	mutex_lock(&ci->i_fragtree_mutex);
+	nsplits = le32_to_cpu(fragtree->nsplits);
+	if (nsplits) {
+		i = prandom_u32() % nsplits;
+		id = le32_to_cpu(fragtree->splits[i].frag);
+		if (!__ceph_find_frag(ci, id))
+			update = true;
+	} else if (!RB_EMPTY_ROOT(&ci->i_fragtree)) {
+		rb_node = rb_first(&ci->i_fragtree);
+		frag = rb_entry(rb_node, struct ceph_inode_frag, node);
+		if (frag->frag != ceph_frag_make(0, 0) || rb_next(rb_node))
+			update = true;
+	}
+	if (!update && dirinfo) {
+		id = le32_to_cpu(dirinfo->frag);
+		if (id != __ceph_choose_frag(ci, id, NULL, NULL))
+			update = true;
+	}
+	if (!update)
+		goto out_unlock;
+
+	dout("fill_fragtree %llx.%llx\n", ceph_vinop(inode));
+	rb_node = rb_first(&ci->i_fragtree);
+	for (i = 0; i < nsplits; i++) {
+		id = le32_to_cpu(fragtree->splits[i].frag);
+		frag = NULL;
+		while (rb_node) {
+			frag = rb_entry(rb_node, struct ceph_inode_frag, node);
+			if (ceph_frag_compare(frag->frag, id) >= 0) {
+				if (frag->frag != id)
+					frag = NULL;
+				else
+					rb_node = rb_next(rb_node);
+				break;
+			}
+			rb_node = rb_next(rb_node);
+			rb_erase(&frag->node, &ci->i_fragtree);
+			kfree(frag);
+			frag = NULL;
+		}
+		if (!frag) {
+			frag = __get_or_create_frag(ci, id);
+			if (IS_ERR(frag))
+				continue;
+		}
+		frag->split_by = le32_to_cpu(fragtree->splits[i].by);
+		dout(" frag %x split by %d\n", frag->frag, frag->split_by);
+	}
+	while (rb_node) {
+		frag = rb_entry(rb_node, struct ceph_inode_frag, node);
+		rb_node = rb_next(rb_node);
+		rb_erase(&frag->node, &ci->i_fragtree);
+		kfree(frag);
+	}
+out_unlock:
+	mutex_unlock(&ci->i_fragtree_mutex);
+	return 0;
+}
 
 /*
  * initialize a newly allocated inode.
@@ -341,7 +424,6 @@
 	INIT_LIST_HEAD(&ci->i_cap_snaps);
 	ci->i_head_snapc = NULL;
 	ci->i_snap_caps = 0;
-	ci->i_cap_exporting_issued = 0;
 
 	for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
 		ci->i_nr_by_mode[i] = 0;
@@ -407,7 +489,7 @@
 
 	/*
 	 * we may still have a snap_realm reference if there are stray
-	 * caps in i_cap_exporting_issued or i_snap_caps.
+	 * caps in i_snap_caps.
 	 */
 	if (ci->i_snap_realm) {
 		struct ceph_mds_client *mdsc =
@@ -582,22 +664,26 @@
 		      unsigned long ttl_from, int cap_fmode,
 		      struct ceph_cap_reservation *caps_reservation)
 {
+	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
 	struct ceph_mds_reply_inode *info = iinfo->in;
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	int i;
-	int issued = 0, implemented;
+	int issued = 0, implemented, new_issued;
 	struct timespec mtime, atime, ctime;
-	u32 nsplits;
-	struct ceph_inode_frag *frag;
-	struct rb_node *rb_node;
 	struct ceph_buffer *xattr_blob = NULL;
+	struct ceph_cap *new_cap = NULL;
 	int err = 0;
-	int queue_trunc = 0;
+	bool wake = false;
+	bool queue_trunc = false;
+	bool new_version = false;
 
 	dout("fill_inode %p ino %llx.%llx v %llu had %llu\n",
 	     inode, ceph_vinop(inode), le64_to_cpu(info->version),
 	     ci->i_version);
 
+	/* prealloc new cap struct */
+	if (info->cap.caps && ceph_snap(inode) == CEPH_NOSNAP)
+		new_cap = ceph_get_cap(mdsc, caps_reservation);
+
 	/*
 	 * prealloc xattr data, if it looks like we'll need it.  only
 	 * if len > 4 (meaning there are actually xattrs; the first 4
@@ -623,19 +709,23 @@
 	 *   3    2     skip
 	 *   3    3     update
 	 */
-	if (le64_to_cpu(info->version) > 0 &&
-	    (ci->i_version & ~1) >= le64_to_cpu(info->version))
-		goto no_change;
-	
+	if (ci->i_version == 0 ||
+	    ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
+	     le64_to_cpu(info->version) > (ci->i_version & ~1)))
+		new_version = true;
+
 	issued = __ceph_caps_issued(ci, &implemented);
 	issued |= implemented | __ceph_caps_dirty(ci);
+	new_issued = ~issued & le32_to_cpu(info->cap.caps);
 
 	/* update inode */
 	ci->i_version = le64_to_cpu(info->version);
 	inode->i_version++;
 	inode->i_rdev = le32_to_cpu(info->rdev);
+	inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
 
-	if ((issued & CEPH_CAP_AUTH_EXCL) == 0) {
+	if ((new_version || (new_issued & CEPH_CAP_AUTH_SHARED)) &&
+	    (issued & CEPH_CAP_AUTH_EXCL) == 0) {
 		inode->i_mode = le32_to_cpu(info->mode);
 		inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(info->uid));
 		inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(info->gid));
@@ -644,23 +734,35 @@
 		     from_kgid(&init_user_ns, inode->i_gid));
 	}
 
-	if ((issued & CEPH_CAP_LINK_EXCL) == 0)
+	if ((new_version || (new_issued & CEPH_CAP_LINK_SHARED)) &&
+	    (issued & CEPH_CAP_LINK_EXCL) == 0)
 		set_nlink(inode, le32_to_cpu(info->nlink));
 
-	/* be careful with mtime, atime, size */
-	ceph_decode_timespec(&atime, &info->atime);
-	ceph_decode_timespec(&mtime, &info->mtime);
-	ceph_decode_timespec(&ctime, &info->ctime);
-	queue_trunc = ceph_fill_file_size(inode, issued,
-					  le32_to_cpu(info->truncate_seq),
-					  le64_to_cpu(info->truncate_size),
-					  le64_to_cpu(info->size));
-	ceph_fill_file_time(inode, issued,
-			    le32_to_cpu(info->time_warp_seq),
-			    &ctime, &mtime, &atime);
+	if (new_version || (new_issued & CEPH_CAP_ANY_RD)) {
+		/* be careful with mtime, atime, size */
+		ceph_decode_timespec(&atime, &info->atime);
+		ceph_decode_timespec(&mtime, &info->mtime);
+		ceph_decode_timespec(&ctime, &info->ctime);
+		ceph_fill_file_time(inode, issued,
+				le32_to_cpu(info->time_warp_seq),
+				&ctime, &mtime, &atime);
+	}
 
-	ci->i_layout = info->layout;
-	inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
+	if (new_version ||
+	    (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
+		ci->i_layout = info->layout;
+		queue_trunc = ceph_fill_file_size(inode, issued,
+					le32_to_cpu(info->truncate_seq),
+					le64_to_cpu(info->truncate_size),
+					le64_to_cpu(info->size));
+		/* only update max_size on auth cap */
+		if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
+		    ci->i_max_size != le64_to_cpu(info->max_size)) {
+			dout("max_size %lld -> %llu\n", ci->i_max_size,
+					le64_to_cpu(info->max_size));
+			ci->i_max_size = le64_to_cpu(info->max_size);
+		}
+	}
 
 	/* xattrs */
 	/* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */
@@ -745,58 +847,6 @@
 		dout(" marking %p complete (empty)\n", inode);
 		__ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count));
 	}
-no_change:
-	/* only update max_size on auth cap */
-	if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
-	    ci->i_max_size != le64_to_cpu(info->max_size)) {
-		dout("max_size %lld -> %llu\n", ci->i_max_size,
-		     le64_to_cpu(info->max_size));
-		ci->i_max_size = le64_to_cpu(info->max_size);
-	}
-
-	spin_unlock(&ci->i_ceph_lock);
-
-	/* queue truncate if we saw i_size decrease */
-	if (queue_trunc)
-		ceph_queue_vmtruncate(inode);
-
-	/* populate frag tree */
-	/* FIXME: move me up, if/when version reflects fragtree changes */
-	nsplits = le32_to_cpu(info->fragtree.nsplits);
-	mutex_lock(&ci->i_fragtree_mutex);
-	rb_node = rb_first(&ci->i_fragtree);
-	for (i = 0; i < nsplits; i++) {
-		u32 id = le32_to_cpu(info->fragtree.splits[i].frag);
-		frag = NULL;
-		while (rb_node) {
-			frag = rb_entry(rb_node, struct ceph_inode_frag, node);
-			if (ceph_frag_compare(frag->frag, id) >= 0) {
-				if (frag->frag != id)
-					frag = NULL;
-				else
-					rb_node = rb_next(rb_node);
-				break;
-			}
-			rb_node = rb_next(rb_node);
-			rb_erase(&frag->node, &ci->i_fragtree);
-			kfree(frag);
-			frag = NULL;
-		}
-		if (!frag) {
-			frag = __get_or_create_frag(ci, id);
-			if (IS_ERR(frag))
-				continue;
-		}
-		frag->split_by = le32_to_cpu(info->fragtree.splits[i].by);
-		dout(" frag %x split by %d\n", frag->frag, frag->split_by);
-	}
-	while (rb_node) {
-		frag = rb_entry(rb_node, struct ceph_inode_frag, node);
-		rb_node = rb_next(rb_node);
-		rb_erase(&frag->node, &ci->i_fragtree);
-		kfree(frag);
-	}
-	mutex_unlock(&ci->i_fragtree_mutex);
 
 	/* were we issued a capability? */
 	if (info->cap.caps) {
@@ -809,30 +859,41 @@
 				     le32_to_cpu(info->cap.seq),
 				     le32_to_cpu(info->cap.mseq),
 				     le64_to_cpu(info->cap.realm),
-				     info->cap.flags,
-				     caps_reservation);
+				     info->cap.flags, &new_cap);
+			wake = true;
 		} else {
-			spin_lock(&ci->i_ceph_lock);
 			dout(" %p got snap_caps %s\n", inode,
 			     ceph_cap_string(le32_to_cpu(info->cap.caps)));
 			ci->i_snap_caps |= le32_to_cpu(info->cap.caps);
 			if (cap_fmode >= 0)
 				__ceph_get_fmode(ci, cap_fmode);
-			spin_unlock(&ci->i_ceph_lock);
 		}
 	} else if (cap_fmode >= 0) {
 		pr_warn("mds issued no caps on %llx.%llx\n",
 			   ceph_vinop(inode));
 		__ceph_get_fmode(ci, cap_fmode);
 	}
+	spin_unlock(&ci->i_ceph_lock);
+
+	if (wake)
+		wake_up_all(&ci->i_cap_wq);
+
+	/* queue truncate if we saw i_size decrease */
+	if (queue_trunc)
+		ceph_queue_vmtruncate(inode);
+
+	/* populate frag tree */
+	if (S_ISDIR(inode->i_mode))
+		ceph_fill_fragtree(inode, &info->fragtree, dirinfo);
 
 	/* update delegation info? */
 	if (dirinfo)
 		ceph_fill_dirfrag(inode, dirinfo);
 
 	err = 0;
-
 out:
+	if (new_cap)
+		ceph_put_cap(mdsc, new_cap);
 	if (xattr_blob)
 		ceph_buffer_put(xattr_blob);
 	return err;
@@ -1485,7 +1546,7 @@
 	orig_gen = ci->i_rdcache_gen;
 	spin_unlock(&ci->i_ceph_lock);
 
-	truncate_inode_pages(inode->i_mapping, 0);
+	truncate_pagecache(inode, 0);
 
 	spin_lock(&ci->i_ceph_lock);
 	if (orig_gen == ci->i_rdcache_gen &&
@@ -1588,7 +1649,7 @@
 	     ci->i_truncate_pending, to);
 	spin_unlock(&ci->i_ceph_lock);
 
-	truncate_inode_pages(inode->i_mapping, to);
+	truncate_pagecache(inode, to);
 
 	spin_lock(&ci->i_ceph_lock);
 	if (to == ci->i_truncate_size) {
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 9a33b98..92a2548 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1558,6 +1558,8 @@
 	init_completion(&req->r_safe_completion);
 	INIT_LIST_HEAD(&req->r_unsafe_item);
 
+	req->r_stamp = CURRENT_TIME;
+
 	req->r_op = op;
 	req->r_direct_mode = mode;
 	return req;
@@ -1783,7 +1785,8 @@
 	}
 
 	len = sizeof(*head) +
-		pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64));
+		pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
+		sizeof(struct timespec);
 
 	/* calculate (max) length for cap releases */
 	len += sizeof(struct ceph_mds_request_release) *
@@ -1800,6 +1803,7 @@
 		goto out_free2;
 	}
 
+	msg->hdr.version = 2;
 	msg->hdr.tid = cpu_to_le64(req->r_tid);
 
 	head = msg->front.iov_base;
@@ -1836,6 +1840,9 @@
 		      mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
 	head->num_releases = cpu_to_le16(releases);
 
+	/* time stamp */
+	ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp));
+
 	BUG_ON(p > end);
 	msg->front.iov_len = p - msg->front.iov_base;
 	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index e90cfcc..e00737c 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -194,6 +194,7 @@
 	int r_fmode;        /* file mode, if expecting cap */
 	kuid_t r_uid;
 	kgid_t r_gid;
+	struct timespec r_stamp;
 
 	/* for choosing which mds to send this request to */
 	int r_direct_mode;
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index ead05cc..12b2074 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -292,7 +292,6 @@
 	struct ceph_snap_context *i_head_snapc;  /* set if wr_buffer_head > 0 or
 						    dirty|flushing caps */
 	unsigned i_snap_caps;           /* cap bits for snapped files */
-	unsigned i_cap_exporting_issued;
 
 	int i_nr_by_mode[CEPH_FILE_MODE_NUM];  /* open file counts */
 
@@ -775,11 +774,13 @@
 extern const char *ceph_cap_string(int c);
 extern void ceph_handle_caps(struct ceph_mds_session *session,
 			     struct ceph_msg *msg);
-extern int ceph_add_cap(struct inode *inode,
-			struct ceph_mds_session *session, u64 cap_id,
-			int fmode, unsigned issued, unsigned wanted,
-			unsigned cap, unsigned seq, u64 realmino, int flags,
-			struct ceph_cap_reservation *caps_reservation);
+extern struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
+				     struct ceph_cap_reservation *ctx);
+extern void ceph_add_cap(struct inode *inode,
+			 struct ceph_mds_session *session, u64 cap_id,
+			 int fmode, unsigned issued, unsigned wanted,
+			 unsigned cap, unsigned seq, u64 realmino, int flags,
+			 struct ceph_cap **new_cap);
 extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
 extern void ceph_put_cap(struct ceph_mds_client *mdsc,
 			 struct ceph_cap *cap);
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index 5f6db18..3c97d5e 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -625,6 +625,8 @@
 			   CEPH_CAP_LINK_EXCL |		\
 			   CEPH_CAP_XATTR_EXCL |	\
 			   CEPH_CAP_FILE_EXCL)
+#define CEPH_CAP_ANY_FILE_RD (CEPH_CAP_FILE_RD | CEPH_CAP_FILE_CACHE | \
+			      CEPH_CAP_FILE_SHARED)
 #define CEPH_CAP_ANY_FILE_WR (CEPH_CAP_FILE_WR | CEPH_CAP_FILE_BUFFER |	\
 			      CEPH_CAP_FILE_EXCL)
 #define CEPH_CAP_ANY_WR   (CEPH_CAP_ANY_EXCL | CEPH_CAP_ANY_FILE_WR)
diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h
index a486f39..deb47e4 100644
--- a/include/linux/ceph/mon_client.h
+++ b/include/linux/ceph/mon_client.h
@@ -40,9 +40,9 @@
 };
 
 /*
- * ceph_mon_generic_request is being used for the statfs and poolop requests
- * which are bening done a bit differently because we need to get data back
- * to the caller
+ * ceph_mon_generic_request is being used for the statfs, poolop and
+ * mon_get_version requests which are being done a bit differently
+ * because we need to get data back to the caller
  */
 struct ceph_mon_generic_request {
 	struct kref kref;
@@ -104,10 +104,15 @@
 extern int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 have);
 
 extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc);
+extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
+				 unsigned long timeout);
 
 extern int ceph_monc_do_statfs(struct ceph_mon_client *monc,
 			       struct ceph_statfs *buf);
 
+extern int ceph_monc_do_get_version(struct ceph_mon_client *monc,
+				    const char *what, u64 *newest);
+
 extern int ceph_monc_open_session(struct ceph_mon_client *monc);
 
 extern int ceph_monc_validate_auth(struct ceph_mon_client *monc);
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 67d7721..1675021 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -72,6 +72,8 @@
 	case CEPH_MSG_MON_SUBSCRIBE_ACK: return "mon_subscribe_ack";
 	case CEPH_MSG_STATFS: return "statfs";
 	case CEPH_MSG_STATFS_REPLY: return "statfs_reply";
+	case CEPH_MSG_MON_GET_VERSION: return "mon_get_version";
+	case CEPH_MSG_MON_GET_VERSION_REPLY: return "mon_get_version_reply";
 	case CEPH_MSG_MDS_MAP: return "mds_map";
 	case CEPH_MSG_CLIENT_SESSION: return "client_session";
 	case CEPH_MSG_CLIENT_RECONNECT: return "client_reconnect";
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 10421a4..d1a62c6 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -126,9 +126,13 @@
 		req = rb_entry(rp, struct ceph_mon_generic_request, node);
 		op = le16_to_cpu(req->request->hdr.type);
 		if (op == CEPH_MSG_STATFS)
-			seq_printf(s, "%lld statfs\n", req->tid);
+			seq_printf(s, "%llu statfs\n", req->tid);
+		else if (op == CEPH_MSG_POOLOP)
+			seq_printf(s, "%llu poolop\n", req->tid);
+		else if (op == CEPH_MSG_MON_GET_VERSION)
+			seq_printf(s, "%llu mon_get_version", req->tid);
 		else
-			seq_printf(s, "%lld unknown\n", req->tid);
+			seq_printf(s, "%llu unknown\n", req->tid);
 	}
 
 	mutex_unlock(&monc->mutex);
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 2ac9ef3..067d3af 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -296,6 +296,33 @@
 		__send_subscribe(monc);
 	mutex_unlock(&monc->mutex);
 }
+EXPORT_SYMBOL(ceph_monc_request_next_osdmap);
+
+int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
+			  unsigned long timeout)
+{
+	unsigned long started = jiffies;
+	int ret;
+
+	mutex_lock(&monc->mutex);
+	while (monc->have_osdmap < epoch) {
+		mutex_unlock(&monc->mutex);
+
+		if (timeout != 0 && time_after_eq(jiffies, started + timeout))
+			return -ETIMEDOUT;
+
+		ret = wait_event_interruptible_timeout(monc->client->auth_wq,
+					 monc->have_osdmap >= epoch, timeout);
+		if (ret < 0)
+			return ret;
+
+		mutex_lock(&monc->mutex);
+	}
+
+	mutex_unlock(&monc->mutex);
+	return 0;
+}
+EXPORT_SYMBOL(ceph_monc_wait_osdmap);
 
 /*
  *
@@ -477,14 +504,13 @@
 	return m;
 }
 
-static int do_generic_request(struct ceph_mon_client *monc,
-			      struct ceph_mon_generic_request *req)
+static int __do_generic_request(struct ceph_mon_client *monc, u64 tid,
+				struct ceph_mon_generic_request *req)
 {
 	int err;
 
 	/* register request */
-	mutex_lock(&monc->mutex);
-	req->tid = ++monc->last_tid;
+	req->tid = tid != 0 ? tid : ++monc->last_tid;
 	req->request->hdr.tid = cpu_to_le64(req->tid);
 	__insert_generic_request(monc, req);
 	monc->num_generic_requests++;
@@ -496,13 +522,24 @@
 	mutex_lock(&monc->mutex);
 	rb_erase(&req->node, &monc->generic_request_tree);
 	monc->num_generic_requests--;
-	mutex_unlock(&monc->mutex);
 
 	if (!err)
 		err = req->result;
 	return err;
 }
 
+static int do_generic_request(struct ceph_mon_client *monc,
+			      struct ceph_mon_generic_request *req)
+{
+	int err;
+
+	mutex_lock(&monc->mutex);
+	err = __do_generic_request(monc, 0, req);
+	mutex_unlock(&monc->mutex);
+
+	return err;
+}
+
 /*
  * statfs
  */
@@ -579,6 +616,96 @@
 }
 EXPORT_SYMBOL(ceph_monc_do_statfs);
 
+static void handle_get_version_reply(struct ceph_mon_client *monc,
+				     struct ceph_msg *msg)
+{
+	struct ceph_mon_generic_request *req;
+	u64 tid = le64_to_cpu(msg->hdr.tid);
+	void *p = msg->front.iov_base;
+	void *end = p + msg->front_alloc_len;
+	u64 handle;
+
+	dout("%s %p tid %llu\n", __func__, msg, tid);
+
+	ceph_decode_need(&p, end, 2*sizeof(u64), bad);
+	handle = ceph_decode_64(&p);
+	if (tid != 0 && tid != handle)
+		goto bad;
+
+	mutex_lock(&monc->mutex);
+	req = __lookup_generic_req(monc, handle);
+	if (req) {
+		*(u64 *)req->buf = ceph_decode_64(&p);
+		req->result = 0;
+		get_generic_request(req);
+	}
+	mutex_unlock(&monc->mutex);
+	if (req) {
+		complete_all(&req->completion);
+		put_generic_request(req);
+	}
+
+	return;
+bad:
+	pr_err("corrupt mon_get_version reply\n");
+	ceph_msg_dump(msg);
+}
+
+/*
+ * Send MMonGetVersion and wait for the reply.
+ *
+ * @what: one of "mdsmap", "osdmap" or "monmap"
+ */
+int ceph_monc_do_get_version(struct ceph_mon_client *monc, const char *what,
+			     u64 *newest)
+{
+	struct ceph_mon_generic_request *req;
+	void *p, *end;
+	u64 tid;
+	int err;
+
+	req = kzalloc(sizeof(*req), GFP_NOFS);
+	if (!req)
+		return -ENOMEM;
+
+	kref_init(&req->kref);
+	req->buf = newest;
+	req->buf_len = sizeof(*newest);
+	init_completion(&req->completion);
+
+	req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
+				    sizeof(u64) + sizeof(u32) + strlen(what),
+				    GFP_NOFS, true);
+	if (!req->request) {
+		err = -ENOMEM;
+		goto out;
+	}
+
+	req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 1024,
+				  GFP_NOFS, true);
+	if (!req->reply) {
+		err = -ENOMEM;
+		goto out;
+	}
+
+	p = req->request->front.iov_base;
+	end = p + req->request->front_alloc_len;
+
+	/* fill out request */
+	mutex_lock(&monc->mutex);
+	tid = ++monc->last_tid;
+	ceph_encode_64(&p, tid); /* handle */
+	ceph_encode_string(&p, end, what, strlen(what));
+
+	err = __do_generic_request(monc, tid, req);
+
+	mutex_unlock(&monc->mutex);
+out:
+	kref_put(&req->kref, release_generic_request);
+	return err;
+}
+EXPORT_SYMBOL(ceph_monc_do_get_version);
+
 /*
  * pool ops
  */
@@ -981,6 +1108,10 @@
 		handle_statfs_reply(monc, msg);
 		break;
 
+	case CEPH_MSG_MON_GET_VERSION_REPLY:
+		handle_get_version_reply(monc, msg);
+		break;
+
 	case CEPH_MSG_POOLOP_REPLY:
 		handle_poolop_reply(monc, msg);
 		break;
@@ -1029,6 +1160,15 @@
 	case CEPH_MSG_AUTH_REPLY:
 		m = ceph_msg_get(monc->m_auth_reply);
 		break;
+	case CEPH_MSG_MON_GET_VERSION_REPLY:
+		if (le64_to_cpu(hdr->tid) != 0)
+			return get_generic_reply(con, hdr, skip);
+
+		/*
+		 * Older OSDs don't set reply tid even if the orignal
+		 * request had a non-zero tid.  Workaround this weirdness
+		 * by falling through to the allocate case.
+		 */
 	case CEPH_MSG_MON_MAP:
 	case CEPH_MSG_MDS_MAP:
 	case CEPH_MSG_OSD_MAP: