Merge branch 'for-linus' of git://ceph.newdream.net/git/ceph-client

* 'for-linus' of git://ceph.newdream.net/git/ceph-client:
  libceph: fix double-free of page vector
  ceph: fix 32-bit ino numbers
  libceph: force resend of osd requests if we skip an osdmap
  ceph: use kernel DNS resolver
  ceph: fix ceph_monc_init memory leak
  ceph: let the set_layout ioctl set single traits
  Revert "ceph: don't truncate dirty pages in invalidate work thread"
  ceph: replace leading spaces with tabs
  libceph: warn on msg allocation failures
  libceph: don't complain on msgpool alloc failures
  libceph: always preallocate mon connection
  libceph: create messenger with client
  ceph: document ioctls
  ceph: implement (optional) max read size
  ceph: rename rsize -> rasize
  ceph: make readpages fully async
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index fe3c324..65cc424 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -260,7 +260,7 @@
 	kref_init(&rbdc->kref);
 	INIT_LIST_HEAD(&rbdc->node);
 
-	rbdc->client = ceph_create_client(opt, rbdc);
+	rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
 	if (IS_ERR(rbdc->client))
 		goto out_rbdc;
 	opt = NULL; /* Now rbdc->client is responsible for opt */
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 5a3953d..4144caf 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -228,36 +228,129 @@
 }
 
 /*
- * Build a vector of contiguous pages from the provided page list.
+ * Finish an async read(ahead) op.
  */
-static struct page **page_vector_from_list(struct list_head *page_list,
-					   unsigned *nr_pages)
+static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
 {
+	struct inode *inode = req->r_inode;
+	struct ceph_osd_reply_head *replyhead;
+	int rc, bytes;
+	int i;
+
+	/* parse reply */
+	replyhead = msg->front.iov_base;
+	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
+	rc = le32_to_cpu(replyhead->result);
+	bytes = le32_to_cpu(msg->hdr.data_len);
+
+	dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
+
+	/* unlock all pages, zeroing any data we didn't read */
+	for (i = 0; i < req->r_num_pages; i++, bytes -= PAGE_CACHE_SIZE) {
+		struct page *page = req->r_pages[i];
+
+		if (bytes < (int)PAGE_CACHE_SIZE) {
+			/* zero (remainder of) page */
+			int s = bytes < 0 ? 0 : bytes;
+			zero_user_segment(page, s, PAGE_CACHE_SIZE);
+		}
+ 		dout("finish_read %p uptodate %p idx %lu\n", inode, page,
+		     page->index);
+		flush_dcache_page(page);
+		SetPageUptodate(page);
+		unlock_page(page);
+		page_cache_release(page);
+	}
+	kfree(req->r_pages);
+}
+
+/*
+ * start an async read(ahead) operation.  return nr_pages we submitted
+ * a read for on success, or negative error code.
+ */
+static int start_read(struct inode *inode, struct list_head *page_list, int max)
+{
+	struct ceph_osd_client *osdc =
+		&ceph_inode_to_client(inode)->client->osdc;
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct page *page = list_entry(page_list->prev, struct page, lru);
+	struct ceph_osd_request *req;
+	u64 off;
+	u64 len;
+	int i;
 	struct page **pages;
-	struct page *page;
-	int next_index, contig_pages = 0;
+	pgoff_t next_index;
+	int nr_pages = 0;
+	int ret;
+
+	off = page->index << PAGE_CACHE_SHIFT;
+
+	/* count pages */
+	next_index = page->index;
+	list_for_each_entry_reverse(page, page_list, lru) {
+		if (page->index != next_index)
+			break;
+		nr_pages++;
+		next_index++;
+		if (max && nr_pages == max)
+			break;
+	}
+	len = nr_pages << PAGE_CACHE_SHIFT;
+	dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
+	     off, len);
+
+	req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode),
+				    off, &len,
+				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
+				    NULL, 0,
+				    ci->i_truncate_seq, ci->i_truncate_size,
+				    NULL, false, 1, 0);
+	if (!req)
+		return -ENOMEM;
 
 	/* build page vector */
-	pages = kmalloc(sizeof(*pages) * *nr_pages, GFP_NOFS);
+	nr_pages = len >> PAGE_CACHE_SHIFT;
+	pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS);
+	ret = -ENOMEM;
 	if (!pages)
