diff mbox

[net-next,05/13] RDS: IB: Re-organise ibmr code

Message ID 1455967804-27644-6-git-send-email-santosh.shilimkar@oracle.com
State Changes Requested, archived
Delegated to: David Miller
Headers show

Commit Message

Santosh Shilimkar Feb. 20, 2016, 11:29 a.m. UTC
No functional changes. This is in preperation towards adding
fastreg memory resgitration support.

Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org>
Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
---
 net/rds/Makefile  |   2 +-
 net/rds/ib.c      |  37 +++---
 net/rds/ib.h      |  25 +---
 net/rds/ib_fmr.c  | 217 +++++++++++++++++++++++++++++++
 net/rds/ib_mr.h   | 109 ++++++++++++++++
 net/rds/ib_rdma.c | 379 +++++++++++-------------------------------------------
 6 files changed, 422 insertions(+), 347 deletions(-)
 create mode 100644 net/rds/ib_fmr.c
 create mode 100644 net/rds/ib_mr.h
diff mbox

Patch

diff --git a/net/rds/Makefile b/net/rds/Makefile
index 19e5485..bcf5591 100644
--- a/net/rds/Makefile
+++ b/net/rds/Makefile
@@ -6,7 +6,7 @@  rds-y :=	af_rds.o bind.o cong.o connection.o info.o message.o   \
 obj-$(CONFIG_RDS_RDMA) += rds_rdma.o
 rds_rdma-y :=	rdma_transport.o \
 			ib.o ib_cm.o ib_recv.o ib_ring.o ib_send.o ib_stats.o \
-			ib_sysctl.o ib_rdma.o
+			ib_sysctl.o ib_rdma.o ib_fmr.o
 
 
 obj-$(CONFIG_RDS_TCP) += rds_tcp.o
diff --git a/net/rds/ib.c b/net/rds/ib.c
index 9481d55..bb32cb9 100644
--- a/net/rds/ib.c
+++ b/net/rds/ib.c
@@ -42,15 +42,16 @@ 
 
 #include "rds.h"
 #include "ib.h"
+#include "ib_mr.h"
 
-unsigned int rds_ib_fmr_1m_pool_size = RDS_FMR_1M_POOL_SIZE;
-unsigned int rds_ib_fmr_8k_pool_size = RDS_FMR_8K_POOL_SIZE;
+unsigned int rds_ib_mr_1m_pool_size = RDS_MR_1M_POOL_SIZE;
+unsigned int rds_ib_mr_8k_pool_size = RDS_MR_8K_POOL_SIZE;
 unsigned int rds_ib_retry_count = RDS_IB_DEFAULT_RETRY_COUNT;
 
-module_param(rds_ib_fmr_1m_pool_size, int, 0444);
-MODULE_PARM_DESC(rds_ib_fmr_1m_pool_size, " Max number of 1M fmr per HCA");
-module_param(rds_ib_fmr_8k_pool_size, int, 0444);
-MODULE_PARM_DESC(rds_ib_fmr_8k_pool_size, " Max number of 8K fmr per HCA");
+module_param(rds_ib_mr_1m_pool_size, int, 0444);
+MODULE_PARM_DESC(rds_ib_mr_1m_pool_size, " Max number of 1M mr per HCA");
+module_param(rds_ib_mr_8k_pool_size, int, 0444);
+MODULE_PARM_DESC(rds_ib_mr_8k_pool_size, " Max number of 8K mr per HCA");
 module_param(rds_ib_retry_count, int, 0444);
 MODULE_PARM_DESC(rds_ib_retry_count, " Number of hw retries before reporting an error");
 
@@ -140,13 +141,13 @@  static void rds_ib_add_one(struct ib_device *device)
 	rds_ibdev->max_sge = min(device->attrs.max_sge, RDS_IB_MAX_SGE);
 
 	rds_ibdev->fmr_max_remaps = device->attrs.max_map_per_fmr?: 32;
-	rds_ibdev->max_1m_fmrs = device->attrs.max_mr ?
+	rds_ibdev->max_1m_mrs = device->attrs.max_mr ?
 		min_t(unsigned int, (device->attrs.max_mr / 2),
-		      rds_ib_fmr_1m_pool_size) : rds_ib_fmr_1m_pool_size;
+		      rds_ib_mr_1m_pool_size) : rds_ib_mr_1m_pool_size;
 
-	rds_ibdev->max_8k_fmrs = device->attrs.max_mr ?
+	rds_ibdev->max_8k_mrs = device->attrs.max_mr ?
 		min_t(unsigned int, ((device->attrs.max_mr / 2) * RDS_MR_8K_SCALE),
-		      rds_ib_fmr_8k_pool_size) : rds_ib_fmr_8k_pool_size;
+		      rds_ib_mr_8k_pool_size) : rds_ib_mr_8k_pool_size;
 
 	rds_ibdev->max_initiator_depth = device->attrs.max_qp_init_rd_atom;
 	rds_ibdev->max_responder_resources = device->attrs.max_qp_rd_atom;
