diff mbox

[v3,10/12] netfilter: add a netbuffer filter

Message ID 1438590616-21142-11-git-send-email-yanghy@cn.fujitsu.com
State New
Headers show

Commit Message

Yang Hongyang Aug. 3, 2015, 8:30 a.m. UTC
This filter is to buffer/release packets, this feature can be used
when using MicroCheckpointing, or other Remus like VM FT solutions, you
can also use it to simulate the network delay.
It has an interval option, if supplied, this filter will release
packets by interval.

Usage:
 -netdev tap,id=bn0
 -netfilter buffer,id=f0,netdev=bn0,chain=in,interval=1000

NOTE:
 the scale of interval is microsecond.

Signed-off-by: Yang Hongyang <yanghy@cn.fujitsu.com>
---
v3: check packet's sender and sender->peer when flush it
---
 net/Makefile.objs   |   1 +
 net/filter-buffer.c | 162 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 net/filter.c        |   2 +
 net/filters.h       |  17 ++++++
 qapi-schema.json    |  18 +++++-
 5 files changed, 199 insertions(+), 1 deletion(-)
 create mode 100644 net/filter-buffer.c
 create mode 100644 net/filters.h

Comments

Jason Wang Aug. 4, 2015, 5:03 a.m. UTC | #1
On 08/03/2015 04:30 PM, Yang Hongyang wrote:
> This filter is to buffer/release packets, this feature can be used
> when using MicroCheckpointing, or other Remus like VM FT solutions, you
> can also use it to simulate the network delay.
> It has an interval option, if supplied, this filter will release
> packets by interval.
>
> Usage:
>  -netdev tap,id=bn0
>  -netfilter buffer,id=f0,netdev=bn0,chain=in,interval=1000
>
> NOTE:
>  the scale of interval is microsecond.
>
> Signed-off-by: Yang Hongyang <yanghy@cn.fujitsu.com>
> ---
> v3: check packet's sender and sender->peer when flush it
> ---
>  net/Makefile.objs   |   1 +
>  net/filter-buffer.c | 162 ++++++++++++++++++++++++++++++++++++++++++++++++++++
>  net/filter.c        |   2 +
>  net/filters.h       |  17 ++++++
>  qapi-schema.json    |  18 +++++-
>  5 files changed, 199 insertions(+), 1 deletion(-)
>  create mode 100644 net/filter-buffer.c
>  create mode 100644 net/filters.h
>
> diff --git a/net/Makefile.objs b/net/Makefile.objs
> index 914aec0..5fa2f97 100644
> --- a/net/Makefile.objs
> +++ b/net/Makefile.objs
> @@ -14,3 +14,4 @@ common-obj-$(CONFIG_SLIRP) += slirp.o
>  common-obj-$(CONFIG_VDE) += vde.o
>  common-obj-$(CONFIG_NETMAP) += netmap.o
>  common-obj-y += filter.o
> +common-obj-y += filter-buffer.o
> diff --git a/net/filter-buffer.c b/net/filter-buffer.c
> new file mode 100644
> index 0000000..1547765
> --- /dev/null
> +++ b/net/filter-buffer.c
> @@ -0,0 +1,162 @@
> +/*
> + * Copyright (c) 2015 FUJITSU LIMITED
> + * Author: Yang Hongyang <yanghy@cn.fujitsu.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or
> + * later.  See the COPYING file in the top-level directory.
> + */
> +
> +#include "net/filter.h"
> +#include "net/queue.h"
> +#include "filters.h"
> +#include "qemu-common.h"
> +#include "qemu/error-report.h"
> +#include "qemu/main-loop.h"
> +#include "qemu/timer.h"
> +#include "qemu/iov.h"
> +
> +typedef struct FILTERBUFFERState {
> +    NetFilterState nf;
> +    NetQueue *incoming_queue;
> +    NetQueue *inflight_queue;
> +    QEMUBH *flush_bh;
> +    int64_t interval;
> +    QEMUTimer release_timer;
> +} FILTERBUFFERState;
> +
> +static void packet_send_completed(NetClientState *nc, ssize_t len)
> +{
> +    return;
> +}
> +
> +static void filter_buffer_flush(NetFilterState *nf)
> +{
> +    FILTERBUFFERState *s = DO_UPCAST(FILTERBUFFERState, nf, nf);
> +    NetQueue *queue = s->inflight_queue;
> +    NetPacket *packet;
> +
> +    while (queue && !QTAILQ_EMPTY(&queue->packets)) {
> +        packet = QTAILQ_FIRST(&queue->packets);
> +        QTAILQ_REMOVE(&queue->packets, packet, entry);
> +        queue->nq_count--;
> +
> +        if (packet->sender && packet->sender->peer) {
> +            qemu_net_queue_send(packet->sender->peer->incoming_queue,
> +                                packet->sender,
> +                                packet->flags,
> +                                packet->data,
> +                                packet->size,
> +                                packet->sent_cb);
> +        }
> +
> +        /*
> +         * now that we pass the packet to sender->peer->incoming_queue, we
> +         * don't care the reture value here, because the peer's queue will
> +         * take care of this packet
> +         */
> +        g_free(packet);

So looks like the packet was still not passed to next filter?

> +    }
> +
> +    g_free(queue);
> +    s->inflight_queue = NULL;
> +}
> +
> +static void filter_buffer_flush_bh(void *opaque)
> +{
> +    FILTERBUFFERState *s = opaque;
> +    NetFilterState *nf = &s->nf;
> +    filter_buffer_flush(nf);
> +}
> +
> +static void filter_buffer_release_one(NetFilterState *nf)
> +{
> +    FILTERBUFFERState *s = DO_UPCAST(FILTERBUFFERState, nf, nf);
> +
> +    /* flush inflight packets */
> +    if (s->inflight_queue) {
> +        filter_buffer_flush(nf);
> +    }
> +
> +    s->inflight_queue = s->incoming_queue;
> +    s->incoming_queue = qemu_new_net_queue(nf);

So this in fact flush a brunch of packets. If yes, the name of function
is confusing

> +    qemu_bh_schedule(s->flush_bh);

Don't get why a bh is needed. If we could get rid of it, there's
probably no need for inflight_queue, and we can just drain
incoming_queue here.

> +}
> +
> +static void filter_buffer_release_timer(void *opaque)
> +{
> +    FILTERBUFFERState *s = opaque;
> +    filter_buffer_release_one(&s->nf);
> +    timer_mod(&s->release_timer,
> +              qemu_clock_get_us(QEMU_CLOCK_VIRTUAL) + s->interval);
> +}
> +
> +/* filter APIs */
> +static ssize_t filter_buffer_receive_iov(NetFilterState *nf,
> +                                         NetClientState *sender,
> +                                         unsigned flags,
> +                                         const struct iovec *iov,
> +                                         int iovcnt)
> +{
> +    FILTERBUFFERState *s = DO_UPCAST(FILTERBUFFERState, nf, nf);
> +    NetQueue *queue = s->incoming_queue;
> +
> +    qemu_net_queue_append_iov(queue, sender, flags, iov, iovcnt,
> +                              packet_send_completed);
> +    return iov_size(iov, iovcnt);
> +}
> +
> +static void filter_buffer_cleanup(NetFilterState *nf)
> +{
> +    FILTERBUFFERState *s = DO_UPCAST(FILTERBUFFERState, nf, nf);
> +
> +    if (s->interval) {
> +        timer_del(&s->release_timer);
> +    }
> +
> +    /* flush inflight packets */
> +    filter_buffer_flush(nf);
> +    /* flush incoming packets */
> +    s->inflight_queue = s->incoming_queue;
> +    s->incoming_queue = NULL;
> +    filter_buffer_flush(nf);
> +
> +    if (s->flush_bh) {
> +        qemu_bh_delete(s->flush_bh);
> +        s->flush_bh = NULL;
> +    }
> +    return;
> +}
> +
> +
> +static NetFilterInfo net_filter_buffer_info = {
> +    .type = NET_FILTER_OPTIONS_KIND_BUFFER,
> +    .size = sizeof(FILTERBUFFERState),
> +    .receive_iov = filter_buffer_receive_iov,
> +    .cleanup = filter_buffer_cleanup,
> +};
> +
> +int net_init_filter_buffer(const NetFilterOptions *opts, const char *name,
> +                           int chain, NetClientState *netdev, Error **errp)
> +{
> +    NetFilterState *nf;
> +    FILTERBUFFERState *s;
> +    const NetFilterBufferOptions *bufferopt;
> +
> +    assert(opts->kind == NET_FILTER_OPTIONS_KIND_BUFFER);
> +    bufferopt = opts->buffer;
> +
> +    nf = qemu_new_net_filter(&net_filter_buffer_info,
> +                             netdev, "buffer", name, chain);
> +    s = DO_UPCAST(FILTERBUFFERState, nf, nf);
> +    s->incoming_queue = qemu_new_net_queue(nf);
> +    s->flush_bh = qemu_bh_new(filter_buffer_flush_bh, s);
> +    s->interval = bufferopt->has_interval ? bufferopt->interval : 0;
> +    if (s->interval) {
> +        timer_init_us(&s->release_timer, QEMU_CLOCK_VIRTUAL,
> +                      filter_buffer_release_timer, s);
> +        timer_mod(&s->release_timer,
> +                  qemu_clock_get_us(QEMU_CLOCK_VIRTUAL) + s->interval);
> +    }
> +
> +    return 0;
> +}
> diff --git a/net/filter.c b/net/filter.c
> index 84845b1..ba78683 100644
> --- a/net/filter.c
> +++ b/net/filter.c
> @@ -17,6 +17,7 @@
>  
>  #include "net/filter.h"
>  #include "net/net.h"
> +#include "filters.h"
>  
>  static QTAILQ_HEAD(, NetFilterState) net_filters;
>  
> @@ -168,6 +169,7 @@ typedef int (NetFilterInit)(const NetFilterOptions *opts,
>  
>  static
>  NetFilterInit * const net_filter_init_fun[NET_FILTER_OPTIONS_KIND_MAX] = {
> +    [NET_FILTER_OPTIONS_KIND_BUFFER] = net_init_filter_buffer,
>  };
>  
>  static int net_filter_init1(const NetFilter *netfilter, Error **errp)
> diff --git a/net/filters.h b/net/filters.h
> new file mode 100644
> index 0000000..3b546db
> --- /dev/null
> +++ b/net/filters.h
> @@ -0,0 +1,17 @@
> +/*
> + * Copyright (c) 2015 FUJITSU LIMITED
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or
> + * later.  See the COPYING file in the top-level directory.
> + */
> +
> +#ifndef QEMU_NET_FILTERS_H
> +#define QEMU_NET_FILTERS_H
> +
> +#include "net/net.h"
> +#include "net/filter.h"
> +
> +int net_init_filter_buffer(const NetFilterOptions *opts, const char *name,
> +                           int chain, NetClientState *netdev, Error **errp);
> +
> +#endif /* QEMU_NET_FILTERS_H */
> diff --git a/qapi-schema.json b/qapi-schema.json
> index 9d97c21..e51bb59 100644
> --- a/qapi-schema.json
> +++ b/qapi-schema.json
> @@ -2584,6 +2584,21 @@
>  { 'command': 'netfilter_del', 'data': {'id': 'str'} }
>  
>  ##
> +# @NetFilterBufferOptions
> +#
> +# a netbuffer filter for network backend.
> +#
> +# @interval: #optional release packets by interval, if no interval supplied,
> +#            will release packets when filter_buffer_release_all been called.
> +#            scale: microsecond
> +#
> +# Since 2.5
> +##
> +{ 'struct': 'NetFilterBufferOptions',
> +  'data': {
> +    '*interval':     'int64' } }
> +
> +##
>  # @NetFilterOptions
>  #
>  # A discriminated record of network filters.
> @@ -2592,7 +2607,8 @@
>  #
>  ##
>  { 'union': 'NetFilterOptions',
> -  'data': { } }
> +  'data': {
> +    'buffer':     'NetFilterBufferOptions'} }
>  
>  ##
>  # @NetFilter
Yang Hongyang Aug. 4, 2015, 6:05 a.m. UTC | #2
On 08/04/2015 01:03 PM, Jason Wang wrote:
>
>
> On 08/03/2015 04:30 PM, Yang Hongyang wrote:
>> This filter is to buffer/release packets, this feature can be used
>> when using MicroCheckpointing, or other Remus like VM FT solutions, you
>> can also use it to simulate the network delay.
>> It has an interval option, if supplied, this filter will release
>> packets by interval.
>>
>> Usage:
>>   -netdev tap,id=bn0
>>   -netfilter buffer,id=f0,netdev=bn0,chain=in,interval=1000
>>
>> NOTE:
>>   the scale of interval is microsecond.
>>
>> Signed-off-by: Yang Hongyang <yanghy@cn.fujitsu.com>
>> ---
>> v3: check packet's sender and sender->peer when flush it
>> ---
>>   net/Makefile.objs   |   1 +
>>   net/filter-buffer.c | 162 ++++++++++++++++++++++++++++++++++++++++++++++++++++
>>   net/filter.c        |   2 +
>>   net/filters.h       |  17 ++++++
>>   qapi-schema.json    |  18 +++++-
>>   5 files changed, 199 insertions(+), 1 deletion(-)
>>   create mode 100644 net/filter-buffer.c
>>   create mode 100644 net/filters.h
>>
>> diff --git a/net/Makefile.objs b/net/Makefile.objs
>> index 914aec0..5fa2f97 100644
>> --- a/net/Makefile.objs
>> +++ b/net/Makefile.objs
>> @@ -14,3 +14,4 @@ common-obj-$(CONFIG_SLIRP) += slirp.o
>>   common-obj-$(CONFIG_VDE) += vde.o
>>   common-obj-$(CONFIG_NETMAP) += netmap.o
>>   common-obj-y += filter.o
>> +common-obj-y += filter-buffer.o
>> diff --git a/net/filter-buffer.c b/net/filter-buffer.c
>> new file mode 100644
>> index 0000000..1547765
>> --- /dev/null
>> +++ b/net/filter-buffer.c
>> @@ -0,0 +1,162 @@
>> +/*
>> + * Copyright (c) 2015 FUJITSU LIMITED
>> + * Author: Yang Hongyang <yanghy@cn.fujitsu.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2 or
>> + * later.  See the COPYING file in the top-level directory.
>> + */
>> +
>> +#include "net/filter.h"
>> +#include "net/queue.h"
>> +#include "filters.h"
>> +#include "qemu-common.h"
>> +#include "qemu/error-report.h"
>> +#include "qemu/main-loop.h"
>> +#include "qemu/timer.h"
>> +#include "qemu/iov.h"
>> +
>> +typedef struct FILTERBUFFERState {
>> +    NetFilterState nf;
>> +    NetQueue *incoming_queue;
>> +    NetQueue *inflight_queue;
>> +    QEMUBH *flush_bh;
>> +    int64_t interval;
>> +    QEMUTimer release_timer;
>> +} FILTERBUFFERState;
>> +
>> +static void packet_send_completed(NetClientState *nc, ssize_t len)
>> +{
>> +    return;
>> +}
>> +
>> +static void filter_buffer_flush(NetFilterState *nf)
>> +{
>> +    FILTERBUFFERState *s = DO_UPCAST(FILTERBUFFERState, nf, nf);
>> +    NetQueue *queue = s->inflight_queue;
>> +    NetPacket *packet;
>> +
>> +    while (queue && !QTAILQ_EMPTY(&queue->packets)) {
>> +        packet = QTAILQ_FIRST(&queue->packets);
>> +        QTAILQ_REMOVE(&queue->packets, packet, entry);
>> +        queue->nq_count--;
>> +
>> +        if (packet->sender && packet->sender->peer) {
>> +            qemu_net_queue_send(packet->sender->peer->incoming_queue,
>> +                                packet->sender,
>> +                                packet->flags,
>> +                                packet->data,
>> +                                packet->size,
>> +                                packet->sent_cb);
>> +        }
>> +
>> +        /*
>> +         * now that we pass the packet to sender->peer->incoming_queue, we
>> +         * don't care the reture value here, because the peer's queue will
>> +         * take care of this packet
>> +         */
>> +        g_free(packet);
>
> So looks like the packet was still not passed to next filter?

I didn't get your suggestion last time, sorry, will add it in next version.
Just to confirm, do you mean we need to pass the packet to next filter
instead of pass to the receiver's incoming_queue? and even if we pass to next
filter, the check of sender and it's peer is still needed, because if
there's no receiver, it's nonsense to pass it further?

>
>> +    }
>> +
>> +    g_free(queue);
>> +    s->inflight_queue = NULL;
>> +}
>> +
>> +static void filter_buffer_flush_bh(void *opaque)
>> +{
>> +    FILTERBUFFERState *s = opaque;
>> +    NetFilterState *nf = &s->nf;
>> +    filter_buffer_flush(nf);
>> +}
>> +
>> +static void filter_buffer_release_one(NetFilterState *nf)
>> +{
>> +    FILTERBUFFERState *s = DO_UPCAST(FILTERBUFFERState, nf, nf);
>> +
>> +    /* flush inflight packets */
>> +    if (s->inflight_queue) {
>> +        filter_buffer_flush(nf);
>> +    }
>> +
>> +    s->inflight_queue = s->incoming_queue;
>> +    s->incoming_queue = qemu_new_net_queue(nf);
>
> So this in fact flush a brunch of packets. If yes, the name of function
> is confusing

maybe filter_buffer_release ?

>
>> +    qemu_bh_schedule(s->flush_bh);
>
> Don't get why a bh is needed. If we could get rid of it, there's
> probably no need for inflight_queue, and we can just drain
> incoming_queue here.

Seems we can get rid of bh, will do, thanks.

>
>> +}
>> +
>> +static void filter_buffer_release_timer(void *opaque)
>> +{
>> +    FILTERBUFFERState *s = opaque;
>> +    filter_buffer_release_one(&s->nf);
>> +    timer_mod(&s->release_timer,
>> +              qemu_clock_get_us(QEMU_CLOCK_VIRTUAL) + s->interval);
>> +}
>> +
>> +/* filter APIs */
>> +static ssize_t filter_buffer_receive_iov(NetFilterState *nf,
>> +                                         NetClientState *sender,
>> +                                         unsigned flags,
>> +                                         const struct iovec *iov,
>> +                                         int iovcnt)
>> +{
>> +    FILTERBUFFERState *s = DO_UPCAST(FILTERBUFFERState, nf, nf);
>> +    NetQueue *queue = s->incoming_queue;
>> +
>> +    qemu_net_queue_append_iov(queue, sender, flags, iov, iovcnt,
>> +                              packet_send_completed);
>> +    return iov_size(iov, iovcnt);
>> +}
>> +
>> +static void filter_buffer_cleanup(NetFilterState *nf)
>> +{
>> +    FILTERBUFFERState *s = DO_UPCAST(FILTERBUFFERState, nf, nf);
>> +
>> +    if (s->interval) {
>> +        timer_del(&s->release_timer);
>> +    }
>> +
>> +    /* flush inflight packets */
>> +    filter_buffer_flush(nf);
>> +    /* flush incoming packets */
>> +    s->inflight_queue = s->incoming_queue;
>> +    s->incoming_queue = NULL;
>> +    filter_buffer_flush(nf);
>> +
>> +    if (s->flush_bh) {
>> +        qemu_bh_delete(s->flush_bh);
>> +        s->flush_bh = NULL;
>> +    }
>> +    return;
>> +}
>> +
>> +
>> +static NetFilterInfo net_filter_buffer_info = {
>> +    .type = NET_FILTER_OPTIONS_KIND_BUFFER,
>> +    .size = sizeof(FILTERBUFFERState),
>> +    .receive_iov = filter_buffer_receive_iov,
>> +    .cleanup = filter_buffer_cleanup,
>> +};
>> +
>> +int net_init_filter_buffer(const NetFilterOptions *opts, const char *name,
>> +                           int chain, NetClientState *netdev, Error **errp)
>> +{
>> +    NetFilterState *nf;
>> +    FILTERBUFFERState *s;
>> +    const NetFilterBufferOptions *bufferopt;
>> +
>> +    assert(opts->kind == NET_FILTER_OPTIONS_KIND_BUFFER);
>> +    bufferopt = opts->buffer;
>> +
>> +    nf = qemu_new_net_filter(&net_filter_buffer_info,
>> +                             netdev, "buffer", name, chain);
>> +    s = DO_UPCAST(FILTERBUFFERState, nf, nf);
>> +    s->incoming_queue = qemu_new_net_queue(nf);
>> +    s->flush_bh = qemu_bh_new(filter_buffer_flush_bh, s);
>> +    s->interval = bufferopt->has_interval ? bufferopt->interval : 0;
>> +    if (s->interval) {
>> +        timer_init_us(&s->release_timer, QEMU_CLOCK_VIRTUAL,
>> +                      filter_buffer_release_timer, s);
>> +        timer_mod(&s->release_timer,
>> +                  qemu_clock_get_us(QEMU_CLOCK_VIRTUAL) + s->interval);
>> +    }
>> +
>> +    return 0;
>> +}
>> diff --git a/net/filter.c b/net/filter.c
>> index 84845b1..ba78683 100644
>> --- a/net/filter.c
>> +++ b/net/filter.c
>> @@ -17,6 +17,7 @@
>>
>>   #include "net/filter.h"
>>   #include "net/net.h"
>> +#include "filters.h"
>>
>>   static QTAILQ_HEAD(, NetFilterState) net_filters;
>>
>> @@ -168,6 +169,7 @@ typedef int (NetFilterInit)(const NetFilterOptions *opts,
>>
>>   static
>>   NetFilterInit * const net_filter_init_fun[NET_FILTER_OPTIONS_KIND_MAX] = {
>> +    [NET_FILTER_OPTIONS_KIND_BUFFER] = net_init_filter_buffer,
>>   };
>>
>>   static int net_filter_init1(const NetFilter *netfilter, Error **errp)
>> diff --git a/net/filters.h b/net/filters.h
>> new file mode 100644
>> index 0000000..3b546db
>> --- /dev/null
>> +++ b/net/filters.h
>> @@ -0,0 +1,17 @@
>> +/*
>> + * Copyright (c) 2015 FUJITSU LIMITED
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2 or
>> + * later.  See the COPYING file in the top-level directory.
>> + */
>> +
>> +#ifndef QEMU_NET_FILTERS_H
>> +#define QEMU_NET_FILTERS_H
>> +
>> +#include "net/net.h"
>> +#include "net/filter.h"
>> +
>> +int net_init_filter_buffer(const NetFilterOptions *opts, const char *name,
>> +                           int chain, NetClientState *netdev, Error **errp);
>> +
>> +#endif /* QEMU_NET_FILTERS_H */
>> diff --git a/qapi-schema.json b/qapi-schema.json
>> index 9d97c21..e51bb59 100644
>> --- a/qapi-schema.json
>> +++ b/qapi-schema.json
>> @@ -2584,6 +2584,21 @@
>>   { 'command': 'netfilter_del', 'data': {'id': 'str'} }
>>
>>   ##
>> +# @NetFilterBufferOptions
>> +#
>> +# a netbuffer filter for network backend.
>> +#
>> +# @interval: #optional release packets by interval, if no interval supplied,
>> +#            will release packets when filter_buffer_release_all been called.
>> +#            scale: microsecond
>> +#
>> +# Since 2.5
>> +##
>> +{ 'struct': 'NetFilterBufferOptions',
>> +  'data': {
>> +    '*interval':     'int64' } }
>> +
>> +##
>>   # @NetFilterOptions
>>   #
>>   # A discriminated record of network filters.
>> @@ -2592,7 +2607,8 @@
>>   #
>>   ##
>>   { 'union': 'NetFilterOptions',
>> -  'data': { } }
>> +  'data': {
>> +    'buffer':     'NetFilterBufferOptions'} }
>>
>>   ##
>>   # @NetFilter
>
> .
>
Jason Wang Aug. 4, 2015, 6:30 a.m. UTC | #3
On 08/04/2015 02:05 PM, Yang Hongyang wrote:
> On 08/04/2015 01:03 PM, Jason Wang wrote:
>>
>>
>> On 08/03/2015 04:30 PM, Yang Hongyang wrote:
>>> This filter is to buffer/release packets, this feature can be used
>>> when using MicroCheckpointing, or other Remus like VM FT solutions, you
>>> can also use it to simulate the network delay.
>>> It has an interval option, if supplied, this filter will release
>>> packets by interval.
>>>
>>> Usage:
>>>   -netdev tap,id=bn0
>>>   -netfilter buffer,id=f0,netdev=bn0,chain=in,interval=1000
>>>
>>> NOTE:
>>>   the scale of interval is microsecond.
>>>
>>> Signed-off-by: Yang Hongyang <yanghy@cn.fujitsu.com>
>>> ---
>>> v3: check packet's sender and sender->peer when flush it
>>> ---
>>>   net/Makefile.objs   |   1 +
>>>   net/filter-buffer.c | 162
>>> ++++++++++++++++++++++++++++++++++++++++++++++++++++
>>>   net/filter.c        |   2 +
>>>   net/filters.h       |  17 ++++++
>>>   qapi-schema.json    |  18 +++++-
>>>   5 files changed, 199 insertions(+), 1 deletion(-)
>>>   create mode 100644 net/filter-buffer.c
>>>   create mode 100644 net/filters.h
>>>
>>> diff --git a/net/Makefile.objs b/net/Makefile.objs
>>> index 914aec0..5fa2f97 100644
>>> --- a/net/Makefile.objs
>>> +++ b/net/Makefile.objs
>>> @@ -14,3 +14,4 @@ common-obj-$(CONFIG_SLIRP) += slirp.o
>>>   common-obj-$(CONFIG_VDE) += vde.o
>>>   common-obj-$(CONFIG_NETMAP) += netmap.o
>>>   common-obj-y += filter.o
>>> +common-obj-y += filter-buffer.o
>>> diff --git a/net/filter-buffer.c b/net/filter-buffer.c
>>> new file mode 100644
>>> index 0000000..1547765
>>> --- /dev/null
>>> +++ b/net/filter-buffer.c
>>> @@ -0,0 +1,162 @@
>>> +/*
>>> + * Copyright (c) 2015 FUJITSU LIMITED
>>> + * Author: Yang Hongyang <yanghy@cn.fujitsu.com>
>>> + *
>>> + * This work is licensed under the terms of the GNU GPL, version 2 or
>>> + * later.  See the COPYING file in the top-level directory.
>>> + */
>>> +
>>> +#include "net/filter.h"
>>> +#include "net/queue.h"
>>> +#include "filters.h"
>>> +#include "qemu-common.h"
>>> +#include "qemu/error-report.h"
>>> +#include "qemu/main-loop.h"
>>> +#include "qemu/timer.h"
>>> +#include "qemu/iov.h"
>>> +
>>> +typedef struct FILTERBUFFERState {
>>> +    NetFilterState nf;
>>> +    NetQueue *incoming_queue;
>>> +    NetQueue *inflight_queue;
>>> +    QEMUBH *flush_bh;
>>> +    int64_t interval;
>>> +    QEMUTimer release_timer;
>>> +} FILTERBUFFERState;
>>> +
>>> +static void packet_send_completed(NetClientState *nc, ssize_t len)
>>> +{
>>> +    return;
>>> +}
>>> +
>>> +static void filter_buffer_flush(NetFilterState *nf)
>>> +{
>>> +    FILTERBUFFERState *s = DO_UPCAST(FILTERBUFFERState, nf, nf);
>>> +    NetQueue *queue = s->inflight_queue;
>>> +    NetPacket *packet;
>>> +
>>> +    while (queue && !QTAILQ_EMPTY(&queue->packets)) {
>>> +        packet = QTAILQ_FIRST(&queue->packets);
>>> +        QTAILQ_REMOVE(&queue->packets, packet, entry);
>>> +        queue->nq_count--;
>>> +
>>> +        if (packet->sender && packet->sender->peer) {
>>> +            qemu_net_queue_send(packet->sender->peer->incoming_queue,
>>> +                                packet->sender,
>>> +                                packet->flags,
>>> +                                packet->data,
>>> +                                packet->size,
>>> +                                packet->sent_cb);
>>> +        }
>>> +
>>> +        /*
>>> +         * now that we pass the packet to
>>> sender->peer->incoming_queue, we
>>> +         * don't care the reture value here, because the peer's
>>> queue will
>>> +         * take care of this packet
>>> +         */
>>> +        g_free(packet);
>>
>> So looks like the packet was still not passed to next filter?
>
> I didn't get your suggestion last time, sorry, will add it in next
> version.
> Just to confirm, do you mean we need to pass the packet to next filter
> instead of pass to the receiver's incoming_queue?

Yes, consider you may have two filters. First is dump and second is
buffer, I believe you still want to buffer the packet even if it has
been dumped.

> and even if we pass to next
> filter, the check of sender and it's peer is still needed, because if
> there's no receiver, it's nonsense to pass it further?

Yes.

>
>>
>>> +    }
>>> +
>>> +    g_free(queue);
>>> +    s->inflight_queue = NULL;
>>> +}
>>> +
>>> +static void filter_buffer_flush_bh(void *opaque)
>>> +{
>>> +    FILTERBUFFERState *s = opaque;
>>> +    NetFilterState *nf = &s->nf;
>>> +    filter_buffer_flush(nf);
>>> +}
>>> +
>>> +static void filter_buffer_release_one(NetFilterState *nf)
>>> +{
>>> +    FILTERBUFFERState *s = DO_UPCAST(FILTERBUFFERState, nf, nf);
>>> +
>>> +    /* flush inflight packets */
>>> +    if (s->inflight_queue) {
>>> +        filter_buffer_flush(nf);
>>> +    }
>>> +
>>> +    s->inflight_queue = s->incoming_queue;
>>> +    s->incoming_queue = qemu_new_net_queue(nf);
>>
>> So this in fact flush a brunch of packets. If yes, the name of function
>> is confusing
>
> maybe filter_buffer_release ?
>

Right.

>>
>>> +    qemu_bh_schedule(s->flush_bh);
>>
>> Don't get why a bh is needed. If we could get rid of it, there's
>> probably no need for inflight_queue, and we can just drain
>> incoming_queue here.
>
> Seems we can get rid of bh, will do, thanks.
Yang Hongyang Aug. 4, 2015, 7:57 a.m. UTC | #4
On 08/04/2015 02:30 PM, Jason Wang wrote:
>
>
> On 08/04/2015 02:05 PM, Yang Hongyang wrote:
>> On 08/04/2015 01:03 PM, Jason Wang wrote:
>>>
>>>
>>> On 08/03/2015 04:30 PM, Yang Hongyang wrote:
>>>> This filter is to buffer/release packets, this feature can be used
>>>> when using MicroCheckpointing, or other Remus like VM FT solutions, you
>>>> can also use it to simulate the network delay.
>>>> It has an interval option, if supplied, this filter will release
>>>> packets by interval.
>>>>
>>>> Usage:
>>>>    -netdev tap,id=bn0
>>>>    -netfilter buffer,id=f0,netdev=bn0,chain=in,interval=1000
>>>>
>>>> NOTE:
>>>>    the scale of interval is microsecond.
>>>>
>>>> Signed-off-by: Yang Hongyang <yanghy@cn.fujitsu.com>
>>>> ---
>>>> v3: check packet's sender and sender->peer when flush it
>>>> ---
>>>>    net/Makefile.objs   |   1 +
>>>>    net/filter-buffer.c | 162
>>>> ++++++++++++++++++++++++++++++++++++++++++++++++++++
>>>>    net/filter.c        |   2 +
>>>>    net/filters.h       |  17 ++++++
>>>>    qapi-schema.json    |  18 +++++-
>>>>    5 files changed, 199 insertions(+), 1 deletion(-)
>>>>    create mode 100644 net/filter-buffer.c
>>>>    create mode 100644 net/filters.h
>>>>
>>>> diff --git a/net/Makefile.objs b/net/Makefile.objs
>>>> index 914aec0..5fa2f97 100644
>>>> --- a/net/Makefile.objs
>>>> +++ b/net/Makefile.objs
>>>> @@ -14,3 +14,4 @@ common-obj-$(CONFIG_SLIRP) += slirp.o
>>>>    common-obj-$(CONFIG_VDE) += vde.o
>>>>    common-obj-$(CONFIG_NETMAP) += netmap.o
>>>>    common-obj-y += filter.o
>>>> +common-obj-y += filter-buffer.o
>>>> diff --git a/net/filter-buffer.c b/net/filter-buffer.c
>>>> new file mode 100644
>>>> index 0000000..1547765
>>>> --- /dev/null
>>>> +++ b/net/filter-buffer.c
>>>> @@ -0,0 +1,162 @@
>>>> +/*
>>>> + * Copyright (c) 2015 FUJITSU LIMITED
>>>> + * Author: Yang Hongyang <yanghy@cn.fujitsu.com>
>>>> + *
>>>> + * This work is licensed under the terms of the GNU GPL, version 2 or
>>>> + * later.  See the COPYING file in the top-level directory.
>>>> + */
>>>> +
>>>> +#include "net/filter.h"
>>>> +#include "net/queue.h"
>>>> +#include "filters.h"
>>>> +#include "qemu-common.h"
>>>> +#include "qemu/error-report.h"
>>>> +#include "qemu/main-loop.h"
>>>> +#include "qemu/timer.h"
>>>> +#include "qemu/iov.h"
>>>> +
>>>> +typedef struct FILTERBUFFERState {
>>>> +    NetFilterState nf;
>>>> +    NetQueue *incoming_queue;
>>>> +    NetQueue *inflight_queue;
>>>> +    QEMUBH *flush_bh;
>>>> +    int64_t interval;
>>>> +    QEMUTimer release_timer;
>>>> +} FILTERBUFFERState;
>>>> +
>>>> +static void packet_send_completed(NetClientState *nc, ssize_t len)
>>>> +{
>>>> +    return;
>>>> +}
>>>> +
>>>> +static void filter_buffer_flush(NetFilterState *nf)
>>>> +{
>>>> +    FILTERBUFFERState *s = DO_UPCAST(FILTERBUFFERState, nf, nf);
>>>> +    NetQueue *queue = s->inflight_queue;
>>>> +    NetPacket *packet;
>>>> +
>>>> +    while (queue && !QTAILQ_EMPTY(&queue->packets)) {
>>>> +        packet = QTAILQ_FIRST(&queue->packets);
>>>> +        QTAILQ_REMOVE(&queue->packets, packet, entry);
>>>> +        queue->nq_count--;
>>>> +
>>>> +        if (packet->sender && packet->sender->peer) {
>>>> +            qemu_net_queue_send(packet->sender->peer->incoming_queue,
>>>> +                                packet->sender,
>>>> +                                packet->flags,
>>>> +                                packet->data,
>>>> +                                packet->size,
>>>> +                                packet->sent_cb);
>>>> +        }
>>>> +
>>>> +        /*
>>>> +         * now that we pass the packet to
>>>> sender->peer->incoming_queue, we
>>>> +         * don't care the reture value here, because the peer's
>>>> queue will
>>>> +         * take care of this packet
>>>> +         */
>>>> +        g_free(packet);
>>>
>>> So looks like the packet was still not passed to next filter?
>>
>> I didn't get your suggestion last time, sorry, will add it in next
>> version.
>> Just to confirm, do you mean we need to pass the packet to next filter
>> instead of pass to the receiver's incoming_queue?
>
> Yes, consider you may have two filters. First is dump and second is
> buffer, I believe you still want to buffer the packet even if it has
> been dumped.
>
>> and even if we pass to next
>> filter, the check of sender and it's peer is still needed, because if
>> there's no receiver, it's nonsense to pass it further?
>
> Yes.

thanks.

>
>>
>>>
>>>> +    }
>>>> +
>>>> +    g_free(queue);
>>>> +    s->inflight_queue = NULL;
>>>> +}
>>>> +
>>>> +static void filter_buffer_flush_bh(void *opaque)
>>>> +{
>>>> +    FILTERBUFFERState *s = opaque;
>>>> +    NetFilterState *nf = &s->nf;
>>>> +    filter_buffer_flush(nf);
>>>> +}
>>>> +
>>>> +static void filter_buffer_release_one(NetFilterState *nf)
>>>> +{
>>>> +    FILTERBUFFERState *s = DO_UPCAST(FILTERBUFFERState, nf, nf);
>>>> +
>>>> +    /* flush inflight packets */
>>>> +    if (s->inflight_queue) {
>>>> +        filter_buffer_flush(nf);
>>>> +    }
>>>> +
>>>> +    s->inflight_queue = s->incoming_queue;
>>>> +    s->incoming_queue = qemu_new_net_queue(nf);
>>>
>>> So this in fact flush a brunch of packets. If yes, the name of function
>>> is confusing
>>
>> maybe filter_buffer_release ?
>>
>
> Right.

By removing bh, this function can be dropped, just call filter_buffer_flush.
thanks.

>
>>>
>>>> +    qemu_bh_schedule(s->flush_bh);
>>>
>>> Don't get why a bh is needed. If we could get rid of it, there's
>>> probably no need for inflight_queue, and we can just drain
>>> incoming_queue here.
>>
>> Seems we can get rid of bh, will do, thanks.
>
> .
>
diff mbox

Patch

diff --git a/net/Makefile.objs b/net/Makefile.objs
index 914aec0..5fa2f97 100644
--- a/net/Makefile.objs
+++ b/net/Makefile.objs
@@ -14,3 +14,4 @@  common-obj-$(CONFIG_SLIRP) += slirp.o
 common-obj-$(CONFIG_VDE) += vde.o
 common-obj-$(CONFIG_NETMAP) += netmap.o
 common-obj-y += filter.o
+common-obj-y += filter-buffer.o
diff --git a/net/filter-buffer.c b/net/filter-buffer.c
new file mode 100644
index 0000000..1547765
--- /dev/null
+++ b/net/filter-buffer.c
@@ -0,0 +1,162 @@ 
+/*
+ * Copyright (c) 2015 FUJITSU LIMITED
+ * Author: Yang Hongyang <yanghy@cn.fujitsu.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later.  See the COPYING file in the top-level directory.
+ */
+
+#include "net/filter.h"
+#include "net/queue.h"
+#include "filters.h"
+#include "qemu-common.h"
+#include "qemu/error-report.h"
+#include "qemu/main-loop.h"
+#include "qemu/timer.h"
+#include "qemu/iov.h"
+
+typedef struct FILTERBUFFERState {
+    NetFilterState nf;
+    NetQueue *incoming_queue;
+    NetQueue *inflight_queue;
+    QEMUBH *flush_bh;
+    int64_t interval;
+    QEMUTimer release_timer;
+} FILTERBUFFERState;
+
+static void packet_send_completed(NetClientState *nc, ssize_t len)
+{
+    return;
+}
+
+static void filter_buffer_flush(NetFilterState *nf)
+{
+    FILTERBUFFERState *s = DO_UPCAST(FILTERBUFFERState, nf, nf);
+    NetQueue *queue = s->inflight_queue;
+    NetPacket *packet;
+
+    while (queue && !QTAILQ_EMPTY(&queue->packets)) {
+        packet = QTAILQ_FIRST(&queue->packets);
+        QTAILQ_REMOVE(&queue->packets, packet, entry);
+        queue->nq_count--;
+
+        if (packet->sender && packet->sender->peer) {
+            qemu_net_queue_send(packet->sender->peer->incoming_queue,
+                                packet->sender,
+                                packet->flags,
+                                packet->data,
+                                packet->size,
+                                packet->sent_cb);
+        }
+
+        /*
+         * now that we pass the packet to sender->peer->incoming_queue, we
+         * don't care the reture value here, because the peer's queue will
+         * take care of this packet
+         */
+        g_free(packet);
+    }
+
+    g_free(queue);
+    s->inflight_queue = NULL;
+}
+
+static void filter_buffer_flush_bh(void *opaque)
+{
+    FILTERBUFFERState *s = opaque;
+    NetFilterState *nf = &s->nf;
+    filter_buffer_flush(nf);
+}
+
+static void filter_buffer_release_one(NetFilterState *nf)
+{
+    FILTERBUFFERState *s = DO_UPCAST(FILTERBUFFERState, nf, nf);
+
+    /* flush inflight packets */
+    if (s->inflight_queue) {
+        filter_buffer_flush(nf);
+    }
+
+    s->inflight_queue = s->incoming_queue;
+    s->incoming_queue = qemu_new_net_queue(nf);
+    qemu_bh_schedule(s->flush_bh);
+}
+
+static void filter_buffer_release_timer(void *opaque)
+{
+    FILTERBUFFERState *s = opaque;
+    filter_buffer_release_one(&s->nf);
+    timer_mod(&s->release_timer,
+              qemu_clock_get_us(QEMU_CLOCK_VIRTUAL) + s->interval);
+}
+
+/* filter APIs */
+static ssize_t filter_buffer_receive_iov(NetFilterState *nf,
+                                         NetClientState *sender,
+                                         unsigned flags,
+                                         const struct iovec *iov,
+                                         int iovcnt)
+{
+    FILTERBUFFERState *s = DO_UPCAST(FILTERBUFFERState, nf, nf);
+    NetQueue *queue = s->incoming_queue;
+
+    qemu_net_queue_append_iov(queue, sender, flags, iov, iovcnt,
+                              packet_send_completed);
+    return iov_size(iov, iovcnt);
+}
+
+static void filter_buffer_cleanup(NetFilterState *nf)
+{
+    FILTERBUFFERState *s = DO_UPCAST(FILTERBUFFERState, nf, nf);
+
+    if (s->interval) {
+        timer_del(&s->release_timer);
+    }
+
+    /* flush inflight packets */
+    filter_buffer_flush(nf);
+    /* flush incoming packets */
+    s->inflight_queue = s->incoming_queue;
+    s->incoming_queue = NULL;
+    filter_buffer_flush(nf);
+
+    if (s->flush_bh) {
+        qemu_bh_delete(s->flush_bh);
+        s->flush_bh = NULL;
+    }
+    return;
+}
+
+
+static NetFilterInfo net_filter_buffer_info = {
+    .type = NET_FILTER_OPTIONS_KIND_BUFFER,
+    .size = sizeof(FILTERBUFFERState),
+    .receive_iov = filter_buffer_receive_iov,
+    .cleanup = filter_buffer_cleanup,
+};
+
+int net_init_filter_buffer(const NetFilterOptions *opts, const char *name,
+                           int chain, NetClientState *netdev, Error **errp)
+{
+    NetFilterState *nf;
+    FILTERBUFFERState *s;
+    const NetFilterBufferOptions *bufferopt;
+
+    assert(opts->kind == NET_FILTER_OPTIONS_KIND_BUFFER);
+    bufferopt = opts->buffer;
+
+    nf = qemu_new_net_filter(&net_filter_buffer_info,
+                             netdev, "buffer", name, chain);
+    s = DO_UPCAST(FILTERBUFFERState, nf, nf);
+    s->incoming_queue = qemu_new_net_queue(nf);
+    s->flush_bh = qemu_bh_new(filter_buffer_flush_bh, s);
+    s->interval = bufferopt->has_interval ? bufferopt->interval : 0;
+    if (s->interval) {
+        timer_init_us(&s->release_timer, QEMU_CLOCK_VIRTUAL,
+                      filter_buffer_release_timer, s);
+        timer_mod(&s->release_timer,
+                  qemu_clock_get_us(QEMU_CLOCK_VIRTUAL) + s->interval);
+    }
+
+    return 0;
+}
diff --git a/net/filter.c b/net/filter.c
index 84845b1..ba78683 100644
--- a/net/filter.c
+++ b/net/filter.c
@@ -17,6 +17,7 @@ 
 
 #include "net/filter.h"
 #include "net/net.h"
+#include "filters.h"
 
 static QTAILQ_HEAD(, NetFilterState) net_filters;
 
@@ -168,6 +169,7 @@  typedef int (NetFilterInit)(const NetFilterOptions *opts,
 
 static
 NetFilterInit * const net_filter_init_fun[NET_FILTER_OPTIONS_KIND_MAX] = {
+    [NET_FILTER_OPTIONS_KIND_BUFFER] = net_init_filter_buffer,
 };
 
 static int net_filter_init1(const NetFilter *netfilter, Error **errp)
diff --git a/net/filters.h b/net/filters.h
new file mode 100644
index 0000000..3b546db
--- /dev/null
+++ b/net/filters.h
@@ -0,0 +1,17 @@ 
+/*
+ * Copyright (c) 2015 FUJITSU LIMITED
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or
+ * later.  See the COPYING file in the top-level directory.
+ */
+
+#ifndef QEMU_NET_FILTERS_H
+#define QEMU_NET_FILTERS_H
+
+#include "net/net.h"
+#include "net/filter.h"
+
+int net_init_filter_buffer(const NetFilterOptions *opts, const char *name,
+                           int chain, NetClientState *netdev, Error **errp);
+
+#endif /* QEMU_NET_FILTERS_H */
diff --git a/qapi-schema.json b/qapi-schema.json
index 9d97c21..e51bb59 100644
--- a/qapi-schema.json
+++ b/qapi-schema.json
@@ -2584,6 +2584,21 @@ 
 { 'command': 'netfilter_del', 'data': {'id': 'str'} }
 
 ##
+# @NetFilterBufferOptions
+#
+# a netbuffer filter for network backend.
+#
+# @interval: #optional release packets by interval, if no interval supplied,
+#            will release packets when filter_buffer_release_all been called.
+#            scale: microsecond
+#
+# Since 2.5
+##
+{ 'struct': 'NetFilterBufferOptions',
+  'data': {
+    '*interval':     'int64' } }
+
+##
 # @NetFilterOptions
 #
 # A discriminated record of network filters.
@@ -2592,7 +2607,8 @@ 
 #
 ##
 { 'union': 'NetFilterOptions',
-  'data': { } }
+  'data': {
+    'buffer':     'NetFilterBufferOptions'} }
 
 ##
 # @NetFilter