-		return ERR_PTR(-ENOMEM);
-
-	BUG_ON(list_empty(page_list));
-	next_index = list_entry(page_list->prev, struct page, lru)->index;
-	list_for_each_entry_reverse(page, page_list, lru) {
-		if (page->index == next_index) {
-			dout("readpages page %d %p\n", contig_pages, page);
-			pages[contig_pages] = page;
-			contig_pages++;
-			next_index++;
-		} else {
-			break;
+		goto out;
+	for (i = 0; i < nr_pages; ++i) {
+		page = list_entry(page_list->prev, struct page, lru);
+		BUG_ON(PageLocked(page));
+		list_del(&page->lru);
+		
+ 		dout("start_read %p adding %p idx %lu\n", inode, page,
+		     page->index);
+		if (add_to_page_cache_lru(page, &inode->i_data, page->index,
+					  GFP_NOFS)) {
+			page_cache_release(page);
+			dout("start_read %p add_to_page_cache failed %p\n",
+			     inode, page);
+			nr_pages = i;
+			goto out_pages;
 		}
+		pages[i] = page;
 	}
-	*nr_pages = contig_pages;
-	return pages;
+	req->r_pages = pages;
+	req->r_num_pages = nr_pages;
+	req->r_callback = finish_read;
+	req->r_inode = inode;
+
+	dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
+	ret = ceph_osdc_start_request(osdc, req, false);
+	if (ret < 0)
+		goto out_pages;
+	ceph_osdc_put_request(req);
+	return nr_pages;
+
+out_pages:
+	ceph_release_page_vector(pages, nr_pages);
+out:
+	ceph_osdc_put_request(req);
+	return ret;
 }
 
+
 /*
  * Read multiple pages.  Leave pages we don't read + unlock in page_list;
  * the caller (VM) cleans them up.
@@ -266,64 +359,24 @@
 			  struct list_head *page_list, unsigned nr_pages)
 {
 	struct inode *inode = file->f_dentry->d_inode;
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_osd_client *osdc =
-		&ceph_inode_to_client(inode)->client->osdc;
+	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 	int rc = 0;
-	struct page **pages;
-	loff_t offset;
-	u64 len;
+	int max = 0;
 
-	dout("readpages %p file %p nr_pages %d\n",
-	     inode, file, nr_pages);
+	if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE)
+		max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1)
+			>> PAGE_SHIFT;
 
-	pages = page_vector_from_list(page_list, &nr_pages);
-	if (IS_ERR(pages))
-		return PTR_ERR(pages);
-
-	/* guess read extent */
-	offset = pages[0]->index << PAGE_CACHE_SHIFT;
-	len = nr_pages << PAGE_CACHE_SHIFT;
-	rc = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
-				 offset, &len,
-				 ci->i_truncate_seq, ci->i_truncate_size,
-				 pages, nr_pages, 0);
-	if (rc == -ENOENT)
-		rc = 0;
-	if (rc < 0)
-		goto out;
-
-	for (; !list_empty(page_list) && len > 0;
-	     rc -= PAGE_CACHE_SIZE, len -= PAGE_CACHE_SIZE) {
-		struct page *page =
-			list_entry(page_list->prev, struct page, lru);
-
-		list_del(&page->lru);
-
-		if (rc < (int)PAGE_CACHE_SIZE) {
-			/* zero (remainder of) page */
-			int s = rc < 0 ? 0 : rc;
-			zero_user_segment(page, s, PAGE_CACHE_SIZE);
-		}
-
-		if (add_to_page_cache_lru(page, mapping, page->index,
-					  GFP_NOFS)) {
-			page_cache_release(page);
-			dout("readpages %p add_to_page_cache failed %p\n",
-			     inode, page);
-			continue;
-		}
-		dout("readpages %p adding %p idx %lu\n", inode, page,
-		     page->index);
-		flush_dcache_page(page);
-		SetPageUptodate(page);
-		unlock_page(page);
-		page_cache_release(page);
+	dout("readpages %p file %p nr_pages %d max %d\n", inode, file, nr_pages,
+	     max);
+	while (!list_empty(page_list)) {
+		rc = start_read(inode, page_list, max);
+		if (rc < 0)
+			goto out;
+		BUG_ON(rc == 0);
 	}
-	rc = 0;
-
 out:
-	kfree(pages);
+	dout("readpages %p file %p ret %d\n", inode, file, rc);
 	return rc;
 }
 
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 8d74ad7..b8731bf 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -945,7 +945,7 @@
 	     seq, issue_seq, mseq, follows, size, max_size,
 	     xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
 
-	msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS);
+	msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS, false);
 	if (!msg)
 		return -ENOMEM;
 
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 095799b..5dde7d51 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -9,7 +9,6 @@
 #include <linux/namei.h>
 #include <linux/writeback.h>
 #include <linux/vmalloc.h>
-#include <linux/pagevec.h>
 
 #include "super.h"
 #include "mds_client.h"
@@ -1364,49 +1363,6 @@
 }
 
 /*
- * invalidate any pages that are not dirty or under writeback.  this
- * includes pages that are clean and mapped.
- */
-static void ceph_invalidate_nondirty_pages(struct address_space *mapping)
-{
-	struct pagevec pvec;
-	pgoff_t next = 0;
-	int i;
-
-	pagevec_init(&pvec, 0);
-	while (pagevec_lookup(&pvec, mapping, next, PAGEVEC_SIZE)) {
-		for (i = 0; i < pagevec_count(&pvec); i++) {
-			struct page *page = pvec.pages[i];
-			pgoff_t index;
-			int skip_page =
-				(PageDirty(page) || PageWriteback(page));
-
-			if (!skip_page)
-				skip_page = !trylock_page(page);
-
-			/*
-			 * We really shouldn't be looking at the ->index of an
-			 * unlocked page.  But we're not allowed to lock these
-			 * pages.  So we rely upon nobody altering the ->index
-			 * of this (pinned-by-us) page.
-			 */
-			index = page->index;
-			if (index > next)
-				next = index;
-			next++;
-
-			if (skip_page)
-				continue;
-
-			generic_error_remove_page(mapping, page);
-			unlock_page(page);
-		}
-		pagevec_release(&pvec);
-		cond_resched();
-	}
-}
-
-/*
  * Invalidate inode pages in a worker thread.  (This can't be done
  * in the message handler context.)
  */
@@ -1429,7 +1385,7 @@
 	orig_gen = ci->i_rdcache_gen;
 	spin_unlock(&inode->i_lock);
 