@@ -172,10 +173,10 @@  static void rds_ib_add_one(struct ib_device *device)
 		goto put_dev;
 	}
 
-	rdsdebug("RDS/IB: max_mr = %d, max_wrs = %d, max_sge = %d, fmr_max_remaps = %d, max_1m_fmrs = %d, max_8k_fmrs = %d\n",
+	rdsdebug("RDS/IB: max_mr = %d, max_wrs = %d, max_sge = %d, fmr_max_remaps = %d, max_1m_mrs = %d, max_8k_mrs = %d\n",
 		 device->attrs.max_fmr, rds_ibdev->max_wrs, rds_ibdev->max_sge,
-		 rds_ibdev->fmr_max_remaps, rds_ibdev->max_1m_fmrs,
-		 rds_ibdev->max_8k_fmrs);
+		 rds_ibdev->fmr_max_remaps, rds_ibdev->max_1m_mrs,
+		 rds_ibdev->max_8k_mrs);
 
 	INIT_LIST_HEAD(&rds_ibdev->ipaddr_list);
 	INIT_LIST_HEAD(&rds_ibdev->conn_list);
@@ -364,7 +365,7 @@  void rds_ib_exit(void)
 	rds_ib_sysctl_exit();
 	rds_ib_recv_exit();
 	rds_trans_unregister(&rds_ib_transport);
-	rds_ib_fmr_exit();
+	rds_ib_mr_exit();
 }
 
 struct rds_transport rds_ib_transport = {
@@ -400,13 +401,13 @@  int rds_ib_init(void)
 
 	INIT_LIST_HEAD(&rds_ib_devices);
 
-	ret = rds_ib_fmr_init();
+	ret = rds_ib_mr_init();
 	if (ret)
 		goto out;
 
 	ret = ib_register_client(&rds_ib_client);
 	if (ret)
-		goto out_fmr_exit;
+		goto out_mr_exit;
 
 	ret = rds_ib_sysctl_init();
 	if (ret)
@@ -430,8 +431,8 @@  out_sysctl:
 	rds_ib_sysctl_exit();
 out_ibreg:
 	rds_ib_unregister_client();
-out_fmr_exit:
-	rds_ib_fmr_exit();
+out_mr_exit:
+	rds_ib_mr_exit();
 out:
 	return ret;
 }
diff --git a/net/rds/ib.h b/net/rds/ib.h
index 09cd8e3..c88cb22 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -9,12 +9,6 @@ 
 #include "rds.h"
 #include "rdma_transport.h"
 
-#define RDS_FMR_1M_POOL_SIZE		(8192 / 2)
-#define RDS_FMR_1M_MSG_SIZE		256
-#define RDS_FMR_8K_MSG_SIZE		2
-#define RDS_MR_8K_SCALE			(256 / (RDS_FMR_8K_MSG_SIZE + 1))
-#define RDS_FMR_8K_POOL_SIZE		(RDS_MR_8K_SCALE * (8192 / 2))
-
 #define RDS_IB_MAX_SGE			8
 #define RDS_IB_RECV_SGE 		2
 
@@ -206,12 +200,12 @@  struct rds_ib_device {
 	struct list_head	conn_list;
 	struct ib_device	*dev;
 	struct ib_pd		*pd;
-	unsigned int		max_fmrs;
+	unsigned int		max_mrs;
 	struct rds_ib_mr_pool	*mr_1m_pool;
 	struct rds_ib_mr_pool   *mr_8k_pool;
 	unsigned int		fmr_max_remaps;
-	unsigned int		max_8k_fmrs;
-	unsigned int		max_1m_fmrs;
+	unsigned int		max_8k_mrs;
+	unsigned int		max_1m_mrs;
 	int			max_sge;
 	unsigned int		max_wrs;
 	unsigned int		max_initiator_depth;
@@ -316,8 +310,6 @@  struct rds_ib_device *rds_ib_get_client_data(struct ib_device *device);
 void rds_ib_dev_put(struct rds_ib_device *rds_ibdev);
 extern struct ib_client rds_ib_client;
 
-extern unsigned int rds_ib_fmr_1m_pool_size;
-extern unsigned int rds_ib_fmr_8k_pool_size;
 extern unsigned int rds_ib_retry_count;
 
 extern spinlock_t ib_nodev_conns_lock;
@@ -347,17 +339,6 @@  int rds_ib_update_ipaddr(struct rds_ib_device *rds_ibdev, __be32 ipaddr);
 void rds_ib_add_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn);
 void rds_ib_remove_conn(struct rds_ib_device *rds_ibdev, struct rds_connection *conn);
 void rds_ib_destroy_nodev_conns(void);
-struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_dev,
-					     int npages);
-void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_connection *iinfo);
-void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *);
-void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
-		    struct rds_sock *rs, u32 *key_ret);
-void rds_ib_sync_mr(void *trans_private, int dir);
-void rds_ib_free_mr(void *trans_private, int invalidate);
-void rds_ib_flush_mrs(void);
-int rds_ib_fmr_init(void);
-void rds_ib_fmr_exit(void);
 
 /* ib_recv.c */
 int rds_ib_recv_init(void);
diff --git a/net/rds/ib_fmr.c b/net/rds/ib_fmr.c
new file mode 100644
index 0000000..d4f200d
--- /dev/null
+++ b/net/rds/ib_fmr.c
@@ -0,0 +1,217 @@ 
+/*
+ * Copyright (c) 2016 Oracle.  All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include "ib_mr.h"
+
+struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev, int npages)
+{
+	struct rds_ib_mr_pool *pool;
+	struct rds_ib_mr *ibmr = NULL;
+	int err = 0, iter = 0;
+
+	if (npages <= RDS_MR_8K_MSG_SIZE)
+		pool = rds_ibdev->mr_8k_pool;
+	else
+		pool = rds_ibdev->mr_1m_pool;
+
+	if (atomic_read(&pool->dirty_count) >= pool->max_items / 10)
+		queue_delayed_work(rds_ib_mr_wq, &pool->flush_worker, 10);
+
+	/* Switch pools if one of the pool is reaching upper limit */
+	if (atomic_read(&pool->dirty_count) >=  pool->max_items * 9 / 10) {
+		if (pool->pool_type == RDS_IB_MR_8K_POOL)
+			pool = rds_ibdev->mr_1m_pool;
+		else
+			pool = rds_ibdev->mr_8k_pool;
+	}
+
+	while (1) {
+		ibmr = rds_ib_reuse_mr(pool);
+		if (ibmr)
+			return ibmr;
+
+		/* No clean MRs - now we have the choice of either
+		 * allocating a fresh MR up to the limit imposed by the
+		 * driver, or flush any dirty unused MRs.
+		 * We try to avoid stalling in the send path if possible,
+		 * so we allocate as long as we're allowed to.
+		 *
+		 * We're fussy with enforcing the FMR limit, though. If the
+		 * driver tells us we can't use more than N fmrs, we shouldn't
+		 * start arguing with it
+		 */
+		if (atomic_inc_return(&pool->item_count) <= pool->max_items)
+			break;
+
+		atomic_dec(&pool->item_count);
+
+		if (++iter > 2) {
+			if (pool->pool_type == RDS_IB_MR_8K_POOL)
+				rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_depleted);
+			else
+				rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_depleted);
+			return ERR_PTR(-EAGAIN);
+		}
+
+		/* We do have some empty MRs. Flush them out. */
+		if (pool->pool_type == RDS_IB_MR_8K_POOL)
+			rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_wait);
+		else
+			rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_wait);
+		rds_ib_flush_mr_pool(pool, 0, &ibmr);
+		if (ibmr)
+			return ibmr;
+	}
+
+	ibmr = kzalloc_node(sizeof(*ibmr), GFP_KERNEL,
+			    rdsibdev_to_node(rds_ibdev));
+	if (!ibmr) {
+		err = -ENOMEM;
+		goto out_no_cigar;
+	}
+
+	ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd,
+			(IB_ACCESS_LOCAL_WRITE |
+			 IB_ACCESS_REMOTE_READ |
+			 IB_ACCESS_REMOTE_WRITE |
+			 IB_ACCESS_REMOTE_ATOMIC),
+			&pool->fmr_attr);
+	if (IS_ERR(ibmr->fmr)) {
+		err = PTR_ERR(ibmr->fmr);
+		ibmr->fmr = NULL;
+		pr_warn("RDS/IB: %s failed (err=%d)\n", __func__, err);
+		goto out_no_cigar;
+	}
+
+	ibmr->pool = pool;
+	if (pool->pool_type == RDS_IB_MR_8K_POOL)
+		rds_ib_stats_inc(s_ib_rdma_mr_8k_alloc);
+	else
+		rds_ib_stats_inc(s_ib_rdma_mr_1m_alloc);
+
+	return ibmr;
+
+out_no_cigar:
+	if (ibmr) {
+		if (ibmr->fmr)
+			ib_dealloc_fmr(ibmr->fmr);
+		kfree(ibmr);
+	}
+	atomic_dec(&pool->item_count);
+	return ERR_PTR(err);
+}
+
+int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibmr,
+		   struct scatterlist *sg, unsigned int nents)
+{
+	struct ib_device *dev = rds_ibdev->dev;
+	struct scatterlist *scat = sg;
+	u64 io_addr = 0;
+	u64 *dma_pages;
+	u32 len;
+	int page_cnt, sg_dma_len;
+	int i, j;
+	int ret;
+
+	sg_dma_len = ib_dma_map_sg(dev, sg, nents, DMA_BIDIRECTIONAL);
+	if (unlikely(!sg_dma_len)) {
+		pr_warn("RDS/IB: %s failed!\n", __func__);
+		return -EBUSY;
+	}
+
+	len = 0;
+	page_cnt = 0;
+
+	for (i = 0; i < sg_dma_len; ++i) {
+		unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
+		u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
+
+		if (dma_addr & ~PAGE_MASK) {
+			if (i > 0)
+				return -EINVAL;
+			else
+				++page_cnt;
+		}
+		if ((dma_addr + dma_len) & ~PAGE_MASK) {
+			if (i < sg_dma_len - 1)
+				return -EINVAL;
+			else
+				++page_cnt;
+		}
+
+		len += dma_len;
+	}
+
+	page_cnt += len >> PAGE_SHIFT;
+	if (page_cnt > ibmr->pool->fmr_attr.max_pages)
+		return -EINVAL;
+
+	dma_pages = kmalloc_node(sizeof(u64) * page_cnt, GFP_ATOMIC,
+				 rdsibdev_to_node(rds_ibdev));
+	if (!dma_pages)
+		return -ENOMEM;
+
+	page_cnt = 0;
+	for (i = 0; i < sg_dma_len; ++i) {
+		unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
+		u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
+
+		for (j = 0; j < dma_len; j += PAGE_SIZE)
+			dma_pages[page_cnt++] =
+				(dma_addr & PAGE_MASK) + j;
+	}
+
+	ret = ib_map_phys_fmr(ibmr->fmr, dma_pages, page_cnt, io_addr);
+	if (ret)
+		goto out;
+
+	/* Success - we successfully remapped the MR, so we can
+	 * safely tear down the old mapping.
+	 */
+	rds_ib_teardown_mr(ibmr);
+
+	ibmr->sg = scat;
+	ibmr->sg_len = nents;
+	ibmr->sg_dma_len = sg_dma_len;
+	ibmr->remap_count++;
+
+	if (ibmr->pool->pool_type == RDS_IB_MR_8K_POOL)
+		rds_ib_stats_inc(s_ib_rdma_mr_8k_used);
+	else
+		rds_ib_stats_inc(s_ib_rdma_mr_1m_used);
+	ret = 0;
+
+out:
+	kfree(dma_pages);
+
+	return ret;
+}
diff --git a/net/rds/ib_mr.h b/net/rds/ib_mr.h
new file mode 100644
index 0000000..d88724f
--- /dev/null
+++ b/net/rds/ib_mr.h
@@ -0,0 +1,109 @@ 
+/*
+ * Copyright (c) 2016 Oracle.  All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+#ifndef _RDS_IB_MR_H
+#define _RDS_IB_MR_H
+
+#include <linux/kernel.h>
+
+#include "rds.h"
+#include "ib.h"
+
+#define RDS_MR_1M_POOL_SIZE		(8192 / 2)
+#define RDS_MR_1M_MSG_SIZE		256
+#define RDS_MR_8K_MSG_SIZE		2
+#define RDS_MR_8K_SCALE			(256 / (RDS_MR_8K_MSG_SIZE + 1))
+#define RDS_MR_8K_POOL_SIZE		(RDS_MR_8K_SCALE * (8192 / 2))
+
+/* This is stored as mr->r_trans_private. */
+struct rds_ib_mr {
+	struct rds_ib_device	*device;
+	struct rds_ib_mr_pool	*pool;
+	struct ib_fmr		*fmr;
+
+	struct llist_node	llnode;
+
+	/* unmap_list is for freeing */
+	struct list_head	unmap_list;
+	unsigned int		remap_count;
+
+	struct scatterlist	*sg;
+	unsigned int		sg_len;
+	u64			*dma;
+	int			sg_dma_len;
+};
+
+/* Our own little MR pool */
+struct rds_ib_mr_pool {
+	unsigned int            pool_type;
+	struct mutex		flush_lock;	/* serialize fmr invalidate */
+	struct delayed_work	flush_worker;	/* flush worker */
+
+	atomic_t		item_count;	/* total # of MRs */
+	atomic_t		dirty_count;	/* # dirty of MRs */
+
+	struct llist_head	drop_list;	/* MRs not reached max_maps */
+	struct llist_head	free_list;	/* unused MRs */
+	struct llist_head	clean_list;	/* unused & unmapped MRs */
+	wait_queue_head_t	flush_wait;
+
+	atomic_t		free_pinned;	/* memory pinned by free MRs */
+	unsigned long		max_items;
+	unsigned long		max_items_soft;
+	unsigned long		max_free_pinned;
+	struct ib_fmr_attr	fmr_attr;
+};
+
+extern struct workqueue_struct *rds_ib_mr_wq;
+extern unsigned int rds_ib_mr_1m_pool_size;
+extern unsigned int rds_ib_mr_8k_pool_size;
+
+struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_dev,
+					     int npages);
+void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev,
+			struct rds_info_rdma_connection *iinfo);
+void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *);
+void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
+		    struct rds_sock *rs, u32 *key_ret);
+void rds_ib_sync_mr(void *trans_private, int dir);
+void rds_ib_free_mr(void *trans_private, int invalidate);
+void rds_ib_flush_mrs(void);
+int rds_ib_mr_init(void);
+void rds_ib_mr_exit(void);
+
+void __rds_ib_teardown_mr(struct rds_ib_mr *);
+void rds_ib_teardown_mr(struct rds_ib_mr *);
+struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *, int);
+int rds_ib_map_fmr(struct rds_ib_device *, struct rds_ib_mr *,
+		   struct scatterlist *, unsigned int);
+struct rds_ib_mr *rds_ib_reuse_mr(struct rds_ib_mr_pool *);
+int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *, int, struct rds_ib_mr **);
+#endif
diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index a234074..c594519 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -35,78 +35,13 @@ 
 #include <linux/rculist.h>
 #include <linux/llist.h>
 
