diff mbox series

[V2,net-next,5/7] rds: zerocopy Tx support.

Message ID 97bec2535a23b8dc976f958dc27256731de9127e.1517843755.git.sowmini.varadhan@oracle.com
State Superseded, archived
Delegated to: David Miller
Headers show
Series RDS: zerocopy support | expand

Commit Message

Sowmini Varadhan Feb. 14, 2018, 10:28 a.m. UTC
If the MSG_ZEROCOPY flag is specified with rds_sendmsg(), and,
if the SO_ZEROCOPY socket option has been set on the PF_RDS socket,
application pages sent down with rds_sendmsg() are pinned.

The pinning uses the accounting infrastructure added by
Commit a91dbff551a6 ("sock: ulimit on MSG_ZEROCOPY pages")

The payload bytes in the message may not be modified for the
duration that the message has been pinned. A multi-threaded
application using this infrastructure may thus need to be notified
about send-completion so that it can free/reuse the buffers
passed to rds_sendmsg(). Notification of send-completion will
identify each message-buffer by a cookie that the application
must specify as ancillary data to rds_sendmsg().
The ancillary data in this case has cmsg_level == SOL_RDS
and cmsg_type == RDS_CMSG_ZCOPY_COOKIE.

Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com>
---
v2: 
  - remove unused data_len argument to rds_rm_size;
  - unmap as necessary if we fail in the middle of zerocopy setup

 include/uapi/linux/rds.h |    1 +
 net/rds/message.c        |   51 +++++++++++++++++++++++++++++++++++++++++++++-
 net/rds/rds.h            |    3 +-
 net/rds/send.c           |   44 ++++++++++++++++++++++++++++++++++-----
 4 files changed, 91 insertions(+), 8 deletions(-)

Comments

Santosh Shilimkar Feb. 14, 2018, 7:10 p.m. UTC | #1
On 2/14/2018 2:28 AM, Sowmini Varadhan wrote:
> If the MSG_ZEROCOPY flag is specified with rds_sendmsg(), and,
> if the SO_ZEROCOPY socket option has been set on the PF_RDS socket,
> application pages sent down with rds_sendmsg() are pinned.
> 
> The pinning uses the accounting infrastructure added by
> Commit a91dbff551a6 ("sock: ulimit on MSG_ZEROCOPY pages")
> 
> The payload bytes in the message may not be modified for the
> duration that the message has been pinned. A multi-threaded
> application using this infrastructure may thus need to be notified
> about send-completion so that it can free/reuse the buffers
> passed to rds_sendmsg(). Notification of send-completion will
> identify each message-buffer by a cookie that the application
> must specify as ancillary data to rds_sendmsg().
> The ancillary data in this case has cmsg_level == SOL_RDS
> and cmsg_type == RDS_CMSG_ZCOPY_COOKIE.
> 
> Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com>
> ---
> v2:
>    - remove unused data_len argument to rds_rm_size;
>    - unmap as necessary if we fail in the middle of zerocopy setup
> 
>   include/uapi/linux/rds.h |    1 +
>   net/rds/message.c        |   51 +++++++++++++++++++++++++++++++++++++++++++++-
>   net/rds/rds.h            |    3 +-
>   net/rds/send.c           |   44 ++++++++++++++++++++++++++++++++++-----
>   4 files changed, 91 insertions(+), 8 deletions(-)
> 
> diff --git a/include/uapi/linux/rds.h b/include/uapi/linux/rds.h
> index e71d449..12e3bca 100644
> --- a/include/uapi/linux/rds.h
> +++ b/include/uapi/linux/rds.h
> @@ -103,6 +103,7 @@
>   #define RDS_CMSG_MASKED_ATOMIC_FADD	8
>   #define RDS_CMSG_MASKED_ATOMIC_CSWP	9
>   #define RDS_CMSG_RXPATH_LATENCY		11
> +#define	RDS_CMSG_ZCOPY_COOKIE		12
>
s/RDS_CMSG_ZCOPY_COOKIE/RDS_CMSG_ZMSGCOPY_COOKIE	