-	ceph_invalidate_nondirty_pages(inode->i_mapping);
+	truncate_inode_pages(&inode->i_data, 0);
 
 	spin_lock(&inode->i_lock);
 	if (orig_gen == ci->i_rdcache_gen &&
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index 3b256b5..5a14c29 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -42,17 +42,39 @@
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 	struct ceph_mds_request *req;
 	struct ceph_ioctl_layout l;
+	struct ceph_inode_info *ci = ceph_inode(file->f_dentry->d_inode);
+	struct ceph_ioctl_layout nl;
 	int err, i;
 
-	/* copy and validate */
 	if (copy_from_user(&l, arg, sizeof(l)))
 		return -EFAULT;
 
-	if ((l.object_size & ~PAGE_MASK) ||
-	    (l.stripe_unit & ~PAGE_MASK) ||
-	    !l.stripe_unit ||
-	    (l.object_size &&
-	     (unsigned)l.object_size % (unsigned)l.stripe_unit))
+	/* validate changed params against current layout */
+	err = ceph_do_getattr(file->f_dentry->d_inode, CEPH_STAT_CAP_LAYOUT);
+	if (!err) {
+		nl.stripe_unit = ceph_file_layout_su(ci->i_layout);
+		nl.stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
+		nl.object_size = ceph_file_layout_object_size(ci->i_layout);
+		nl.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool);
+		nl.preferred_osd =
+				(s32)le32_to_cpu(ci->i_layout.fl_pg_preferred);
+	} else
+		return err;
+
+	if (l.stripe_count)
+		nl.stripe_count = l.stripe_count;
+	if (l.stripe_unit)
+		nl.stripe_unit = l.stripe_unit;
+	if (l.object_size)
+		nl.object_size = l.object_size;
+	if (l.data_pool)
+		nl.data_pool = l.data_pool;
+	if (l.preferred_osd)
+		nl.preferred_osd = l.preferred_osd;
+
+	if ((nl.object_size & ~PAGE_MASK) ||
+	    (nl.stripe_unit & ~PAGE_MASK) ||
+	    ((unsigned)nl.object_size % (unsigned)nl.stripe_unit))
 		return -EINVAL;
 
 	/* make sure it's a valid data pool */
diff --git a/fs/ceph/ioctl.h b/fs/ceph/ioctl.h
index 0c5167e..be4a604 100644
--- a/fs/ceph/ioctl.h
+++ b/fs/ceph/ioctl.h
@@ -6,7 +6,31 @@
 
 #define CEPH_IOCTL_MAGIC 0x97
 
-/* just use u64 to align sanely on all archs */
+/*
+ * CEPH_IOC_GET_LAYOUT - get file layout or dir layout policy
+ * CEPH_IOC_SET_LAYOUT - set file layout
+ * CEPH_IOC_SET_LAYOUT_POLICY - set dir layout policy
+ *
+ * The file layout specifies how file data is striped over objects in
+ * the distributed object store, which object pool they belong to (if
+ * it differs from the default), and an optional 'preferred osd' to
+ * store them on.
+ *
+ * Files get a new layout based on the policy set on the containing
+ * directory or one of its ancestors.  The GET_LAYOUT ioctl will let
+ * you examine the layout for a file or the policy on a directory.
+ *
+ * SET_LAYOUT will let you set a layout on a newly created file.  This
+ * only works immediately after the file is created and before any
+ * data is written to it.
+ *
+ * SET_LAYOUT_POLICY will let you set a layout policy (default layout)
+ * on a directory that will apply to any new files created in that
+ * directory (or any child directory that doesn't specify a layout of
+ * its own).
+ */
+
+/* use u64 to align sanely on all archs */
 struct ceph_ioctl_layout {
 	__u64 stripe_unit, stripe_count, object_size;
 	__u64 data_pool;
@@ -21,6 +45,8 @@
 				   struct ceph_ioctl_layout)
 
 /*
+ * CEPH_IOC_GET_DATALOC - get location of file data in the cluster
+ *
  * Extract identity, address of the OSD and object storing a given
  * file offset.
  */
@@ -39,7 +65,34 @@
 #define CEPH_IOC_GET_DATALOC _IOWR(CEPH_IOCTL_MAGIC, 3,	\
 				   struct ceph_ioctl_dataloc)
 
+/*
+ * CEPH_IOC_LAZYIO - relax consistency
+ *
+ * Normally Ceph switches to synchronous IO when multiple clients have
+ * the file open (and or more for write).  Reads and writes bypass the
+ * page cache and go directly to the OSD.  Setting this flag on a file
+ * descriptor will allow buffered IO for this file in cases where the
+ * application knows it won't interfere with other nodes (or doesn't
+ * care).
+ */
 #define CEPH_IOC_LAZYIO _IO(CEPH_IOCTL_MAGIC, 4)
+
+/*
+ * CEPH_IOC_SYNCIO - force synchronous IO
+ *
+ * This ioctl sets a file flag that forces the synchronous IO that
+ * bypasses the page cache, even if it is not necessary.  This is
+ * essentially the opposite behavior of IOC_LAZYIO.  This forces the
+ * same read/write path as a file opened by multiple clients when one
+ * or more of those clients is opened for write.
+ *
+ * Note that this type of sync IO takes a different path than a file
+ * opened with O_SYNC/D_SYNC (writes hit the page cache and are
+ * immediately flushed on page boundaries).  It is very similar to
+ * O_DIRECT (writes bypass the page cache) excep that O_DIRECT writes
+ * are not copied (user page must remain stable) and O_DIRECT writes
+ * have alignment restrictions (on the buffer and file offset).
+ */
 #define CEPH_IOC_SYNCIO _IO(CEPH_IOCTL_MAGIC, 5)
 
 #endif
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 86c59e1..1d72f15 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -764,7 +764,8 @@
 	struct ceph_msg *msg;
 	struct ceph_mds_session_head *h;
 
-	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS);
+	msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
+			   false);
 	if (!msg) {
 		pr_err("create_session_msg ENOMEM creating msg\n");
 		return NULL;
@@ -1240,7 +1241,7 @@
 	while (session->s_num_cap_releases < session->s_nr_caps + extra) {
 		spin_unlock(&session->s_cap_lock);
 		msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
-				   GFP_NOFS);
+				   GFP_NOFS, false);
 		if (!msg)
 			goto out_unlocked;
 		dout("add_cap_releases %p msg %p now %d\n", session, msg,
@@ -1652,7 +1653,7 @@
 	if (req->r_old_dentry_drop)
 		len += req->r_old_dentry->d_name.len;
 
-	msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS);
+	msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS, false);
 	if (!msg) {
 		msg = ERR_PTR(-ENOMEM);
 		goto out_free2;
@@ -2518,7 +2519,7 @@
 		goto fail_nopagelist;
 	ceph_pagelist_init(pagelist);
 
-	reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS);
+	reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS, false);
 	if (!reply)
 		goto fail_nomsg;
 