-#include "rds.h"
-#include "ib.h"
+#include "ib_mr.h"
+
+struct workqueue_struct *rds_ib_mr_wq;
 
 static DEFINE_PER_CPU(unsigned long, clean_list_grace);
 #define CLEAN_LIST_BUSY_BIT 0
 
-/*
- * This is stored as mr->r_trans_private.
- */
-struct rds_ib_mr {
-	struct rds_ib_device	*device;
-	struct rds_ib_mr_pool	*pool;
-	struct ib_fmr		*fmr;
-
-	struct llist_node	llnode;
-
-	/* unmap_list is for freeing */
-	struct list_head	unmap_list;
-	unsigned int		remap_count;
-
-	struct scatterlist	*sg;
-	unsigned int		sg_len;
-	u64			*dma;
-	int			sg_dma_len;
-};
-
-/*
- * Our own little FMR pool
- */
-struct rds_ib_mr_pool {
-	unsigned int            pool_type;
-	struct mutex		flush_lock;		/* serialize fmr invalidate */
-	struct delayed_work	flush_worker;		/* flush worker */
-
-	atomic_t		item_count;		/* total # of MRs */
-	atomic_t		dirty_count;		/* # dirty of MRs */
-
-	struct llist_head	drop_list;		/* MRs that have reached their max_maps limit */
-	struct llist_head	free_list;		/* unused MRs */
-	struct llist_head	clean_list;		/* global unused & unamapped MRs */
-	wait_queue_head_t	flush_wait;
-
-	atomic_t		free_pinned;		/* memory pinned by free MRs */
-	unsigned long		max_items;
-	unsigned long		max_items_soft;
-	unsigned long		max_free_pinned;
-	struct ib_fmr_attr	fmr_attr;
-};
-
-static struct workqueue_struct *rds_ib_fmr_wq;
-
-int rds_ib_fmr_init(void)
-{
-	rds_ib_fmr_wq = create_workqueue("rds_fmr_flushd");
-	if (!rds_ib_fmr_wq)
-		return -ENOMEM;
-	return 0;
-}
-
-/* By the time this is called all the IB devices should have been torn down and
- * had their pools freed.  As each pool is freed its work struct is waited on,
- * so the pool flushing work queue should be idle by the time we get here.
- */
-void rds_ib_fmr_exit(void)
-{
-	destroy_workqueue(rds_ib_fmr_wq);
-}
-
-static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool, int free_all, struct rds_ib_mr **);
-static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr);
-static void rds_ib_mr_pool_flush_worker(struct work_struct *work);
-
 static struct rds_ib_device *rds_ib_get_device(__be32 ipaddr)
 {
 	struct rds_ib_device *rds_ibdev;
@@ -235,41 +170,6 @@  void rds_ib_destroy_nodev_conns(void)
 		rds_conn_destroy(ic->conn);
 }
 
-struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev,
-					     int pool_type)
-{
-	struct rds_ib_mr_pool *pool;
-
-	pool = kzalloc(sizeof(*pool), GFP_KERNEL);
-	if (!pool)
-		return ERR_PTR(-ENOMEM);
-
-	pool->pool_type = pool_type;
-	init_llist_head(&pool->free_list);
-	init_llist_head(&pool->drop_list);
-	init_llist_head(&pool->clean_list);
-	mutex_init(&pool->flush_lock);
-	init_waitqueue_head(&pool->flush_wait);
-	INIT_DELAYED_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
-
-	if (pool_type == RDS_IB_MR_1M_POOL) {
-		/* +1 allows for unaligned MRs */
-		pool->fmr_attr.max_pages = RDS_FMR_1M_MSG_SIZE + 1;
-		pool->max_items = RDS_FMR_1M_POOL_SIZE;
-	} else {
-		/* pool_type == RDS_IB_MR_8K_POOL */
-		pool->fmr_attr.max_pages = RDS_FMR_8K_MSG_SIZE + 1;
-		pool->max_items = RDS_FMR_8K_POOL_SIZE;
-	}
-
-	pool->max_free_pinned = pool->max_items * pool->fmr_attr.max_pages / 4;
-	pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
-	pool->fmr_attr.page_shift = PAGE_SHIFT;
-	pool->max_items_soft = rds_ibdev->max_fmrs * 3 / 4;
-
-	return pool;
-}
-
 void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_connection *iinfo)
 {
 	struct rds_ib_mr_pool *pool_1m = rds_ibdev->mr_1m_pool;
@@ -278,16 +178,7 @@  void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_co
 	iinfo->rdma_mr_size = pool_1m->fmr_attr.max_pages;
 }
 
-void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool)
-{
-	cancel_delayed_work_sync(&pool->flush_worker);
-	rds_ib_flush_mr_pool(pool, 1, NULL);
-	WARN_ON(atomic_read(&pool->item_count));
-	WARN_ON(atomic_read(&pool->free_pinned));
-	kfree(pool);
-}
-
-static inline struct rds_ib_mr *rds_ib_reuse_fmr(struct rds_ib_mr_pool *pool)
+struct rds_ib_mr *rds_ib_reuse_mr(struct rds_ib_mr_pool *pool)
 {
 	struct rds_ib_mr *ibmr = NULL;
 	struct llist_node *ret;
@@ -317,190 +208,6 @@  static inline void wait_clean_list_grace(void)
 	}
 }
 
-static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev,
-					  int npages)
-{
-	struct rds_ib_mr_pool *pool;
-	struct rds_ib_mr *ibmr = NULL;
-	int err = 0, iter = 0;
-
-	if (npages <= RDS_FMR_8K_MSG_SIZE)
-		pool = rds_ibdev->mr_8k_pool;
-	else
-		pool = rds_ibdev->mr_1m_pool;
-
-	if (atomic_read(&pool->dirty_count) >= pool->max_items / 10)
-		queue_delayed_work(rds_ib_fmr_wq, &pool->flush_worker, 10);
-
-	/* Switch pools if one of the pool is reaching upper limit */
-	if (atomic_read(&pool->dirty_count) >=  pool->max_items * 9 / 10) {
-		if (pool->pool_type == RDS_IB_MR_8K_POOL)
-			pool = rds_ibdev->mr_1m_pool;
-		else
-			pool = rds_ibdev->mr_8k_pool;
-	}
-
-	while (1) {
-		ibmr = rds_ib_reuse_fmr(pool);
-		if (ibmr)
-			return ibmr;
-
-		/* No clean MRs - now we have the choice of either
-		 * allocating a fresh MR up to the limit imposed by the
-		 * driver, or flush any dirty unused MRs.
-		 * We try to avoid stalling in the send path if possible,
-		 * so we allocate as long as we're allowed to.
-		 *
-		 * We're fussy with enforcing the FMR limit, though. If the driver
-		 * tells us we can't use more than N fmrs, we shouldn't start
-		 * arguing with it */
-		if (atomic_inc_return(&pool->item_count) <= pool->max_items)
-			break;
-
-		atomic_dec(&pool->item_count);
-
-		if (++iter > 2) {
-			if (pool->pool_type == RDS_IB_MR_8K_POOL)
-				rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_depleted);
-			else
-				rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_depleted);
-			return ERR_PTR(-EAGAIN);
-		}
-
-		/* We do have some empty MRs. Flush them out. */
-		if (pool->pool_type == RDS_IB_MR_8K_POOL)
-			rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_wait);
-		else
-			rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_wait);
-		rds_ib_flush_mr_pool(pool, 0, &ibmr);
-		if (ibmr)
-			return ibmr;
-	}
-
-	ibmr = kzalloc_node(sizeof(*ibmr), GFP_KERNEL, rdsibdev_to_node(rds_ibdev));
-	if (!ibmr) {
-		err = -ENOMEM;
-		goto out_no_cigar;
-	}
-
-	ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd,
-			(IB_ACCESS_LOCAL_WRITE |
-			 IB_ACCESS_REMOTE_READ |
-			 IB_ACCESS_REMOTE_WRITE|
-			 IB_ACCESS_REMOTE_ATOMIC),
-			&pool->fmr_attr);
-	if (IS_ERR(ibmr->fmr)) {
-		err = PTR_ERR(ibmr->fmr);
-		ibmr->fmr = NULL;
-		printk(KERN_WARNING "RDS/IB: ib_alloc_fmr failed (err=%d)\n", err);
-		goto out_no_cigar;
-	}
-
-	ibmr->pool = pool;
-	if (pool->pool_type == RDS_IB_MR_8K_POOL)
-		rds_ib_stats_inc(s_ib_rdma_mr_8k_alloc);
-	else
-		rds_ib_stats_inc(s_ib_rdma_mr_1m_alloc);
-
-	return ibmr;
-
-out_no_cigar:
-	if (ibmr) {
-		if (ibmr->fmr)
-			ib_dealloc_fmr(ibmr->fmr);
-		kfree(ibmr);
-	}
-	atomic_dec(&pool->item_count);
-	return ERR_PTR(err);
-}
-
-static int rds_ib_map_fmr(struct rds_ib_device *rds_ibdev, struct rds_ib_mr *ibmr,
-	       struct scatterlist *sg, unsigned int nents)
-{
-	struct ib_device *dev = rds_ibdev->dev;
-	struct scatterlist *scat = sg;
-	u64 io_addr = 0;
-	u64 *dma_pages;
-	u32 len;
-	int page_cnt, sg_dma_len;
-	int i, j;
-	int ret;
-
-	sg_dma_len = ib_dma_map_sg(dev, sg, nents,
-				 DMA_BIDIRECTIONAL);
-	if (unlikely(!sg_dma_len)) {
-		printk(KERN_WARNING "RDS/IB: dma_map_sg failed!\n");
-		return -EBUSY;
-	}
-
-	len = 0;
-	page_cnt = 0;
-
-	for (i = 0; i < sg_dma_len; ++i) {
-		unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
-		u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
-
-		if (dma_addr & ~PAGE_MASK) {
-			if (i > 0)
-				return -EINVAL;
-			else
-				++page_cnt;
-		}
-		if ((dma_addr + dma_len) & ~PAGE_MASK) {
-			if (i < sg_dma_len - 1)
-				return -EINVAL;
-			else
-				++page_cnt;
-		}
-
-		len += dma_len;
-	}
-
-	page_cnt += len >> PAGE_SHIFT;
-	if (page_cnt > ibmr->pool->fmr_attr.max_pages)
-		return -EINVAL;
-
-	dma_pages = kmalloc_node(sizeof(u64) * page_cnt, GFP_ATOMIC,
-				 rdsibdev_to_node(rds_ibdev));
-	if (!dma_pages)
-		return -ENOMEM;
-
-	page_cnt = 0;
-	for (i = 0; i < sg_dma_len; ++i) {
-		unsigned int dma_len = ib_sg_dma_len(dev, &scat[i]);
-		u64 dma_addr = ib_sg_dma_address(dev, &scat[i]);
-
-		for (j = 0; j < dma_len; j += PAGE_SIZE)
-			dma_pages[page_cnt++] =
-				(dma_addr & PAGE_MASK) + j;
-	}
-
-	ret = ib_map_phys_fmr(ibmr->fmr,
-				   dma_pages, page_cnt, io_addr);
-	if (ret)
-		goto out;
-
-	/* Success - we successfully remapped the MR, so we can
-	 * safely tear down the old mapping. */
-	rds_ib_teardown_mr(ibmr);
-
-	ibmr->sg = scat;
-	ibmr->sg_len = nents;
-	ibmr->sg_dma_len = sg_dma_len;
-	ibmr->remap_count++;
-
-	if (ibmr->pool->pool_type == RDS_IB_MR_8K_POOL)
-		rds_ib_stats_inc(s_ib_rdma_mr_8k_used);
-	else
-		rds_ib_stats_inc(s_ib_rdma_mr_1m_used);
-	ret = 0;
-
-out:
-	kfree(dma_pages);
-
-	return ret;
-}
-
 void rds_ib_sync_mr(void *trans_private, int direction)
 {
 	struct rds_ib_mr *ibmr = trans_private;
@@ -518,7 +225,7 @@  void rds_ib_sync_mr(void *trans_private, int direction)
 	}
 }
 