>   #define RDS_INFO_FIRST			10000
>   #define RDS_INFO_COUNTERS		10000
> diff --git a/net/rds/message.c b/net/rds/message.c
> index d874b74..e499566 100644
> --- a/net/rds/message.c
> +++ b/net/rds/message.c
> @@ -341,12 +341,14 @@ struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned in
>   	return rm;
>   }
>   
> -int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from)
> +int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
> +			       bool zcopy)
>   {
>   	unsigned long to_copy, nbytes;
>   	unsigned long sg_off;
>   	struct scatterlist *sg;
>   	int ret = 0;
> +	int length = iov_iter_count(from);
>   
>   	rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from));
>   
> @@ -356,6 +358,53 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from)
>   	sg = rm->data.op_sg;
>   	sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */
>   
> +	if (zcopy) {
> +		int total_copied = 0;
> +		struct sk_buff *skb;
> +
> +		skb = alloc_skb(SO_EE_ORIGIN_MAX_ZCOOKIES * sizeof(u32),
> +				GFP_KERNEL);
This can sleep so you might want to check if you want to use ATOMIC 
version here.

> +		if (!skb)
> +			return -ENOMEM;
> +		rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb);
> +		memset(rm->data.op_mmp_znotifier, 0,
> +		       sizeof(*rm->data.op_mmp_znotifier));
> +		if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp,
> +					    length)) {
> +			consume_skb(skb);
> +			rm->data.op_mmp_znotifier = NULL;
> +			return -ENOMEM;
> +		}
NOMEM new application visible change but probably the right one for this
particular case. Just need to make sure application can handle this
error.


> diff --git a/net/rds/rds.h b/net/rds/rds.h
> index 6e8fc4c..dfdc9b3 100644
> --- a/net/rds/rds.h
> +++ b/net/rds/rds.h
> @@ -784,7 +784,8 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
>   /* message.c */
>   struct rds_message *rds_message_alloc(unsigned int nents, gfp_t gfp);
>   struct scatterlist *rds_message_alloc_sgs(struct rds_message *rm, int nents);
> -int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from);
> +int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
> +			       bool zcopy);
>   struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned int total_len);
>   void rds_message_populate_header(struct rds_header *hdr, __be16 sport,
>   				 __be16 dport, u64 seq);
> diff --git a/net/rds/send.c b/net/rds/send.c
> index 5ac0925..80171cf 100644
> --- a/net/rds/send.c
> +++ b/net/rds/send.c

[...]

> @@ -1087,8 +1112,15 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
>   		goto out;
>   	}
>   
> +	if (zcopy) {
> +		if (rs->rs_transport->t_type != RDS_TRANS_TCP) {
> +			ret = -EOPNOTSUPP;
> +			goto out;
> +		}
> +		num_sgs = iov_iter_npages(&msg->msg_iter, INT_MAX);
> +	}

Instead of this transport check, lets move this under transport function
which then can be populated by TCP transport.

Rest of the changes looks good.

Regards,
Santosh
Sowmini Varadhan Feb. 14, 2018, 7:49 p.m. UTC | #2
On (02/14/18 11:10), Santosh Shilimkar wrote:
> s/RDS_CMSG_ZCOPY_COOKIE/RDS_CMSG_ZMSGCOPY_COOKIE	
> 

Please see https://www.spinics.net/lists/netdev/msg483627.html