@@ -2831,7 +2832,7 @@
 	dnamelen = dentry->d_name.len;
 	len += dnamelen;
 
-	msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS);
+	msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
 	if (!msg)
 		return;
 	lease = msg->front.iov_base;
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 88bacaf..788f5ad 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -114,6 +114,7 @@
 enum {
 	Opt_wsize,
 	Opt_rsize,
+	Opt_rasize,
 	Opt_caps_wanted_delay_min,
 	Opt_caps_wanted_delay_max,
 	Opt_cap_release_safety,
@@ -136,6 +137,7 @@
 static match_table_t fsopt_tokens = {
 	{Opt_wsize, "wsize=%d"},
 	{Opt_rsize, "rsize=%d"},
+	{Opt_rasize, "rasize=%d"},
 	{Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
 	{Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"},
 	{Opt_cap_release_safety, "cap_release_safety=%d"},
@@ -196,6 +198,9 @@
 	case Opt_rsize:
 		fsopt->rsize = intval;
 		break;
+	case Opt_rasize:
+		fsopt->rasize = intval;
+		break;
 	case Opt_caps_wanted_delay_min:
 		fsopt->caps_wanted_delay_min = intval;
 		break;
@@ -289,28 +294,29 @@
 
 	dout("parse_mount_options %p, dev_name '%s'\n", fsopt, dev_name);
 
-        fsopt->sb_flags = flags;
-        fsopt->flags = CEPH_MOUNT_OPT_DEFAULT;
+	fsopt->sb_flags = flags;
+	fsopt->flags = CEPH_MOUNT_OPT_DEFAULT;
 
-        fsopt->rsize = CEPH_RSIZE_DEFAULT;
-        fsopt->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL);
+	fsopt->rsize = CEPH_RSIZE_DEFAULT;
+	fsopt->rasize = CEPH_RASIZE_DEFAULT;
+	fsopt->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL);
 	fsopt->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT;
 	fsopt->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT;
-        fsopt->cap_release_safety = CEPH_CAP_RELEASE_SAFETY_DEFAULT;
-        fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT;
-        fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
-        fsopt->congestion_kb = default_congestion_kb();
-	
-        /* ip1[:port1][,ip2[:port2]...]:/subdir/in/fs */
-        err = -EINVAL;
-        if (!dev_name)
-                goto out;
-        *path = strstr(dev_name, ":/");
-        if (*path == NULL) {
-                pr_err("device name is missing path (no :/ in %s)\n",
-                       dev_name);
-                goto out;
-        }
+	fsopt->cap_release_safety = CEPH_CAP_RELEASE_SAFETY_DEFAULT;
+	fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT;
+	fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
+	fsopt->congestion_kb = default_congestion_kb();
+
+	/* ip1[:port1][,ip2[:port2]...]:/subdir/in/fs */
+	err = -EINVAL;
+	if (!dev_name)
+		goto out;
+	*path = strstr(dev_name, ":/");
+	if (*path == NULL) {
+		pr_err("device name is missing path (no :/ in %s)\n",
+				dev_name);
+		goto out;
+	}
 	dev_name_end = *path;
 	dout("device name '%.*s'\n", (int)(dev_name_end - dev_name), dev_name);
 
@@ -376,6 +382,8 @@
 		seq_printf(m, ",wsize=%d", fsopt->wsize);
 	if (fsopt->rsize != CEPH_RSIZE_DEFAULT)
 		seq_printf(m, ",rsize=%d", fsopt->rsize);
+	if (fsopt->rasize != CEPH_RASIZE_DEFAULT)
+		seq_printf(m, ",rasize=%d", fsopt->rsize);
 	if (fsopt->congestion_kb != default_congestion_kb())
 		seq_printf(m, ",write_congestion_kb=%d", fsopt->congestion_kb);
 	if (fsopt->caps_wanted_delay_min != CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT)
@@ -422,20 +430,23 @@
 					struct ceph_options *opt)
 {
 	struct ceph_fs_client *fsc;
+	const unsigned supported_features =
+		CEPH_FEATURE_FLOCK |
+		CEPH_FEATURE_DIRLAYOUTHASH;
+	const unsigned required_features = 0;
 	int err = -ENOMEM;
 
 	fsc = kzalloc(sizeof(*fsc), GFP_KERNEL);
 	if (!fsc)
 		return ERR_PTR(-ENOMEM);
 
-	fsc->client = ceph_create_client(opt, fsc);
+	fsc->client = ceph_create_client(opt, fsc, supported_features,
+					 required_features);
 	if (IS_ERR(fsc->client)) {
 		err = PTR_ERR(fsc->client);
 		goto fail;
 	}
 	fsc->client->extra_mon_dispatch = extra_mon_dispatch;
-	fsc->client->supported_features |= CEPH_FEATURE_FLOCK |
-		CEPH_FEATURE_DIRLAYOUTHASH;
 	fsc->client->monc.want_mdsmap = 1;
 
 	fsc->mount_options = fsopt;
@@ -774,10 +785,10 @@
 {
 	int err;
 
-	/* set ra_pages based on rsize mount option? */
-	if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE)
+	/* set ra_pages based on rasize mount option? */
+	if (fsc->mount_options->rasize >= PAGE_CACHE_SIZE)
 		fsc->backing_dev_info.ra_pages =
-			(fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1)
+			(fsc->mount_options->rasize + PAGE_CACHE_SIZE - 1)
 			>> PAGE_SHIFT;
 	else
 		fsc->backing_dev_info.ra_pages =
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index a23eed5..b01442a 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -36,7 +36,8 @@
 #define ceph_test_mount_opt(fsc, opt) \
 	(!!((fsc)->mount_options->flags & CEPH_MOUNT_OPT_##opt))
 
-#define CEPH_RSIZE_DEFAULT             (512*1024) /* readahead */
+#define CEPH_RSIZE_DEFAULT             0           /* max read size */
+#define CEPH_RASIZE_DEFAULT            (8192*1024) /* readahead */
 #define CEPH_MAX_READDIR_DEFAULT        1024
 #define CEPH_MAX_READDIR_BYTES_DEFAULT  (512*1024)
 #define CEPH_SNAPDIRNAME_DEFAULT        ".snap"
@@ -45,8 +46,9 @@
 	int flags;
 	int sb_flags;
 
-	int wsize;
-	int rsize;            /* max readahead */
+	int wsize;            /* max write size */
+	int rsize;            /* max read size */
+	int rasize;           /* max readahead */
 	int congestion_kb;    /* max writeback in flight */
 	int caps_wanted_delay_min, caps_wanted_delay_max;
 	int cap_release_safety;
@@ -344,9 +346,10 @@
  * x86_64+ino32  64                     32
  * x86_64        64                     64
  */
-static inline u32 ceph_ino_to_ino32(ino_t ino)
+static inline u32 ceph_ino_to_ino32(__u64 vino)
 {
-	ino ^= ino >> (sizeof(ino) * 8 - 32);
+	u32 ino = vino & 0xffffffff;
+	ino ^= vino >> 32;
 	if (!ino)
 		ino = 1;
 	return ino;
@@ -357,11 +360,11 @@
  */
 static inline ino_t ceph_vino_to_ino(struct ceph_vino vino)
 {
-	ino_t ino = (ino_t)vino.ino;  /* ^ (vino.snap << 20); */
 #if BITS_PER_LONG == 32
-	ino = ceph_ino_to_ino32(ino);
+	return ceph_ino_to_ino32(vino.ino);
+#else
+	return (ino_t)vino.ino;
 #endif
-	return ino;
 }
 
 /*
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 5637551..95bd850 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -215,7 +215,9 @@
 extern int ceph_compare_options(struct ceph_options *new_opt,
 				struct ceph_client *client);
 extern struct ceph_client *ceph_create_client(struct ceph_options *opt,
-					      void *private);
+					      void *private,
+					      unsigned supported_features,
+					      unsigned required_features);
 extern u64 ceph_client_id(struct ceph_client *client);
 extern void ceph_destroy_client(struct ceph_client *client);
 extern int __ceph_open_session(struct ceph_client *client,
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index ca768ae..ffbeb2c 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -237,7 +237,8 @@
 extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
 extern void ceph_con_put(struct ceph_connection *con);
 
-extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags);
+extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
+				     bool can_fail);
 extern void ceph_msg_kfree(struct ceph_msg *m);
 
 
diff --git a/net/ceph/Kconfig b/net/ceph/Kconfig
index be683f2..cc04dd6 100644
--- a/net/ceph/Kconfig
+++ b/net/ceph/Kconfig
@@ -27,3 +27,17 @@
 
 	  If unsure, say N.
 
+config CEPH_LIB_USE_DNS_RESOLVER
+	bool "Use in-kernel support for DNS lookup"
+	depends on CEPH_LIB
+	select DNS_RESOLVER
+	default n
+	help
+	  If you say Y here, hostnames (e.g. monitor addresses) will
+	  be resolved using the CONFIG_DNS_RESOLVER facility.
+
+	  For information on how to use CONFIG_DNS_RESOLVER consult
+	  Documentation/networking/dns_resolver.txt
+
+	  If unsure, say N.
+
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 2883ea0..97f70e5 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -432,9 +432,12 @@
 /*
  * create a fresh client instance
  */
-struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private)
+struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
+				       unsigned supported_features,
+				       unsigned required_features)
 {
 	struct ceph_client *client;
+	struct ceph_entity_addr *myaddr = NULL;
 	int err = -ENOMEM;
 
 	client = kzalloc(sizeof(*client), GFP_KERNEL);
@@ -449,15 +452,27 @@
 	client->auth_err = 0;
 
 	client->extra_mon_dispatch = NULL;
-	client->supported_features = CEPH_FEATURE_SUPPORTED_DEFAULT;
-	client->required_features = CEPH_FEATURE_REQUIRED_DEFAULT;
+	client->supported_features = CEPH_FEATURE_SUPPORTED_DEFAULT |
+		supported_features;
+	client->required_features = CEPH_FEATURE_REQUIRED_DEFAULT |
+		required_features;
 
-	client->msgr = NULL;
+	/* msgr */
+	if (ceph_test_opt(client, MYIP))
+		myaddr = &client->options->my_addr;
+	client->msgr = ceph_messenger_create(myaddr,
+					     client->supported_features,
+					     client->required_features);
+	if (IS_ERR(client->msgr)) {
+		err = PTR_ERR(client->msgr);
+		goto fail;
+	}
+	client->msgr->nocrc = ceph_test_opt(client, NOCRC);
 
 	/* subsystems */
 	err = ceph_monc_init(&client->monc, client);
 	if (err < 0)
-		goto fail;
+		goto fail_msgr;
 	err = ceph_osdc_init(&client->osdc, client);
 	if (err < 0)
 		goto fail_monc;
@@ -466,6 +481,8 @@
 
 fail_monc:
 	ceph_monc_stop(&client->monc);
+fail_msgr:
+	ceph_messenger_destroy(client->msgr);
 fail:
 	kfree(client);
 	return ERR_PTR(err);
@@ -490,8 +507,7 @@
 
 	ceph_debugfs_client_cleanup(client);
 
-	if (client->msgr)
-		ceph_messenger_destroy(client->msgr);
+	ceph_messenger_destroy(client->msgr);
 
 	ceph_destroy_options(client->options);
 
@@ -514,24 +530,9 @@
  */
 int __ceph_open_session(struct ceph_client *client, unsigned long started)
 {
-	struct ceph_entity_addr *myaddr = NULL;
 	int err;
 	unsigned long timeout = client->options->mount_timeout * HZ;
 
-	/* initialize the messenger */
-	if (client->msgr == NULL) {
-		if (ceph_test_opt(client, MYIP))
-			myaddr = &client->options->my_addr;
-		client->msgr = ceph_messenger_create(myaddr,
-					client->supported_features,
-					client->required_features);
-		if (IS_ERR(client->msgr)) {
-			client->msgr = NULL;
-			return PTR_ERR(client->msgr);
-		}
-		client->msgr->nocrc = ceph_test_opt(client, NOCRC);
-	}
-
 	/* open session, and wait for mon and osd maps */
 	err = ceph_monc_open_session(&client->monc);
 	if (err < 0)
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 9918e9e..f466930 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -11,6 +11,7 @@
 #include <linux/string.h>
 #include <linux/bio.h>
 #include <linux/blkdev.h>
+#include <linux/dns_resolver.h>
 #include <net/tcp.h>
 
 #include <linux/ceph/libceph.h>
@@ -1078,6 +1079,101 @@
 }
 
 /*
+ * Unlike other *_pton function semantics, zero indicates success.
+ */
+static int ceph_pton(const char *str, size_t len, struct sockaddr_storage *ss,
+		char delim, const char **ipend)
+{
+	struct sockaddr_in *in4 = (void *)ss;
+	struct sockaddr_in6 *in6 = (void *)ss;
+
+	memset(ss, 0, sizeof(*ss));
+
+	if (in4_pton(str, len, (u8 *)&in4->sin_addr.s_addr, delim, ipend)) {
+		ss->ss_family = AF_INET;
+		return 0;
+	}
+
+	if (in6_pton(str, len, (u8 *)&in6->sin6_addr.s6_addr, delim, ipend)) {
+		ss->ss_family = AF_INET6;
+		return 0;
+	}
+
+	return -EINVAL;
+}
+
+/*
+ * Extract hostname string and resolve using kernel DNS facility.
+ */
+#ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER
+static int ceph_dns_resolve_name(const char *name, size_t namelen,
+		struct sockaddr_storage *ss, char delim, const char **ipend)
+{
+	const char *end, *delim_p;
+	char *colon_p, *ip_addr = NULL;
+	int ip_len, ret;
+
+	/*
+	 * The end of the hostname occurs immediately preceding the delimiter or
+	 * the port marker (':') where the delimiter takes precedence.
+	 */
+	delim_p = memchr(name, delim, namelen);
+	colon_p = memchr(name, ':', namelen);
+
+	if (delim_p && colon_p)
+		end = delim_p < colon_p ? delim_p : colon_p;
+	else if (!delim_p && colon_p)
+		end = colon_p;
+	else {
+		end = delim_p;
+		if (!end) /* case: hostname:/ */
+			end = name + namelen;
+	}
+
+	if (end <= name)
+		return -EINVAL;
+
+	/* do dns_resolve upcall */
+	ip_len = dns_query(NULL, name, end - name, NULL, &ip_addr, NULL);
+	if (ip_len > 0)
+		ret = ceph_pton(ip_addr, ip_len, ss, -1, NULL);
+	else
+		ret = -ESRCH;
+
+	kfree(ip_addr);
+
+	*ipend = end;
+
+	pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end - name), name,
+			ret, ret ? "failed" : ceph_pr_addr(ss));
+
+	return ret;
+}
+#else
+static inline int ceph_dns_resolve_name(const char *name, size_t namelen,
+		struct sockaddr_storage *ss, char delim, const char **ipend)
+{
+	return -EINVAL;
+}
+#endif
+
+/*
+ * Parse a server name (IP or hostname). If a valid IP address is not found
+ * then try to extract a hostname to resolve using userspace DNS upcall.
+ */
+static int ceph_parse_server_name(const char *name, size_t namelen,
+			struct sockaddr_storage *ss, char delim, const char **ipend)
+{
+	int ret;
+
+	ret = ceph_pton(name, namelen, ss, delim, ipend);
+	if (ret)
+		ret = ceph_dns_resolve_name(name, namelen, ss, delim, ipend);
+
+	return ret;
+}
+
+/*
  * Parse an ip[:port] list into an addr array.  Use the default
  * monitor port if a port isn't specified.
  */
@@ -1085,15 +1181,13 @@
 		   struct ceph_entity_addr *addr,
 		   int max_count, int *count)
 {
-	int i;
+	int i, ret = -EINVAL;
 	const char *p = c;
 
 	dout("parse_ips on '%.*s'\n", (int)(end-c), c);
 	for (i = 0; i < max_count; i++) {
 		const char *ipend;
 		struct sockaddr_storage *ss = &addr[i].in_addr;
-		struct sockaddr_in *in4 = (void *)ss;
-		struct sockaddr_in6 *in6 = (void *)ss;
 		int port;
 		char delim = ',';
 
@@ -1102,15 +1196,11 @@
 			p++;
 		}
 
-		memset(ss, 0, sizeof(*ss));
-		if (in4_pton(p, end - p, (u8 *)&in4->sin_addr.s_addr,
-			     delim, &ipend))
-			ss->ss_family = AF_INET;
-		else if (in6_pton(p, end - p, (u8 *)&in6->sin6_addr.s6_addr,
-				  delim, &ipend))
-			ss->ss_family = AF_INET6;
-		else
+		ret = ceph_parse_server_name(p, end - p, ss, delim, &ipend);
+		if (ret)
 			goto bad;
+		ret = -EINVAL;
+
 		p = ipend;
 
 		if (delim == ']') {
@@ -1155,7 +1245,7 @@
 
 bad:
 	pr_err("parse_ips bad ip '%.*s'\n", (int)(end - c), c);
-	return -EINVAL;
+	return ret;
 }
 EXPORT_SYMBOL(ceph_parse_ips);
 
@@ -2281,7 +2371,8 @@
  * construct a new message with given type, size
  * the new msg has a ref count of 1.
  */
-struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
+struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
+			      bool can_fail)
 {
 	struct ceph_msg *m;
 
@@ -2333,7 +2424,7 @@
 			m->front.iov_base = kmalloc(front_len, flags);
 		}
 		if (m->front.iov_base == NULL) {
-			pr_err("msg_new can't allocate %d bytes\n",
+			dout("ceph_msg_new can't allocate %d bytes\n",
 			     front_len);
 			goto out2;
 		}
@@ -2348,7 +2439,14 @@
 out2:
 	ceph_msg_put(m);
 out:
-	pr_err("msg_new can't create type %d front %d\n", type, front_len);
+	if (!can_fail) {
+		pr_err("msg_new can't create type %d front %d\n", type,
+		       front_len);
+		WARN_ON(1);
+	} else {
+		dout("msg_new can't create type %d front %d\n", type,
+		     front_len);
+	}
 	return NULL;
 }
 EXPORT_SYMBOL(ceph_msg_new);
@@ -2398,7 +2496,7 @@
 	}
 	if (!msg) {
 		*skip = 0;
-		msg = ceph_msg_new(type, front_len, GFP_NOFS);
+		msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
 		if (!msg) {
 			pr_err("unable to allocate msg type %d len %d\n",
 			       type, front_len);
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index cbe31fa..0b62dea 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -116,14 +116,12 @@
  */
 static void __close_session(struct ceph_mon_client *monc)
 {
-	if (monc->con) {
-		dout("__close_session closing mon%d\n", monc->cur_mon);
-		ceph_con_revoke(monc->con, monc->m_auth);
-		ceph_con_close(monc->con);
-		monc->cur_mon = -1;
-		monc->pending_auth = 0;
-		ceph_auth_reset(monc->auth);
-	}
+	dout("__close_session closing mon%d\n", monc->cur_mon);
+	ceph_con_revoke(monc->con, monc->m_auth);
+	ceph_con_close(monc->con);
+	monc->cur_mon = -1;
+	monc->pending_auth = 0;
+	ceph_auth_reset(monc->auth);
 }
 
 /*
@@ -302,15 +300,6 @@
  */
 int ceph_monc_open_session(struct ceph_mon_client *monc)
 {
-	if (!monc->con) {
-		monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
-		if (!monc->con)
-			return -ENOMEM;
-		ceph_con_init(monc->client->msgr, monc->con);
-		monc->con->private = monc;
-		monc->con->ops = &mon_con_ops;
-	}
-
 	mutex_lock(&monc->mutex);
 	__open_session(monc);
 	__schedule_delayed(monc);
@@ -528,10 +517,12 @@
 	init_completion(&req->completion);
 
 	err = -ENOMEM;
-	req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS);
+	req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
+				    true);
 	if (!req->request)
 		goto out;
-	req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS);
+	req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 1024, GFP_NOFS,
+				  true);
 	if (!req->reply)
 		goto out;
 
