Merge tag 'ceph-for-4.15-rc1' of git://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "We have a set of file locking improvements from Zheng, rbd rw/ro state
  handling code cleanup from myself and some assorted CephFS fixes from
  Jeff.

  rbd now defaults to single-major=Y, lifting the limit of ~240 rbd
  images per host for everyone"

* tag 'ceph-for-4.15-rc1' of git://github.com/ceph/ceph-client:
  rbd: default to single-major device number scheme
  libceph: don't WARN() if user tries to add invalid key
  rbd: set discard_alignment to zero
  ceph: silence sparse endianness warning in encode_caps_cb
  ceph: remove the bump of i_version
  ceph: present consistent fsid, regardless of arch endianness
  ceph: clean up spinlocking and list handling around cleanup_cap_releases()
  rbd: get rid of rbd_mapping::read_only
  rbd: fix and simplify rbd_ioctl_set_ro()
  ceph: remove unused and redundant variable dropping
  ceph: mark expected switch fall-throughs
  ceph: -EINVAL on decoding failure in ceph_mdsc_handle_fsmap()
  ceph: disable cached readdir after dropping positive dentry
  ceph: fix bool initialization/comparison
  ceph: handle 'session get evicted while there are file locks'
  ceph: optimize flock encoding during reconnect
  ceph: make lock_to_ceph_filelock() static
  ceph: keep auth cap when inode has flocks or posix locks
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index adc877d..38fc5f3 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -348,7 +348,6 @@
 struct rbd_mapping {
 	u64                     size;
 	u64                     features;
-	bool			read_only;
 };
 
 /*
@@ -450,12 +449,11 @@
 static struct workqueue_struct *rbd_wq;
 
 /*
- * Default to false for now, as single-major requires >= 0.75 version of
- * userspace rbd utility.
+ * single-major requires >= 0.75 version of userspace rbd utility.
  */
-static bool single_major = false;
+static bool single_major = true;
 module_param(single_major, bool, S_IRUGO);
-MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: false)");
+MODULE_PARM_DESC(single_major, "Use a single major number for all rbd devices (default: true)");
 
 static int rbd_img_request_submit(struct rbd_img_request *img_request);
 
@@ -608,9 +606,6 @@
 	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
 	bool removing = false;
 
-	if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
-		return -EROFS;
-
 	spin_lock_irq(&rbd_dev->lock);
 	if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
 		removing = true;