> >@@ -356,6 +358,53 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from)
> >  	sg = rm->data.op_sg;
> >  	sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */
> >+	if (zcopy) {
> >+		int total_copied = 0;
> >+		struct sk_buff *skb;
> >+
> >+		skb = alloc_skb(SO_EE_ORIGIN_MAX_ZCOOKIES * sizeof(u32),
> >+				GFP_KERNEL);
> This can sleep so you might want to check if you want to use ATOMIC version
> here.

I think it should be fine: rds_message_copy_from_user() is called
in process context, and if you notice, the calling function rds_sendmsg()
also has this
   1100         rm = rds_message_alloc(ret, GFP_KERNEL);
   1101         if (!rm) {
   1102                 ret = -ENOMEM;
   1103                 goto out;
   1104         }

    :
   1106         /* Attach data to the rm */
    :
   1113                 ret = rds_message_copy_from_user(rm, &msg->msg_iter);

So using GFP_KERNEL is as safe as the call at line 1100.


> >+			return -ENOMEM;
> >+		}
> NOMEM new application visible change but probably the right one for this
> particular case. Just need to make sure application can handle this
> error.

I think the application already handles this correctly (see line 1102 above)

Thanks for taking a look.

--Sowmini
Santosh Shilimkar Feb. 14, 2018, 9:14 p.m. UTC | #3
On 2/14/2018 11:49 AM, Sowmini Varadhan wrote:
> On (02/14/18 11:10), Santosh Shilimkar wrote:
>> s/RDS_CMSG_ZCOPY_COOKIE/RDS_CMSG_ZMSGCOPY_COOKIE	
>>
> 
> Please see https://www.spinics.net/lists/netdev/msg483627.html
>
Just saw it and responded to Dave.


>>> @@ -356,6 +358,53 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from)
>>>   	sg = rm->data.op_sg;
>>>   	sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */
>>> +	if (zcopy) {
>>> +		int total_copied = 0;
>>> +		struct sk_buff *skb;
>>> +
>>> +		skb = alloc_skb(SO_EE_ORIGIN_MAX_ZCOOKIES * sizeof(u32),
>>> +				GFP_KERNEL);
>> This can sleep so you might want to check if you want to use ATOMIC version
>> here.
> 
> I think it should be fine: rds_message_copy_from_user() is called
> in process context, and if you notice, the calling function rds_sendmsg()
> also has this
>     1100         rm = rds_message_alloc(ret, GFP_KERNEL);
>     1101         if (!rm) {
>     1102                 ret = -ENOMEM;
>     1103                 goto out;
>     1104         }
> 
>      :
>     1106         /* Attach data to the rm */
>      :
>     1113                 ret = rds_message_copy_from_user(rm, &msg->msg_iter);
> 
> So using GFP_KERNEL is as safe as the call at line 1100.
>
Was just asking you to check if it is safe. The path already
does that so we are good.

> 
>>> +			return -ENOMEM;
>>> +		}
>> NOMEM new application visible change but probably the right one for this
>> particular case. Just need to make sure application can handle this
>> error.
> 
> I think the application already handles this correctly (see line 1102 above)
> 
Indeed. Thanks for checking.

Regards,
Santosh
Willem de Bruijn Feb. 14, 2018, 11:48 p.m. UTC | #4
On Wed, Feb 14, 2018 at 5:28 AM, Sowmini Varadhan
<sowmini.varadhan@oracle.com> wrote:
> If the MSG_ZEROCOPY flag is specified with rds_sendmsg(), and,
> if the SO_ZEROCOPY socket option has been set on the PF_RDS socket,
> application pages sent down with rds_sendmsg() are pinned.
>
> The pinning uses the accounting infrastructure added by
> Commit a91dbff551a6 ("sock: ulimit on MSG_ZEROCOPY pages")
>
> The payload bytes in the message may not be modified for the
> duration that the message has been pinned. A multi-threaded
> application using this infrastructure may thus need to be notified
> about send-completion so that it can free/reuse the buffers
> passed to rds_sendmsg(). Notification of send-completion will
> identify each message-buffer by a cookie that the application
> must specify as ancillary data to rds_sendmsg().
> The ancillary data in this case has cmsg_level == SOL_RDS
> and cmsg_type == RDS_CMSG_ZCOPY_COOKIE.
>
> Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com>
> ---

If the missing break is intentional, no need to respin just for the other
minor comments.

