Patchwork [v2,1/5] net: spread hub on AioContexts

login
register
mail settings
Submitter pingfan liu
Date March 7, 2013, 2:53 a.m.
Message ID <1362624800-10682-2-git-send-email-qemulist@gmail.com>
Download mbox | patch
Permalink /patch/225709/
State New
Headers show

Comments

pingfan liu - March 7, 2013, 2:53 a.m.
From: Liu Ping Fan <pingfank@linux.vnet.ibm.com>

Forward packet to other hub ports by their AioContext.

Signed-off-by: Liu Ping Fan <pingfank@linux.vnet.ibm.com>
---
 hw/qdev-properties-system.c |    1 +
 include/block/aio.h         |    1 +
 include/net/net.h           |    5 +++++
 include/net/queue.h         |   14 ++++++++++++++
 main-loop.c                 |    5 +++++
 net/hub.c                   |   33 ++++++++++++++++++++++++++++++---
 net/net.c                   |    1 +
 net/queue.c                 |    4 ++--
 8 files changed, 59 insertions(+), 5 deletions(-)
Paolo Bonzini - March 12, 2013, 8:50 a.m.
Il 07/03/2013 03:53, Liu Ping Fan ha scritto:
> From: Liu Ping Fan <pingfank@linux.vnet.ibm.com>
> 
> Forward packet to other hub ports by their AioContext.
> 
> Signed-off-by: Liu Ping Fan <pingfank@linux.vnet.ibm.com>
> ---
>  hw/qdev-properties-system.c |    1 +
>  include/block/aio.h         |    1 +
>  include/net/net.h           |    5 +++++
>  include/net/queue.h         |   14 ++++++++++++++
>  main-loop.c                 |    5 +++++
>  net/hub.c                   |   33 ++++++++++++++++++++++++++++++---
>  net/net.c                   |    1 +
>  net/queue.c                 |    4 ++--
>  8 files changed, 59 insertions(+), 5 deletions(-)
> 
> diff --git a/hw/qdev-properties-system.c b/hw/qdev-properties-system.c
> index ce3af22..587a335 100644
> --- a/hw/qdev-properties-system.c
> +++ b/hw/qdev-properties-system.c
> @@ -307,6 +307,7 @@ static void set_vlan(Object *obj, Visitor *v, void *opaque,
>                    name, prop->info->name);
>          return;
>      }
> +    hubport->info->bind_ctx(hubport, qemu_get_aio_context());
>      *ptr = hubport;
>  }
>  
> diff --git a/include/block/aio.h b/include/block/aio.h
> index 5b54d38..bcb5126 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -229,6 +229,7 @@ bool qemu_aio_wait(void);
>  void qemu_aio_set_event_notifier(EventNotifier *notifier,
>                                   EventNotifierHandler *io_read,
>                                   AioFlushEventNotifierHandler *io_flush);
> +AioContext *qemu_get_aio_context(void);
>  
>  #ifdef CONFIG_POSIX
>  void qemu_aio_set_fd_handler(int fd,
> diff --git a/include/net/net.h b/include/net/net.h
> index cb049a1..9c2b357 100644
> --- a/include/net/net.h
> +++ b/include/net/net.h
> @@ -8,6 +8,9 @@
>  #include "net/queue.h"
>  #include "migration/vmstate.h"
>  #include "qapi-types.h"
> +#include "qemu/thread.h"
> +#include "block/aio.h"
> +
>  
>  #define MAX_QUEUE_NUM 1024
>  
> @@ -44,6 +47,7 @@ typedef ssize_t (NetReceiveIOV)(NetClientState *, const struct iovec *, int);
>  typedef void (NetCleanup) (NetClientState *);
>  typedef void (LinkStatusChanged)(NetClientState *);
>  typedef void (NetClientDestructor)(NetClientState *);
> +typedef void (NetClientBindCtx)(NetClientState *, AioContext *);
>  
>  typedef struct NetClientInfo {
>      NetClientOptionsKind type;
> @@ -55,6 +59,7 @@ typedef struct NetClientInfo {
>      NetCleanup *cleanup;
>      LinkStatusChanged *link_status_changed;
>      NetPoll *poll;
> +    NetClientBindCtx *bind_ctx;
>  } NetClientInfo;
>  
>  struct NetClientState {
> diff --git a/include/net/queue.h b/include/net/queue.h
> index fc02b33..f60e57f 100644
> --- a/include/net/queue.h
> +++ b/include/net/queue.h
> @@ -38,6 +38,20 @@ NetQueue *qemu_new_net_queue(void *opaque);
>  
>  void qemu_del_net_queue(NetQueue *queue);
>  
> +void qemu_net_queue_append(NetQueue *queue,
> +                                  NetClientState *sender,
> +                                  unsigned flags,
> +                                  const uint8_t *buf,
> +                                  size_t size,
> +                                  NetPacketSent *sent_cb);
> +
> +void qemu_net_queue_append_iov(NetQueue *queue,
> +                                      NetClientState *sender,
> +                                      unsigned flags,
> +                                      const struct iovec *iov,
> +                                      int iovcnt,
> +                                      NetPacketSent *sent_cb);
> +
>  ssize_t qemu_net_queue_send(NetQueue *queue,
>                              NetClientState *sender,
>                              unsigned flags,
> diff --git a/main-loop.c b/main-loop.c
> index 8c9b58c..eb80ff3 100644
> --- a/main-loop.c
> +++ b/main-loop.c
> @@ -109,6 +109,11 @@ static int qemu_signal_init(void)
>  
>  static AioContext *qemu_aio_context;
>  
> +AioContext *qemu_get_aio_context(void)
> +{
> +    return qemu_aio_context;
> +}
> +
>  void qemu_notify_event(void)
>  {
>      if (!qemu_aio_context) {
> diff --git a/net/hub.c b/net/hub.c
> index df32074..73c1f26 100644
> --- a/net/hub.c
> +++ b/net/hub.c
> @@ -31,6 +31,8 @@ typedef struct NetHubPort {
>      QLIST_ENTRY(NetHubPort) next;
>      NetHub *hub;
>      int id;
> +    EventNotifier e;
> +    AioContext *ctx;
>  } NetHubPort;
>  
>  struct NetHub {
> @@ -52,11 +54,20 @@ static ssize_t net_hub_receive(NetHub *hub, NetHubPort *source_port,
>              continue;
>          }
>  
> -        qemu_send_packet(&port->nc, buf, len);
> +        qemu_net_queue_append(port->nc.peer->send_queue, &port->nc,
> +                            QEMU_NET_PACKET_FLAG_NONE, buf, len, NULL);
> +        event_notifier_set(&port->e);

Why are the context and the EventNotifier a property of the port, rather
than applicable to the NetClientState?

Please explain the design and where you want to go.  We do not have a
crystal ball :) and lack of explanation can only prevent people from
looking at your patches.

The absolute best would be if every tiny step along the way provides a
benefit.  For example, the patches that introduced AioContext enabled
asynchronous I/O on Windows.  Without such an incentive, it is very
difficult that patches progress beyond the RFC phase.

Paolo

>      }
>      return len;
>  }
>  
> +static void hub_port_deliver_packet(void *opaque)
> +{
> +    NetHubPort *port = (NetHubPort *)opaque;
> +
> +    qemu_net_queue_flush(port->nc.peer->send_queue);
> +}
> +
>  static ssize_t net_hub_receive_iov(NetHub *hub, NetHubPort *source_port,
>                                     const struct iovec *iov, int iovcnt)
>  {
> @@ -68,7 +79,9 @@ static ssize_t net_hub_receive_iov(NetHub *hub, NetHubPort *source_port,
>              continue;
>          }
>  
> -        qemu_sendv_packet(&port->nc, iov, iovcnt);
> +        qemu_net_queue_append_iov(port->nc.peer->send_queue, &port->nc,
> +                            QEMU_NET_PACKET_FLAG_NONE, iov, iovcnt, NULL);
> +        event_notifier_set(&port->e);
>      }
>      return len;
>  }
> @@ -126,9 +139,22 @@ static void net_hub_port_cleanup(NetClientState *nc)
>  {
>      NetHubPort *port = DO_UPCAST(NetHubPort, nc, nc);
>  
> +    if (port->ctx) {
> +        aio_set_fd_handler(port->ctx, event_notifier_get_fd(&port->e),
> +                        NULL, NULL, NULL, NULL);
> +    }
>      QLIST_REMOVE(port, next);
>  }
>  
> +static void net_hub_port_bind_ctx(NetClientState *nc, AioContext *ctx)
> +{
> +    NetHubPort *port = DO_UPCAST(NetHubPort, nc, nc);
> +
> +    port->ctx = ctx;
> +    aio_set_fd_handler(ctx, event_notifier_get_fd(&port->e),
> +                        hub_port_deliver_packet, NULL, NULL, port);
> +}
> +
>  static NetClientInfo net_hub_port_info = {
>      .type = NET_CLIENT_OPTIONS_KIND_HUBPORT,
>      .size = sizeof(NetHubPort),
> @@ -136,6 +162,7 @@ static NetClientInfo net_hub_port_info = {
>      .receive = net_hub_port_receive,
>      .receive_iov = net_hub_port_receive_iov,
>      .cleanup = net_hub_port_cleanup,
> +    .bind_ctx = net_hub_port_bind_ctx,
>  };
>  
>  static NetHubPort *net_hub_port_new(NetHub *hub, const char *name)
> @@ -155,7 +182,7 @@ static NetHubPort *net_hub_port_new(NetHub *hub, const char *name)
>      port = DO_UPCAST(NetHubPort, nc, nc);
>      port->id = id;
>      port->hub = hub;
> -
> +    event_notifier_init(&port->e, 0);
>      QLIST_INSERT_HEAD(&hub->ports, port, next);
>  
>      return port;
> diff --git a/net/net.c b/net/net.c
> index f3d67f8..104c5b2 100644
> --- a/net/net.c
> +++ b/net/net.c
> @@ -781,6 +781,7 @@ static int net_client_init1(const void *object, int is_netdev, Error **errp)
>              (opts->kind != NET_CLIENT_OPTIONS_KIND_NIC ||
>               !opts->nic->has_netdev)) {
>              peer = net_hub_add_port(u.net->has_vlan ? u.net->vlan : 0, NULL);
> +            peer->info->bind_ctx(peer, qemu_get_aio_context());
>          }
>  
>          if (net_client_init_fun[opts->kind](opts, name, peer) < 0) {
> diff --git a/net/queue.c b/net/queue.c
> index 859d02a..67959f8 100644
> --- a/net/queue.c
> +++ b/net/queue.c
> @@ -87,7 +87,7 @@ void qemu_del_net_queue(NetQueue *queue)
>      g_free(queue);
>  }
>  
> -static void qemu_net_queue_append(NetQueue *queue,
> +void qemu_net_queue_append(NetQueue *queue,
>                                    NetClientState *sender,
>                                    unsigned flags,
>                                    const uint8_t *buf,
> @@ -110,7 +110,7 @@ static void qemu_net_queue_append(NetQueue *queue,
>      QTAILQ_INSERT_TAIL(&queue->packets, packet, entry);
>  }
>  
> -static void qemu_net_queue_append_iov(NetQueue *queue,
> +void qemu_net_queue_append_iov(NetQueue *queue,
>                                        NetClientState *sender,
>                                        unsigned flags,
>                                        const struct iovec *iov,
>
pingfan liu - March 13, 2013, 2:26 a.m.
On Tue, Mar 12, 2013 at 4:50 PM, Paolo Bonzini <pbonzini@redhat.com> wrote:
> Il 07/03/2013 03:53, Liu Ping Fan ha scritto:
>> From: Liu Ping Fan <pingfank@linux.vnet.ibm.com>
>>
>> Forward packet to other hub ports by their AioContext.
>>
>> Signed-off-by: Liu Ping Fan <pingfank@linux.vnet.ibm.com>
>> ---
>>  hw/qdev-properties-system.c |    1 +
>>  include/block/aio.h         |    1 +
>>  include/net/net.h           |    5 +++++
>>  include/net/queue.h         |   14 ++++++++++++++
>>  main-loop.c                 |    5 +++++
>>  net/hub.c                   |   33 ++++++++++++++++++++++++++++++---
>>  net/net.c                   |    1 +
>>  net/queue.c                 |    4 ++--
>>  8 files changed, 59 insertions(+), 5 deletions(-)
>>
>> diff --git a/hw/qdev-properties-system.c b/hw/qdev-properties-system.c
>> index ce3af22..587a335 100644
>> --- a/hw/qdev-properties-system.c
>> +++ b/hw/qdev-properties-system.c
>> @@ -307,6 +307,7 @@ static void set_vlan(Object *obj, Visitor *v, void *opaque,
>>                    name, prop->info->name);
>>          return;
>>      }
>> +    hubport->info->bind_ctx(hubport, qemu_get_aio_context());
>>      *ptr = hubport;
>>  }
>>
>> diff --git a/include/block/aio.h b/include/block/aio.h
>> index 5b54d38..bcb5126 100644
>> --- a/include/block/aio.h
>> +++ b/include/block/aio.h
>> @@ -229,6 +229,7 @@ bool qemu_aio_wait(void);
>>  void qemu_aio_set_event_notifier(EventNotifier *notifier,
>>                                   EventNotifierHandler *io_read,
>>                                   AioFlushEventNotifierHandler *io_flush);
>> +AioContext *qemu_get_aio_context(void);
>>
>>  #ifdef CONFIG_POSIX
>>  void qemu_aio_set_fd_handler(int fd,
>> diff --git a/include/net/net.h b/include/net/net.h
>> index cb049a1..9c2b357 100644
>> --- a/include/net/net.h
>> +++ b/include/net/net.h
>> @@ -8,6 +8,9 @@
>>  #include "net/queue.h"
>>  #include "migration/vmstate.h"
>>  #include "qapi-types.h"
>> +#include "qemu/thread.h"
>> +#include "block/aio.h"
>> +
>>
>>  #define MAX_QUEUE_NUM 1024
>>
>> @@ -44,6 +47,7 @@ typedef ssize_t (NetReceiveIOV)(NetClientState *, const struct iovec *, int);
>>  typedef void (NetCleanup) (NetClientState *);
>>  typedef void (LinkStatusChanged)(NetClientState *);
>>  typedef void (NetClientDestructor)(NetClientState *);
>> +typedef void (NetClientBindCtx)(NetClientState *, AioContext *);
>>
>>  typedef struct NetClientInfo {
>>      NetClientOptionsKind type;
>> @@ -55,6 +59,7 @@ typedef struct NetClientInfo {
>>      NetCleanup *cleanup;
>>      LinkStatusChanged *link_status_changed;
>>      NetPoll *poll;
>> +    NetClientBindCtx *bind_ctx;
>>  } NetClientInfo;
>>
>>  struct NetClientState {
>> diff --git a/include/net/queue.h b/include/net/queue.h
>> index fc02b33..f60e57f 100644
>> --- a/include/net/queue.h
>> +++ b/include/net/queue.h
>> @@ -38,6 +38,20 @@ NetQueue *qemu_new_net_queue(void *opaque);
>>
>>  void qemu_del_net_queue(NetQueue *queue);
>>
>> +void qemu_net_queue_append(NetQueue *queue,
>> +                                  NetClientState *sender,
>> +                                  unsigned flags,
>> +                                  const uint8_t *buf,
>> +                                  size_t size,
>> +                                  NetPacketSent *sent_cb);
>> +
>> +void qemu_net_queue_append_iov(NetQueue *queue,
>> +                                      NetClientState *sender,
>> +                                      unsigned flags,
>> +                                      const struct iovec *iov,
>> +                                      int iovcnt,
>> +                                      NetPacketSent *sent_cb);
>> +
>>  ssize_t qemu_net_queue_send(NetQueue *queue,
>>                              NetClientState *sender,
>>                              unsigned flags,
>> diff --git a/main-loop.c b/main-loop.c
>> index 8c9b58c..eb80ff3 100644
>> --- a/main-loop.c
>> +++ b/main-loop.c
>> @@ -109,6 +109,11 @@ static int qemu_signal_init(void)
>>
>>  static AioContext *qemu_aio_context;
>>
>> +AioContext *qemu_get_aio_context(void)
>> +{
>> +    return qemu_aio_context;
>> +}
>> +
>>  void qemu_notify_event(void)
>>  {
>>      if (!qemu_aio_context) {
>> diff --git a/net/hub.c b/net/hub.c
>> index df32074..73c1f26 100644
>> --- a/net/hub.c
>> +++ b/net/hub.c
>> @@ -31,6 +31,8 @@ typedef struct NetHubPort {
>>      QLIST_ENTRY(NetHubPort) next;
>>      NetHub *hub;
>>      int id;
>> +    EventNotifier e;
>> +    AioContext *ctx;
>>  } NetHubPort;
>>
>>  struct NetHub {
>> @@ -52,11 +54,20 @@ static ssize_t net_hub_receive(NetHub *hub, NetHubPort *source_port,
>>              continue;
>>          }
>>
>> -        qemu_send_packet(&port->nc, buf, len);
>> +        qemu_net_queue_append(port->nc.peer->send_queue, &port->nc,
>> +                            QEMU_NET_PACKET_FLAG_NONE, buf, len, NULL);
>> +        event_notifier_set(&port->e);
>
> Why are the context and the EventNotifier a property of the port, rather
> than applicable to the NetClientState?
>
Yes, embed context into NetClientState is more reasonable, but as for
EventNotifier, considering about if we port tap onto context, the tap
do not have EventNotifier.

> Please explain the design and where you want to go.  We do not have a
> crystal ball :) and lack of explanation can only prevent people from
> looking at your patches.
>
Thanks for your advice. I have documented(will continue to update) the
whole motivation and plan on
http://wiki.qemu.org/Features/network_reentrant#benefit_of_network_layer_re-entrant
For this patch, hub model is one of the obstacle to make network layer
multi-thread.

Regards,
Pingfan
> The absolute best would be if every tiny step along the way provides a
> benefit.  For example, the patches that introduced AioContext enabled
> asynchronous I/O on Windows.  Without such an incentive, it is very
> difficult that patches progress beyond the RFC phase.
>
> Paolo
>
>>      }
>>      return len;
>>  }
>>
>> +static void hub_port_deliver_packet(void *opaque)
>> +{
>> +    NetHubPort *port = (NetHubPort *)opaque;
>> +
>> +    qemu_net_queue_flush(port->nc.peer->send_queue);
>> +}
>> +
>>  static ssize_t net_hub_receive_iov(NetHub *hub, NetHubPort *source_port,
>>                                     const struct iovec *iov, int iovcnt)
>>  {
>> @@ -68,7 +79,9 @@ static ssize_t net_hub_receive_iov(NetHub *hub, NetHubPort *source_port,
>>              continue;
>>          }
>>
>> -        qemu_sendv_packet(&port->nc, iov, iovcnt);
>> +        qemu_net_queue_append_iov(port->nc.peer->send_queue, &port->nc,
>> +                            QEMU_NET_PACKET_FLAG_NONE, iov, iovcnt, NULL);
>> +        event_notifier_set(&port->e);
>>      }
>>      return len;
>>  }
>> @@ -126,9 +139,22 @@ static void net_hub_port_cleanup(NetClientState *nc)
>>  {
>>      NetHubPort *port = DO_UPCAST(NetHubPort, nc, nc);
>>
>> +    if (port->ctx) {
>> +        aio_set_fd_handler(port->ctx, event_notifier_get_fd(&port->e),
>> +                        NULL, NULL, NULL, NULL);
>> +    }
>>      QLIST_REMOVE(port, next);
>>  }
>>
>> +static void net_hub_port_bind_ctx(NetClientState *nc, AioContext *ctx)
>> +{
>> +    NetHubPort *port = DO_UPCAST(NetHubPort, nc, nc);
>> +
>> +    port->ctx = ctx;
>> +    aio_set_fd_handler(ctx, event_notifier_get_fd(&port->e),
>> +                        hub_port_deliver_packet, NULL, NULL, port);
>> +}
>> +
>>  static NetClientInfo net_hub_port_info = {
>>      .type = NET_CLIENT_OPTIONS_KIND_HUBPORT,
>>      .size = sizeof(NetHubPort),
>> @@ -136,6 +162,7 @@ static NetClientInfo net_hub_port_info = {
>>      .receive = net_hub_port_receive,
>>      .receive_iov = net_hub_port_receive_iov,
>>      .cleanup = net_hub_port_cleanup,
>> +    .bind_ctx = net_hub_port_bind_ctx,
>>  };
>>
>>  static NetHubPort *net_hub_port_new(NetHub *hub, const char *name)
>> @@ -155,7 +182,7 @@ static NetHubPort *net_hub_port_new(NetHub *hub, const char *name)
>>      port = DO_UPCAST(NetHubPort, nc, nc);
>>      port->id = id;
>>      port->hub = hub;
>> -
>> +    event_notifier_init(&port->e, 0);
>>      QLIST_INSERT_HEAD(&hub->ports, port, next);
>>
>>      return port;
>> diff --git a/net/net.c b/net/net.c
>> index f3d67f8..104c5b2 100644
>> --- a/net/net.c
>> +++ b/net/net.c
>> @@ -781,6 +781,7 @@ static int net_client_init1(const void *object, int is_netdev, Error **errp)
>>              (opts->kind != NET_CLIENT_OPTIONS_KIND_NIC ||
>>               !opts->nic->has_netdev)) {
>>              peer = net_hub_add_port(u.net->has_vlan ? u.net->vlan : 0, NULL);
>> +            peer->info->bind_ctx(peer, qemu_get_aio_context());
>>          }
>>
>>          if (net_client_init_fun[opts->kind](opts, name, peer) < 0) {
>> diff --git a/net/queue.c b/net/queue.c
>> index 859d02a..67959f8 100644
>> --- a/net/queue.c
>> +++ b/net/queue.c
>> @@ -87,7 +87,7 @@ void qemu_del_net_queue(NetQueue *queue)
>>      g_free(queue);
>>  }
>>
>> -static void qemu_net_queue_append(NetQueue *queue,
>> +void qemu_net_queue_append(NetQueue *queue,
>>                                    NetClientState *sender,
>>                                    unsigned flags,
>>                                    const uint8_t *buf,
>> @@ -110,7 +110,7 @@ static void qemu_net_queue_append(NetQueue *queue,
>>      QTAILQ_INSERT_TAIL(&queue->packets, packet, entry);
>>  }
>>
>> -static void qemu_net_queue_append_iov(NetQueue *queue,
>> +void qemu_net_queue_append_iov(NetQueue *queue,
>>                                        NetClientState *sender,
>>                                        unsigned flags,
>>                                        const struct iovec *iov,
>>
>
Paolo Bonzini - March 13, 2013, 10:37 a.m.
Il 13/03/2013 03:26, liu ping fan ha scritto:
>>> >> +        qemu_net_queue_append(port->nc.peer->send_queue, &port->nc,
>>> >> +                            QEMU_NET_PACKET_FLAG_NONE, buf, len, NULL);
>>> >> +        event_notifier_set(&port->e);
>> >
>> > Why are the context and the EventNotifier a property of the port, rather
>> > than applicable to the NetClientState?
>> >
> Yes, embed context into NetClientState is more reasonable, but as for
> EventNotifier, considering about if we port tap onto context, the tap
> do not have EventNotifier.
> 

There doesn't even need to be an EventNotifier, instead you can pass the
NetClientState's AioContext to the queue and use a bottom half on the
AioContext.

Furthermore, the bottom half should be completely transparent.  Callers
can keep using qemu_net_queue_flush, qemu_net_queue_flush schedules the
bottom half (perhaps, with an optimization, it only does that if the
queue is not empty), the handler actually performs the flush.

Paolo

Patch

diff --git a/hw/qdev-properties-system.c b/hw/qdev-properties-system.c
index ce3af22..587a335 100644
--- a/hw/qdev-properties-system.c
+++ b/hw/qdev-properties-system.c
@@ -307,6 +307,7 @@  static void set_vlan(Object *obj, Visitor *v, void *opaque,
                   name, prop->info->name);
         return;
     }
+    hubport->info->bind_ctx(hubport, qemu_get_aio_context());
     *ptr = hubport;
 }
 
diff --git a/include/block/aio.h b/include/block/aio.h
index 5b54d38..bcb5126 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -229,6 +229,7 @@  bool qemu_aio_wait(void);
 void qemu_aio_set_event_notifier(EventNotifier *notifier,
                                  EventNotifierHandler *io_read,
                                  AioFlushEventNotifierHandler *io_flush);
+AioContext *qemu_get_aio_context(void);
 
 #ifdef CONFIG_POSIX
 void qemu_aio_set_fd_handler(int fd,
diff --git a/include/net/net.h b/include/net/net.h
index cb049a1..9c2b357 100644
--- a/include/net/net.h
+++ b/include/net/net.h
@@ -8,6 +8,9 @@ 
 #include "net/queue.h"
 #include "migration/vmstate.h"
 #include "qapi-types.h"
+#include "qemu/thread.h"
+#include "block/aio.h"
+
 
 #define MAX_QUEUE_NUM 1024
 
@@ -44,6 +47,7 @@  typedef ssize_t (NetReceiveIOV)(NetClientState *, const struct iovec *, int);
 typedef void (NetCleanup) (NetClientState *);
 typedef void (LinkStatusChanged)(NetClientState *);
 typedef void (NetClientDestructor)(NetClientState *);
+typedef void (NetClientBindCtx)(NetClientState *, AioContext *);
 
 typedef struct NetClientInfo {
     NetClientOptionsKind type;
@@ -55,6 +59,7 @@  typedef struct NetClientInfo {
     NetCleanup *cleanup;
     LinkStatusChanged *link_status_changed;
     NetPoll *poll;
+    NetClientBindCtx *bind_ctx;
 } NetClientInfo;
 
 struct NetClientState {
diff --git a/include/net/queue.h b/include/net/queue.h
index fc02b33..f60e57f 100644
--- a/include/net/queue.h
+++ b/include/net/queue.h
@@ -38,6 +38,20 @@  NetQueue *qemu_new_net_queue(void *opaque);
 
 void qemu_del_net_queue(NetQueue *queue);
 
+void qemu_net_queue_append(NetQueue *queue,
+                                  NetClientState *sender,
+                                  unsigned flags,
+                                  const uint8_t *buf,
+                                  size_t size,
+                                  NetPacketSent *sent_cb);
+
+void qemu_net_queue_append_iov(NetQueue *queue,
+                                      NetClientState *sender,
+                                      unsigned flags,
+                                      const struct iovec *iov,
+                                      int iovcnt,
+                                      NetPacketSent *sent_cb);
+
 ssize_t qemu_net_queue_send(NetQueue *queue,
                             NetClientState *sender,
                             unsigned flags,
diff --git a/main-loop.c b/main-loop.c
index 8c9b58c..eb80ff3 100644
--- a/main-loop.c
+++ b/main-loop.c
@@ -109,6 +109,11 @@  static int qemu_signal_init(void)
 
 static AioContext *qemu_aio_context;
 
+AioContext *qemu_get_aio_context(void)
+{
+    return qemu_aio_context;
+}
+
 void qemu_notify_event(void)
 {
     if (!qemu_aio_context) {
diff --git a/net/hub.c b/net/hub.c
index df32074..73c1f26 100644
--- a/net/hub.c
+++ b/net/hub.c
@@ -31,6 +31,8 @@  typedef struct NetHubPort {
     QLIST_ENTRY(NetHubPort) next;
     NetHub *hub;
     int id;
+    EventNotifier e;
+    AioContext *ctx;
 } NetHubPort;
 
 struct NetHub {
@@ -52,11 +54,20 @@  static ssize_t net_hub_receive(NetHub *hub, NetHubPort *source_port,
             continue;
         }
 
-        qemu_send_packet(&port->nc, buf, len);
+        qemu_net_queue_append(port->nc.peer->send_queue, &port->nc,
+                            QEMU_NET_PACKET_FLAG_NONE, buf, len, NULL);
+        event_notifier_set(&port->e);
     }
     return len;
 }
 
+static void hub_port_deliver_packet(void *opaque)
+{
+    NetHubPort *port = (NetHubPort *)opaque;
+
+    qemu_net_queue_flush(port->nc.peer->send_queue);
+}
+
 static ssize_t net_hub_receive_iov(NetHub *hub, NetHubPort *source_port,
                                    const struct iovec *iov, int iovcnt)
 {
@@ -68,7 +79,9 @@  static ssize_t net_hub_receive_iov(NetHub *hub, NetHubPort *source_port,
             continue;
         }
 
-        qemu_sendv_packet(&port->nc, iov, iovcnt);
+        qemu_net_queue_append_iov(port->nc.peer->send_queue, &port->nc,
+                            QEMU_NET_PACKET_FLAG_NONE, iov, iovcnt, NULL);
+        event_notifier_set(&port->e);
     }
     return len;
 }
@@ -126,9 +139,22 @@  static void net_hub_port_cleanup(NetClientState *nc)
 {
     NetHubPort *port = DO_UPCAST(NetHubPort, nc, nc);
 
+    if (port->ctx) {
+        aio_set_fd_handler(port->ctx, event_notifier_get_fd(&port->e),
+                        NULL, NULL, NULL, NULL);
+    }
     QLIST_REMOVE(port, next);
 }
 
+static void net_hub_port_bind_ctx(NetClientState *nc, AioContext *ctx)
+{
+    NetHubPort *port = DO_UPCAST(NetHubPort, nc, nc);
+
+    port->ctx = ctx;
+    aio_set_fd_handler(ctx, event_notifier_get_fd(&port->e),
+                        hub_port_deliver_packet, NULL, NULL, port);
+}
+
 static NetClientInfo net_hub_port_info = {
     .type = NET_CLIENT_OPTIONS_KIND_HUBPORT,
     .size = sizeof(NetHubPort),
@@ -136,6 +162,7 @@  static NetClientInfo net_hub_port_info = {
     .receive = net_hub_port_receive,
     .receive_iov = net_hub_port_receive_iov,
     .cleanup = net_hub_port_cleanup,
+    .bind_ctx = net_hub_port_bind_ctx,
 };
 
 static NetHubPort *net_hub_port_new(NetHub *hub, const char *name)
@@ -155,7 +182,7 @@  static NetHubPort *net_hub_port_new(NetHub *hub, const char *name)
     port = DO_UPCAST(NetHubPort, nc, nc);
     port->id = id;
     port->hub = hub;
-
+    event_notifier_init(&port->e, 0);
     QLIST_INSERT_HEAD(&hub->ports, port, next);
 
     return port;
diff --git a/net/net.c b/net/net.c
index f3d67f8..104c5b2 100644
--- a/net/net.c
+++ b/net/net.c
@@ -781,6 +781,7 @@  static int net_client_init1(const void *object, int is_netdev, Error **errp)
             (opts->kind != NET_CLIENT_OPTIONS_KIND_NIC ||
              !opts->nic->has_netdev)) {
             peer = net_hub_add_port(u.net->has_vlan ? u.net->vlan : 0, NULL);
+            peer->info->bind_ctx(peer, qemu_get_aio_context());
         }
 
         if (net_client_init_fun[opts->kind](opts, name, peer) < 0) {
diff --git a/net/queue.c b/net/queue.c
index 859d02a..67959f8 100644
--- a/net/queue.c
+++ b/net/queue.c
@@ -87,7 +87,7 @@  void qemu_del_net_queue(NetQueue *queue)
     g_free(queue);
 }
 
-static void qemu_net_queue_append(NetQueue *queue,
+void qemu_net_queue_append(NetQueue *queue,
                                   NetClientState *sender,
                                   unsigned flags,
                                   const uint8_t *buf,
@@ -110,7 +110,7 @@  static void qemu_net_queue_append(NetQueue *queue,
     QTAILQ_INSERT_TAIL(&queue->packets, packet, entry);
 }
 
-static void qemu_net_queue_append_iov(NetQueue *queue,
+void qemu_net_queue_append_iov(NetQueue *queue,
                                       NetClientState *sender,
                                       unsigned flags,
                                       const struct iovec *iov,