Patchwork [RFC,v5,09/14] net: introduce lock to protect NetClientState's peer's access

login
register
mail settings
Submitter pingfan liu
Date April 26, 2013, 2:47 a.m.
Message ID <1366944455-14239-10-git-send-email-qemulist@gmail.com>
Download mbox | patch
Permalink /patch/239638/
State New
Headers show

Comments

pingfan liu - April 26, 2013, 2:47 a.m.
From: Liu Ping Fan <pingfank@linux.vnet.ibm.com>

Introduce nc->peer_lock to shield off the race of nc->peer's reader and
deleter. With it, after deleter finish, no new qemu_send_packet_xx()
will append packet to peer->send_queue, therefore no new reference from
packet->sender to nc will exist in nc->peer->send_queue.

Signed-off-by: Liu Ping Fan <pingfank@linux.vnet.ibm.com>
---
 include/net/net.h |    7 +++++
 net/net.c         |   79 ++++++++++++++++++++++++++++++++++++++++++++++++++---
 net/queue.c       |    4 +-
 3 files changed, 84 insertions(+), 6 deletions(-)

Patch

diff --git a/include/net/net.h b/include/net/net.h
index 88332d2..54f91ea 100644
--- a/include/net/net.h
+++ b/include/net/net.h
@@ -5,6 +5,7 @@ 
 #include "qemu-common.h"
 #include "qapi/qmp/qdict.h"
 #include "qemu/option.h"
+#include "qemu/thread.h"
 #include "net/queue.h"
 #include "migration/vmstate.h"
 #include "qapi-types.h"
@@ -63,6 +64,10 @@  struct NetClientState {
     NetClientInfo *info;
     int link_down;
     QTAILQ_ENTRY(NetClientState) next;
+    /* protect the race access of peer only between reader and writer.
+         * to resolve the writer's race condition, resort on biglock.
+         */
+    QemuMutex peer_lock;
     NetClientState *peer;
     NetQueue *send_queue;
     char *model;
@@ -75,6 +80,7 @@  struct NetClientState {
 
 typedef struct NICState {
     NetClientState *ncs;
+    NetClientState **pending_peer;
     NICConf *conf;
     void *opaque;
     bool peer_deleted;
@@ -102,6 +108,7 @@  NetClientState *qemu_find_vlan_client_by_name(Monitor *mon, int vlan_id,
                                               const char *client_str);
 typedef void (*qemu_nic_foreach)(NICState *nic, void *opaque);
 void qemu_foreach_nic(qemu_nic_foreach func, void *opaque);
+int qemu_can_send_packet_nolock(NetClientState *sender);
 int qemu_can_send_packet(NetClientState *nc);
 ssize_t qemu_sendv_packet(NetClientState *nc, const struct iovec *iov,
                           int iovcnt);
diff --git a/net/net.c b/net/net.c
index f3d67f8..7619762 100644
--- a/net/net.c
+++ b/net/net.c
@@ -207,6 +207,7 @@  static void qemu_net_client_setup(NetClientState *nc,
         nc->peer = peer;
         peer->peer = nc;
     }
+    qemu_mutex_init(&nc->peer_lock);
     QTAILQ_INSERT_TAIL(&net_clients, nc, next);
 
     nc->send_queue = qemu_new_net_queue(nc);
@@ -246,6 +247,7 @@  NICState *qemu_new_nic(NetClientInfo *info,
     nic->ncs = (void *)nic + info->size;
     nic->conf = conf;
     nic->opaque = opaque;
+    nic->pending_peer = g_malloc0(sizeof(NetClientState *) * queues);
 
     for (i = 0; i < queues; i++) {
         qemu_net_client_setup(&nic->ncs[i], info, peers[i], model, name,
@@ -304,6 +306,38 @@  static void qemu_free_net_client(NetClientState *nc)
     }
 }
 
+/* elimate the reference and sync with exit of rx/tx action.
+ * And flush out peer's queue.
+ */
+static void qemu_net_client_detach_flush(NetClientState *nc)
+{
+    NetClientState *peer;
+
+    /* reader of self's peer field , fixme? the deleters are not concurrent,
+         * so this pair lock can save.
+         */
+    qemu_mutex_lock(&nc->peer_lock);
+    peer = nc->peer;
+    qemu_mutex_unlock(&nc->peer_lock);
+
+    /* writer of peer's peer field*/
+    if (peer) {
+        /* exclude the race with tx to @nc */
+        qemu_mutex_lock(&peer->peer_lock);
+        peer->peer = NULL;
+        qemu_mutex_unlock(&peer->peer_lock);
+    }
+
+    /* writer of self's peer field*/
+    /*  exclude the race with tx from @nc */
+    qemu_mutex_lock(&nc->peer_lock);
+    nc->peer = NULL;
+    if (peer) {
+        qemu_net_queue_purge(peer->send_queue, nc);
+    }
+    qemu_mutex_unlock(&nc->peer_lock);
+}
+
 void qemu_del_net_client(NetClientState *nc)
 {
     NetClientState *ncs[MAX_QUEUE_NUM];
@@ -334,7 +368,9 @@  void qemu_del_net_client(NetClientState *nc)
         }
 
         for (i = 0; i < queues; i++) {
+            qemu_net_client_detach_flush(ncs[i]);
             qemu_cleanup_net_client(ncs[i]);
+            nic->pending_peer[i] = ncs[i];
         }
 
         return;
@@ -343,6 +379,7 @@  void qemu_del_net_client(NetClientState *nc)
     assert(nc->info->type != NET_CLIENT_OPTIONS_KIND_NIC);
 
     for (i = 0; i < queues; i++) {
+        qemu_net_client_detach_flush(ncs[i]);
         qemu_cleanup_net_client(ncs[i]);
         qemu_free_net_client(ncs[i]);
     }
@@ -355,17 +392,19 @@  void qemu_del_nic(NICState *nic)
     /* If this is a peer NIC and peer has already been deleted, free it now. */
     if (nic->peer_deleted) {
         for (i = 0; i < queues; i++) {
-            qemu_free_net_client(qemu_get_subqueue(nic, i)->peer);
+            qemu_free_net_client(nic->pending_peer[i]);
         }
     }
 
     for (i = queues - 1; i >= 0; i--) {
         NetClientState *nc = qemu_get_subqueue(nic, i);
 
+        qemu_net_client_detach_flush(nc);
         qemu_cleanup_net_client(nc);
         qemu_free_net_client(nc);
     }
 
+    g_free(nic->pending_peer);
     g_free(nic);
 }
 
@@ -382,7 +421,7 @@  void qemu_foreach_nic(qemu_nic_foreach func, void *opaque)
     }
 }
 
-int qemu_can_send_packet(NetClientState *sender)
+int qemu_can_send_packet_nolock(NetClientState *sender)
 {
     if (!sender->peer) {
         return 1;
@@ -397,6 +436,28 @@  int qemu_can_send_packet(NetClientState *sender)
     return 1;
 }
 
+int qemu_can_send_packet(NetClientState *sender)
+{
+    int ret = 1;
+
+    qemu_mutex_lock(&sender->peer_lock);
+    if (!sender->peer) {
+        goto unlock;
+    }
+
+    if (sender->peer->receive_disabled) {
+        ret = 0;
+        goto unlock;
+    } else if (sender->peer->info->can_receive &&
+               !sender->peer->info->can_receive(sender->peer)) {
+        ret = 0;
+        goto unlock;
+    }
+unlock:
+    qemu_mutex_unlock(&sender->peer_lock);
+    return ret;
+}
+
 ssize_t qemu_deliver_packet(NetClientState *sender,
                             unsigned flags,
                             const uint8_t *data,
@@ -460,19 +521,24 @@  static ssize_t qemu_send_packet_async_with_flags(NetClientState *sender,
                                                  NetPacketSent *sent_cb)
 {
     NetQueue *queue;
+    ssize_t sz;
 
 #ifdef DEBUG_NET
     printf("qemu_send_packet_async:\n");
     hex_dump(stdout, buf, size);
 #endif
 
+    qemu_mutex_lock(&sender->peer_lock);
     if (sender->link_down || !sender->peer) {
+        qemu_mutex_unlock(&sender->peer_lock);
         return size;
     }
 
     queue = sender->peer->send_queue;
 
-    return qemu_net_queue_send(queue, sender, flags, buf, size, sent_cb);
+    sz = qemu_net_queue_send(queue, sender, flags, buf, size, sent_cb);
+    qemu_mutex_unlock(&sender->peer_lock);
+    return sz;
 }
 
 ssize_t qemu_send_packet_async(NetClientState *sender,
@@ -540,16 +606,21 @@  ssize_t qemu_sendv_packet_async(NetClientState *sender,
                                 NetPacketSent *sent_cb)
 {
     NetQueue *queue;
+    ssize_t sz;
 
+    qemu_mutex_lock(&sender->peer_lock);
     if (sender->link_down || !sender->peer) {
+        qemu_mutex_unlock(&sender->peer_lock);
         return iov_size(iov, iovcnt);
     }
 
     queue = sender->peer->send_queue;
 
-    return qemu_net_queue_send_iov(queue, sender,
+    sz = qemu_net_queue_send_iov(queue, sender,
                                    QEMU_NET_PACKET_FLAG_NONE,
                                    iov, iovcnt, sent_cb);
+    qemu_mutex_unlock(&sender->peer_lock);
+    return sz;
 }
 
 ssize_t
diff --git a/net/queue.c b/net/queue.c
index 2856c1d..123c338 100644
--- a/net/queue.c
+++ b/net/queue.c
@@ -190,7 +190,7 @@  ssize_t qemu_net_queue_send(NetQueue *queue,
 {
     ssize_t ret;
 
-    if (queue->delivering || !qemu_can_send_packet(sender)) {
+    if (queue->delivering || !qemu_can_send_packet_nolock(sender)) {
         qemu_net_queue_append(queue, sender, flags, data, size, sent_cb);
         return 0;
     }
@@ -215,7 +215,7 @@  ssize_t qemu_net_queue_send_iov(NetQueue *queue,
 {
     ssize_t ret;
 
-    if (queue->delivering || !qemu_can_send_packet(sender)) {
+    if (queue->delivering || !qemu_can_send_packet_nolock(sender)) {
         qemu_net_queue_append_iov(queue, sender, flags, iov, iovcnt, sent_cb);
         return 0;
     }