> @@ -341,12 +341,14 @@ struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned in
>         return rm;
>  }
>
> -int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from)
> +int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
> +                              bool zcopy)
>  {
>         unsigned long to_copy, nbytes;
>         unsigned long sg_off;
>         struct scatterlist *sg;
>         int ret = 0;
> +       int length = iov_iter_count(from);
>
>         rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from));
>
> @@ -356,6 +358,53 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from)
>         sg = rm->data.op_sg;
>         sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */
>
> +       if (zcopy) {
> +               int total_copied = 0;
> +               struct sk_buff *skb;
> +
> +               skb = alloc_skb(SO_EE_ORIGIN_MAX_ZCOOKIES * sizeof(u32),
> +                               GFP_KERNEL);
> +               if (!skb)
> +                       return -ENOMEM;
> +               rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb);
> +               memset(rm->data.op_mmp_znotifier, 0,
> +                      sizeof(*rm->data.op_mmp_znotifier));

Not strictly needed, as alloc_skb clears skb->cb[]

> +               if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp,
> +                                           length)) {
> +                       consume_skb(skb);
> +                       rm->data.op_mmp_znotifier = NULL;
> +                       return -ENOMEM;
> +               }

One less action to revert if moving the mm_account_pinned_pages check
before assigning op_mmp_znotifier.

Conversely, move to an err: label at the end to be able to deduplicate
with the error branch introduced below.

> @@ -875,12 +875,13 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
>   * rds_message is getting to be quite complicated, and we'd like to allocate
>   * it all in one go. This figures out how big it needs to be up front.
>   */
> -static int rds_rm_size(struct msghdr *msg, int data_len)
> +static int rds_rm_size(struct msghdr *msg, int num_sgs)
>  {
>         struct cmsghdr *cmsg;
>         int size = 0;
>         int cmsg_groups = 0;
>         int retval;
> +       bool zcopy_cookie = false;
>
>         for_each_cmsghdr(cmsg, msg) {
>                 if (!CMSG_OK(msg, cmsg))
> @@ -899,6 +900,8 @@ static int rds_rm_size(struct msghdr *msg, int data_len)
>
>                         break;
>
> +               case RDS_CMSG_ZCOPY_COOKIE:
> +                       zcopy_cookie = true;

break, or if intended to fall through, please label as such.

>                 case RDS_CMSG_RDMA_DEST:
>                 case RDS_CMSG_RDMA_MAP:
>                         cmsg_groups |= 2;
Sowmini Varadhan Feb. 15, 2018, 12:09 a.m. UTC | #5
On (02/14/18 18:48), Willem de Bruijn wrote:
> 
> If the missing break is intentional, no need to respin just for the other
> minor comments.

yes the missing break is intentional-  the function returns the
size of the scatterlist needed for RDMA, and RDS_CMSG_ZCOPY_COOKIE
(like RDMA_DEST and RDMA_MAP) is meta-data that does not change
that size.

I expect to be in the neighborhood of this  code pretty soon, to get
the additional opimization of passing up the zcopy completion
as part of recvmsg (see the discussion in
https://www.mail-archive.com/netdev@vger.kernel.org/msg212788.html)
 
I can take care of the other code-cleanup comment suggestions in
here at that time..

--Sowmini
Willem de Bruijn Feb. 15, 2018, 12:15 a.m. UTC | #6
On Wed, Feb 14, 2018 at 7:09 PM, Sowmini Varadhan
<sowmini.varadhan@oracle.com> wrote:
> On (02/14/18 18:48), Willem de Bruijn wrote:
>>
>> If the missing break is intentional, no need to respin just for the other
>> minor comments.
>
> yes the missing break is intentional-  the function returns the
> size of the scatterlist needed for RDMA, and RDS_CMSG_ZCOPY_COOKIE
> (like RDMA_DEST and RDMA_MAP) is meta-data that does not change
> that size.
>
> I expect to be in the neighborhood of this  code pretty soon, to get
> the additional opimization of passing up the zcopy completion
> as part of recvmsg (see the discussion in
> https://www.mail-archive.com/netdev@vger.kernel.org/msg212788.html)
>
> I can take care of the other code-cleanup comment suggestions in
> here at that time..

Sounds good.
diff mbox series

Patch

diff --git a/include/uapi/linux/rds.h b/include/uapi/linux/rds.h
index e71d449..12e3bca 100644
--- a/include/uapi/linux/rds.h
+++ b/include/uapi/linux/rds.h
@@ -103,6 +103,7 @@ 
 #define RDS_CMSG_MASKED_ATOMIC_FADD	8
 #define RDS_CMSG_MASKED_ATOMIC_CSWP	9
 #define RDS_CMSG_RXPATH_LATENCY		11
+#define	RDS_CMSG_ZCOPY_COOKIE		12
 
 #define RDS_INFO_FIRST			10000
 #define RDS_INFO_COUNTERS		10000
diff --git a/net/rds/message.c b/net/rds/message.c
index d874b74..e499566 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -341,12 +341,14 @@  struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned in
 	return rm;
 }
 
-int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from)
+int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
+			       bool zcopy)
 {
 	unsigned long to_copy, nbytes;
 	unsigned long sg_off;
 	struct scatterlist *sg;
 	int ret = 0;
+	int length = iov_iter_count(from);
 
 	rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from));
 