-static void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
+void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
 {
 	struct rds_ib_device *rds_ibdev = ibmr->device;
 
@@ -549,7 +256,7 @@  static void __rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
 	}
 }
 
-static void rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
+void rds_ib_teardown_mr(struct rds_ib_mr *ibmr)
 {
 	unsigned int pinned = ibmr->sg_len;
 
@@ -623,8 +330,8 @@  static void list_to_llist_nodes(struct rds_ib_mr_pool *pool,
  * If the number of MRs allocated exceeds the limit, we also try
  * to free as many MRs as needed to get back to this limit.
  */
-static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
-				int free_all, struct rds_ib_mr **ibmr_ret)
+int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
+			 int free_all, struct rds_ib_mr **ibmr_ret)
 {
 	struct rds_ib_mr *ibmr, *next;
 	struct llist_node *clean_nodes;
@@ -643,7 +350,7 @@  static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
 	if (ibmr_ret) {
 		DEFINE_WAIT(wait);
 		while (!mutex_trylock(&pool->flush_lock)) {
-			ibmr = rds_ib_reuse_fmr(pool);
+			ibmr = rds_ib_reuse_mr(pool);
 			if (ibmr) {
 				*ibmr_ret = ibmr;
 				finish_wait(&pool->flush_wait, &wait);
@@ -655,7 +362,7 @@  static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
 			if (llist_empty(&pool->clean_list))
 				schedule();
 
-			ibmr = rds_ib_reuse_fmr(pool);
+			ibmr = rds_ib_reuse_mr(pool);
 			if (ibmr) {
 				*ibmr_ret = ibmr;
 				finish_wait(&pool->flush_wait, &wait);
@@ -667,7 +374,7 @@  static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
 		mutex_lock(&pool->flush_lock);
 
 	if (ibmr_ret) {
-		ibmr = rds_ib_reuse_fmr(pool);
+		ibmr = rds_ib_reuse_mr(pool);
 		if (ibmr) {
 			*ibmr_ret = ibmr;
 			goto out;
@@ -773,7 +480,7 @@  void rds_ib_free_mr(void *trans_private, int invalidate)
 	/* If we've pinned too many pages, request a flush */
 	if (atomic_read(&pool->free_pinned) >= pool->max_free_pinned ||
 	    atomic_read(&pool->dirty_count) >= pool->max_items / 5)
-		queue_delayed_work(rds_ib_fmr_wq, &pool->flush_worker, 10);
+		queue_delayed_work(rds_ib_mr_wq, &pool->flush_worker, 10);
 
 	if (invalidate) {
 		if (likely(!in_interrupt())) {
@@ -782,7 +489,7 @@  void rds_ib_free_mr(void *trans_private, int invalidate)
 			/* We get here if the user created a MR marked
 			 * as use_once and invalidate at the same time.
 			 */
-			queue_delayed_work(rds_ib_fmr_wq,
+			queue_delayed_work(rds_ib_mr_wq,
 					   &pool->flush_worker, 10);
 		}
 	}
@@ -849,3 +556,63 @@  void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
 	return ibmr;
 }
 
+void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool)
+{
+	cancel_delayed_work_sync(&pool->flush_worker);
+	rds_ib_flush_mr_pool(pool, 1, NULL);
+	WARN_ON(atomic_read(&pool->item_count));
+	WARN_ON(atomic_read(&pool->free_pinned));
+	kfree(pool);
+}
+
+struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev,
+					     int pool_type)
+{
+	struct rds_ib_mr_pool *pool;
+
+	pool = kzalloc(sizeof(*pool), GFP_KERNEL);
+	if (!pool)
+		return ERR_PTR(-ENOMEM);
+
+	pool->pool_type = pool_type;
+	init_llist_head(&pool->free_list);
+	init_llist_head(&pool->drop_list);
+	init_llist_head(&pool->clean_list);
+	mutex_init(&pool->flush_lock);
+	init_waitqueue_head(&pool->flush_wait);
+	INIT_DELAYED_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
+
+	if (pool_type == RDS_IB_MR_1M_POOL) {
+		/* +1 allows for unaligned MRs */
+		pool->fmr_attr.max_pages = RDS_MR_1M_MSG_SIZE + 1;
+		pool->max_items = RDS_MR_1M_POOL_SIZE;
+	} else {
+		/* pool_type == RDS_IB_MR_8K_POOL */
+		pool->fmr_attr.max_pages = RDS_MR_8K_MSG_SIZE + 1;
+		pool->max_items = RDS_MR_8K_POOL_SIZE;
+	}
+
+	pool->max_free_pinned = pool->max_items * pool->fmr_attr.max_pages / 4;
+	pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
+	pool->fmr_attr.page_shift = PAGE_SHIFT;
+	pool->max_items_soft = rds_ibdev->max_mrs * 3 / 4;
+
+	return pool;
+}
+
+int rds_ib_mr_init(void)
+{
+	rds_ib_mr_wq = create_workqueue("rds_mr_flushd");
+	if (!rds_ib_mr_wq)
+		return -ENOMEM;
+	return 0;
+}
+
+/* By the time this is called all the IB devices should have been torn down and
+ * had their pools freed.  As each pool is freed its work struct is waited on,
+ * so the pool flushing work queue should be idle by the time we get here.
+ */
+void rds_ib_mr_exit(void)
+{
+	destroy_workqueue(rds_ib_mr_wq);
+}