diff mbox series

[net-next,4/7] rds: support for zcopy completion notification

Message ID 1193bb432985523ff75715ce68eb7126dac7f018.1516793252.git.sowmini.varadhan@oracle.com
State Changes Requested, archived
Delegated to: David Miller
Headers show
Series RDS zerocopy support | expand

Commit Message

Sowmini Varadhan Jan. 24, 2018, 11:45 a.m. UTC
RDS removes a datagram (rds_message) from the retransmit queue when
an ACK is received. The ACK indicates that the receiver has queued
the RDS datagram, so that the sender can safely forget the datagram.
When all references to the rds_message are quiesced, rds_message_purge
is called to release resources used by the rds_message

If the datagram to be removed had pinned pages set up, add
an entry to the rs->rs_znotify_queue so that the notifcation
will be sent up via rds_rm_zerocopy_callback() when the
rds_message is eventually freed by rds_message_purge.

rds_rm_zerocopy_callback() attempts to batch the number of cookies
sent with each notification  to a max of SO_EE_ORIGIN_MAX_ZCOOKIES.
Each time a cookie is released by rds_message_purge(), the
rs_znotify_queue is checked to see if the MAX_ZCOOKIES batch limit
has been exceeded (in which case we send up a notification). If the
limit has not been exceeded, the cookie is added to the rs_znotify_queue
and a timer is set up, to make sure the cookie notification will
be sent after an upper bound of  RDS_REAP_TIMEOUT (should the
traffic rate slow down)

Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com>
---
 include/uapi/linux/errqueue.h |    2 +
 net/rds/af_rds.c              |    7 +++
 net/rds/message.c             |  104 ++++++++++++++++++++++++++++++++++++++---
 net/rds/rds.h                 |   20 ++++++++
 net/rds/recv.c                |    2 +
 5 files changed, 128 insertions(+), 7 deletions(-)

Comments

Willem de Bruijn Jan. 28, 2018, 1:56 p.m. UTC | #1
On Wed, Jan 24, 2018 at 12:45 PM, Sowmini Varadhan
<sowmini.varadhan@oracle.com> wrote:
> RDS removes a datagram (rds_message) from the retransmit queue when
> an ACK is received. The ACK indicates that the receiver has queued
> the RDS datagram, so that the sender can safely forget the datagram.
> When all references to the rds_message are quiesced, rds_message_purge
> is called to release resources used by the rds_message
>
> If the datagram to be removed had pinned pages set up, add
> an entry to the rs->rs_znotify_queue so that the notifcation
> will be sent up via rds_rm_zerocopy_callback() when the
> rds_message is eventually freed by rds_message_purge.
>
> rds_rm_zerocopy_callback() attempts to batch the number of cookies
> sent with each notification  to a max of SO_EE_ORIGIN_MAX_ZCOOKIES.
> Each time a cookie is released by rds_message_purge(), the
> rs_znotify_queue is checked to see if the MAX_ZCOOKIES batch limit
> has been exceeded (in which case we send up a notification). If the
> limit has not been exceeded, the cookie is added to the rs_znotify_queue
> and a timer is set up

An alternative that does not require a timer is to batch on the sk
error queue itself, like tcp zerocopy. That queues the first notification
skb on the error queue without any notification latency.

Then, if a subsequent notification comes in while another is pending
with < MAX zcookies, it coalesces the new notification onto the pending
skb and consumes the other. For RDS notifications, the implementation
is an extra skb_put + uint32_t assignment.

Optionally, the socket can trigger another sk_error_report on each
new notification.