@@ -356,6 +358,53 @@  int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from)
 	sg = rm->data.op_sg;
 	sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */
 
+	if (zcopy) {
+		int total_copied = 0;
+		struct sk_buff *skb;
+
+		skb = alloc_skb(SO_EE_ORIGIN_MAX_ZCOOKIES * sizeof(u32),
+				GFP_KERNEL);
+		if (!skb)
+			return -ENOMEM;
+		rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb);
+		memset(rm->data.op_mmp_znotifier, 0,
+		       sizeof(*rm->data.op_mmp_znotifier));
+		if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp,
+					    length)) {
+			consume_skb(skb);
+			rm->data.op_mmp_znotifier = NULL;
+			return -ENOMEM;
+		}
+		while (iov_iter_count(from)) {
+			struct page *pages;
+			size_t start;
+			ssize_t copied;
+
+			copied = iov_iter_get_pages(from, &pages, PAGE_SIZE,
+						    1, &start);
+			if (copied < 0) {
+				struct mmpin *mmp;
+				int i;
+
+				for (i = 0; i < rm->data.op_nents; i++)
+					put_page(sg_page(&rm->data.op_sg[i]));
+				mmp = &rm->data.op_mmp_znotifier->z_mmp;
+				mm_unaccount_pinned_pages(mmp);
+				consume_skb(skb);
+				rm->data.op_mmp_znotifier = NULL;
+				return -EFAULT;
+			}
+			total_copied += copied;
+			iov_iter_advance(from, copied);
+			length -= copied;
+			sg_set_page(sg, pages, copied, start);
+			rm->data.op_nents++;
+			sg++;
+		}
+		WARN_ON_ONCE(length != 0);
+		return ret;
+	} /* zcopy */
+
 	while (iov_iter_count(from)) {
 		if (!sg_page(sg)) {
 			ret = rds_page_remainder_alloc(sg, iov_iter_count(from),
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 6e8fc4c..dfdc9b3 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -784,7 +784,8 @@  void rds_for_each_conn_info(struct socket *sock, unsigned int len,
 /* message.c */
 struct rds_message *rds_message_alloc(unsigned int nents, gfp_t gfp);
 struct scatterlist *rds_message_alloc_sgs(struct rds_message *rm, int nents);
-int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from);
+int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
+			       bool zcopy);
 struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned int total_len);
 void rds_message_populate_header(struct rds_header *hdr, __be16 sport,
 				 __be16 dport, u64 seq);
diff --git a/net/rds/send.c b/net/rds/send.c
index 5ac0925..80171cf 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -875,12 +875,13 @@  static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
  * rds_message is getting to be quite complicated, and we'd like to allocate
  * it all in one go. This figures out how big it needs to be up front.
  */
-static int rds_rm_size(struct msghdr *msg, int data_len)
+static int rds_rm_size(struct msghdr *msg, int num_sgs)
 {
 	struct cmsghdr *cmsg;
 	int size = 0;
 	int cmsg_groups = 0;
 	int retval;
+	bool zcopy_cookie = false;
 
 	for_each_cmsghdr(cmsg, msg) {
 		if (!CMSG_OK(msg, cmsg))
@@ -899,6 +900,8 @@  static int rds_rm_size(struct msghdr *msg, int data_len)
 
 			break;
 
+		case RDS_CMSG_ZCOPY_COOKIE:
+			zcopy_cookie = true;
 		case RDS_CMSG_RDMA_DEST:
 		case RDS_CMSG_RDMA_MAP:
 			cmsg_groups |= 2;
@@ -919,7 +922,10 @@  static int rds_rm_size(struct msghdr *msg, int data_len)
 
 	}
 
-	size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist);
+	if ((msg->msg_flags & MSG_ZEROCOPY) && !zcopy_cookie)
+		return -EINVAL;
+
+	size += num_sgs * sizeof(struct scatterlist);
 
 	/* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */
 	if (cmsg_groups == 3)
@@ -928,6 +934,18 @@  static int rds_rm_size(struct msghdr *msg, int data_len)
 	return size;
 }
 