@@ -640,46 +635,24 @@
 
 static int rbd_ioctl_set_ro(struct rbd_device *rbd_dev, unsigned long arg)
 {
-	int ret = 0;
-	int val;
-	bool ro;
-	bool ro_changed = false;
+	int ro;
 
-	/* get_user() may sleep, so call it before taking rbd_dev->lock */
-	if (get_user(val, (int __user *)(arg)))
+	if (get_user(ro, (int __user *)arg))
 		return -EFAULT;
 
-	ro = val ? true : false;
-	/* Snapshot doesn't allow to write*/
+	/* Snapshots can't be marked read-write */
 	if (rbd_dev->spec->snap_id != CEPH_NOSNAP && !ro)
 		return -EROFS;
 
-	spin_lock_irq(&rbd_dev->lock);
-	/* prevent others open this device */
-	if (rbd_dev->open_count > 1) {
-		ret = -EBUSY;
-		goto out;
-	}
-
-	if (rbd_dev->mapping.read_only != ro) {
-		rbd_dev->mapping.read_only = ro;
-		ro_changed = true;
-	}
-
-out:
-	spin_unlock_irq(&rbd_dev->lock);
-	/* set_disk_ro() may sleep, so call it after releasing rbd_dev->lock */
-	if (ret == 0 && ro_changed)
-		set_disk_ro(rbd_dev->disk, ro ? 1 : 0);
-
-	return ret;
+	/* Let blkdev_roset() handle it */
+	return -ENOTTY;
 }
 
 static int rbd_ioctl(struct block_device *bdev, fmode_t mode,
 			unsigned int cmd, unsigned long arg)
 {
 	struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
-	int ret = 0;
+	int ret;
 
 	switch (cmd) {
 	case BLKROSET:
@@ -4050,15 +4023,8 @@
 		goto err_rq;
 	}
 
-	/* Only reads are allowed to a read-only device */
-
-	if (op_type != OBJ_OP_READ) {
-		if (rbd_dev->mapping.read_only) {
-			result = -EROFS;
-			goto err_rq;
-		}
-		rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
-	}
+	rbd_assert(op_type == OBJ_OP_READ ||
+		   rbd_dev->spec->snap_id == CEPH_NOSNAP);
 
 	/*
 	 * Quit early if the mapped snapshot no longer exists.  It's
@@ -4423,7 +4389,6 @@
 	/* enable the discard support */
 	queue_flag_set_unlocked(QUEUE_FLAG_DISCARD, q);
 	q->limits.discard_granularity = segment_size;
-	q->limits.discard_alignment = segment_size;
 	blk_queue_max_discard_sectors(q, segment_size / SECTOR_SIZE);
 	blk_queue_max_write_zeroes_sectors(q, segment_size / SECTOR_SIZE);
 
@@ -5994,7 +5959,7 @@
 		goto err_out_disk;
 
 	set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
-	set_disk_ro(rbd_dev->disk, rbd_dev->mapping.read_only);
+	set_disk_ro(rbd_dev->disk, rbd_dev->opts->read_only);
 
 	ret = dev_set_name(&rbd_dev->dev, "%d", rbd_dev->dev_id);
 	if (ret)
@@ -6145,7 +6110,6 @@
 	struct rbd_options *rbd_opts = NULL;
 	struct rbd_spec *spec = NULL;
 	struct rbd_client *rbdc;
-	bool read_only;
 	int rc;
 
 	if (!try_module_get(THIS_MODULE))
@@ -6194,11 +6158,8 @@
 	}
 
 	/* If we are mapping a snapshot it must be marked read-only */
-
-	read_only = rbd_dev->opts->read_only;
 	if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
-		read_only = true;
-	rbd_dev->mapping.read_only = read_only;
+		rbd_dev->opts->read_only = true;
 
 	rc = rbd_dev_device_setup(rbd_dev);
 	if (rc)
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index ff5d32c..a14b2c9 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1160,7 +1160,7 @@
 	struct ceph_inode_info *ci = cap->ci;
 	struct inode *inode = &ci->vfs_inode;
 	struct cap_msg_args arg;
-	int held, revoking, dropping;
+	int held, revoking;
 	int wake = 0;
 	int delayed = 0;
 	int ret;
@@ -1168,7 +1168,6 @@
 	held = cap->issued | cap->implemented;
 	revoking = cap->implemented & ~cap->issued;
 	retain &= ~revoking;
-	dropping = cap->issued & ~retain;
 
 	dout("__send_cap %p cap %p session %p %s -> %s (revoking %s)\n",
 	     inode, cap, cap->session,
@@ -1712,7 +1711,7 @@
 
 	/* if we are unmounting, flush any unused caps immediately. */
 	if (mdsc->stopping)
-		is_delayed = 1;
+		is_delayed = true;
 
 	spin_lock(&ci->i_ceph_lock);
 
@@ -3189,8 +3188,8 @@
 	int dirty = le32_to_cpu(m->dirty);
 	int cleaned = 0;
 	bool drop = false;
-	bool wake_ci = 0;
-	bool wake_mdsc = 0;
+	bool wake_ci = false;
+	bool wake_mdsc = false;
 
 	list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) {
 		if (cf->tid == flush_tid)
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index f2550a0..ab81652 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -493,6 +493,7 @@
 	ci->i_wb_ref = 0;
 	ci->i_wrbuffer_ref = 0;
 	ci->i_wrbuffer_ref_head = 0;
+	atomic_set(&ci->i_filelock_ref, 0);
 	ci->i_shared_gen = 0;
 	ci->i_rdcache_gen = 0;
 	ci->i_rdcache_revoking = 0;
@@ -786,7 +787,6 @@
 
 	/* update inode */
 	ci->i_version = le64_to_cpu(info->version);
-	inode->i_version++;
 	inode->i_rdev = le32_to_cpu(info->rdev);
 	inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
 
@@ -1185,6 +1185,7 @@
 				    ceph_snap(d_inode(dn)) != tvino.snap)) {
 				dout(" dn %p points to wrong inode %p\n",
 				     dn, d_inode(dn));
+				ceph_dir_clear_ordered(dir);
 				d_delete(dn);
 				dput(dn);
 				goto retry_lookup;
@@ -1322,6 +1323,7 @@
 			dout(" %p links to %p %llx.%llx, not %llx.%llx\n",
 			     dn, d_inode(dn), ceph_vinop(d_inode(dn)),
 			     ceph_vinop(in));
+			ceph_dir_clear_ordered(dir);
 			d_invalidate(dn);
 			have_lease = false;
 		}