@@ -626,10 +617,12 @@
 	init_completion(&req->completion);
 
 	err = -ENOMEM;
-	req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS);
+	req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS,
+				    true);
 	if (!req->request)
 		goto out;
-	req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS);
+	req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS,
+				  true);
 	if (!req->reply)
 		goto out;
 
@@ -755,13 +748,21 @@
 	if (err)
 		goto out;
 
-	monc->con = NULL;
+	/* connection */
+	monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
+	if (!monc->con)
+		goto out_monmap;
+	ceph_con_init(monc->client->msgr, monc->con);
+	monc->con->private = monc;
+	monc->con->ops = &mon_con_ops;
 
 	/* authentication */
 	monc->auth = ceph_auth_init(cl->options->name,
 				    cl->options->key);
-	if (IS_ERR(monc->auth))
-		return PTR_ERR(monc->auth);
+	if (IS_ERR(monc->auth)) {
+		err = PTR_ERR(monc->auth);
+		goto out_con;
+	}
 	monc->auth->want_keys =
 		CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
 		CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
@@ -770,19 +771,21 @@
 	err = -ENOMEM;
 	monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
 				     sizeof(struct ceph_mon_subscribe_ack),
-				     GFP_NOFS);
+				     GFP_NOFS, true);
 	if (!monc->m_subscribe_ack)
