diff mbox

[v2,4/5] net: introduce lock to protect NetClientState's peer's access

Message ID 1362624800-10682-5-git-send-email-qemulist@gmail.com
State New
Headers show

Commit Message

pingfan liu March 7, 2013, 2:53 a.m. UTC
From: Liu Ping Fan <pingfank@linux.vnet.ibm.com>

Introduce nc->send_lock, it shield off the race of nc->peer's reader and
deleter. With it, after deleter finish, no new qemu_send_packet_xx()
can reach ->send_queue, so no new reference(packet->sender) to nc will
be appended to nc->peer->send_queue.

Signed-off-by: Liu Ping Fan <pingfank@linux.vnet.ibm.com>
---
 include/net/net.h |    4 +++
 net/hub.c         |   18 ++++++++++++
 net/net.c         |   77 ++++++++++++++++++++++++++++++++++++++++++++++++++---
 net/queue.c       |    4 +-
 4 files changed, 97 insertions(+), 6 deletions(-)

Comments

Paolo Bonzini March 12, 2013, 8:55 a.m. UTC | #1
Il 07/03/2013 03:53, Liu Ping Fan ha scritto:
> From: Liu Ping Fan <pingfank@linux.vnet.ibm.com>
> 
> Introduce nc->send_lock, it shield off the race of nc->peer's reader and
> deleter. With it, after deleter finish, no new qemu_send_packet_xx()
> can reach ->send_queue, so no new reference(packet->sender) to nc will
> be appended to nc->peer->send_queue.
> 
> Signed-off-by: Liu Ping Fan <pingfank@linux.vnet.ibm.com>
> ---
>  include/net/net.h |    4 +++
>  net/hub.c         |   18 ++++++++++++
>  net/net.c         |   77 ++++++++++++++++++++++++++++++++++++++++++++++++++---
>  net/queue.c       |    4 +-
>  4 files changed, 97 insertions(+), 6 deletions(-)
> 
> diff --git a/include/net/net.h b/include/net/net.h
> index 9c2b357..45779d2 100644
> --- a/include/net/net.h
> +++ b/include/net/net.h
> @@ -66,6 +66,8 @@ struct NetClientState {
>      NetClientInfo *info;
>      int link_down;
>      QTAILQ_ENTRY(NetClientState) next;
> +    /* protect the race access of peer between deleter and sender */
> +    QemuMutex send_lock;
>      NetClientState *peer;
>      NetQueue *send_queue;
>      char *model;
> @@ -78,6 +80,7 @@ struct NetClientState {
>  
>  typedef struct NICState {
>      NetClientState *ncs;
> +    NetClientState **pending_peer;
>      NICConf *conf;
>      void *opaque;
>      bool peer_deleted;
> @@ -105,6 +108,7 @@ NetClientState *qemu_find_vlan_client_by_name(Monitor *mon, int vlan_id,
>                                                const char *client_str);
>  typedef void (*qemu_nic_foreach)(NICState *nic, void *opaque);
>  void qemu_foreach_nic(qemu_nic_foreach func, void *opaque);
> +int qemu_can_send_packet_nolock(NetClientState *sender);
>  int qemu_can_send_packet(NetClientState *nc);
>  ssize_t qemu_sendv_packet(NetClientState *nc, const struct iovec *iov,
>                            int iovcnt);
> diff --git a/net/hub.c b/net/hub.c
> index 47fe72c..d953a99 100644
> --- a/net/hub.c
> +++ b/net/hub.c
> @@ -57,8 +57,14 @@ static ssize_t net_hub_receive(NetHub *hub, NetHubPort *source_port,
>              continue;
>          }
>  
> +        qemu_mutex_lock(&port->nc.send_lock);
> +        if (!port->nc.peer) {
> +            qemu_mutex_unlock(&port->nc.send_lock);
> +            continue;
> +        }
>          qemu_net_queue_append(port->nc.peer->send_queue, &port->nc,
>                              QEMU_NET_PACKET_FLAG_NONE, buf, len, NULL);
> +        qemu_mutex_unlock(&port->nc.send_lock);

Do you really need to lock everything?  Can you just wrap the peer with
a ref/unref, like

NetClientState *net_client_get_peer(NetClientState *nc)
{
    NetClientState *peer;
    qemu_mutex_lock(&nc->send_lock);
    peer = nc->peer;
    if (peer) {
        net_client_ref(peer);
    }
    qemu_mutex_unlock(&nc->send_lock);
    return peer;
}

and then

-        qemu_net_queue_append(port->nc.peer->send_queue, &port->nc,
+        peer = net_client_get_peer(&port->nc);
+        if (!peer) {
+            continue;
+        }
+        qemu_net_queue_append(peer->send_queue, &port->nc,
                             QEMU_NET_PACKET_FLAG_NONE, buf, len, NULL);
+        net_client_unref(peer);

Paolo

>          event_notifier_set(&port->e);
>      }
>      qemu_mutex_unlock(&hub->lock);
> @@ -69,7 +75,13 @@ static void hub_port_deliver_packet(void *opaque)
>  {
>      NetHubPort *port = (NetHubPort *)opaque;
>  
> +    qemu_mutex_lock(&port->nc.send_lock);
> +    if (!port->nc.peer) {
> +        qemu_mutex_unlock(&port->nc.send_lock);
> +        return;
> +    }
>      qemu_net_queue_flush(port->nc.peer->send_queue);
> +    qemu_mutex_unlock(&port->nc.send_lock);
>  }
>  
>  static ssize_t net_hub_receive_iov(NetHub *hub, NetHubPort *source_port,
> @@ -84,8 +96,14 @@ static ssize_t net_hub_receive_iov(NetHub *hub, NetHubPort *source_port,
>              continue;
>          }
>  
> +        qemu_mutex_lock(&port->nc.send_lock);
> +        if (!port->nc.peer) {
> +            qemu_mutex_unlock(&port->nc.send_lock);
> +            continue;
> +        }
>          qemu_net_queue_append_iov(port->nc.peer->send_queue, &port->nc,
>                              QEMU_NET_PACKET_FLAG_NONE, iov, iovcnt, NULL);
> +        qemu_mutex_unlock(&port->nc.send_lock);
>          event_notifier_set(&port->e);
>      }
>      qemu_mutex_unlock(&hub->lock);
> diff --git a/net/net.c b/net/net.c
> index 104c5b2..441362e 100644
> --- a/net/net.c
> +++ b/net/net.c
> @@ -207,6 +207,7 @@ static void qemu_net_client_setup(NetClientState *nc,
>          nc->peer = peer;
>          peer->peer = nc;
>      }
> +    qemu_mutex_init(&nc->send_lock);
>      QTAILQ_INSERT_TAIL(&net_clients, nc, next);
>  
>      nc->send_queue = qemu_new_net_queue(nc);
> @@ -246,6 +247,7 @@ NICState *qemu_new_nic(NetClientInfo *info,
>      nic->ncs = (void *)nic + info->size;
>      nic->conf = conf;
>      nic->opaque = opaque;
> +    nic->pending_peer = g_malloc0(sizeof(NetClientState *) * queues);
>  
>      for (i = 0; i < queues; i++) {
>          qemu_net_client_setup(&nic->ncs[i], info, peers[i], model, name,
> @@ -304,6 +306,36 @@ static void qemu_free_net_client(NetClientState *nc)
>      }
>  }
>  
> +/* elimate the reference and sync with exit of rx/tx action.
> + * And flush out peer's queue.
> + */
> +static void qemu_net_client_detach_flush(NetClientState *nc)
> +{
> +    NetClientState *peer;
> +
> +    /* Fixme? Assume this function the only place to detach peer from @nc?
> +     * Then reader and deleter are sequent. So here can we save the lock?
> +     */
> +    qemu_mutex_lock(&nc->send_lock);
> +    peer = nc->peer;
> +    qemu_mutex_unlock(&nc->send_lock);
> +
> +    if (peer) {
> +        /* exclude the race with tx to @nc */
> +        qemu_mutex_lock(&peer->send_lock);
> +        peer->peer = NULL;
> +        qemu_mutex_unlock(&peer->send_lock);
> +    }
> +
> +    /*  exclude the race with tx from @nc */
> +    qemu_mutex_lock(&nc->send_lock);
> +    nc->peer = NULL;
> +    if (peer) {
> +        qemu_net_queue_purge(peer->send_queue, nc);
> +    }
> +    qemu_mutex_unlock(&nc->send_lock);
> +}
> +
>  void qemu_del_net_client(NetClientState *nc)
>  {
>      NetClientState *ncs[MAX_QUEUE_NUM];
> @@ -334,7 +366,9 @@ void qemu_del_net_client(NetClientState *nc)
>          }
>  
>          for (i = 0; i < queues; i++) {
> +            qemu_net_client_detach_flush(ncs[i]);
>              qemu_cleanup_net_client(ncs[i]);
> +            nic->pending_peer[i] = ncs[i];
>          }
>  
>          return;
> @@ -343,6 +377,7 @@ void qemu_del_net_client(NetClientState *nc)
>      assert(nc->info->type != NET_CLIENT_OPTIONS_KIND_NIC);
>  
>      for (i = 0; i < queues; i++) {
> +        qemu_net_client_detach_flush(ncs[i]);
>          qemu_cleanup_net_client(ncs[i]);
>          qemu_free_net_client(ncs[i]);
>      }
> @@ -355,17 +390,19 @@ void qemu_del_nic(NICState *nic)
>      /* If this is a peer NIC and peer has already been deleted, free it now. */
>      if (nic->peer_deleted) {
>          for (i = 0; i < queues; i++) {
> -            qemu_free_net_client(qemu_get_subqueue(nic, i)->peer);
> +            qemu_free_net_client(nic->pending_peer[i]);
>          }
>      }
>  
>      for (i = queues - 1; i >= 0; i--) {
>          NetClientState *nc = qemu_get_subqueue(nic, i);
>  
> +        qemu_net_client_detach_flush(nc);
>          qemu_cleanup_net_client(nc);
>          qemu_free_net_client(nc);
>      }
>  
> +    g_free(nic->pending_peer);
>      g_free(nic);
>  }
>  
> @@ -382,7 +419,7 @@ void qemu_foreach_nic(qemu_nic_foreach func, void *opaque)
>      }
>  }
>  
> -int qemu_can_send_packet(NetClientState *sender)
> +int qemu_can_send_packet_nolock(NetClientState *sender)
>  {
>      if (!sender->peer) {
>          return 1;
> @@ -397,6 +434,28 @@ int qemu_can_send_packet(NetClientState *sender)
>      return 1;
>  }
>  
> +int qemu_can_send_packet(NetClientState *sender)
> +{
> +    int ret = 1;
> +
> +    qemu_mutex_lock(&sender->send_lock);
> +    if (!sender->peer) {
> +        goto unlock;
> +    }
> +
> +    if (sender->peer->receive_disabled) {
> +        ret = 0;
> +        goto unlock;
> +    } else if (sender->peer->info->can_receive &&
> +               !sender->peer->info->can_receive(sender->peer)) {
> +        ret = 0;
> +        goto unlock;
> +    }
> +unlock:
> +    qemu_mutex_unlock(&sender->send_lock);
> +    return ret;
> +}
> +
>  ssize_t qemu_deliver_packet(NetClientState *sender,
>                              unsigned flags,
>                              const uint8_t *data,
> @@ -460,19 +519,24 @@ static ssize_t qemu_send_packet_async_with_flags(NetClientState *sender,
>                                                   NetPacketSent *sent_cb)
>  {
>      NetQueue *queue;
> +    ssize_t sz;
>  
>  #ifdef DEBUG_NET
>      printf("qemu_send_packet_async:\n");
>      hex_dump(stdout, buf, size);
>  #endif
>  
> +    qemu_mutex_lock(&sender->send_lock);
>      if (sender->link_down || !sender->peer) {
> +        qemu_mutex_unlock(&sender->send_lock);
>          return size;
>      }
>  
>      queue = sender->peer->send_queue;
>  
> -    return qemu_net_queue_send(queue, sender, flags, buf, size, sent_cb);
> +    sz = qemu_net_queue_send(queue, sender, flags, buf, size, sent_cb);
> +    qemu_mutex_unlock(&sender->send_lock);
> +    return sz;
>  }
>  
>  ssize_t qemu_send_packet_async(NetClientState *sender,
> @@ -540,16 +604,21 @@ ssize_t qemu_sendv_packet_async(NetClientState *sender,
>                                  NetPacketSent *sent_cb)
>  {
>      NetQueue *queue;
> +    ssize_t sz;
>  
> +    qemu_mutex_lock(&sender->send_lock);
>      if (sender->link_down || !sender->peer) {
> +        qemu_mutex_unlock(&sender->send_lock);
>          return iov_size(iov, iovcnt);
>      }
>  
>      queue = sender->peer->send_queue;
>  
> -    return qemu_net_queue_send_iov(queue, sender,
> +    sz = qemu_net_queue_send_iov(queue, sender,
>                                     QEMU_NET_PACKET_FLAG_NONE,
>                                     iov, iovcnt, sent_cb);
> +    qemu_mutex_unlock(&sender->send_lock);
> +    return sz;
>  }
>  
>  ssize_t
> diff --git a/net/queue.c b/net/queue.c
> index f7ff020..e141ecf 100644
> --- a/net/queue.c
> +++ b/net/queue.c
> @@ -190,7 +190,7 @@ ssize_t qemu_net_queue_send(NetQueue *queue,
>  {
>      ssize_t ret;
>  
> -    if (queue->delivering || !qemu_can_send_packet(sender)) {
> +    if (queue->delivering || !qemu_can_send_packet_nolock(sender)) {
>          qemu_net_queue_append(queue, sender, flags, data, size, sent_cb);
>          return 0;
>      }
> @@ -215,7 +215,7 @@ ssize_t qemu_net_queue_send_iov(NetQueue *queue,
>  {
>      ssize_t ret;
>  
> -    if (queue->delivering || !qemu_can_send_packet(sender)) {
> +    if (queue->delivering || !qemu_can_send_packet_nolock(sender)) {
>          qemu_net_queue_append_iov(queue, sender, flags, iov, iovcnt, sent_cb);
>          return 0;
>      }
>
pingfan liu March 13, 2013, 1:26 a.m. UTC | #2
On Tue, Mar 12, 2013 at 4:55 PM, Paolo Bonzini <pbonzini@redhat.com> wrote:
> Il 07/03/2013 03:53, Liu Ping Fan ha scritto:
>> From: Liu Ping Fan <pingfank@linux.vnet.ibm.com>
>>
>> Introduce nc->send_lock, it shield off the race of nc->peer's reader and
>> deleter. With it, after deleter finish, no new qemu_send_packet_xx()
>> can reach ->send_queue, so no new reference(packet->sender) to nc will
>> be appended to nc->peer->send_queue.
>>
>> Signed-off-by: Liu Ping Fan <pingfank@linux.vnet.ibm.com>
>> ---
>>  include/net/net.h |    4 +++
>>  net/hub.c         |   18 ++++++++++++
>>  net/net.c         |   77 ++++++++++++++++++++++++++++++++++++++++++++++++++---
>>  net/queue.c       |    4 +-
>>  4 files changed, 97 insertions(+), 6 deletions(-)
>>
>> diff --git a/include/net/net.h b/include/net/net.h
>> index 9c2b357..45779d2 100644
>> --- a/include/net/net.h
>> +++ b/include/net/net.h
>> @@ -66,6 +66,8 @@ struct NetClientState {
>>      NetClientInfo *info;
>>      int link_down;
>>      QTAILQ_ENTRY(NetClientState) next;
>> +    /* protect the race access of peer between deleter and sender */
>> +    QemuMutex send_lock;
>>      NetClientState *peer;
>>      NetQueue *send_queue;
>>      char *model;
>> @@ -78,6 +80,7 @@ struct NetClientState {
>>
>>  typedef struct NICState {
>>      NetClientState *ncs;
>> +    NetClientState **pending_peer;
>>      NICConf *conf;
>>      void *opaque;
>>      bool peer_deleted;
>> @@ -105,6 +108,7 @@ NetClientState *qemu_find_vlan_client_by_name(Monitor *mon, int vlan_id,
>>                                                const char *client_str);
>>  typedef void (*qemu_nic_foreach)(NICState *nic, void *opaque);
>>  void qemu_foreach_nic(qemu_nic_foreach func, void *opaque);
>> +int qemu_can_send_packet_nolock(NetClientState *sender);
>>  int qemu_can_send_packet(NetClientState *nc);
>>  ssize_t qemu_sendv_packet(NetClientState *nc, const struct iovec *iov,
>>                            int iovcnt);
>> diff --git a/net/hub.c b/net/hub.c
>> index 47fe72c..d953a99 100644
>> --- a/net/hub.c
>> +++ b/net/hub.c
>> @@ -57,8 +57,14 @@ static ssize_t net_hub_receive(NetHub *hub, NetHubPort *source_port,
>>              continue;
>>          }
>>
>> +        qemu_mutex_lock(&port->nc.send_lock);
>> +        if (!port->nc.peer) {
>> +            qemu_mutex_unlock(&port->nc.send_lock);
>> +            continue;
>> +        }
>>          qemu_net_queue_append(port->nc.peer->send_queue, &port->nc,
>>                              QEMU_NET_PACKET_FLAG_NONE, buf, len, NULL);
>> +        qemu_mutex_unlock(&port->nc.send_lock);
>
> Do you really need to lock everything?  Can you just wrap the peer with
> a ref/unref, like
>
> NetClientState *net_client_get_peer(NetClientState *nc)
> {
>     NetClientState *peer;
>     qemu_mutex_lock(&nc->send_lock);
>     peer = nc->peer;
>     if (peer) {
>         net_client_ref(peer);
>     }
>     qemu_mutex_unlock(&nc->send_lock);
>     return peer;
> }
>
> and then
>
> -        qemu_net_queue_append(port->nc.peer->send_queue, &port->nc,
> +        peer = net_client_get_peer(&port->nc);
> +        if (!peer) {
> +            continue;
> +        }
> +        qemu_net_queue_append(peer->send_queue, &port->nc,
>                              QEMU_NET_PACKET_FLAG_NONE, buf, len, NULL);
> +        net_client_unref(peer);
>
Oh, seems that I do not explain very clearly in the commit log.  The
lock does not only protect against the reclaimer( and this can be
achieved by refcnt as your codes),  but also sync between remover and
sender.   If the NetClientState being removed, the remover will be
like:
             nc->peer = NULL;
              ----------> Here opens the gap for in-flight sender, and
refcnt can not work
             flush out reference from its peer's send_queue;

Thanks and regards,
Pingfan

> Paolo
>
>>          event_notifier_set(&port->e);
>>      }
>>      qemu_mutex_unlock(&hub->lock);
>> @@ -69,7 +75,13 @@ static void hub_port_deliver_packet(void *opaque)
>>  {
>>      NetHubPort *port = (NetHubPort *)opaque;
>>
>> +    qemu_mutex_lock(&port->nc.send_lock);
>> +    if (!port->nc.peer) {
>> +        qemu_mutex_unlock(&port->nc.send_lock);
>> +        return;
>> +    }
>>      qemu_net_queue_flush(port->nc.peer->send_queue);
>> +    qemu_mutex_unlock(&port->nc.send_lock);
>>  }
>>
>>  static ssize_t net_hub_receive_iov(NetHub *hub, NetHubPort *source_port,
>> @@ -84,8 +96,14 @@ static ssize_t net_hub_receive_iov(NetHub *hub, NetHubPort *source_port,
>>              continue;
>>          }
>>
>> +        qemu_mutex_lock(&port->nc.send_lock);
>> +        if (!port->nc.peer) {
>> +            qemu_mutex_unlock(&port->nc.send_lock);
>> +            continue;
>> +        }
>>          qemu_net_queue_append_iov(port->nc.peer->send_queue, &port->nc,
>>                              QEMU_NET_PACKET_FLAG_NONE, iov, iovcnt, NULL);
>> +        qemu_mutex_unlock(&port->nc.send_lock);
>>          event_notifier_set(&port->e);
>>      }
>>      qemu_mutex_unlock(&hub->lock);
>> diff --git a/net/net.c b/net/net.c
>> index 104c5b2..441362e 100644
>> --- a/net/net.c
>> +++ b/net/net.c
>> @@ -207,6 +207,7 @@ static void qemu_net_client_setup(NetClientState *nc,
>>          nc->peer = peer;
>>          peer->peer = nc;
>>      }
>> +    qemu_mutex_init(&nc->send_lock);
>>      QTAILQ_INSERT_TAIL(&net_clients, nc, next);
>>
>>      nc->send_queue = qemu_new_net_queue(nc);
>> @@ -246,6 +247,7 @@ NICState *qemu_new_nic(NetClientInfo *info,
>>      nic->ncs = (void *)nic + info->size;
>>      nic->conf = conf;
>>      nic->opaque = opaque;
>> +    nic->pending_peer = g_malloc0(sizeof(NetClientState *) * queues);
>>
>>      for (i = 0; i < queues; i++) {
>>          qemu_net_client_setup(&nic->ncs[i], info, peers[i], model, name,
>> @@ -304,6 +306,36 @@ static void qemu_free_net_client(NetClientState *nc)
>>      }
>>  }
>>
>> +/* elimate the reference and sync with exit of rx/tx action.
>> + * And flush out peer's queue.
>> + */
>> +static void qemu_net_client_detach_flush(NetClientState *nc)
>> +{
>> +    NetClientState *peer;
>> +
>> +    /* Fixme? Assume this function the only place to detach peer from @nc?
>> +     * Then reader and deleter are sequent. So here can we save the lock?
>> +     */
>> +    qemu_mutex_lock(&nc->send_lock);
>> +    peer = nc->peer;
>> +    qemu_mutex_unlock(&nc->send_lock);
>> +
>> +    if (peer) {
>> +        /* exclude the race with tx to @nc */
>> +        qemu_mutex_lock(&peer->send_lock);
>> +        peer->peer = NULL;
>> +        qemu_mutex_unlock(&peer->send_lock);
>> +    }
>> +
>> +    /*  exclude the race with tx from @nc */
>> +    qemu_mutex_lock(&nc->send_lock);
>> +    nc->peer = NULL;
>> +    if (peer) {
>> +        qemu_net_queue_purge(peer->send_queue, nc);
>> +    }
>> +    qemu_mutex_unlock(&nc->send_lock);
>> +}
>> +
>>  void qemu_del_net_client(NetClientState *nc)
>>  {
>>      NetClientState *ncs[MAX_QUEUE_NUM];
>> @@ -334,7 +366,9 @@ void qemu_del_net_client(NetClientState *nc)
>>          }
>>
>>          for (i = 0; i < queues; i++) {
>> +            qemu_net_client_detach_flush(ncs[i]);
>>              qemu_cleanup_net_client(ncs[i]);
>> +            nic->pending_peer[i] = ncs[i];
>>          }
>>
>>          return;
>> @@ -343,6 +377,7 @@ void qemu_del_net_client(NetClientState *nc)
>>      assert(nc->info->type != NET_CLIENT_OPTIONS_KIND_NIC);
>>
>>      for (i = 0; i < queues; i++) {
>> +        qemu_net_client_detach_flush(ncs[i]);
>>          qemu_cleanup_net_client(ncs[i]);
>>          qemu_free_net_client(ncs[i]);
>>      }
>> @@ -355,17 +390,19 @@ void qemu_del_nic(NICState *nic)
>>      /* If this is a peer NIC and peer has already been deleted, free it now. */
>>      if (nic->peer_deleted) {
>>          for (i = 0; i < queues; i++) {
>> -            qemu_free_net_client(qemu_get_subqueue(nic, i)->peer);
>> +            qemu_free_net_client(nic->pending_peer[i]);
>>          }
>>      }
>>
>>      for (i = queues - 1; i >= 0; i--) {
>>          NetClientState *nc = qemu_get_subqueue(nic, i);
>>
>> +        qemu_net_client_detach_flush(nc);
>>          qemu_cleanup_net_client(nc);
>>          qemu_free_net_client(nc);
>>      }
>>
>> +    g_free(nic->pending_peer);
>>      g_free(nic);
>>  }
>>
>> @@ -382,7 +419,7 @@ void qemu_foreach_nic(qemu_nic_foreach func, void *opaque)
>>      }
>>  }
>>
>> -int qemu_can_send_packet(NetClientState *sender)
>> +int qemu_can_send_packet_nolock(NetClientState *sender)
>>  {
>>      if (!sender->peer) {
>>          return 1;
>> @@ -397,6 +434,28 @@ int qemu_can_send_packet(NetClientState *sender)
>>      return 1;
>>  }
>>
>> +int qemu_can_send_packet(NetClientState *sender)
>> +{
>> +    int ret = 1;
>> +
>> +    qemu_mutex_lock(&sender->send_lock);
>> +    if (!sender->peer) {
>> +        goto unlock;
>> +    }
>> +
>> +    if (sender->peer->receive_disabled) {
>> +        ret = 0;
>> +        goto unlock;
>> +    } else if (sender->peer->info->can_receive &&
>> +               !sender->peer->info->can_receive(sender->peer)) {
>> +        ret = 0;
>> +        goto unlock;
>> +    }
>> +unlock:
>> +    qemu_mutex_unlock(&sender->send_lock);
>> +    return ret;
>> +}
>> +
>>  ssize_t qemu_deliver_packet(NetClientState *sender,
>>                              unsigned flags,
>>                              const uint8_t *data,
>> @@ -460,19 +519,24 @@ static ssize_t qemu_send_packet_async_with_flags(NetClientState *sender,
>>                                                   NetPacketSent *sent_cb)
>>  {
>>      NetQueue *queue;
>> +    ssize_t sz;
>>
>>  #ifdef DEBUG_NET
>>      printf("qemu_send_packet_async:\n");
>>      hex_dump(stdout, buf, size);
>>  #endif
>>
>> +    qemu_mutex_lock(&sender->send_lock);
>>      if (sender->link_down || !sender->peer) {
>> +        qemu_mutex_unlock(&sender->send_lock);
>>          return size;
>>      }
>>
>>      queue = sender->peer->send_queue;
>>
>> -    return qemu_net_queue_send(queue, sender, flags, buf, size, sent_cb);
>> +    sz = qemu_net_queue_send(queue, sender, flags, buf, size, sent_cb);
>> +    qemu_mutex_unlock(&sender->send_lock);
>> +    return sz;
>>  }
>>
>>  ssize_t qemu_send_packet_async(NetClientState *sender,
>> @@ -540,16 +604,21 @@ ssize_t qemu_sendv_packet_async(NetClientState *sender,
>>                                  NetPacketSent *sent_cb)
>>  {
>>      NetQueue *queue;
>> +    ssize_t sz;
>>
>> +    qemu_mutex_lock(&sender->send_lock);
>>      if (sender->link_down || !sender->peer) {
>> +        qemu_mutex_unlock(&sender->send_lock);
>>          return iov_size(iov, iovcnt);
>>      }
>>
>>      queue = sender->peer->send_queue;
>>
>> -    return qemu_net_queue_send_iov(queue, sender,
>> +    sz = qemu_net_queue_send_iov(queue, sender,
>>                                     QEMU_NET_PACKET_FLAG_NONE,
>>                                     iov, iovcnt, sent_cb);
>> +    qemu_mutex_unlock(&sender->send_lock);
>> +    return sz;
>>  }
>>
>>  ssize_t
>> diff --git a/net/queue.c b/net/queue.c
>> index f7ff020..e141ecf 100644
>> --- a/net/queue.c
>> +++ b/net/queue.c
>> @@ -190,7 +190,7 @@ ssize_t qemu_net_queue_send(NetQueue *queue,
>>  {
>>      ssize_t ret;
>>
>> -    if (queue->delivering || !qemu_can_send_packet(sender)) {
>> +    if (queue->delivering || !qemu_can_send_packet_nolock(sender)) {
>>          qemu_net_queue_append(queue, sender, flags, data, size, sent_cb);
>>          return 0;
>>      }
>> @@ -215,7 +215,7 @@ ssize_t qemu_net_queue_send_iov(NetQueue *queue,
>>  {
>>      ssize_t ret;
>>
>> -    if (queue->delivering || !qemu_can_send_packet(sender)) {
>> +    if (queue->delivering || !qemu_can_send_packet_nolock(sender)) {
>>          qemu_net_queue_append_iov(queue, sender, flags, iov, iovcnt, sent_cb);
>>          return 0;
>>      }
>>
>
Paolo Bonzini March 13, 2013, 10:39 a.m. UTC | #3
Il 13/03/2013 02:26, liu ping fan ha scritto:
> On Tue, Mar 12, 2013 at 4:55 PM, Paolo Bonzini <pbonzini@redhat.com> wrote:
>> Il 07/03/2013 03:53, Liu Ping Fan ha scritto:
>>> From: Liu Ping Fan <pingfank@linux.vnet.ibm.com>
>>>
>>> Introduce nc->send_lock, it shield off the race of nc->peer's reader and
>>> deleter. With it, after deleter finish, no new qemu_send_packet_xx()
>>> can reach ->send_queue, so no new reference(packet->sender) to nc will
>>> be appended to nc->peer->send_queue.
>>>
>>> Signed-off-by: Liu Ping Fan <pingfank@linux.vnet.ibm.com>
>>> ---
>>>  include/net/net.h |    4 +++
>>>  net/hub.c         |   18 ++++++++++++
>>>  net/net.c         |   77 ++++++++++++++++++++++++++++++++++++++++++++++++++---
>>>  net/queue.c       |    4 +-
>>>  4 files changed, 97 insertions(+), 6 deletions(-)
>>>
>>> diff --git a/include/net/net.h b/include/net/net.h
>>> index 9c2b357..45779d2 100644
>>> --- a/include/net/net.h
>>> +++ b/include/net/net.h
>>> @@ -66,6 +66,8 @@ struct NetClientState {
>>>      NetClientInfo *info;
>>>      int link_down;
>>>      QTAILQ_ENTRY(NetClientState) next;
>>> +    /* protect the race access of peer between deleter and sender */
>>> +    QemuMutex send_lock;
>>>      NetClientState *peer;
>>>      NetQueue *send_queue;
>>>      char *model;
>>> @@ -78,6 +80,7 @@ struct NetClientState {
>>>
>>>  typedef struct NICState {
>>>      NetClientState *ncs;
>>> +    NetClientState **pending_peer;
>>>      NICConf *conf;
>>>      void *opaque;
>>>      bool peer_deleted;
>>> @@ -105,6 +108,7 @@ NetClientState *qemu_find_vlan_client_by_name(Monitor *mon, int vlan_id,
>>>                                                const char *client_str);
>>>  typedef void (*qemu_nic_foreach)(NICState *nic, void *opaque);
>>>  void qemu_foreach_nic(qemu_nic_foreach func, void *opaque);
>>> +int qemu_can_send_packet_nolock(NetClientState *sender);
>>>  int qemu_can_send_packet(NetClientState *nc);
>>>  ssize_t qemu_sendv_packet(NetClientState *nc, const struct iovec *iov,
>>>                            int iovcnt);
>>> diff --git a/net/hub.c b/net/hub.c
>>> index 47fe72c..d953a99 100644
>>> --- a/net/hub.c
>>> +++ b/net/hub.c
>>> @@ -57,8 +57,14 @@ static ssize_t net_hub_receive(NetHub *hub, NetHubPort *source_port,
>>>              continue;
>>>          }
>>>
>>> +        qemu_mutex_lock(&port->nc.send_lock);
>>> +        if (!port->nc.peer) {
>>> +            qemu_mutex_unlock(&port->nc.send_lock);
>>> +            continue;
>>> +        }
>>>          qemu_net_queue_append(port->nc.peer->send_queue, &port->nc,
>>>                              QEMU_NET_PACKET_FLAG_NONE, buf, len, NULL);
>>> +        qemu_mutex_unlock(&port->nc.send_lock);
>>
>> Do you really need to lock everything?  Can you just wrap the peer with
>> a ref/unref, like
>>
>> NetClientState *net_client_get_peer(NetClientState *nc)
>> {
>>     NetClientState *peer;
>>     qemu_mutex_lock(&nc->send_lock);
>>     peer = nc->peer;
>>     if (peer) {
>>         net_client_ref(peer);
>>     }
>>     qemu_mutex_unlock(&nc->send_lock);
>>     return peer;
>> }
>>
>> and then
>>
>> -        qemu_net_queue_append(port->nc.peer->send_queue, &port->nc,
>> +        peer = net_client_get_peer(&port->nc);
>> +        if (!peer) {
>> +            continue;
>> +        }
>> +        qemu_net_queue_append(peer->send_queue, &port->nc,
>>                              QEMU_NET_PACKET_FLAG_NONE, buf, len, NULL);
>> +        net_client_unref(peer);
>>
> Oh, seems that I do not explain very clearly in the commit log.  The
> lock does not only protect against the reclaimer( and this can be
> achieved by refcnt as your codes),  but also sync between remover and
> sender.   If the NetClientState being removed, the remover will be
> like:
>              nc->peer = NULL;
>               ----------> Here opens the gap for in-flight sender, and
> refcnt can not work
>              flush out reference from its peer's send_queue;

What's the problem if the dying peer is still used momentarily?  The
next unref will drop the last reference and qemu_del_net_queue will free
the packet that was just appended.

Paolo
pingfan liu March 14, 2013, 2:14 a.m. UTC | #4
On Wed, Mar 13, 2013 at 6:39 PM, Paolo Bonzini <pbonzini@redhat.com> wrote:
> Il 13/03/2013 02:26, liu ping fan ha scritto:
>> On Tue, Mar 12, 2013 at 4:55 PM, Paolo Bonzini <pbonzini@redhat.com> wrote:
>>> Il 07/03/2013 03:53, Liu Ping Fan ha scritto:
>>>> From: Liu Ping Fan <pingfank@linux.vnet.ibm.com>
>>>>
>>>> Introduce nc->send_lock, it shield off the race of nc->peer's reader and
>>>> deleter. With it, after deleter finish, no new qemu_send_packet_xx()
>>>> can reach ->send_queue, so no new reference(packet->sender) to nc will
>>>> be appended to nc->peer->send_queue.
>>>>
>>>> Signed-off-by: Liu Ping Fan <pingfank@linux.vnet.ibm.com>
>>>> ---
>>>>  include/net/net.h |    4 +++
>>>>  net/hub.c         |   18 ++++++++++++
>>>>  net/net.c         |   77 ++++++++++++++++++++++++++++++++++++++++++++++++++---
>>>>  net/queue.c       |    4 +-
>>>>  4 files changed, 97 insertions(+), 6 deletions(-)
>>>>
>>>> diff --git a/include/net/net.h b/include/net/net.h
>>>> index 9c2b357..45779d2 100644
>>>> --- a/include/net/net.h
>>>> +++ b/include/net/net.h
>>>> @@ -66,6 +66,8 @@ struct NetClientState {
>>>>      NetClientInfo *info;
>>>>      int link_down;
>>>>      QTAILQ_ENTRY(NetClientState) next;
>>>> +    /* protect the race access of peer between deleter and sender */
>>>> +    QemuMutex send_lock;
>>>>      NetClientState *peer;
>>>>      NetQueue *send_queue;
>>>>      char *model;
>>>> @@ -78,6 +80,7 @@ struct NetClientState {
>>>>
>>>>  typedef struct NICState {
>>>>      NetClientState *ncs;
>>>> +    NetClientState **pending_peer;
>>>>      NICConf *conf;
>>>>      void *opaque;
>>>>      bool peer_deleted;
>>>> @@ -105,6 +108,7 @@ NetClientState *qemu_find_vlan_client_by_name(Monitor *mon, int vlan_id,
>>>>                                                const char *client_str);
>>>>  typedef void (*qemu_nic_foreach)(NICState *nic, void *opaque);
>>>>  void qemu_foreach_nic(qemu_nic_foreach func, void *opaque);
>>>> +int qemu_can_send_packet_nolock(NetClientState *sender);
>>>>  int qemu_can_send_packet(NetClientState *nc);
>>>>  ssize_t qemu_sendv_packet(NetClientState *nc, const struct iovec *iov,
>>>>                            int iovcnt);
>>>> diff --git a/net/hub.c b/net/hub.c
>>>> index 47fe72c..d953a99 100644
>>>> --- a/net/hub.c
>>>> +++ b/net/hub.c
>>>> @@ -57,8 +57,14 @@ static ssize_t net_hub_receive(NetHub *hub, NetHubPort *source_port,
>>>>              continue;
>>>>          }
>>>>
>>>> +        qemu_mutex_lock(&port->nc.send_lock);
>>>> +        if (!port->nc.peer) {
>>>> +            qemu_mutex_unlock(&port->nc.send_lock);
>>>> +            continue;
>>>> +        }
>>>>          qemu_net_queue_append(port->nc.peer->send_queue, &port->nc,
>>>>                              QEMU_NET_PACKET_FLAG_NONE, buf, len, NULL);
>>>> +        qemu_mutex_unlock(&port->nc.send_lock);
>>>
>>> Do you really need to lock everything?  Can you just wrap the peer with
>>> a ref/unref, like
>>>
>>> NetClientState *net_client_get_peer(NetClientState *nc)
>>> {
>>>     NetClientState *peer;
>>>     qemu_mutex_lock(&nc->send_lock);
>>>     peer = nc->peer;
>>>     if (peer) {
>>>         net_client_ref(peer);
>>>     }
>>>     qemu_mutex_unlock(&nc->send_lock);
>>>     return peer;
>>> }
>>>
>>> and then
>>>
>>> -        qemu_net_queue_append(port->nc.peer->send_queue, &port->nc,
>>> +        peer = net_client_get_peer(&port->nc);
>>> +        if (!peer) {
>>> +            continue;
>>> +        }
>>> +        qemu_net_queue_append(peer->send_queue, &port->nc,
>>>                              QEMU_NET_PACKET_FLAG_NONE, buf, len, NULL);
>>> +        net_client_unref(peer);
>>>
>> Oh, seems that I do not explain very clearly in the commit log.  The
>> lock does not only protect against the reclaimer( and this can be
>> achieved by refcnt as your codes),  but also sync between remover and
>> sender.   If the NetClientState being removed, the remover will be
>> like:
>>              nc->peer = NULL;
>>               ----------> Here opens the gap for in-flight sender, and
>> refcnt can not work
>>              flush out reference from its peer's send_queue;
>
> What's the problem if the dying peer is still used momentarily?  The
> next unref will drop the last reference and qemu_del_net_queue will free
> the packet that was just appended.
>
The deletion of NetClientState-A does mean that its peer will be
delete, and so the peer's qemu_del_net_queue. And each packet
referring to A still holds a ref, and finally A will not be reclaimed.

Regards,
Pingfan
> Paolo
diff mbox

Patch

diff --git a/include/net/net.h b/include/net/net.h
index 9c2b357..45779d2 100644
--- a/include/net/net.h
+++ b/include/net/net.h
@@ -66,6 +66,8 @@  struct NetClientState {
     NetClientInfo *info;
     int link_down;
     QTAILQ_ENTRY(NetClientState) next;
+    /* protect the race access of peer between deleter and sender */
+    QemuMutex send_lock;
     NetClientState *peer;
     NetQueue *send_queue;
     char *model;
@@ -78,6 +80,7 @@  struct NetClientState {
 
 typedef struct NICState {
     NetClientState *ncs;
+    NetClientState **pending_peer;
     NICConf *conf;
     void *opaque;
     bool peer_deleted;
@@ -105,6 +108,7 @@  NetClientState *qemu_find_vlan_client_by_name(Monitor *mon, int vlan_id,
                                               const char *client_str);
 typedef void (*qemu_nic_foreach)(NICState *nic, void *opaque);
 void qemu_foreach_nic(qemu_nic_foreach func, void *opaque);
+int qemu_can_send_packet_nolock(NetClientState *sender);
 int qemu_can_send_packet(NetClientState *nc);
 ssize_t qemu_sendv_packet(NetClientState *nc, const struct iovec *iov,
                           int iovcnt);
diff --git a/net/hub.c b/net/hub.c
index 47fe72c..d953a99 100644
--- a/net/hub.c
+++ b/net/hub.c
@@ -57,8 +57,14 @@  static ssize_t net_hub_receive(NetHub *hub, NetHubPort *source_port,
             continue;
         }
 
+        qemu_mutex_lock(&port->nc.send_lock);
+        if (!port->nc.peer) {
+            qemu_mutex_unlock(&port->nc.send_lock);
+            continue;
+        }
         qemu_net_queue_append(port->nc.peer->send_queue, &port->nc,
                             QEMU_NET_PACKET_FLAG_NONE, buf, len, NULL);
+        qemu_mutex_unlock(&port->nc.send_lock);
         event_notifier_set(&port->e);
     }
     qemu_mutex_unlock(&hub->lock);
@@ -69,7 +75,13 @@  static void hub_port_deliver_packet(void *opaque)
 {
     NetHubPort *port = (NetHubPort *)opaque;
 
+    qemu_mutex_lock(&port->nc.send_lock);
+    if (!port->nc.peer) {
+        qemu_mutex_unlock(&port->nc.send_lock);
+        return;
+    }
     qemu_net_queue_flush(port->nc.peer->send_queue);
+    qemu_mutex_unlock(&port->nc.send_lock);
 }
 
 static ssize_t net_hub_receive_iov(NetHub *hub, NetHubPort *source_port,
@@ -84,8 +96,14 @@  static ssize_t net_hub_receive_iov(NetHub *hub, NetHubPort *source_port,
             continue;
         }
 
+        qemu_mutex_lock(&port->nc.send_lock);
+        if (!port->nc.peer) {
+            qemu_mutex_unlock(&port->nc.send_lock);
+            continue;
+        }
         qemu_net_queue_append_iov(port->nc.peer->send_queue, &port->nc,
                             QEMU_NET_PACKET_FLAG_NONE, iov, iovcnt, NULL);
+        qemu_mutex_unlock(&port->nc.send_lock);
         event_notifier_set(&port->e);
     }
     qemu_mutex_unlock(&hub->lock);
diff --git a/net/net.c b/net/net.c
index 104c5b2..441362e 100644
--- a/net/net.c
+++ b/net/net.c
@@ -207,6 +207,7 @@  static void qemu_net_client_setup(NetClientState *nc,
         nc->peer = peer;
         peer->peer = nc;
     }
+    qemu_mutex_init(&nc->send_lock);
     QTAILQ_INSERT_TAIL(&net_clients, nc, next);
 
     nc->send_queue = qemu_new_net_queue(nc);
@@ -246,6 +247,7 @@  NICState *qemu_new_nic(NetClientInfo *info,
     nic->ncs = (void *)nic + info->size;
     nic->conf = conf;
     nic->opaque = opaque;
+    nic->pending_peer = g_malloc0(sizeof(NetClientState *) * queues);
 
     for (i = 0; i < queues; i++) {
         qemu_net_client_setup(&nic->ncs[i], info, peers[i], model, name,
@@ -304,6 +306,36 @@  static void qemu_free_net_client(NetClientState *nc)
     }
 }
 
+/* elimate the reference and sync with exit of rx/tx action.
+ * And flush out peer's queue.
+ */
+static void qemu_net_client_detach_flush(NetClientState *nc)
+{
+    NetClientState *peer;
+
+    /* Fixme? Assume this function the only place to detach peer from @nc?
+     * Then reader and deleter are sequent. So here can we save the lock?
+     */
+    qemu_mutex_lock(&nc->send_lock);
+    peer = nc->peer;
+    qemu_mutex_unlock(&nc->send_lock);
+
+    if (peer) {
+        /* exclude the race with tx to @nc */
+        qemu_mutex_lock(&peer->send_lock);
+        peer->peer = NULL;
+        qemu_mutex_unlock(&peer->send_lock);
+    }
+
+    /*  exclude the race with tx from @nc */
+    qemu_mutex_lock(&nc->send_lock);
+    nc->peer = NULL;
+    if (peer) {
+        qemu_net_queue_purge(peer->send_queue, nc);
+    }
+    qemu_mutex_unlock(&nc->send_lock);
+}
+
 void qemu_del_net_client(NetClientState *nc)
 {
     NetClientState *ncs[MAX_QUEUE_NUM];
@@ -334,7 +366,9 @@  void qemu_del_net_client(NetClientState *nc)
         }
 
         for (i = 0; i < queues; i++) {
+            qemu_net_client_detach_flush(ncs[i]);
             qemu_cleanup_net_client(ncs[i]);
+            nic->pending_peer[i] = ncs[i];
         }
 
         return;
@@ -343,6 +377,7 @@  void qemu_del_net_client(NetClientState *nc)
     assert(nc->info->type != NET_CLIENT_OPTIONS_KIND_NIC);
 
     for (i = 0; i < queues; i++) {
+        qemu_net_client_detach_flush(ncs[i]);
         qemu_cleanup_net_client(ncs[i]);
         qemu_free_net_client(ncs[i]);
     }
@@ -355,17 +390,19 @@  void qemu_del_nic(NICState *nic)
     /* If this is a peer NIC and peer has already been deleted, free it now. */
     if (nic->peer_deleted) {
         for (i = 0; i < queues; i++) {
-            qemu_free_net_client(qemu_get_subqueue(nic, i)->peer);
+            qemu_free_net_client(nic->pending_peer[i]);
         }
     }
 
     for (i = queues - 1; i >= 0; i--) {
         NetClientState *nc = qemu_get_subqueue(nic, i);
 
+        qemu_net_client_detach_flush(nc);
         qemu_cleanup_net_client(nc);
         qemu_free_net_client(nc);
     }
 
+    g_free(nic->pending_peer);
     g_free(nic);
 }
 
@@ -382,7 +419,7 @@  void qemu_foreach_nic(qemu_nic_foreach func, void *opaque)
     }
 }
 
-int qemu_can_send_packet(NetClientState *sender)
+int qemu_can_send_packet_nolock(NetClientState *sender)
 {
     if (!sender->peer) {
         return 1;
@@ -397,6 +434,28 @@  int qemu_can_send_packet(NetClientState *sender)
     return 1;
 }
 
+int qemu_can_send_packet(NetClientState *sender)
+{
+    int ret = 1;
+
+    qemu_mutex_lock(&sender->send_lock);
+    if (!sender->peer) {
+        goto unlock;
+    }
+
+    if (sender->peer->receive_disabled) {
+        ret = 0;
+        goto unlock;
+    } else if (sender->peer->info->can_receive &&
+               !sender->peer->info->can_receive(sender->peer)) {
+        ret = 0;
+        goto unlock;
+    }
+unlock:
+    qemu_mutex_unlock(&sender->send_lock);
+    return ret;
+}
+
 ssize_t qemu_deliver_packet(NetClientState *sender,
                             unsigned flags,
                             const uint8_t *data,
@@ -460,19 +519,24 @@  static ssize_t qemu_send_packet_async_with_flags(NetClientState *sender,
                                                  NetPacketSent *sent_cb)
 {
     NetQueue *queue;
+    ssize_t sz;
 
 #ifdef DEBUG_NET
     printf("qemu_send_packet_async:\n");
     hex_dump(stdout, buf, size);
 #endif
 
+    qemu_mutex_lock(&sender->send_lock);
     if (sender->link_down || !sender->peer) {
+        qemu_mutex_unlock(&sender->send_lock);
         return size;
     }
 
     queue = sender->peer->send_queue;
 
-    return qemu_net_queue_send(queue, sender, flags, buf, size, sent_cb);
+    sz = qemu_net_queue_send(queue, sender, flags, buf, size, sent_cb);
+    qemu_mutex_unlock(&sender->send_lock);
+    return sz;
 }
 
 ssize_t qemu_send_packet_async(NetClientState *sender,
@@ -540,16 +604,21 @@  ssize_t qemu_sendv_packet_async(NetClientState *sender,
                                 NetPacketSent *sent_cb)
 {
     NetQueue *queue;
+    ssize_t sz;
 
+    qemu_mutex_lock(&sender->send_lock);
     if (sender->link_down || !sender->peer) {
+        qemu_mutex_unlock(&sender->send_lock);
         return iov_size(iov, iovcnt);
     }
 
     queue = sender->peer->send_queue;
 
-    return qemu_net_queue_send_iov(queue, sender,
+    sz = qemu_net_queue_send_iov(queue, sender,
                                    QEMU_NET_PACKET_FLAG_NONE,
                                    iov, iovcnt, sent_cb);
+    qemu_mutex_unlock(&sender->send_lock);
+    return sz;
 }
 
 ssize_t
diff --git a/net/queue.c b/net/queue.c
index f7ff020..e141ecf 100644
--- a/net/queue.c
+++ b/net/queue.c
@@ -190,7 +190,7 @@  ssize_t qemu_net_queue_send(NetQueue *queue,
 {
     ssize_t ret;
 
-    if (queue->delivering || !qemu_can_send_packet(sender)) {
+    if (queue->delivering || !qemu_can_send_packet_nolock(sender)) {
         qemu_net_queue_append(queue, sender, flags, data, size, sent_cb);
         return 0;
     }
@@ -215,7 +215,7 @@  ssize_t qemu_net_queue_send_iov(NetQueue *queue,
 {
     ssize_t ret;
 
-    if (queue->delivering || !qemu_can_send_packet(sender)) {
+    if (queue->delivering || !qemu_can_send_packet_nolock(sender)) {
         qemu_net_queue_append_iov(queue, sender, flags, iov, iovcnt, sent_cb);
         return 0;
     }