@@ -1573,6 +1575,7 @@
 			    ceph_snap(d_inode(dn)) != tvino.snap)) {
 			dout(" dn %p points to wrong inode %p\n",
 			     dn, d_inode(dn));
+			__ceph_dir_clear_ordered(ci);
 			d_delete(dn);
 			dput(dn);
 			goto retry_lookup;
@@ -1597,7 +1600,9 @@
 				 &req->r_caps_reservation);
 		if (ret < 0) {
 			pr_err("fill_inode badness on %p\n", in);
-			if (d_really_is_negative(dn))
+			if (d_really_is_positive(dn))
+				__ceph_dir_clear_ordered(ci);
+			else
 				iput(in);
 			d_drop(dn);
 			err = ret;
diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c
index e7cce41..9e66f69 100644
--- a/fs/ceph/locks.c
+++ b/fs/ceph/locks.c
@@ -30,19 +30,52 @@
 	get_random_bytes(&lock_secret, sizeof(lock_secret));
 }
 
+static void ceph_fl_copy_lock(struct file_lock *dst, struct file_lock *src)
+{
+	struct inode *inode = file_inode(src->fl_file);
+	atomic_inc(&ceph_inode(inode)->i_filelock_ref);
+}
+
+static void ceph_fl_release_lock(struct file_lock *fl)
+{
+	struct inode *inode = file_inode(fl->fl_file);
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	if (atomic_dec_and_test(&ci->i_filelock_ref)) {
+		/* clear error when all locks are released */
+		spin_lock(&ci->i_ceph_lock);
+		ci->i_ceph_flags &= ~CEPH_I_ERROR_FILELOCK;
+		spin_unlock(&ci->i_ceph_lock);
+	}
+}
+
+static const struct file_lock_operations ceph_fl_lock_ops = {
+	.fl_copy_lock = ceph_fl_copy_lock,
+	.fl_release_private = ceph_fl_release_lock,
+};
+
 /**
  * Implement fcntl and flock locking functions.
  */
