[v4,14/22] CIFS: SMBD: Implement function to send data via RDMA send

Message ID 20171002023030.3582-15-longli@exchange.microsoft.com
State New
Headers show
Series
  • CIFS: Implement SMBDirect
Related show

Commit Message

Long Li Oct. 2, 2017, 2:30 a.m.
From: Long Li <longli@microsoft.com>

The transport doesn't maintain send buffers or send queue for transferring
payload via RDMA send. There is no data copy in the transport on send.

Signed-off-by: Long Li <longli@microsoft.com>
---
 fs/cifs/smbdirect.c | 248 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 fs/cifs/smbdirect.h |   4 +
 2 files changed, 252 insertions(+)

Comments

kbuild test robot Oct. 4, 2017, 9:43 a.m. | #1
Hi Long,

[auto build test WARNING on cifs/for-next]
[also build test WARNING on v4.14-rc3]
[if your patch is applied to the wrong git tree, please drop us a note to help improve the system]

url:    https://github.com/0day-ci/linux/commits/Long-Li/CIFS-Implement-SMBDirect/20171004-165915
base:   git://git.samba.org/sfrench/cifs-2.6.git for-next
config: i386-randconfig-x001-201740 (attached as .config)
compiler: gcc-6 (Debian 6.2.0-3) 6.2.0 20160901
reproduce:
        # save the attached .config to linux build tree
        make ARCH=i386 

All warnings (new ones prefixed by >>):

   In file included from include/linux/kernel.h:13:0,
                    from include/linux/list.h:8,
                    from include/linux/module.h:9,
                    from fs/cifs/smbdirect.c:16:
   fs/cifs/smbdirect.c: In function 'smbd_send':
>> fs/cifs/cifs_debug.h:55:24: warning: format '%lu' expects argument of type 'long unsigned int', but argument 8 has type 'size_t {aka unsigned int}' [-Wformat=]
      pr_debug_ratelimited("%s: "    \
                           ^
   include/linux/printk.h:285:21: note: in definition of macro 'pr_fmt'
    #define pr_fmt(fmt) fmt
                        ^~~
>> fs/cifs/cifs_debug.h:55:3: note: in expansion of macro 'pr_debug_ratelimited'
      pr_debug_ratelimited("%s: "    \
      ^~~~~~~~~~~~~~~~~~~~
>> fs/cifs/smbdirect.c:143:3: note: in expansion of macro 'cifs_dbg'
      cifs_dbg(VFS, "%s:%d " fmt, __func__, __LINE__, ##args);\
      ^~~~~~~~
>> fs/cifs/smbdirect.c:151:40: note: in expansion of macro 'log_rdma'
    #define log_write(level, fmt, args...) log_rdma(level, LOG_WRITE, fmt, ##args)
                                           ^~~~~~~~
>> fs/cifs/smbdirect.c:2148:6: note: in expansion of macro 'log_write'
         log_write(INFO,
         ^~~~~~~~~
   In file included from fs/cifs/smbdirect.c:18:0:
>> fs/cifs/smbdirect.c:143:17: warning: format '%lu' expects argument of type 'long unsigned int', but argument 6 has type 'size_t {aka unsigned int}' [-Wformat=]
      cifs_dbg(VFS, "%s:%d " fmt, __func__, __LINE__, ##args);\
                    ^
   fs/cifs/cifs_debug.h:58:16: note: in definition of macro 'cifs_dbg'
      cifs_vfs_err(fmt, ##__VA_ARGS__);   \
                   ^~~
>> fs/cifs/smbdirect.c:151:40: note: in expansion of macro 'log_rdma'
    #define log_write(level, fmt, args...) log_rdma(level, LOG_WRITE, fmt, ##args)
                                           ^~~~~~~~
>> fs/cifs/smbdirect.c:2148:6: note: in expansion of macro 'log_write'
         log_write(INFO,
         ^~~~~~~~~
   In file included from include/linux/kernel.h:13:0,
                    from include/linux/list.h:8,
                    from include/linux/module.h:9,
                    from fs/cifs/smbdirect.c:16:
   fs/cifs/smbdirect.c:143:17: warning: format '%lu' expects argument of type 'long unsigned int', but argument 7 has type 'size_t {aka unsigned int}' [-Wformat=]
      cifs_dbg(VFS, "%s:%d " fmt, __func__, __LINE__, ##args);\
                    ^
   include/linux/printk.h:285:21: note: in definition of macro 'pr_fmt'
    #define pr_fmt(fmt) fmt
                        ^~~
   fs/cifs/cifs_debug.h:60:3: note: in expansion of macro 'pr_debug_ratelimited'
      pr_debug_ratelimited(fmt, ##__VA_ARGS__);  \
      ^~~~~~~~~~~~~~~~~~~~
>> fs/cifs/smbdirect.c:143:3: note: in expansion of macro 'cifs_dbg'
      cifs_dbg(VFS, "%s:%d " fmt, __func__, __LINE__, ##args);\
      ^~~~~~~~
>> fs/cifs/smbdirect.c:151:40: note: in expansion of macro 'log_rdma'
    #define log_write(level, fmt, args...) log_rdma(level, LOG_WRITE, fmt, ##args)
                                           ^~~~~~~~
>> fs/cifs/smbdirect.c:2148:6: note: in expansion of macro 'log_write'
         log_write(INFO,
         ^~~~~~~~~

vim +/cifs_dbg +143 fs/cifs/smbdirect.c

056f27d3 Long Li 2017-10-01  112  
056f27d3 Long Li 2017-10-01  113  /* Transport logging functions
056f27d3 Long Li 2017-10-01  114   * Logging are defined as classes. They can be OR'ed to define the actual
056f27d3 Long Li 2017-10-01  115   * logging level via module parameter smbd_logging_class
056f27d3 Long Li 2017-10-01  116   * e.g. cifs.smbd_logging_class=0x500 will log all log_rdma_recv() and
056f27d3 Long Li 2017-10-01  117   * log_rdma_event()
056f27d3 Long Li 2017-10-01  118   */
056f27d3 Long Li 2017-10-01  119  #define LOG_OUTGOING			0x1
056f27d3 Long Li 2017-10-01  120  #define LOG_INCOMING			0x2
056f27d3 Long Li 2017-10-01  121  #define LOG_READ			0x4
056f27d3 Long Li 2017-10-01  122  #define LOG_WRITE			0x8
056f27d3 Long Li 2017-10-01  123  #define LOG_RDMA_SEND			0x10
056f27d3 Long Li 2017-10-01  124  #define LOG_RDMA_RECV			0x20
056f27d3 Long Li 2017-10-01  125  #define LOG_KEEP_ALIVE			0x40
056f27d3 Long Li 2017-10-01  126  #define LOG_RDMA_EVENT			0x80
056f27d3 Long Li 2017-10-01  127  #define LOG_RDMA_MR			0x100
056f27d3 Long Li 2017-10-01  128  static unsigned int smbd_logging_class = 0;
056f27d3 Long Li 2017-10-01  129  module_param(smbd_logging_class, uint, 0644);
056f27d3 Long Li 2017-10-01  130  MODULE_PARM_DESC(smbd_logging_class,
056f27d3 Long Li 2017-10-01  131  	"Logging class for SMBD transport 0x0 to 0x100");
056f27d3 Long Li 2017-10-01  132  
056f27d3 Long Li 2017-10-01  133  #define ERR		0x0
056f27d3 Long Li 2017-10-01  134  #define INFO		0x1
056f27d3 Long Li 2017-10-01  135  static unsigned int smbd_logging_level = ERR;
056f27d3 Long Li 2017-10-01  136  module_param(smbd_logging_level, uint, 0644);
056f27d3 Long Li 2017-10-01  137  MODULE_PARM_DESC(smbd_logging_level,
056f27d3 Long Li 2017-10-01  138  	"Logging level for SMBD transport, 0 (default): error, 1: info");
056f27d3 Long Li 2017-10-01  139  
056f27d3 Long Li 2017-10-01  140  #define log_rdma(level, class, fmt, args...)				\
056f27d3 Long Li 2017-10-01  141  do {									\
056f27d3 Long Li 2017-10-01  142  	if (level <= smbd_logging_level || class & smbd_logging_class)	\
056f27d3 Long Li 2017-10-01 @143  		cifs_dbg(VFS, "%s:%d " fmt, __func__, __LINE__, ##args);\
056f27d3 Long Li 2017-10-01  144  } while (0)
056f27d3 Long Li 2017-10-01  145  
056f27d3 Long Li 2017-10-01  146  #define log_outgoing(level, fmt, args...) \
056f27d3 Long Li 2017-10-01  147  		log_rdma(level, LOG_OUTGOING, fmt, ##args)
056f27d3 Long Li 2017-10-01  148  #define log_incoming(level, fmt, args...) \
056f27d3 Long Li 2017-10-01  149  		log_rdma(level, LOG_INCOMING, fmt, ##args)
056f27d3 Long Li 2017-10-01  150  #define log_read(level, fmt, args...)	log_rdma(level, LOG_READ, fmt, ##args)
056f27d3 Long Li 2017-10-01 @151  #define log_write(level, fmt, args...)	log_rdma(level, LOG_WRITE, fmt, ##args)
056f27d3 Long Li 2017-10-01  152  #define log_rdma_send(level, fmt, args...) \
056f27d3 Long Li 2017-10-01  153  		log_rdma(level, LOG_RDMA_SEND, fmt, ##args)
056f27d3 Long Li 2017-10-01  154  #define log_rdma_recv(level, fmt, args...) \
056f27d3 Long Li 2017-10-01  155  		log_rdma(level, LOG_RDMA_RECV, fmt, ##args)
056f27d3 Long Li 2017-10-01  156  #define log_keep_alive(level, fmt, args...) \
056f27d3 Long Li 2017-10-01  157  		log_rdma(level, LOG_KEEP_ALIVE, fmt, ##args)
056f27d3 Long Li 2017-10-01  158  #define log_rdma_event(level, fmt, args...) \
056f27d3 Long Li 2017-10-01  159  		log_rdma(level, LOG_RDMA_EVENT, fmt, ##args)
056f27d3 Long Li 2017-10-01  160  #define log_rdma_mr(level, fmt, args...) \
056f27d3 Long Li 2017-10-01  161  		log_rdma(level, LOG_RDMA_MR, fmt, ##args)
056f27d3 Long Li 2017-10-01  162  

:::::: The code at line 143 was first introduced by commit
:::::: 056f27d3b5a46aa10b0e23437c46d1c5a9fab431 CIFS: SMBD: Establish SMBDirect connection

:::::: TO: Long Li <longli@microsoft.com>
:::::: CC: 0day robot <fengguang.wu@intel.com>

---
0-DAY kernel test infrastructure                Open Source Technology Center
https://lists.01.org/pipermail/kbuild-all                   Intel Corporation
kbuild test robot Oct. 4, 2017, 10:05 a.m. | #2
Hi Long,

[auto build test WARNING on cifs/for-next]
[also build test WARNING on v4.14-rc3 next-20170929]
[if your patch is applied to the wrong git tree, please drop us a note to help improve the system]

url:    https://github.com/0day-ci/linux/commits/Long-Li/CIFS-Implement-SMBDirect/20171004-165915
base:   git://git.samba.org/sfrench/cifs-2.6.git for-next
config: i386-randconfig-x005-201740 (attached as .config)
compiler: gcc-6 (Debian 6.2.0-3) 6.2.0 20160901
reproduce:
        # save the attached .config to linux build tree
        make ARCH=i386 

All warnings (new ones prefixed by >>):

   fs/cifs/smbdirect.c: In function 'smbd_recv_page':
   fs/cifs/smbdirect.c:1983:15: error: implicit declaration of function 'kmap_atomic' [-Werror=implicit-function-declaration]
     to_address = kmap_atomic(page);
                  ^~~~~~~~~~~
   fs/cifs/smbdirect.c:1983:13: warning: assignment makes pointer from integer without a cast [-Wint-conversion]
     to_address = kmap_atomic(page);
                ^
   fs/cifs/smbdirect.c:1989:2: error: implicit declaration of function 'kunmap_atomic' [-Werror=implicit-function-declaration]
     kunmap_atomic(to_address);
     ^~~~~~~~~~~~~
   In file included from include/linux/kernel.h:13:0,
                    from include/linux/list.h:8,
                    from include/linux/module.h:9,
                    from fs/cifs/smbdirect.c:16:
   fs/cifs/smbdirect.c: In function 'smbd_send':
>> include/linux/kern_levels.h:4:18: warning: format '%lu' expects argument of type 'long unsigned int', but argument 6 has type 'size_t {aka unsigned int}' [-Wformat=]
    #define KERN_SOH "\001"  /* ASCII Start Of Header */
                     ^
   include/linux/printk.h:136:11: note: in definition of macro 'no_printk'
       printk(fmt, ##__VA_ARGS__); \
              ^~~
   include/linux/kern_levels.h:14:20: note: in expansion of macro 'KERN_SOH'
    #define KERN_DEBUG KERN_SOH "7" /* debug-level messages */
                       ^~~~~~~~
   include/linux/printk.h:339:12: note: in expansion of macro 'KERN_DEBUG'
     no_printk(KERN_DEBUG pr_fmt(fmt), ##__VA_ARGS__)
               ^~~~~~~~~~
>> fs/cifs/cifs_debug.h:72:3: note: in expansion of macro 'pr_debug'
      pr_debug(fmt, ##__VA_ARGS__);    \
      ^~~~~~~~
   fs/cifs/smbdirect.c:143:3: note: in expansion of macro 'cifs_dbg'
      cifs_dbg(VFS, "%s:%d " fmt, __func__, __LINE__, ##args);\
      ^~~~~~~~
   fs/cifs/smbdirect.c:151:40: note: in expansion of macro 'log_rdma'
    #define log_write(level, fmt, args...) log_rdma(level, LOG_WRITE, fmt, ##args)
                                           ^~~~~~~~
   fs/cifs/smbdirect.c:2148:6: note: in expansion of macro 'log_write'
         log_write(INFO,
         ^~~~~~~~~
   cc1: some warnings being treated as errors

vim +4 include/linux/kern_levels.h

314ba352 Joe Perches 2012-07-30  3  
04d2c8c8 Joe Perches 2012-07-30 @4  #define KERN_SOH	"\001"		/* ASCII Start Of Header */
04d2c8c8 Joe Perches 2012-07-30  5  #define KERN_SOH_ASCII	'\001'
04d2c8c8 Joe Perches 2012-07-30  6  

:::::: The code at line 4 was first introduced by commit
:::::: 04d2c8c83d0e3ac5f78aeede51babb3236200112 printk: convert the format for KERN_<LEVEL> to a 2 byte pattern

:::::: TO: Joe Perches <joe@perches.com>
:::::: CC: Linus Torvalds <torvalds@linux-foundation.org>

---
0-DAY kernel test infrastructure                Open Source Technology Center
https://lists.01.org/pipermail/kbuild-all                   Intel Corporation

Patch

diff --git a/fs/cifs/smbdirect.c b/fs/cifs/smbdirect.c
index b9be9d6..90e2c94 100644
--- a/fs/cifs/smbdirect.c
+++ b/fs/cifs/smbdirect.c
@@ -42,6 +42,12 @@  static int smbd_post_recv(
 		struct smbd_response *response);
 
 static int smbd_post_send_empty(struct smbd_connection *info);
+static int smbd_post_send_data(
+		struct smbd_connection *info,
+		struct kvec *iov, int n_vec, int remaining_data_length);
+static int smbd_post_send_page(struct smbd_connection *info,
+		struct page *page, unsigned long offset,
+		size_t size, int remaining_data_length);
 
 /* SMBD version number */
 #define SMBD_V1	0x0100
@@ -198,6 +204,10 @@  static void smbd_destroy_rdma_work(struct work_struct *work)
 	log_rdma_event(INFO, "cancelling send immediate work\n");
 	cancel_delayed_work_sync(&info->send_immediate_work);
 
+	log_rdma_event(INFO, "wait for all send to finish\n");
+	wait_event(info->wait_smbd_send_pending,
+		info->smbd_send_pending == 0);
+
 	log_rdma_event(INFO, "wait for all recv to finish\n");
 	wake_up_interruptible(&info->wait_reassembly_queue);
 	wait_event(info->wait_smbd_recv_pending,
@@ -1103,6 +1113,24 @@  static int smbd_post_send_sgl(struct smbd_connection *info,
 }
 
 /*
+ * Send a page
+ * page: the page to send
+ * offset: offset in the page to send
+ * size: length in the page to send
+ * remaining_data_length: remaining data to send in this payload
+ */
+static int smbd_post_send_page(struct smbd_connection *info, struct page *page,
+		unsigned long offset, size_t size, int remaining_data_length)
+{
+	struct scatterlist sgl;
+
+	sg_init_table(&sgl, 1);
+	sg_set_page(&sgl, page, size, offset);
+
+	return smbd_post_send_sgl(info, &sgl, size, remaining_data_length);
+}
+
+/*
  * Send an empty message
  * Empty message is used to extend credits to peer to for keep live
  * while there is no upper layer payload to send at the time
@@ -1114,6 +1142,35 @@  static int smbd_post_send_empty(struct smbd_connection *info)
 }
 
 /*
+ * Send a data buffer
+ * iov: the iov array describing the data buffers
+ * n_vec: number of iov array
+ * remaining_data_length: remaining data to send following this packet
+ * in segmented SMBD packet
+ */
+static int smbd_post_send_data(
+	struct smbd_connection *info, struct kvec *iov, int n_vec,
+	int remaining_data_length)
+{
+	int i;
+	u32 data_length = 0;
+	struct scatterlist sgl[SMBDIRECT_MAX_SGE];
+
+	if (n_vec > SMBDIRECT_MAX_SGE) {
+		cifs_dbg(VFS, "Can't fit data to SGL, n_vec=%d\n", n_vec);
+		return -ENOMEM;
+	}
+
+	sg_init_table(sgl, n_vec);
+	for (i = 0; i < n_vec; i++) {
+		data_length += iov[i].iov_len;
+		sg_set_buf(&sgl[i], iov[i].iov_base, iov[i].iov_len);
+	}
+
+	return smbd_post_send_sgl(info, sgl, data_length, remaining_data_length);
+}
+
+/*
  * Post a receive request to the transport
  * The remote peer can only send data when a receive request is posted
  * The interaction is controlled by send/receive credit system
@@ -1680,6 +1737,9 @@  struct smbd_connection *_smbd_get_connection(
 	queue_delayed_work(info->workqueue, &info->idle_timer_work,
 		info->keep_alive_interval*HZ);
 
+	init_waitqueue_head(&info->wait_smbd_send_pending);
+	info->smbd_send_pending = 0;
+
 	init_waitqueue_head(&info->wait_smbd_recv_pending);
 	info->smbd_recv_pending = 0;
 
@@ -1973,3 +2033,191 @@  int smbd_recv(struct smbd_connection *info, struct msghdr *msg)
 		msg->msg_iter.count = 0;
 	return rc;
 }
+
+/*
+ * Send data to transport
+ * Each rqst is transported as a SMBDirect payload
+ * rqst: the data to write
+ * return value: 0 if successfully write, otherwise error code
+ */
+int smbd_send(struct smbd_connection *info, struct smb_rqst *rqst)
+{
+	struct kvec vec;
+	int nvecs;
+	int size;
+	int buflen = 0, remaining_data_length;
+	int start, i, j;
+	int max_iov_size =
+		info->max_send_size - sizeof(struct smbd_data_transfer);
+	struct kvec iov[SMBDIRECT_MAX_SGE];
+	int rc;
+	unsigned long long t1 = rdtsc();
+
+	info->smbd_send_pending++;
+	if (info->transport_status != SMBD_CONNECTED) {
+		rc = -ENODEV;
+		goto done;
+	}
+
+	/*
+	 * This usually means a configuration error
+	 * We use RDMA read/write for packet size > rdma_readwrite_threshold
+	 * as long as it's properly configured we should never get into this
+	 * situation
+	 */
+	if (rqst->rq_nvec + rqst->rq_npages > SMBDIRECT_MAX_SGE) {
+		log_write(ERR, "maximum send segment %x exceeding %x\n",
+			 rqst->rq_nvec + rqst->rq_npages, SMBDIRECT_MAX_SGE);
+		rc = -EINVAL;
+		goto done;
+	}
+
+	/*
+	 * Remove the RFC1002 length defined in MS-SMB2 section 2.1
+	 * It is used only for TCP transport
+	 * In future we may want to add a transport layer under protocol
+	 * layer so this will only be issued to TCP transport
+	 */
+	iov[0].iov_base = (char *)rqst->rq_iov[0].iov_base + 4;
+	iov[0].iov_len = rqst->rq_iov[0].iov_len - 4;
+	buflen += iov[0].iov_len;
+
+	/* total up iov array first */
+	for (i = 1; i < rqst->rq_nvec; i++) {
+		iov[i].iov_base = rqst->rq_iov[i].iov_base;
+		iov[i].iov_len = rqst->rq_iov[i].iov_len;
+		buflen += iov[i].iov_len;
+	}
+
+	/* add in the page array if there is one */
+	if (rqst->rq_npages) {
+		buflen += rqst->rq_pagesz * (rqst->rq_npages - 1);
+		buflen += rqst->rq_tailsz;
+	}
+
+	if (buflen + sizeof(struct smbd_data_transfer) >
+		info->max_fragmented_send_size) {
+		log_write(ERR, "payload size %d > max size %d\n",
+			buflen, info->max_fragmented_send_size);
+		rc = -EINVAL;
+		goto done;
+	}
+
+	remaining_data_length = buflen;
+
+	log_write(INFO, "rqst->rq_nvec=%d rqst->rq_npages=%d rq_pagesz=%d "
+		"rq_tailsz=%d buflen=%d\n",
+		rqst->rq_nvec, rqst->rq_npages, rqst->rq_pagesz,
+		rqst->rq_tailsz, buflen);
+
+	start = i = iov[0].iov_len ? 0 : 1;
+	buflen = 0;
+	while (true) {
+		buflen += iov[i].iov_len;
+		if (buflen > max_iov_size) {
+			if (i > start) {
+				remaining_data_length -=
+					(buflen-iov[i].iov_len);
+				log_write(INFO, "sending iov[] from start=%d "
+					"i=%d nvecs=%d "
+					"remaining_data_length=%d\n",
+					start, i, i-start,
+					remaining_data_length);
+				rc = smbd_post_send_data(
+					info, &iov[start], i-start,
+					remaining_data_length);
+				if (rc)
+					goto done;
+			} else {
+				/* iov[start] is too big, break it */
+				nvecs = (buflen+max_iov_size-1)/max_iov_size;
+				log_write(INFO, "iov[%d] iov_base=%p buflen=%d"
+					" break to %d vectors\n",
+					start, iov[start].iov_base,
+					buflen, nvecs);
+				for (j = 0; j < nvecs; j++) {
+					vec.iov_base =
+						(char *)iov[start].iov_base +
+						j*max_iov_size;
+					vec.iov_len = max_iov_size;
+					if (j == nvecs-1)
+						vec.iov_len =
+							buflen -
+							max_iov_size*(nvecs-1);
+					remaining_data_length -= vec.iov_len;
+					log_write(INFO,
+						"sending vec j=%d iov_base=%p"
+						" iov_len=%lu "
+						"remaining_data_length=%d\n",
+						j, vec.iov_base, vec.iov_len,
+						remaining_data_length);
+					rc = smbd_post_send_data(
+						info, &vec, 1,
+						remaining_data_length);
+					if (rc)
+						goto done;
+				}
+				i++;
+			}
+			start = i;
+			buflen = 0;
+		} else {
+			i++;
+			if (i == rqst->rq_nvec) {
+				/* send out all remaining vecs */
+				remaining_data_length -= buflen;
+				log_write(INFO,
+					"sending iov[] from start=%d i=%d "
+					"nvecs=%d remaining_data_length=%d\n",
+					start, i, i-start,
+					remaining_data_length);
+				rc = smbd_post_send_data(info, &iov[start],
+					i-start, remaining_data_length);
+				if (rc)
+					goto done;
+				break;
+			}
+		}
+		log_write(INFO, "looping i=%d buflen=%d\n", i, buflen);
+	}
+
+	/* now sending pages if there are any */
+	for (i = 0; i < rqst->rq_npages; i++) {
+		buflen = (i == rqst->rq_npages-1) ?
+			rqst->rq_tailsz : rqst->rq_pagesz;
+		nvecs = (buflen + max_iov_size - 1) / max_iov_size;
+		log_write(INFO, "sending pages buflen=%d nvecs=%d\n",
+			buflen, nvecs);
+		for (j = 0; j < nvecs; j++) {
+			size = max_iov_size;
+			if (j == nvecs-1)
+				size = buflen - j*max_iov_size;
+			remaining_data_length -= size;
+			log_write(INFO, "sending pages i=%d offset=%d size=%d"
+				" remaining_data_length=%d\n",
+				i, j*max_iov_size, size, remaining_data_length);
+			rc = smbd_post_send_page(
+				info, rqst->rq_pages[i], j*max_iov_size,
+				size, remaining_data_length);
+			if (rc)
+				goto done;
+		}
+	}
+
+done:
+	/*
+	 * As an optimization, we don't wait for individual I/O to finish
+	 * before sending the next one.
+	 * Send them all and wait for pending send count to get to 0
+	 * that means all the I/Os have been out and we are good to return
+	 */
+
+	wait_event(info->wait_send_payload_pending,
+		atomic_read(&info->send_payload_pending) == 0);
+
+	info->smbd_send_pending--;
+	wake_up(&info->wait_smbd_send_pending);
+	profiling_add_histogram(rdtsc()-t1, info->smbd_write_cycles);
+
+	return rc;
+}
diff --git a/fs/cifs/smbdirect.h b/fs/cifs/smbdirect.h
index 26614fa..e9bd938 100644
--- a/fs/cifs/smbdirect.h
+++ b/fs/cifs/smbdirect.h
@@ -89,6 +89,9 @@  struct smbd_connection {
 
 	/* Activity accoutning */
 	/* Pending reqeusts issued from upper layer */
+	int smbd_send_pending;
+	wait_queue_head_t wait_smbd_send_pending;
+
 	int smbd_recv_pending;
 	wait_queue_head_t wait_smbd_recv_pending;
 
@@ -260,6 +263,7 @@  void smbd_destroy(struct smbd_connection *info);
 
 /* Interface for carrying upper layer I/O through send/recv */
 int smbd_recv(struct smbd_connection *info, struct msghdr *msg);
+int smbd_send(struct smbd_connection *info, struct smb_rqst *rqst);
 
 void profiling_display_histogram(
 	struct seq_file *m, unsigned long long array[]);