> +static void rds_rm_zerocopy_callback(struct rds_sock *rs,
> +                                    struct rds_znotifier *znotifier,
> +                                    bool force)
> +{
> +       struct sock *sk = rds_rs_to_sk(rs);
> +       struct sk_buff *skb;
> +       struct sock_exterr_skb *serr;
> +       unsigned long flags;
> +       u32 *ptr;
> +       int ncookies = 0, i;
> +       struct rds_znotifier *znotif, *ztmp, *first;
> +       LIST_HEAD(tmp_list);
> +
> +       spin_lock_irqsave(&rs->rs_lock, flags);
> +       ncookies = rs->rs_ncookies;
> +       if (ncookies < SO_EE_ORIGIN_MAX_ZCOOKIES && !force) {
> +               if (znotifier) { /* add this cookie to the list and return */

can be checked before taking lock.

More importantly, when is this ever NULL? This function is a callback
for a zerocopy struct of type znotifier. Is it doing double duty to flush
any outstanding if znotifier == NULL && force == true? If so, the first
condition probably never occurs unless force == true and thus the
second is redundant.

> +                       list_add_tail(&znotifier->z_list,
> +                                     &rs->rs_znotify_queue);
> +                       rs->rs_ncookies++;
> +               }
> +               spin_unlock_irqrestore(&rs->rs_lock, flags);
> +               return;
> +       }
> +       if (!ncookies) { /* timer finds a reaped list */
> +               spin_unlock_irqrestore(&rs->rs_lock, flags);
> +               return;
> +       }
> +       /* reap existing cookie list if we have hit the max, then add
> +        * new cookie to the list for next round of reaping.
> +        */
> +       list_splice(&rs->rs_znotify_queue, &tmp_list); /* reap now */
> +       INIT_LIST_HEAD(&rs->rs_znotify_queue);
> +       rs->rs_ncookies = 0;
> +       if (znotifier) { /* for next round */

This adds unnecessary notification latency to delivery of current
notification. The latest notification can be appended to tmp_list and
sent up immediately.

> +               list_add_tail(&znotifier->z_list, &rs->rs_znotify_queue);
> +               rs->rs_ncookies++;
> +       }
> +       spin_unlock_irqrestore(&rs->rs_lock, flags);
> +
> +       first = list_first_entry(&tmp_list, struct rds_znotifier, z_list);
> +       znotif = list_next_entry(first, z_list);
> +       list_del(&first->z_list);
> +
> +       skb = rds_skb_from_znotifier(first);
> +       ptr = skb_put(skb, ncookies * sizeof(u32));
> +       i = 0;
> +       ptr[i++] = first->z_cookie;
> +
> +       list_for_each_entry_safe(znotif, ztmp, &tmp_list, z_list) {
> +               list_del(&znotif->z_list);
> +               ptr[i++] = znotif->z_cookie;
> +               mm_unaccount_pinned_pages(&znotif->z_mmp);
> +               consume_skb(rds_skb_from_znotifier(znotif));
> +       }
> +       WARN_ON(!list_empty(&tmp_list));
> +
> +       serr = SKB_EXT_ERR(skb);
> +       serr->ee.ee_errno = 0;
> +       serr->ee.ee_origin = SO_EE_ORIGIN_ZCOOKIE;
> +       serr->ee.ee_data = ncookies;
> +       serr->ee.ee_info = 0;
> +       serr->ee.ee_code |= SO_EE_CODE_ZEROCOPY_COPIED;
> +
> +       if (sock_queue_err_skb(sk, skb))
> +               consume_skb(skb);
> +}
> +
> +void rs_zcopy_notify(struct timer_list *t)
> +{
> +       struct rds_sock *rs = from_timer(rs, t, rs_cookie_timer);
> +
> +       rds_rm_zerocopy_callback(rs, NULL, true);
> +}
> +
>  /*
>   * This relies on dma_map_sg() not touching sg[].page during merging.
>   */
>  static void rds_message_purge(struct rds_message *rm)
>  {
>         unsigned long i, flags;
> +       bool zcopy = false;
>
>         if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags)))
>                 return;
>
> +       spin_lock_irqsave(&rm->m_rs_lock, flags);
> +       if (rm->data.op_mmp_znotifier && rm->m_rs) {
> +               struct rds_sock *rs = rm->m_rs;
> +
> +               zcopy = true;
> +               rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier, false);
> +               rm->data.op_mmp_znotifier = NULL;
> +               (void)mod_timer(&rs->rs_cookie_timer, RDS_REAP_TIMEOUT);

Resetting timeout on each queued notification causes unbound
notification latency for previous notifications on the queue.

> +
> +               sock_put(rds_rs_to_sk(rs));
> +               rm->m_rs = NULL;

These two lines are now called only if znotifier is true, but
used to be called whenever rm->m_rs != NULL. Intentional?

> +       }
> +       spin_unlock_irqrestore(&rm->m_rs_lock, flags);
> +
>         for (i = 0; i < rm->data.op_nents; i++) {
>                 rdsdebug("putting data page %p\n", (void *)sg_page(&rm->data.op_sg[i]));
>                 /* XXX will have to put_page for page refs */
> -               __free_page(sg_page(&rm->data.op_sg[i]));
> +               if (!zcopy)
> +                       __free_page(sg_page(&rm->data.op_sg[i]));
> +               else
> +                       put_page(sg_page(&rm->data.op_sg[i]));
>         }
>         rm->data.op_nents = 0;
> -       spin_lock_irqsave(&rm->m_rs_lock, flags);
> -       if (rm->m_rs) {
> -               sock_put(rds_rs_to_sk(rm->m_rs));
> -               rm->m_rs = NULL;
> -       }
> -       spin_unlock_irqrestore(&rm->m_rs_lock, flags);
>
>         if (rm->rdma.op_active)
>                 rds_rdma_free_op(&rm->rdma);
Sowmini Varadhan Jan. 28, 2018, 4:15 p.m. UTC | #2
thanks for taking the time to go through the code!

> 
> An alternative that does not require a timer is to batch on the sk
> error queue itself, like tcp zerocopy. That queues the first notification
> skb on the error queue without any notification latency.
> 
> Then, if a subsequent notification comes in while another is pending
> with < MAX zcookies, it coalesces the new notification onto the pending
> skb and consumes the other. For RDS notifications, the implementation
> is an extra skb_put + uint32_t assignment.

This is an interesting idea, let me give it a try.

> Optionally, the socket can trigger another sk_error_report on each
> new notification.

I was trying to avoid that- an upcall for each message is not a good
idea when you have high traffic and are able to free multiple rds_messages
at a  time.

> > +static void rds_rm_zerocopy_callback(struct rds_sock *rs,
> > +                                    struct rds_znotifier *znotifier,
> > +                                    bool force)
> > +{
> > +       struct sock *sk = rds_rs_to_sk(rs);
> > +       struct sk_buff *skb;
> > +       struct sock_exterr_skb *serr;
> > +       unsigned long flags;
> > +       u32 *ptr;
> > +       int ncookies = 0, i;
> > +       struct rds_znotifier *znotif, *ztmp, *first;
> > +       LIST_HEAD(tmp_list);
> > +
> > +       spin_lock_irqsave(&rs->rs_lock, flags);
> > +       ncookies = rs->rs_ncookies;
> > +       if (ncookies < SO_EE_ORIGIN_MAX_ZCOOKIES && !force) {
> > +               if (znotifier) { /* add this cookie to the list and return */
> 
> can be checked before taking lock.
> 
> More importantly, when is this ever NULL? 

It is null when invoked from tthe timer callback (rs_zcopy_notify() 
going off because havent had any traffic for the expiration interval
so we want to send out pending notifications, but dont have any znotifier
in this case). But you are right in that:

> This function is a callback
> for a zerocopy struct of type znotifier. Is it doing double duty to flush
> any outstanding if znotifier == NULL && force == true? If so, the first
> condition probably never occurs unless force == true and thus the
> second is redundant.

yes, force can simply be !znotifier. 

I dont quite follow the "can be checked before taking the lock
comment though"- the lock is needed to make sure we atomically do
the lists "add new entry and potentially flush" operation.
the check is for the "potentially" part of that operation, so 
I'm not seeing how it would help to move it out of the lock.

having said  all that, I like the earlier suggestion of just 
batching on the error_queue itself. If that works out without any
issues, all of this stuff may not be needed, so let me give that
a shot first.

> This adds unnecessary notification latency to delivery of current
> notification. The latest notification can be appended to tmp_list and
> sent up immediately.

true it if fits within the MAX cookies limit. 

> > +               (void)mod_timer(&rs->rs_cookie_timer, RDS_REAP_TIMEOUT);
> 
> Resetting timeout on each queued notification causes unbound
> notification latency for previous notifications on the queue.

I'm not sure I get that comment. RDS_REAP_TIMEOUT is the upper bound
for sending notification. If, in the interim, we get back a TCP
ack that lets us reap a bunch of messages, we'd go and flush the
queue anyway, so the RDS_REAP_TIMEOUT will not matter. Can you 
elaborate on your concern?


> > +
> > +               sock_put(rds_rs_to_sk(rs));
> > +               rm->m_rs = NULL;
> 
> These two lines are now called only if znotifier is true, but
> used to be called whenever rm->m_rs != NULL. Intentional?

good catch, no that's a bug. Thanks for flagging.

Also, ACK to all the comments for patch 5/7. Will fix for next round.
Willem de Bruijn Jan. 28, 2018, 6:46 p.m. UTC | #3
On Sun, Jan 28, 2018 at 5:15 PM, Sowmini Varadhan
<sowmini.varadhan@oracle.com> wrote:
>
> thanks for taking the time to go through the code!
>
>>
>> An alternative that does not require a timer is to batch on the sk
>> error queue itself, like tcp zerocopy. That queues the first notification
>> skb on the error queue without any notification latency.
>>
>> Then, if a subsequent notification comes in while another is pending
>> with < MAX zcookies, it coalesces the new notification onto the pending
>> skb and consumes the other. For RDS notifications, the implementation
>> is an extra skb_put + uint32_t assignment.
>
> This is an interesting idea, let me give it a try.
>
>> Optionally, the socket can trigger another sk_error_report on each
>> new notification.
>
> I was trying to avoid that- an upcall for each message is not a good
> idea when you have high traffic and are able to free multiple rds_messages
> at a  time.

Agreed. It was only a suggestion if that would be a reason
for you to not try the above idea.

>> > +static void rds_rm_zerocopy_callback(struct rds_sock *rs,
>> > +                                    struct rds_znotifier *znotifier,
>> > +                                    bool force)
>> > +{
>> > +       struct sock *sk = rds_rs_to_sk(rs);
>> > +       struct sk_buff *skb;
>> > +       struct sock_exterr_skb *serr;
>> > +       unsigned long flags;
>> > +       u32 *ptr;
>> > +       int ncookies = 0, i;
>> > +       struct rds_znotifier *znotif, *ztmp, *first;
>> > +       LIST_HEAD(tmp_list);
>> > +
>> > +       spin_lock_irqsave(&rs->rs_lock, flags);
>> > +       ncookies = rs->rs_ncookies;
>> > +       if (ncookies < SO_EE_ORIGIN_MAX_ZCOOKIES && !force) {
>> > +               if (znotifier) { /* add this cookie to the list and return */
>>
>> can be checked before taking lock.
>>
>> More importantly, when is this ever NULL?
>
> It is null when invoked from tthe timer callback (rs_zcopy_notify()
> going off because havent had any traffic for the expiration interval
> so we want to send out pending notifications, but dont have any znotifier
> in this case). But you are right in that:
>
>> This function is a callback
>> for a zerocopy struct of type znotifier. Is it doing double duty to flush
>> any outstanding if znotifier == NULL && force == true? If so, the first
>> condition probably never occurs unless force == true and thus the
>> second is redundant.
>
> yes, force can simply be !znotifier.
>
> I dont quite follow the "can be checked before taking the lock
> comment though"- the lock is needed to make sure we atomically do
> the lists "add new entry and potentially flush" operation.
> the check is for the "potentially" part of that operation, so
> I'm not seeing how it would help to move it out of the lock.
>
> having said  all that, I like the earlier suggestion of just
> batching on the error_queue itself. If that works out without any
> issues, all of this stuff may not be needed, so let me give that
> a shot first.

Sounds great!

>
>> This adds unnecessary notification latency to delivery of current
>> notification. The latest notification can be appended to tmp_list and
>> sent up immediately.
>
> true it if fits within the MAX cookies limit.
>
>> > +               (void)mod_timer(&rs->rs_cookie_timer, RDS_REAP_TIMEOUT);
>>
>> Resetting timeout on each queued notification causes unbound
>> notification latency for previous notifications on the queue.
>
> I'm not sure I get that comment. RDS_REAP_TIMEOUT is the upper bound
> for sending notification. If, in the interim, we get back a TCP
> ack that lets us reap a bunch of messages, we'd go and flush the
> queue anyway, so the RDS_REAP_TIMEOUT will not matter. Can you
> elaborate on your concern?

I meant that if packet rate is low enough to require the timer
to fire, then by resetting the timer on each notification, the maximum
timeout for the first enqueued notification is the max cookie limit
* timeout, as opposed to timeout if set once and never modified.

Some physical device drivers do the same for their interrupt
moderation. You really want the configured timeout to be the
upper bound.
diff mbox series

Patch

diff --git a/include/uapi/linux/errqueue.h b/include/uapi/linux/errqueue.h
index dc64cfa..28812ed 100644
--- a/include/uapi/linux/errqueue.h
+++ b/include/uapi/linux/errqueue.h
@@ -20,11 +20,13 @@  struct sock_extended_err {
 #define SO_EE_ORIGIN_ICMP6	3
 #define SO_EE_ORIGIN_TXSTATUS	4
 #define SO_EE_ORIGIN_ZEROCOPY	5
+#define SO_EE_ORIGIN_ZCOOKIE	6
 #define SO_EE_ORIGIN_TIMESTAMPING SO_EE_ORIGIN_TXSTATUS
 
 #define SO_EE_OFFENDER(ee)	((struct sockaddr*)((ee)+1))
 
 #define SO_EE_CODE_ZEROCOPY_COPIED	1
+#define	SO_EE_ORIGIN_MAX_ZCOOKIES	8
 
 /**
  *	struct scm_timestamping - timestamps exposed through cmsg
diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c
index b405f77..49a81e8 100644
--- a/net/rds/af_rds.c
+++ b/net/rds/af_rds.c
@@ -66,6 +66,8 @@  static int rds_release(struct socket *sock)
 	rs = rds_sk_to_rs(sk);
 
 	sock_orphan(sk);
+
+	del_timer_sync(&rs->rs_cookie_timer);
 	/* Note - rds_clear_recv_queue grabs rs_recv_lock, so
 	 * that ensures the recv path has completed messing
 	 * with the socket. */
@@ -183,6 +185,8 @@  static unsigned int rds_poll(struct file *file, struct socket *sock,
 		mask |= (POLLIN | POLLRDNORM);
 	if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
 		mask |= (POLLOUT | POLLWRNORM);
+	if (sk->sk_err || !skb_queue_empty(&sk->sk_error_queue))
+		mask |= POLLERR;
 	read_unlock_irqrestore(&rs->rs_recv_lock, flags);
 
 	/* clear state any time we wake a seen-congested socket */
@@ -511,6 +515,9 @@  static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
 	INIT_LIST_HEAD(&rs->rs_send_queue);
 	INIT_LIST_HEAD(&rs->rs_recv_queue);
 	INIT_LIST_HEAD(&rs->rs_notify_queue);
+	INIT_LIST_HEAD(&rs->rs_znotify_queue);
+	rs->rs_ncookies = 0;
+	timer_setup(&rs->rs_cookie_timer, rs_zcopy_notify, 0);
 	INIT_LIST_HEAD(&rs->rs_cong_list);
 	spin_lock_init(&rs->rs_rdma_lock);
 	rs->rs_rdma_keys = RB_ROOT;
diff --git a/net/rds/message.c b/net/rds/message.c
index ef3daaf..7ca968a 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -33,6 +33,9 @@ 
 #include <linux/kernel.h>
 #include <linux/slab.h>
 #include <linux/export.h>
+#include <linux/skbuff.h>
+#include <linux/list.h>
+#include <linux/errqueue.h>
 
 #include "rds.h"
 
@@ -53,28 +56,115 @@  void rds_message_addref(struct rds_message *rm)
 }
 EXPORT_SYMBOL_GPL(rds_message_addref);
 
+static void rds_rm_zerocopy_callback(struct rds_sock *rs,
+				     struct rds_znotifier *znotifier,
+				     bool force)
+{
+	struct sock *sk = rds_rs_to_sk(rs);
+	struct sk_buff *skb;
+	struct sock_exterr_skb *serr;
+	unsigned long flags;
+	u32 *ptr;
+	int ncookies = 0, i;
+	struct rds_znotifier *znotif, *ztmp, *first;
+	LIST_HEAD(tmp_list);
+
+	spin_lock_irqsave(&rs->rs_lock, flags);
+	ncookies = rs->rs_ncookies;
+	if (ncookies < SO_EE_ORIGIN_MAX_ZCOOKIES && !force) {
+		if (znotifier) { /* add this cookie to the list and return */
+			list_add_tail(&znotifier->z_list,
+				      &rs->rs_znotify_queue);
+			rs->rs_ncookies++;
+		}
+		spin_unlock_irqrestore(&rs->rs_lock, flags);
+		return;
+	}
+	if (!ncookies) { /* timer finds a reaped list */
+		spin_unlock_irqrestore(&rs->rs_lock, flags);
+		return;
+	}
+	/* reap existing cookie list if we have hit the max, then add
+	 * new cookie to the list for next round of reaping.
+	 */
+	list_splice(&rs->rs_znotify_queue, &tmp_list); /* reap now */
+	INIT_LIST_HEAD(&rs->rs_znotify_queue);
+	rs->rs_ncookies = 0;
+	if (znotifier) { /* for next round */
+		list_add_tail(&znotifier->z_list, &rs->rs_znotify_queue);
+		rs->rs_ncookies++;
+	}
+	spin_unlock_irqrestore(&rs->rs_lock, flags);
+
+	first = list_first_entry(&tmp_list, struct rds_znotifier, z_list);
+	znotif = list_next_entry(first, z_list);
+	list_del(&first->z_list);
+
+	skb = rds_skb_from_znotifier(first);
+	ptr = skb_put(skb, ncookies * sizeof(u32));
+	i = 0;
+	ptr[i++] = first->z_cookie;
+
+	list_for_each_entry_safe(znotif, ztmp, &tmp_list, z_list) {
+		list_del(&znotif->z_list);
+		ptr[i++] = znotif->z_cookie;
+		mm_unaccount_pinned_pages(&znotif->z_mmp);
+		consume_skb(rds_skb_from_znotifier(znotif));
+	}
+	WARN_ON(!list_empty(&tmp_list));
+
+	serr = SKB_EXT_ERR(skb);
+	serr->ee.ee_errno = 0;
+	serr->ee.ee_origin = SO_EE_ORIGIN_ZCOOKIE;
+	serr->ee.ee_data = ncookies;
+	serr->ee.ee_info = 0;
+	serr->ee.ee_code |= SO_EE_CODE_ZEROCOPY_COPIED;
+
+	if (sock_queue_err_skb(sk, skb))
+		consume_skb(skb);
+}
+
+void rs_zcopy_notify(struct timer_list *t)
+{
+	struct rds_sock *rs = from_timer(rs, t, rs_cookie_timer);
+
+	rds_rm_zerocopy_callback(rs, NULL, true);
+}
+
 /*
  * This relies on dma_map_sg() not touching sg[].page during merging.
  */
 static void rds_message_purge(struct rds_message *rm)
 {
 	unsigned long i, flags;
+	bool zcopy = false;
 
 	if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags)))
 		return;
 
+	spin_lock_irqsave(&rm->m_rs_lock, flags);
+	if (rm->data.op_mmp_znotifier && rm->m_rs) {
+		struct rds_sock *rs = rm->m_rs;
+
+		zcopy = true;
+		rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier, false);
+		rm->data.op_mmp_znotifier = NULL;
+		(void)mod_timer(&rs->rs_cookie_timer, RDS_REAP_TIMEOUT);
+
+		sock_put(rds_rs_to_sk(rs));
+		rm->m_rs = NULL;
+	}
+	spin_unlock_irqrestore(&rm->m_rs_lock, flags);
+
 	for (i = 0; i < rm->data.op_nents; i++) {
 		rdsdebug("putting data page %p\n", (void *)sg_page(&rm->data.op_sg[i]));
 		/* XXX will have to put_page for page refs */
-		__free_page(sg_page(&rm->data.op_sg[i]));
+		if (!zcopy)
+			__free_page(sg_page(&rm->data.op_sg[i]));
+		else
+			put_page(sg_page(&rm->data.op_sg[i]));
 	}
 	rm->data.op_nents = 0;
-	spin_lock_irqsave(&rm->m_rs_lock, flags);
-	if (rm->m_rs) {
-		sock_put(rds_rs_to_sk(rm->m_rs));
-		rm->m_rs = NULL;
-	}
-	spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 
 	if (rm->rdma.op_active)
 		rds_rdma_free_op(&rm->rdma);
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 374ae83..c375dd8 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -356,6 +356,19 @@  static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie)
 #define RDS_MSG_PAGEVEC		7
 #define RDS_MSG_FLUSH		8
 
+struct rds_znotifier {
+	struct list_head	z_list;
+	struct mmpin		z_mmp;
+	u32			z_cookie;
+};
+
+#define	RDS_ZCOPY_SKB(__skb)	((struct rds_znotifier *)&((__skb)->cb[0]))
+
+static inline struct sk_buff *rds_skb_from_znotifier(struct rds_znotifier *z)
+{
+	return container_of((void *)z, struct sk_buff, cb);
+}
+
 struct rds_message {
 	refcount_t		m_refcount;
 	struct list_head	m_sock_item;
@@ -436,6 +449,7 @@  struct rds_message {
 			unsigned int		op_count;
 			unsigned int		op_dmasg;
 			unsigned int		op_dmaoff;
+			struct rds_znotifier	*op_mmp_znotifier;
 			struct scatterlist	*op_sg;
 		} data;
 	};
@@ -588,6 +602,11 @@  struct rds_sock {
 	/* Socket receive path trace points*/
 	u8			rs_rx_traces;
 	u8			rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX];
+
+	struct list_head	rs_znotify_queue; /* zerocopy completion */
+	int			rs_ncookies;
+	struct timer_list	rs_cookie_timer;
+#define	RDS_REAP_TIMEOUT	((HZ / 100) + 1)
 };
 
 static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk)
@@ -785,6 +804,7 @@  int rds_message_next_extension(struct rds_header *hdr,
 void rds_message_put(struct rds_message *rm);
 void rds_message_wait(struct rds_message *rm);
 void rds_message_unmapped(struct rds_message *rm);
+void rs_zcopy_notify(struct timer_list *t);
 
 static inline void rds_message_make_checksum(struct rds_header *hdr)
 {
diff --git a/net/rds/recv.c b/net/rds/recv.c
index b25bcfe..b080961 100644
--- a/net/rds/recv.c
+++ b/net/rds/recv.c
@@ -594,6 +594,8 @@  int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
 
 	if (msg_flags & MSG_OOB)
 		goto out;
+	if (msg_flags & MSG_ERRQUEUE)
+		return sock_recv_errqueue(sk, msg, size, SOL_IP, IP_RECVERR);
 
 	while (1) {
 		/* If there are pending notifications, do those - and nothing else */