-		goto out_monmap;
+		goto out_auth;
 
-	monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS);
+	monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 96, GFP_NOFS,
+					 true);
 	if (!monc->m_subscribe)
 		goto out_subscribe_ack;
 
-	monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS);
+	monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096, GFP_NOFS,
+					  true);
 	if (!monc->m_auth_reply)
 		goto out_subscribe;
 
-	monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS);
+	monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_NOFS, true);
 	monc->pending_auth = 0;
 	if (!monc->m_auth)
 		goto out_auth_reply;
@@ -808,6 +811,10 @@
 	ceph_msg_put(monc->m_subscribe);
 out_subscribe_ack:
 	ceph_msg_put(monc->m_subscribe_ack);
+out_auth:
+	ceph_auth_destroy(monc->auth);
+out_con:
+	monc->con->ops->put(monc->con);
 out_monmap:
 	kfree(monc->monmap);
 out:
@@ -822,11 +829,11 @@
 
 	mutex_lock(&monc->mutex);
 	__close_session(monc);
-	if (monc->con) {
-		monc->con->private = NULL;
-		monc->con->ops->put(monc->con);
-		monc->con = NULL;
-	}
+
+	monc->con->private = NULL;
+	monc->con->ops->put(monc->con);
+	monc->con = NULL;
+
 	mutex_unlock(&monc->mutex);
 
 	ceph_auth_destroy(monc->auth);