-static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
+static int ceph_lock_message(u8 lock_type, u16 operation, struct inode *inode,
 			     int cmd, u8 wait, struct file_lock *fl)
 {
-	struct inode *inode = file_inode(file);
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 	struct ceph_mds_request *req;
 	int err;
 	u64 length = 0;
 	u64 owner;
 
+	if (operation == CEPH_MDS_OP_SETFILELOCK) {
+		/*
+		 * increasing i_filelock_ref closes race window between
+		 * handling request reply and adding file_lock struct to
+		 * inode. Otherwise, auth caps may get trimmed in the
+		 * window. Caller function will decrease the counter.
+		 */
+		fl->fl_ops = &ceph_fl_lock_ops;
+		atomic_inc(&ceph_inode(inode)->i_filelock_ref);
+	}
+
 	if (operation != CEPH_MDS_OP_SETFILELOCK || cmd == CEPH_LOCK_UNLOCK)
 		wait = 0;
 
@@ -180,10 +213,12 @@
  */
 int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
 {
-	u8 lock_cmd;
-	int err;
-	u8 wait = 0;
+	struct inode *inode = file_inode(file);
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	int err = 0;
 	u16 op = CEPH_MDS_OP_SETFILELOCK;
+	u8 wait = 0;
+	u8 lock_cmd;
 
 	if (!(fl->fl_flags & FL_POSIX))
 		return -ENOLCK;
@@ -199,6 +234,26 @@
 	else if (IS_SETLKW(cmd))
 		wait = 1;
 
+	spin_lock(&ci->i_ceph_lock);
+	if (ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) {
+		err = -EIO;
+	} else if (op == CEPH_MDS_OP_SETFILELOCK) {
+		/*
+		 * increasing i_filelock_ref closes race window between
+		 * handling request reply and adding file_lock struct to
+		 * inode. Otherwise, i_auth_cap may get trimmed in the
+		 * window. Caller function will decrease the counter.
+		 */
+		fl->fl_ops = &ceph_fl_lock_ops;
+		atomic_inc(&ci->i_filelock_ref);
+	}
+	spin_unlock(&ci->i_ceph_lock);
+	if (err < 0) {
+		if (op == CEPH_MDS_OP_SETFILELOCK && F_UNLCK == fl->fl_type)
+			posix_lock_file(file, fl, NULL);
+		return err;
+	}
+
 	if (F_RDLCK == fl->fl_type)
 		lock_cmd = CEPH_LOCK_SHARED;
 	else if (F_WRLCK == fl->fl_type)
@@ -206,16 +261,16 @@
 	else
 		lock_cmd = CEPH_LOCK_UNLOCK;
 
-	err = ceph_lock_message(CEPH_LOCK_FCNTL, op, file, lock_cmd, wait, fl);
+	err = ceph_lock_message(CEPH_LOCK_FCNTL, op, inode, lock_cmd, wait, fl);
 	if (!err) {
-		if (op != CEPH_MDS_OP_GETFILELOCK) {
+		if (op == CEPH_MDS_OP_SETFILELOCK) {
 			dout("mds locked, locking locally");
 			err = posix_lock_file(file, fl, NULL);
-			if (err && (CEPH_MDS_OP_SETFILELOCK == op)) {
+			if (err) {
 				/* undo! This should only happen if
 				 * the kernel detects local
 				 * deadlock. */
-				ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
+				ceph_lock_message(CEPH_LOCK_FCNTL, op, inode,
 						  CEPH_LOCK_UNLOCK, 0, fl);
 				dout("got %d on posix_lock_file, undid lock",
 				     err);
@@ -227,9 +282,11 @@
 
 int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
 {
-	u8 lock_cmd;
-	int err;
+	struct inode *inode = file_inode(file);
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	int err = 0;
 	u8 wait = 0;
+	u8 lock_cmd;
 
 	if (!(fl->fl_flags & FL_FLOCK))
 		return -ENOLCK;
@@ -239,6 +296,21 @@
 
 	dout("ceph_flock, fl_file: %p", fl->fl_file);
 
+	spin_lock(&ci->i_ceph_lock);
+	if (ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) {
+		err = -EIO;
+	} else {
+		/* see comment in ceph_lock */
+		fl->fl_ops = &ceph_fl_lock_ops;
+		atomic_inc(&ci->i_filelock_ref);
+	}
+	spin_unlock(&ci->i_ceph_lock);
+	if (err < 0) {
+		if (F_UNLCK == fl->fl_type)
+			locks_lock_file_wait(file, fl);
+		return err;
+	}
+
 	if (IS_SETLKW(cmd))
 		wait = 1;
 
@@ -250,13 +322,13 @@
 		lock_cmd = CEPH_LOCK_UNLOCK;
 
 	err = ceph_lock_message(CEPH_LOCK_FLOCK, CEPH_MDS_OP_SETFILELOCK,
-				file, lock_cmd, wait, fl);
+				inode, lock_cmd, wait, fl);
 	if (!err) {
 		err = locks_lock_file_wait(file, fl);
 		if (err) {
 			ceph_lock_message(CEPH_LOCK_FLOCK,
 					  CEPH_MDS_OP_SETFILELOCK,
-					  file, CEPH_LOCK_UNLOCK, 0, fl);
+					  inode, CEPH_LOCK_UNLOCK, 0, fl);
 			dout("got %d on locks_lock_file_wait, undid lock", err);
 		}
 	}
@@ -288,6 +360,37 @@
 	     *flock_count, *fcntl_count);
 }
 
+/*
+ * Given a pointer to a lock, convert it to a ceph filelock
+ */
+static int lock_to_ceph_filelock(struct file_lock *lock,
+				 struct ceph_filelock *cephlock)
+{
+	int err = 0;
+	cephlock->start = cpu_to_le64(lock->fl_start);
+	cephlock->length = cpu_to_le64(lock->fl_end - lock->fl_start + 1);
+	cephlock->client = cpu_to_le64(0);
+	cephlock->pid = cpu_to_le64((u64)lock->fl_pid);
+	cephlock->owner = cpu_to_le64(secure_addr(lock->fl_owner));
+
+	switch (lock->fl_type) {
+	case F_RDLCK:
+		cephlock->type = CEPH_LOCK_SHARED;
+		break;
+	case F_WRLCK:
+		cephlock->type = CEPH_LOCK_EXCL;
+		break;
+	case F_UNLCK:
+		cephlock->type = CEPH_LOCK_UNLOCK;
+		break;
+	default:
+		dout("Have unknown lock type %d", lock->fl_type);
+		err = -EINVAL;
+	}
+
+	return err;
+}
+
 /**
  * Encode the flock and fcntl locks for the given inode into the ceph_filelock
  * array. Must be called with inode->i_lock already held.
@@ -356,50 +459,22 @@
 	if (err)
 		goto out_fail;
 
-	err = ceph_pagelist_append(pagelist, flocks,
-				   num_fcntl_locks * sizeof(*flocks));
-	if (err)
-		goto out_fail;
+	if (num_fcntl_locks > 0) {
+		err = ceph_pagelist_append(pagelist, flocks,
+					   num_fcntl_locks * sizeof(*flocks));
+		if (err)
+			goto out_fail;
+	}
 
 	nlocks = cpu_to_le32(num_flock_locks);
 	err = ceph_pagelist_append(pagelist, &nlocks, sizeof(nlocks));
 	if (err)
 		goto out_fail;
 
-	err = ceph_pagelist_append(pagelist,
-				   &flocks[num_fcntl_locks],
-				   num_flock_locks * sizeof(*flocks));
-out_fail:
-	return err;
-}
-
-/*
- * Given a pointer to a lock, convert it to a ceph filelock
- */
-int lock_to_ceph_filelock(struct file_lock *lock,
-			  struct ceph_filelock *cephlock)
-{
-	int err = 0;
-	cephlock->start = cpu_to_le64(lock->fl_start);
-	cephlock->length = cpu_to_le64(lock->fl_end - lock->fl_start + 1);
-	cephlock->client = cpu_to_le64(0);
-	cephlock->pid = cpu_to_le64((u64)lock->fl_pid);
-	cephlock->owner = cpu_to_le64(secure_addr(lock->fl_owner));
-
-	switch (lock->fl_type) {
-	case F_RDLCK:
-		cephlock->type = CEPH_LOCK_SHARED;
-		break;
-	case F_WRLCK:
-		cephlock->type = CEPH_LOCK_EXCL;
-		break;
-	case F_UNLCK:
-		cephlock->type = CEPH_LOCK_UNLOCK;
-		break;
-	default:
-		dout("Have unknown lock type %d", lock->fl_type);
-		err = -EINVAL;
+	if (num_flock_locks > 0) {
+		err = ceph_pagelist_append(pagelist, &flocks[num_fcntl_locks],
+					   num_flock_locks * sizeof(*flocks));
 	}
-
+out_fail:
 	return err;
 }
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 0687ab3..ab69dcb 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1039,22 +1039,23 @@
  * session caps
  */
 
-/* caller holds s_cap_lock, we drop it */
-static void cleanup_cap_releases(struct ceph_mds_client *mdsc,
-				 struct ceph_mds_session *session)
-	__releases(session->s_cap_lock)
+static void detach_cap_releases(struct ceph_mds_session *session,
+				struct list_head *target)
 {
-	LIST_HEAD(tmp_list);
-	list_splice_init(&session->s_cap_releases, &tmp_list);
-	session->s_num_cap_releases = 0;
-	spin_unlock(&session->s_cap_lock);
+	lockdep_assert_held(&session->s_cap_lock);
 
-	dout("cleanup_cap_releases mds%d\n", session->s_mds);
-	while (!list_empty(&tmp_list)) {
+	list_splice_init(&session->s_cap_releases, target);
+	session->s_num_cap_releases = 0;
+	dout("dispose_cap_releases mds%d\n", session->s_mds);
+}
+
+static void dispose_cap_releases(struct ceph_mds_client *mdsc,
+				 struct list_head *dispose)
+{
+	while (!list_empty(dispose)) {
 		struct ceph_cap *cap;
 		/* zero out the in-progress message */
-		cap = list_first_entry(&tmp_list,
-					struct ceph_cap, session_caps);
+		cap = list_first_entry(dispose, struct ceph_cap, session_caps);
 		list_del(&cap->session_caps);
 		ceph_put_cap(mdsc, cap);
 	}
@@ -1215,6 +1216,13 @@
 		}
 		spin_unlock(&mdsc->cap_dirty_lock);
 
+		if (atomic_read(&ci->i_filelock_ref) > 0) {
+			/* make further file lock syscall return -EIO */
+			ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
+			pr_warn_ratelimited(" dropping file locks for %p %lld\n",
+					    inode, ceph_ino(inode));
+		}
+
 		if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
 			list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
 			ci->i_prealloc_cap_flush = NULL;
@@ -1244,6 +1252,8 @@
 {
 	struct ceph_fs_client *fsc = session->s_mdsc->fsc;
 	struct super_block *sb = fsc->sb;
+	LIST_HEAD(dispose);
+
 	dout("remove_session_caps on %p\n", session);
 	iterate_session_caps(session, remove_session_caps_cb, fsc);
 
@@ -1278,10 +1288,12 @@
 	}
 
 	// drop cap expires and unlock s_cap_lock
-	cleanup_cap_releases(session->s_mdsc, session);
+	detach_cap_releases(session, &dispose);
 
 	BUG_ON(session->s_nr_caps > 0);
 	BUG_ON(!list_empty(&session->s_cap_flushing));
+	spin_unlock(&session->s_cap_lock);
+	dispose_cap_releases(session->s_mdsc, &dispose);
 }
 
 /*
@@ -1462,6 +1474,11 @@
 			goto out;
 		if ((used | wanted) & CEPH_CAP_ANY_WR)
 			goto out;
+		/* Note: it's possible that i_filelock_ref becomes non-zero
+		 * after dropping auth caps. It doesn't hurt because reply
+		 * of lock mds request will re-add auth caps. */
+		if (atomic_read(&ci->i_filelock_ref) > 0)
+			goto out;
 	}
 	/* The inode has cached pages, but it's no longer used.
 	 * we can safely drop it */
@@ -2827,7 +2844,7 @@
 		struct ceph_mds_cap_reconnect v2;
 		struct ceph_mds_cap_reconnect_v1 v1;
 	} rec;
-	struct ceph_inode_info *ci;
+	struct ceph_inode_info *ci = cap->ci;
 	struct ceph_reconnect_state *recon_state = arg;
 	struct ceph_pagelist *pagelist = recon_state->pagelist;
 	char *path;
@@ -2836,8 +2853,6 @@
 	u64 snap_follows;
 	struct dentry *dentry;
 
-	ci = cap->ci;
-
 	dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
 	     inode, ceph_vinop(inode), cap, cap->cap_id,
 	     ceph_cap_string(cap->issued));
@@ -2870,7 +2885,8 @@
 		rec.v2.issued = cpu_to_le32(cap->issued);
 		rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
 		rec.v2.pathbase = cpu_to_le64(pathbase);
-		rec.v2.flock_len = 0;
+		rec.v2.flock_len = (__force __le32)
+			((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
 	} else {
 		rec.v1.cap_id = cpu_to_le64(cap->cap_id);
 		rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
@@ -2894,26 +2910,37 @@
 
 	if (recon_state->msg_version >= 2) {
 		int num_fcntl_locks, num_flock_locks;
-		struct ceph_filelock *flocks;
+		struct ceph_filelock *flocks = NULL;
 		size_t struct_len, total_len = 0;
 		u8 struct_v = 0;
 
 encode_again:
-		ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
-		flocks = kmalloc((num_fcntl_locks+num_flock_locks) *
-				 sizeof(struct ceph_filelock), GFP_NOFS);
-		if (!flocks) {
-			err = -ENOMEM;
-			goto out_free;
+		if (rec.v2.flock_len) {
+			ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
+		} else {
+			num_fcntl_locks = 0;
+			num_flock_locks = 0;
 		}
-		err = ceph_encode_locks_to_buffer(inode, flocks,
-						  num_fcntl_locks,
-						  num_flock_locks);
-		if (err) {
+		if (num_fcntl_locks + num_flock_locks > 0) {
+			flocks = kmalloc((num_fcntl_locks + num_flock_locks) *
+					 sizeof(struct ceph_filelock), GFP_NOFS);
+			if (!flocks) {
+				err = -ENOMEM;
+				goto out_free;
+			}
+			err = ceph_encode_locks_to_buffer(inode, flocks,
+							  num_fcntl_locks,
+							  num_flock_locks);
+			if (err) {
+				kfree(flocks);
+				flocks = NULL;
+				if (err == -ENOSPC)
+					goto encode_again;
+				goto out_free;
+			}
+		} else {
 			kfree(flocks);
-			if (err == -ENOSPC)
-				goto encode_again;
-			goto out_free;
+			flocks = NULL;
 		}
 
 		if (recon_state->msg_version >= 3) {
@@ -2993,6 +3020,7 @@
 	int s_nr_caps;
 	struct ceph_pagelist *pagelist;
 	struct ceph_reconnect_state recon_state;
+	LIST_HEAD(dispose);
 
 	pr_info("mds%d reconnect start\n", mds);
 
@@ -3026,7 +3054,9 @@
 	 */
 	session->s_cap_reconnect = 1;
 	/* drop old cap expires; we're about to reestablish that state */
-	cleanup_cap_releases(mdsc, session);
+	detach_cap_releases(session, &dispose);
+	spin_unlock(&session->s_cap_lock);
+	dispose_cap_releases(mdsc, &dispose);
 
 	/* trim unused caps to reduce MDS's cache rejoin time */
 	if (mdsc->fsc->sb->s_root)
@@ -3857,14 +3887,14 @@
 		goto err_out;
 	}
 	return;
+
 bad:
 	pr_err("error decoding fsmap\n");
 err_out:
 	mutex_lock(&mdsc->mutex);
-	mdsc->mdsmap_err = -ENOENT;
+	mdsc->mdsmap_err = err;
 	__wake_requests(mdsc, &mdsc->waiting_for_map);
 	mutex_unlock(&mdsc->mutex);
-	return;
 }
 
 /*
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index e4082af..fe9fbb3 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -84,8 +84,9 @@
 	buf->f_ffree = -1;
 	buf->f_namelen = NAME_MAX;
 
-	/* leave fsid little-endian, regardless of host endianness */
-	fsid = *(u64 *)(&monmap->fsid) ^ *((u64 *)&monmap->fsid + 1);
+	/* Must convert the fsid, for consistent values across arches */
+	fsid = le64_to_cpu(*(__le64 *)(&monmap->fsid)) ^
+	       le64_to_cpu(*((__le64 *)&monmap->fsid + 1));
 	buf->f_fsid.val[0] = fsid & 0xffffffff;
 	buf->f_fsid.val[1] = fsid >> 32;
 
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 3e27a28..2beeec0 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -352,6 +352,7 @@
 	int i_pin_ref;
 	int i_rd_ref, i_rdcache_ref, i_wr_ref, i_wb_ref;
 	int i_wrbuffer_ref, i_wrbuffer_ref_head;
+	atomic_t i_filelock_ref;
 	u32 i_shared_gen;       /* increment each time we get FILE_SHARED */
 	u32 i_rdcache_gen;      /* incremented each time we get FILE_CACHE. */
 	u32 i_rdcache_revoking; /* RDCACHE gen to async invalidate, if any */
@@ -487,6 +488,8 @@
 #define CEPH_I_KICK_FLUSH	(1 << 9)  /* kick flushing caps */
 #define CEPH_I_FLUSH_SNAPS	(1 << 10) /* need flush snapss */
 #define CEPH_I_ERROR_WRITE	(1 << 11) /* have seen write errors */
+#define CEPH_I_ERROR_FILELOCK	(1 << 12) /* have seen file lock errors */
+
 
 /*
  * We set the ERROR_WRITE bit when we start seeing write errors on an inode
@@ -1011,7 +1014,6 @@
 extern int ceph_locks_to_pagelist(struct ceph_filelock *flocks,
 				  struct ceph_pagelist *pagelist,
 				  int num_fcntl_locks, int num_flock_locks);
-extern int lock_to_ceph_filelock(struct file_lock *fl, struct ceph_filelock *c);
 
 /* debugfs.c */
 extern int ceph_fs_debugfs_init(struct ceph_fs_client *client);
diff --git a/net/ceph/ceph_hash.c b/net/ceph/ceph_hash.c
index 67bb1f1..9a5850f2 100644
--- a/net/ceph/ceph_hash.c
+++ b/net/ceph/ceph_hash.c
@@ -47,28 +47,38 @@
 
 	/* handle the last 11 bytes */
 	c = c + length;
-	switch (len) {            /* all the case statements fall through */
+	switch (len) {
 	case 11:
 		c = c + ((__u32)k[10] << 24);
+		/* fall through */
 	case 10:
 		c = c + ((__u32)k[9] << 16);
+		/* fall through */
 	case 9:
 		c = c + ((__u32)k[8] << 8);
 		/* the first byte of c is reserved for the length */
+		/* fall through */
 	case 8:
 		b = b + ((__u32)k[7] << 24);
+		/* fall through */
 	case 7:
 		b = b + ((__u32)k[6] << 16);
+		/* fall through */
 	case 6:
 		b = b + ((__u32)k[5] << 8);
+		/* fall through */
 	case 5:
 		b = b + k[4];
+		/* fall through */
 	case 4:
 		a = a + ((__u32)k[3] << 24);
+		/* fall through */
 	case 3:
 		a = a + ((__u32)k[2] << 16);
+		/* fall through */
 	case 2:
 		a = a + ((__u32)k[1] << 8);
+		/* fall through */
 	case 1:
 		a = a + k[0];
 		/* case 0: nothing left to add */
diff --git a/net/ceph/crypto.c b/net/ceph/crypto.c
index 489610a..bf9d079 100644
--- a/net/ceph/crypto.c
+++ b/net/ceph/crypto.c
@@ -37,7 +37,9 @@
 		return -ENOTSUPP;
 	}
 
-	WARN_ON(!key->len);
+	if (!key->len)
+		return -EINVAL;
+
 	key->key = kmemdup(buf, key->len, GFP_NOIO);
 	if (!key->key) {
 		ret = -ENOMEM;
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index ad93342..8a4d375 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -430,6 +430,7 @@
 	switch (sk->sk_state) {
 	case TCP_CLOSE:
 		dout("%s TCP_CLOSE\n", __func__);
+		/* fall through */
 	case TCP_CLOSE_WAIT:
 		dout("%s TCP_CLOSE_WAIT\n", __func__);
 		con_sock_state_closing(con);
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 9ae1bab..1547107 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -1279,9 +1279,10 @@
 
 		/*
 		 * Older OSDs don't set reply tid even if the orignal
-		 * request had a non-zero tid.  Workaround this weirdness
-		 * by falling through to the allocate case.
+		 * request had a non-zero tid.  Work around this weirdness
+		 * by allocating a new message.
 		 */
+		/* fall through */
 	case CEPH_MSG_MON_MAP:
 	case CEPH_MSG_MDS_MAP:
 	case CEPH_MSG_OSD_MAP: