Patchwork [v2,2/6] net: introduce lock to protect NetClientState's peer's access

login
register
mail settings
Submitter pingfan liu
Date June 13, 2013, 9:03 a.m.
Message ID <1371114186-8854-3-git-send-email-qemulist@gmail.com>
Download mbox | patch
Permalink /patch/251015/
State New
Headers show

Comments

pingfan liu - June 13, 2013, 9:03 a.m.
From: Liu Ping Fan <pingfank@linux.vnet.ibm.com>

Introduce nc->peer_lock to shield off the race of nc->peer's reader and
deleter. With it, after deleter finish, no new qemu_send_packet_xx()
will append packet to peer->send_queue, therefore no new reference from
packet->sender to nc will exist in nc->peer->send_queue.

Signed-off-by: Liu Ping Fan <pingfank@linux.vnet.ibm.com>
---
 include/net/net.h |  6 +++++
 net/net.c         | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++++---
 net/queue.c       |  4 +--
 3 files changed, 83 insertions(+), 6 deletions(-)
Stefan Hajnoczi - June 18, 2013, 12:25 p.m.
On Thu, Jun 13, 2013 at 05:03:02PM +0800, Liu Ping Fan wrote:
> @@ -67,6 +67,10 @@ struct NetClientState {
>      NetClientInfo *info;
>      int link_down;
>      QTAILQ_ENTRY(NetClientState) next;
> +    /* protect the race access of peer only between reader and writer.
> +         * to resolve the writer's race condition, resort on biglock.
> +         */

Indentation

> @@ -301,6 +303,38 @@ static void qemu_free_net_client(NetClientState *nc)
>      }
>  }
>  
> +/* elimate the reference and sync with exit of rx/tx action.

s/elimate/Eliminate/

> + * And flush out peer's queue.
> + */
> +static void qemu_net_client_detach_flush(NetClientState *nc)
> +{
> +    NetClientState *peer;
> +
> +    /* reader of self's peer field , fixme? the deleters are not concurrent,
> +         * so this pair lock can save.
> +         */

Indentation, also please resolve the fixme.

> @@ -394,6 +433,28 @@ int qemu_can_send_packet(NetClientState *sender)
>      return 1;
>  }
>  
> +int qemu_can_send_packet(NetClientState *sender)
> +{
> +    int ret = 1;
> +
> +    qemu_mutex_lock(&sender->peer_lock);
> +    if (!sender->peer) {
> +        goto unlock;
> +    }
> +
> +    if (sender->peer->receive_disabled) {
> +        ret = 0;
> +        goto unlock;
> +    } else if (sender->peer->info->can_receive &&
> +               !sender->peer->info->can_receive(sender->peer)) {
> +        ret = 0;
> +        goto unlock;
> +    }

Just call qemu_can_send_packet_nolock() instead of duplicating code?
pingfan liu - June 20, 2013, 6:30 a.m.
On Tue, Jun 18, 2013 at 8:25 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
> On Thu, Jun 13, 2013 at 05:03:02PM +0800, Liu Ping Fan wrote:
>> @@ -67,6 +67,10 @@ struct NetClientState {
>>      NetClientInfo *info;
>>      int link_down;
>>      QTAILQ_ENTRY(NetClientState) next;
>> +    /* protect the race access of peer only between reader and writer.
>> +         * to resolve the writer's race condition, resort on biglock.
>> +         */
>
> Indentation
>
Will fix.
>> @@ -301,6 +303,38 @@ static void qemu_free_net_client(NetClientState *nc)
>>      }
>>  }
>>
>> +/* elimate the reference and sync with exit of rx/tx action.
>
> s/elimate/Eliminate/
>
Will fix
>> + * And flush out peer's queue.
>> + */
>> +static void qemu_net_client_detach_flush(NetClientState *nc)
>> +{
>> +    NetClientState *peer;
>> +
>> +    /* reader of self's peer field , fixme? the deleters are not concurrent,
>> +         * so this pair lock can save.
>> +         */
>
> Indentation, also please resolve the fixme.
>
So, here can I take the assumption that the deleters are serialized by
biglock, and remove the lock following this comment?

>> @@ -394,6 +433,28 @@ int qemu_can_send_packet(NetClientState *sender)
>>      return 1;
>>  }
>>
>> +int qemu_can_send_packet(NetClientState *sender)
>> +{
>> +    int ret = 1;
>> +
>> +    qemu_mutex_lock(&sender->peer_lock);
>> +    if (!sender->peer) {
>> +        goto unlock;
>> +    }
>> +
>> +    if (sender->peer->receive_disabled) {
>> +        ret = 0;
>> +        goto unlock;
>> +    } else if (sender->peer->info->can_receive &&
>> +               !sender->peer->info->can_receive(sender->peer)) {
>> +        ret = 0;
>> +        goto unlock;
>> +    }
>
> Just call qemu_can_send_packet_nolock() instead of duplicating code?

Yes.

Thx & regards,
pingfan
Stefan Hajnoczi - June 20, 2013, 7:46 a.m.
On Thu, Jun 20, 2013 at 02:30:30PM +0800, liu ping fan wrote:
> On Tue, Jun 18, 2013 at 8:25 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
> > On Thu, Jun 13, 2013 at 05:03:02PM +0800, Liu Ping Fan wrote:
> >> + * And flush out peer's queue.
> >> + */
> >> +static void qemu_net_client_detach_flush(NetClientState *nc)
> >> +{
> >> +    NetClientState *peer;
> >> +
> >> +    /* reader of self's peer field , fixme? the deleters are not concurrent,
> >> +         * so this pair lock can save.
> >> +         */
> >
> > Indentation, also please resolve the fixme.
> >
> So, here can I take the assumption that the deleters are serialized by
> biglock, and remove the lock following this comment?

Ah, I understand the comment now.  Is there any advantage to dropping
the lock?  IMO it's clearer to take the lock consistently instead of
"optimizing" cases we think only get called from the main loop.
pingfan liu - June 20, 2013, 9:17 a.m.
On Thu, Jun 20, 2013 at 3:46 PM, Stefan Hajnoczi <stefanha@redhat.com> wrote:
> On Thu, Jun 20, 2013 at 02:30:30PM +0800, liu ping fan wrote:
>> On Tue, Jun 18, 2013 at 8:25 PM, Stefan Hajnoczi <stefanha@gmail.com> wrote:
>> > On Thu, Jun 13, 2013 at 05:03:02PM +0800, Liu Ping Fan wrote:
>> >> + * And flush out peer's queue.
>> >> + */
>> >> +static void qemu_net_client_detach_flush(NetClientState *nc)
>> >> +{
>> >> +    NetClientState *peer;
>> >> +
>> >> +    /* reader of self's peer field , fixme? the deleters are not concurrent,
>> >> +         * so this pair lock can save.
>> >> +         */
>> >
>> > Indentation, also please resolve the fixme.
>> >
>> So, here can I take the assumption that the deleters are serialized by
>> biglock, and remove the lock following this comment?
>
> Ah, I understand the comment now.  Is there any advantage to dropping

:), only two atomic instruction in rare path.
> the lock?  IMO it's clearer to take the lock consistently instead of
> "optimizing" cases we think only get called from the main loop.

Reasonable, will keep them.

Patch

diff --git a/include/net/net.h b/include/net/net.h
index 2f72b26..ea46f13 100644
--- a/include/net/net.h
+++ b/include/net/net.h
@@ -67,6 +67,10 @@  struct NetClientState {
     NetClientInfo *info;
     int link_down;
     QTAILQ_ENTRY(NetClientState) next;
+    /* protect the race access of peer only between reader and writer.
+         * to resolve the writer's race condition, resort on biglock.
+         */
+    QemuMutex peer_lock;
     NetClientState *peer;
     NetQueue *send_queue;
     char *model;
@@ -79,6 +83,7 @@  struct NetClientState {
 
 typedef struct NICState {
     NetClientState *ncs;
+    NetClientState **pending_peer;
     NICConf *conf;
     void *opaque;
     bool peer_deleted;
@@ -106,6 +111,7 @@  NetClientState *qemu_find_vlan_client_by_name(Monitor *mon, int vlan_id,
                                               const char *client_str);
 typedef void (*qemu_nic_foreach)(NICState *nic, void *opaque);
 void qemu_foreach_nic(qemu_nic_foreach func, void *opaque);
+int qemu_can_send_packet_nolock(NetClientState *sender);
 int qemu_can_send_packet(NetClientState *nc);
 ssize_t qemu_sendv_packet(NetClientState *nc, const struct iovec *iov,
                           int iovcnt);
diff --git a/net/net.c b/net/net.c
index 43a74e4..717db12 100644
--- a/net/net.c
+++ b/net/net.c
@@ -204,6 +204,7 @@  static void qemu_net_client_setup(NetClientState *nc,
         nc->peer = peer;
         peer->peer = nc;
     }
+    qemu_mutex_init(&nc->peer_lock);
     QTAILQ_INSERT_TAIL(&net_clients, nc, next);
 
     nc->send_queue = qemu_new_net_queue(nc);
@@ -243,6 +244,7 @@  NICState *qemu_new_nic(NetClientInfo *info,
     nic->ncs = (void *)nic + info->size;
     nic->conf = conf;
     nic->opaque = opaque;
+    nic->pending_peer = g_malloc0(sizeof(NetClientState *) * queues);
 
     for (i = 0; i < queues; i++) {
         qemu_net_client_setup(&nic->ncs[i], info, peers[i], model, name,
@@ -301,6 +303,38 @@  static void qemu_free_net_client(NetClientState *nc)
     }
 }
 
+/* elimate the reference and sync with exit of rx/tx action.
+ * And flush out peer's queue.
+ */
+static void qemu_net_client_detach_flush(NetClientState *nc)
+{
+    NetClientState *peer;
+
+    /* reader of self's peer field , fixme? the deleters are not concurrent,
+         * so this pair lock can save.
+         */
+    qemu_mutex_lock(&nc->peer_lock);
+    peer = nc->peer;
+    qemu_mutex_unlock(&nc->peer_lock);
+
+    /* writer of peer's peer field*/
+    if (peer) {
+        /* exclude the race with tx to @nc */
+        qemu_mutex_lock(&peer->peer_lock);
+        peer->peer = NULL;
+        qemu_mutex_unlock(&peer->peer_lock);
+    }
+
+    /* writer of self's peer field*/
+    /*  exclude the race with tx from @nc */
+    qemu_mutex_lock(&nc->peer_lock);
+    nc->peer = NULL;
+    if (peer) {
+        qemu_net_queue_purge(peer->send_queue, nc);
+    }
+    qemu_mutex_unlock(&nc->peer_lock);
+}
+
 void qemu_del_net_client(NetClientState *nc)
 {
     NetClientState *ncs[MAX_QUEUE_NUM];
@@ -331,7 +365,9 @@  void qemu_del_net_client(NetClientState *nc)
         }
 
         for (i = 0; i < queues; i++) {
+            qemu_net_client_detach_flush(ncs[i]);
             qemu_cleanup_net_client(ncs[i]);
+            nic->pending_peer[i] = ncs[i];
         }
 
         return;
@@ -340,6 +376,7 @@  void qemu_del_net_client(NetClientState *nc)
     assert(nc->info->type != NET_CLIENT_OPTIONS_KIND_NIC);
 
     for (i = 0; i < queues; i++) {
+        qemu_net_client_detach_flush(ncs[i]);
         qemu_cleanup_net_client(ncs[i]);
         qemu_free_net_client(ncs[i]);
     }
@@ -352,17 +389,19 @@  void qemu_del_nic(NICState *nic)
     /* If this is a peer NIC and peer has already been deleted, free it now. */
     if (nic->peer_deleted) {
         for (i = 0; i < queues; i++) {
-            qemu_free_net_client(qemu_get_subqueue(nic, i)->peer);
+            qemu_free_net_client(nic->pending_peer[i]);
         }
     }
 
     for (i = queues - 1; i >= 0; i--) {
         NetClientState *nc = qemu_get_subqueue(nic, i);
 
+        qemu_net_client_detach_flush(nc);
         qemu_cleanup_net_client(nc);
         qemu_free_net_client(nc);
     }
 
+    g_free(nic->pending_peer);
     g_free(nic);
 }
 
@@ -379,7 +418,7 @@  void qemu_foreach_nic(qemu_nic_foreach func, void *opaque)
     }
 }
 
-int qemu_can_send_packet(NetClientState *sender)
+int qemu_can_send_packet_nolock(NetClientState *sender)
 {
     if (!sender->peer) {
         return 1;
@@ -394,6 +433,28 @@  int qemu_can_send_packet(NetClientState *sender)
     return 1;
 }
 
+int qemu_can_send_packet(NetClientState *sender)
+{
+    int ret = 1;
+
+    qemu_mutex_lock(&sender->peer_lock);
+    if (!sender->peer) {
+        goto unlock;
+    }
+
+    if (sender->peer->receive_disabled) {
+        ret = 0;
+        goto unlock;
+    } else if (sender->peer->info->can_receive &&
+               !sender->peer->info->can_receive(sender->peer)) {
+        ret = 0;
+        goto unlock;
+    }
+unlock:
+    qemu_mutex_unlock(&sender->peer_lock);
+    return ret;
+}
+
 ssize_t qemu_deliver_packet(NetClientState *sender,
                             unsigned flags,
                             const uint8_t *data,
@@ -457,19 +518,24 @@  static ssize_t qemu_send_packet_async_with_flags(NetClientState *sender,
                                                  NetPacketSent *sent_cb)
 {
     NetQueue *queue;
+    ssize_t sz;
 
 #ifdef DEBUG_NET
     printf("qemu_send_packet_async:\n");
     hex_dump(stdout, buf, size);
 #endif
 
+    qemu_mutex_lock(&sender->peer_lock);
     if (sender->link_down || !sender->peer) {
+        qemu_mutex_unlock(&sender->peer_lock);
         return size;
     }
 
     queue = sender->peer->send_queue;
 
-    return qemu_net_queue_send(queue, sender, flags, buf, size, sent_cb);
+    sz = qemu_net_queue_send(queue, sender, flags, buf, size, sent_cb);
+    qemu_mutex_unlock(&sender->peer_lock);
+    return sz;
 }
 
 ssize_t qemu_send_packet_async(NetClientState *sender,
@@ -537,16 +603,21 @@  ssize_t qemu_sendv_packet_async(NetClientState *sender,
                                 NetPacketSent *sent_cb)
 {
     NetQueue *queue;
+    ssize_t sz;
 
+    qemu_mutex_lock(&sender->peer_lock);
     if (sender->link_down || !sender->peer) {
+        qemu_mutex_unlock(&sender->peer_lock);
         return iov_size(iov, iovcnt);
     }
 
     queue = sender->peer->send_queue;
 
-    return qemu_net_queue_send_iov(queue, sender,
+    sz = qemu_net_queue_send_iov(queue, sender,
                                    QEMU_NET_PACKET_FLAG_NONE,
                                    iov, iovcnt, sent_cb);
+    qemu_mutex_unlock(&sender->peer_lock);
+    return sz;
 }
 
 ssize_t
diff --git a/net/queue.c b/net/queue.c
index c6d4241..7d6c52e 100644
--- a/net/queue.c
+++ b/net/queue.c
@@ -192,7 +192,7 @@  ssize_t qemu_net_queue_send(NetQueue *queue,
 {
     ssize_t ret;
 
-    if (queue->delivering || !qemu_can_send_packet(sender)) {
+    if (queue->delivering || !qemu_can_send_packet_nolock(sender)) {
         qemu_net_queue_append(queue, sender, flags, data, size, sent_cb);
         return 0;
     }
@@ -217,7 +217,7 @@  ssize_t qemu_net_queue_send_iov(NetQueue *queue,
 {
     ssize_t ret;
 
-    if (queue->delivering || !qemu_can_send_packet(sender)) {
+    if (queue->delivering || !qemu_can_send_packet_nolock(sender)) {
         qemu_net_queue_append_iov(queue, sender, flags, iov, iovcnt, sent_cb);
         return 0;
     }