@@ -973,7 +980,7 @@
 	case CEPH_MSG_MON_MAP:
 	case CEPH_MSG_MDS_MAP:
 	case CEPH_MSG_OSD_MAP:
-		m = ceph_msg_new(type, front_len, GFP_NOFS);
+		m = ceph_msg_new(type, front_len, GFP_NOFS, false);
 		break;
 	}
 
@@ -1000,7 +1007,7 @@
 	if (!con->private)
 		goto out;
 
-	if (monc->con && !monc->hunting)
+	if (!monc->hunting)
 		pr_info("mon%d %s session lost, "
 			"hunting for new mon\n", monc->cur_mon,
 			ceph_pr_addr(&monc->con->peer_addr.in_addr));
diff --git a/net/ceph/msgpool.c b/net/ceph/msgpool.c
index 1f4cb30..11d5f41 100644
--- a/net/ceph/msgpool.c
+++ b/net/ceph/msgpool.c
@@ -12,7 +12,7 @@
 	struct ceph_msgpool *pool = arg;
 	struct ceph_msg *msg;
 
-	msg = ceph_msg_new(0, pool->front_len, gfp_mask);
+	msg = ceph_msg_new(0, pool->front_len, gfp_mask, true);
 	if (!msg) {
 		dout("msgpool_alloc %s failed\n", pool->name);
 	} else {
@@ -61,7 +61,7 @@
 		WARN_ON(1);
 
 		/* try to alloc a fresh message */
-		return ceph_msg_new(0, front_len, GFP_NOFS);
+		return ceph_msg_new(0, front_len, GFP_NOFS, false);
 	}
 
 	msg = mempool_alloc(pool->pool, GFP_NOFS);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 88ad8a2..733e4600 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -227,7 +227,7 @@
 		msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
 	else
 		msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