+static int rds_cmsg_zcopy(struct rds_sock *rs, struct rds_message *rm,
+			  struct cmsghdr *cmsg)
+{
+	u32 *cookie;
+
+	if (cmsg->cmsg_len < CMSG_LEN(sizeof(*cookie)))
+		return -EINVAL;
+	cookie = CMSG_DATA(cmsg);
+	rm->data.op_mmp_znotifier->z_cookie = *cookie;
+	return 0;
+}
+
 static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
 			 struct msghdr *msg, int *allocated_mr)
 {
@@ -970,6 +988,10 @@  static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
 			ret = rds_cmsg_atomic(rs, rm, cmsg);
 			break;
 
+		case RDS_CMSG_ZCOPY_COOKIE:
+			ret = rds_cmsg_zcopy(rs, rm, cmsg);
+			break;
+
 		default:
 			return -EINVAL;
 		}
@@ -1040,10 +1062,13 @@  int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
 	long timeo = sock_sndtimeo(sk, nonblock);
 	struct rds_conn_path *cpath;
 	size_t total_payload_len = payload_len, rdma_payload_len = 0;
+	bool zcopy = ((msg->msg_flags & MSG_ZEROCOPY) &&
+		      sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY));
+	int num_sgs = ceil(payload_len, PAGE_SIZE);
 
 	/* Mirror Linux UDP mirror of BSD error message compatibility */
 	/* XXX: Perhaps MSG_MORE someday */
-	if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT)) {
+	if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT | MSG_ZEROCOPY)) {
 		ret = -EOPNOTSUPP;
 		goto out;
 	}
@@ -1087,8 +1112,15 @@  int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
 		goto out;
 	}
 
+	if (zcopy) {
+		if (rs->rs_transport->t_type != RDS_TRANS_TCP) {
+			ret = -EOPNOTSUPP;
+			goto out;
+		}
+		num_sgs = iov_iter_npages(&msg->msg_iter, INT_MAX);
+	}
 	/* size of rm including all sgs */
-	ret = rds_rm_size(msg, payload_len);
+	ret = rds_rm_size(msg, num_sgs);
 	if (ret < 0)
 		goto out;
 
@@ -1100,12 +1132,12 @@  int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
 
 	/* Attach data to the rm */
 	if (payload_len) {
-		rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE));
+		rm->data.op_sg = rds_message_alloc_sgs(rm, num_sgs);
 		if (!rm->data.op_sg) {
 			ret = -ENOMEM;
 			goto out;
 		}
-		ret = rds_message_copy_from_user(rm, &msg->msg_iter);
+		ret = rds_message_copy_from_user(rm, &msg->msg_iter, zcopy);
 		if (ret)
 			goto out;
 	}