-				   OSD_OPREPLY_FRONT_LEN, gfp_flags);
+				   OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
 	if (!msg) {
 		ceph_osdc_put_request(req);
 		return NULL;
@@ -250,7 +250,7 @@
 	if (use_mempool)
 		msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
 	else
-		msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
+		msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true);
 	if (!msg) {
 		ceph_osdc_put_request(req);
 		return NULL;
@@ -943,7 +943,7 @@
  * Caller should hold map_sem for read and request_mutex.
  */
 static int __map_request(struct ceph_osd_client *osdc,
-			 struct ceph_osd_request *req)
+			 struct ceph_osd_request *req, int force_resend)
 {
 	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
 	struct ceph_pg pgid;
@@ -967,7 +967,8 @@
 		num = err;
 	}
 
-	if ((req->r_osd && req->r_osd->o_osd == o &&
+	if ((!force_resend &&
+	     req->r_osd && req->r_osd->o_osd == o &&
 	     req->r_sent >= req->r_osd->o_incarnation &&
 	     req->r_num_pg_osds == num &&
 	     memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
@@ -1289,18 +1290,18 @@
  *
  * Caller should hold map_sem for read and request_mutex.
  */
-static void kick_requests(struct ceph_osd_client *osdc)
+static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
 {
 	struct ceph_osd_request *req, *nreq;
 	struct rb_node *p;
 	int needmap = 0;
 	int err;
 
-	dout("kick_requests\n");
+	dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
 	mutex_lock(&osdc->request_mutex);
 	for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
 		req = rb_entry(p, struct ceph_osd_request, r_node);
-		err = __map_request(osdc, req);
+		err = __map_request(osdc, req, force_resend);
 		if (err < 0)
 			continue;  /* error */
 		if (req->r_osd == NULL) {
@@ -1318,7 +1319,7 @@
 				 r_linger_item) {
 		dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
 
-		err = __map_request(osdc, req);
+		err = __map_request(osdc, req, force_resend);
 		if (err == 0)
 			continue;  /* no change and no osd was specified */
 		if (err < 0)
@@ -1395,7 +1396,7 @@
 				ceph_osdmap_destroy(osdc->osdmap);
 				osdc->osdmap = newmap;
 			}
-			kick_requests(osdc);
+			kick_requests(osdc, 0);
 			reset_changed_osds(osdc);
 		} else {
 			dout("ignoring incremental map %u len %d\n",
@@ -1423,6 +1424,8 @@
 			     "older than our %u\n", epoch, maplen,
 			     osdc->osdmap->epoch);
 		} else {
+			int skipped_map = 0;
+
 			dout("taking full map %u len %d\n", epoch, maplen);
 			newmap = osdmap_decode(&p, p+maplen);
 			if (IS_ERR(newmap)) {
@@ -1432,9 +1435,12 @@
 			BUG_ON(!newmap);
 			oldmap = osdc->osdmap;
 			osdc->osdmap = newmap;
-			if (oldmap)
+			if (oldmap) {
+				if (oldmap->epoch + 1 < newmap->epoch)
+					skipped_map = 1;
 				ceph_osdmap_destroy(oldmap);
-			kick_requests(osdc);
+			}
+			kick_requests(osdc, skipped_map);
 		}
 		p += maplen;
 		nr_maps--;
@@ -1707,7 +1713,7 @@
 	 * the request still han't been touched yet.
 	 */
 	if (req->r_sent == 0) {
-		rc = __map_request(osdc, req);
+		rc = __map_request(osdc, req, 0);
 		if (rc < 0) {
 			if (nofail) {
 				dout("osdc_start_request failed map, "
@@ -2032,7 +2038,7 @@
 	if (front > req->r_reply->front.iov_len) {
 		pr_warning("get_reply front %d > preallocated %d\n",
 			   front, (int)req->r_reply->front.iov_len);
-		m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS);
+		m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false);
 		if (!m)
 			goto out;
 		ceph_msg_put(req->r_reply);
@@ -2080,7 +2086,7 @@
 	switch (type) {
 	case CEPH_MSG_OSD_MAP:
 	case CEPH_MSG_WATCH_NOTIFY:
-		return ceph_msg_new(type, front, GFP_NOFS);
+		return ceph_msg_new(type, front, GFP_NOFS, false);
 	case CEPH_MSG_OSD_OPREPLY:
 		return get_reply(con, hdr, skip);
 	default: