diff mbox series

[ovs-dev,v9,10/11] netdev-offload-tc: Add psample receive handler

Message ID 20201215033812.145975-11-cmi@nvidia.com
State Changes Requested
Headers show
Series Add offload support for sFlow | expand

Commit Message

Chris Mi Dec. 15, 2020, 3:38 a.m. UTC
Create a dedicated thread to poll psample netlink socket, receive
sampled packet, parse it to sFlow format and send it to sFlow
monitoring host.

Signed-off-by: Chris Mi <cmi@nvidia.com>
Reviewed-by: Eli Britstein <elibr@nvidia.com>
---
 lib/netdev-offload-tc.c | 127 +++++++++++++++++++++++++++++++++++++++-
 1 file changed, 126 insertions(+), 1 deletion(-)

Comments

Eelco Chaudron Jan. 12, 2021, 7:53 p.m. UTC | #1
On 15 Dec 2020, at 4:38, Chris Mi wrote:

> Create a dedicated thread to poll psample netlink socket, receive
> sampled packet, parse it to sFlow format and send it to sFlow
> monitoring host.
>
> Signed-off-by: Chris Mi <cmi@nvidia.com>
> Reviewed-by: Eli Britstein <elibr@nvidia.com>
> ---
>  lib/netdev-offload-tc.c | 127 
> +++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 126 insertions(+), 1 deletion(-)
>
> diff --git a/lib/netdev-offload-tc.c b/lib/netdev-offload-tc.c
> index 61c5a14e6..e5bf719a4 100644
> --- a/lib/netdev-offload-tc.c
> +++ b/lib/netdev-offload-tc.c
> @@ -19,12 +19,14 @@
>  #include <errno.h>
>  #include <linux/if_ether.h>
>  #include <linux/psample.h>
> +#include <poll.h>
>
>  #include "dpif.h"
>  #include "hash.h"
>  #include "openvswitch/hmap.h"
>  #include "openvswitch/match.h"
>  #include "openvswitch/ofpbuf.h"
> +#include "openvswitch/poll-loop.h"
>  #include "openvswitch/thread.h"
>  #include "openvswitch/types.h"
>  #include "openvswitch/util.h"
> @@ -2295,6 +2297,123 @@ netdev_tc_psample_init(void)
>      return sock;
>  }
>
> +struct netdev_tc_psample {
> +    struct nlattr *packet;      /* packet data */
> +    int dp_group_id;            /* mapping id for sFlow offload */
> +    int iifindex;               /* input ifindex */
> +    int group_seq;              /* group sequence */
> +};
> +
> +static int
> +netdev_tc_psample_from_ofpbuf(struct netdev_tc_psample *psample,
> +                              const struct ofpbuf *buf)
> +{
> +    static const struct nl_policy ovs_psample_policy[] = {
> +        [PSAMPLE_ATTR_IIFINDEX] = { .type = NL_A_U16 },
> +        [PSAMPLE_ATTR_SAMPLE_GROUP] = { .type = NL_A_U32 },
> +        [PSAMPLE_ATTR_GROUP_SEQ] = { .type = NL_A_U32 },
> +        [PSAMPLE_ATTR_DATA] = { .type = NL_A_UNSPEC },
> +    };
> +    struct nlattr *a[ARRAY_SIZE(ovs_psample_policy)];
> +    struct genlmsghdr *genl;
> +    struct nlmsghdr *nlmsg;
> +    struct ofpbuf b;
> +
> +    b = ofpbuf_const_initializer(buf->data, buf->size);
> +    nlmsg = ofpbuf_try_pull(&b, sizeof *nlmsg);
> +    genl = ofpbuf_try_pull(&b, sizeof *genl);
> +    if (!nlmsg || !genl || nlmsg->nlmsg_type != psample_family
> +        || !nl_policy_parse(&b, 0, ovs_psample_policy, a,
> +                            ARRAY_SIZE(ovs_psample_policy))) {
> +        return EINVAL;
> +    }
> +
> +    psample->iifindex = nl_attr_get_u16(a[PSAMPLE_ATTR_IIFINDEX]);
> +    psample->dp_group_id = 
> nl_attr_get_u32(a[PSAMPLE_ATTR_SAMPLE_GROUP]);
> +    psample->group_seq = nl_attr_get_u16(a[PSAMPLE_ATTR_GROUP_SEQ]);
> +    psample->packet = a[PSAMPLE_ATTR_DATA];
> +
> +    return 0;
> +}
> +
> +static int
> +netdev_tc_psample_parse_packet(struct netdev_tc_psample *psample,
> +                               struct dpif_upcall_sflow *dupcall)
> +{
> +    const struct gid_node *node;
> +
> +    dp_packet_use_stub(&dupcall->packet,
> +                       CONST_CAST(struct nlattr *,
> +                                  nl_attr_get(psample->packet)) - 1,
> +                       nl_attr_get_size(psample->packet) +
> +                       sizeof(struct nlattr));
> +    dp_packet_set_data(&dupcall->packet,
> +                       (char *) dp_packet_data(&dupcall->packet) +
> +                       sizeof(struct nlattr));
> +    dp_packet_set_size(&dupcall->packet, 
> nl_attr_get_size(psample->packet));
> +
> +    node = gid_find(psample->dp_group_id);
> +    dupcall->sflow_attr = &node->sflow;
> +    dupcall->iifindex = psample->iifindex;
> +
> +    return 0;
> +}
> +
> +static int
> +netdev_tc_psample_poll(struct dpif_upcall_sflow *dupcall,
> +                       struct nl_sock *sock)
> +{
> +    for (;;) {
> +        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 
> 5);
> +        struct netdev_tc_psample psample;
> +        uint64_t buf_stub[4096 / 8];
> +        struct ofpbuf buf;
> +        int error;
> +
> +        ofpbuf_use_stub(&buf, buf_stub, sizeof buf_stub);
> +        error = nl_sock_recv(sock, &buf, NULL, false);
> +
> +        if (!error) {
> +            error = netdev_tc_psample_from_ofpbuf(&psample, &buf);
> +            if (!error) {
> +                    ofpbuf_uninit(&buf);
> +                    error = netdev_tc_psample_parse_packet(&psample, 
> dupcall);
> +                    return error;
> +            }
> +        } else if (error != EAGAIN) {
> +            VLOG_WARN_RL(&rl, "%s: error reading or parsing netlink 
> (%s)",
> +                         __func__, ovs_strerror(error));
> +            nl_sock_drain(sock);
> +            error = ENOBUFS;
> +        }
> +
> +        ofpbuf_uninit(&buf);
> +        if (error) {
> +            return error;
> +        }
> +    }
> +}
> +
> +static void *
> +netdev_tc_psample_handler(void *arg)
> +{
> +    struct nl_sock *sock = CONST_CAST(struct nl_sock *, arg);
> +
> +    struct dpif_upcall_sflow dupcall;
> +    int err;
> +
> +    while (true) {
> +        err = netdev_tc_psample_poll(&dupcall, sock);
> +        if (!err) {

I have not (fully) reviewed the patches below:

[PATCH v9 03/11] dpif: Introduce register sFlow upcall
[PATCH v9 04/11] ofproto: Add upcall callback to proces
[PATCH v9 05/11] netdev-offload: Introduce register sFlow upcall 
callback API
[PATCH v9 06/11] netdev-offload-tc: Implement register sFlow upcall 
callback API
[PATCH v9 07/11] dpif-netlink: Implement register sFlow Upcall callback 
API

Reason being that adding all this additional infrastructure does not 
make sense to me.
We should just use the existing register_upcall_cb, and call it when we 
have an sflow packet via netlink.

We can easily format the data such that 
upcall_cb()->upcall_receive()->process_upcall() will
handle the sflow packet as it came from a normal upcall.

What do you think?

> +            upcall_cb(&dupcall);
> +        }
> +        nl_sock_wait(sock, POLLIN);
> +        poll_block();
> +    }
> +
> +    return NULL;
> +}
> +
>  static int
>  netdev_tc_init_flow_api(struct netdev *netdev)
>  {
> @@ -2324,12 +2443,18 @@ netdev_tc_init_flow_api(struct netdev *netdev)
>      tc_add_del_qdisc(ifindex, false, 0, hook);
>
>      if (ovsthread_once_start(&once)) {
> +        struct nl_sock *sock;
> +
>          probe_tc_block_support(ifindex);
>          /* Need to re-fetch block id as it depends on feature 
> availability. */
>          block_id = get_block_id_from_netdev(netdev);
>
>          probe_multi_mask_per_prio(ifindex);
> -        netdev_tc_psample_init();
> +        sock = netdev_tc_psample_init();
> +        if (sock) {
> +            ovs_thread_create("psample_handler", 
> netdev_tc_psample_handler,
> +                              sock);
> +        }
>          ovsthread_once_done(&once);
>      }
>
> -- 
> 2.26.2
Chris Mi Jan. 22, 2021, 3:38 a.m. UTC | #2
On 1/13/2021 3:53 AM, Eelco Chaudron wrote:
>
>
> On 15 Dec 2020, at 4:38, Chris Mi wrote:
>
>> Create a dedicated thread to poll psample netlink socket, receive
>> sampled packet, parse it to sFlow format and send it to sFlow
>> monitoring host.
>>
>> Signed-off-by: Chris Mi <cmi@nvidia.com>
>> Reviewed-by: Eli Britstein <elibr@nvidia.com>
>> ---
>>  lib/netdev-offload-tc.c | 127 +++++++++++++++++++++++++++++++++++++++-
>>  1 file changed, 126 insertions(+), 1 deletion(-)
>>
>> diff --git a/lib/netdev-offload-tc.c b/lib/netdev-offload-tc.c
>> index 61c5a14e6..e5bf719a4 100644
>> --- a/lib/netdev-offload-tc.c
>> +++ b/lib/netdev-offload-tc.c
>> @@ -19,12 +19,14 @@
>>  #include <errno.h>
>>  #include <linux/if_ether.h>
>>  #include <linux/psample.h>
>> +#include <poll.h>
>>
>>  #include "dpif.h"
>>  #include "hash.h"
>>  #include "openvswitch/hmap.h"
>>  #include "openvswitch/match.h"
>>  #include "openvswitch/ofpbuf.h"
>> +#include "openvswitch/poll-loop.h"
>>  #include "openvswitch/thread.h"
>>  #include "openvswitch/types.h"
>>  #include "openvswitch/util.h"
>> @@ -2295,6 +2297,123 @@ netdev_tc_psample_init(void)
>>      return sock;
>>  }
>>
>> +struct netdev_tc_psample {
>> +    struct nlattr *packet;      /* packet data */
>> +    int dp_group_id;            /* mapping id for sFlow offload */
>> +    int iifindex;               /* input ifindex */
>> +    int group_seq;              /* group sequence */
>> +};
>> +
>> +static int
>> +netdev_tc_psample_from_ofpbuf(struct netdev_tc_psample *psample,
>> +                              const struct ofpbuf *buf)
>> +{
>> +    static const struct nl_policy ovs_psample_policy[] = {
>> +        [PSAMPLE_ATTR_IIFINDEX] = { .type = NL_A_U16 },
>> +        [PSAMPLE_ATTR_SAMPLE_GROUP] = { .type = NL_A_U32 },
>> +        [PSAMPLE_ATTR_GROUP_SEQ] = { .type = NL_A_U32 },
>> +        [PSAMPLE_ATTR_DATA] = { .type = NL_A_UNSPEC },
>> +    };
>> +    struct nlattr *a[ARRAY_SIZE(ovs_psample_policy)];
>> +    struct genlmsghdr *genl;
>> +    struct nlmsghdr *nlmsg;
>> +    struct ofpbuf b;
>> +
>> +    b = ofpbuf_const_initializer(buf->data, buf->size);
>> +    nlmsg = ofpbuf_try_pull(&b, sizeof *nlmsg);
>> +    genl = ofpbuf_try_pull(&b, sizeof *genl);
>> +    if (!nlmsg || !genl || nlmsg->nlmsg_type != psample_family
>> +        || !nl_policy_parse(&b, 0, ovs_psample_policy, a,
>> +                            ARRAY_SIZE(ovs_psample_policy))) {
>> +        return EINVAL;
>> +    }
>> +
>> +    psample->iifindex = nl_attr_get_u16(a[PSAMPLE_ATTR_IIFINDEX]);
>> +    psample->dp_group_id = 
>> nl_attr_get_u32(a[PSAMPLE_ATTR_SAMPLE_GROUP]);
>> +    psample->group_seq = nl_attr_get_u16(a[PSAMPLE_ATTR_GROUP_SEQ]);
>> +    psample->packet = a[PSAMPLE_ATTR_DATA];
>> +
>> +    return 0;
>> +}
>> +
>> +static int
>> +netdev_tc_psample_parse_packet(struct netdev_tc_psample *psample,
>> +                               struct dpif_upcall_sflow *dupcall)
>> +{
>> +    const struct gid_node *node;
>> +
>> +    dp_packet_use_stub(&dupcall->packet,
>> +                       CONST_CAST(struct nlattr *,
>> + nl_attr_get(psample->packet)) - 1,
>> +                       nl_attr_get_size(psample->packet) +
>> +                       sizeof(struct nlattr));
>> +    dp_packet_set_data(&dupcall->packet,
>> +                       (char *) dp_packet_data(&dupcall->packet) +
>> +                       sizeof(struct nlattr));
>> +    dp_packet_set_size(&dupcall->packet, 
>> nl_attr_get_size(psample->packet));
>> +
>> +    node = gid_find(psample->dp_group_id);
>> +    dupcall->sflow_attr = &node->sflow;
>> +    dupcall->iifindex = psample->iifindex;
>> +
>> +    return 0;
>> +}
>> +
>> +static int
>> +netdev_tc_psample_poll(struct dpif_upcall_sflow *dupcall,
>> +                       struct nl_sock *sock)
>> +{
>> +    for (;;) {
>> +        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
>> +        struct netdev_tc_psample psample;
>> +        uint64_t buf_stub[4096 / 8];
>> +        struct ofpbuf buf;
>> +        int error;
>> +
>> +        ofpbuf_use_stub(&buf, buf_stub, sizeof buf_stub);
>> +        error = nl_sock_recv(sock, &buf, NULL, false);
>> +
>> +        if (!error) {
>> +            error = netdev_tc_psample_from_ofpbuf(&psample, &buf);
>> +            if (!error) {
>> +                    ofpbuf_uninit(&buf);
>> +                    error = netdev_tc_psample_parse_packet(&psample, 
>> dupcall);
>> +                    return error;
>> +            }
>> +        } else if (error != EAGAIN) {
>> +            VLOG_WARN_RL(&rl, "%s: error reading or parsing netlink 
>> (%s)",
>> +                         __func__, ovs_strerror(error));
>> +            nl_sock_drain(sock);
>> +            error = ENOBUFS;
>> +        }
>> +
>> +        ofpbuf_uninit(&buf);
>> +        if (error) {
>> +            return error;
>> +        }
>> +    }
>> +}
>> +
>> +static void *
>> +netdev_tc_psample_handler(void *arg)
>> +{
>> +    struct nl_sock *sock = CONST_CAST(struct nl_sock *, arg);
>> +
>> +    struct dpif_upcall_sflow dupcall;
>> +    int err;
>> +
>> +    while (true) {
>> +        err = netdev_tc_psample_poll(&dupcall, sock);
>> +        if (!err) {
>
> I have not (fully) reviewed the patches below:
>
> [PATCH v9 03/11] dpif: Introduce register sFlow upcall
> [PATCH v9 04/11] ofproto: Add upcall callback to proces
> [PATCH v9 05/11] netdev-offload: Introduce register sFlow upcall 
> callback API
> [PATCH v9 06/11] netdev-offload-tc: Implement register sFlow upcall 
> callback API
> [PATCH v9 07/11] dpif-netlink: Implement register sFlow Upcall 
> callback API
>
> Reason being that adding all this additional infrastructure does not 
> make sense to me.
> We should just use the existing register_upcall_cb, and call it when 
> we have an sflow packet via netlink.
>
> We can easily format the data such that 
> upcall_cb()->upcall_receive()->process_upcall() will
> handle the sflow packet as it came from a normal upcall.
>
> What do you think?
Actually, even if we re-use upcall_cb, patch #5, #6 and #7 are needed. 
Only patch #3 is not needed.
I think we need more change for ofproto to make upcall_cb works for 
sFlow offload.
And many of the arguments of upcall_cb are not needed for sFlow offload.
If you let me choose, I'll think creating a new callback for sFlow 
offload  is simple and clean.
>
>> +            upcall_cb(&dupcall);
>> +        }
>> +        nl_sock_wait(sock, POLLIN);
>> +        poll_block();
>> +    }
>> +
>> +    return NULL;
>> +}
>> +
>>  static int
>>  netdev_tc_init_flow_api(struct netdev *netdev)
>>  {
>> @@ -2324,12 +2443,18 @@ netdev_tc_init_flow_api(struct netdev *netdev)
>>      tc_add_del_qdisc(ifindex, false, 0, hook);
>>
>>      if (ovsthread_once_start(&once)) {
>> +        struct nl_sock *sock;
>> +
>>          probe_tc_block_support(ifindex);
>>          /* Need to re-fetch block id as it depends on feature 
>> availability. */
>>          block_id = get_block_id_from_netdev(netdev);
>>
>>          probe_multi_mask_per_prio(ifindex);
>> -        netdev_tc_psample_init();
>> +        sock = netdev_tc_psample_init();
>> +        if (sock) {
>> +            ovs_thread_create("psample_handler", 
>> netdev_tc_psample_handler,
>> +                              sock);
>> +        }
>>          ovsthread_once_done(&once);
>>      }
>>
>> -- 
>> 2.26.2
>
Eelco Chaudron Jan. 25, 2021, 9:04 a.m. UTC | #3
On 22 Jan 2021, at 4:38, Chris Mi wrote:

> On 1/13/2021 3:53 AM, Eelco Chaudron wrote:
>>
>>
>> On 15 Dec 2020, at 4:38, Chris Mi wrote:
>>
>>> Create a dedicated thread to poll psample netlink socket, receive
>>> sampled packet, parse it to sFlow format and send it to sFlow
>>> monitoring host.
>>>
>>> Signed-off-by: Chris Mi <cmi@nvidia.com>
>>> Reviewed-by: Eli Britstein <elibr@nvidia.com>
>>> ---
>>>  lib/netdev-offload-tc.c | 127 
>>> +++++++++++++++++++++++++++++++++++++++-
>>>  1 file changed, 126 insertions(+), 1 deletion(-)
>>>
>>> diff --git a/lib/netdev-offload-tc.c b/lib/netdev-offload-tc.c
>>> index 61c5a14e6..e5bf719a4 100644
>>> --- a/lib/netdev-offload-tc.c
>>> +++ b/lib/netdev-offload-tc.c
>>> @@ -19,12 +19,14 @@
>>>  #include <errno.h>
>>>  #include <linux/if_ether.h>
>>>  #include <linux/psample.h>
>>> +#include <poll.h>
>>>
>>>  #include "dpif.h"
>>>  #include "hash.h"
>>>  #include "openvswitch/hmap.h"
>>>  #include "openvswitch/match.h"
>>>  #include "openvswitch/ofpbuf.h"
>>> +#include "openvswitch/poll-loop.h"
>>>  #include "openvswitch/thread.h"
>>>  #include "openvswitch/types.h"
>>>  #include "openvswitch/util.h"
>>> @@ -2295,6 +2297,123 @@ netdev_tc_psample_init(void)
>>>      return sock;
>>>  }
>>>
>>> +struct netdev_tc_psample {
>>> +    struct nlattr *packet;      /* packet data */
>>> +    int dp_group_id;            /* mapping id for 
>>> sFlow offload */
>>> +    int iifindex;               /* input ifindex 
>>> */
>>> +    int group_seq;              /* group sequence 
>>> */
>>> +};
>>> +
>>> +static int
>>> +netdev_tc_psample_from_ofpbuf(struct netdev_tc_psample *psample,
>>> +                              const 
>>> struct ofpbuf *buf)
>>> +{
>>> +    static const struct nl_policy ovs_psample_policy[] = {
>>> +        [PSAMPLE_ATTR_IIFINDEX] = { .type = NL_A_U16 },
>>> +        [PSAMPLE_ATTR_SAMPLE_GROUP] = { .type = NL_A_U32 },
>>> +        [PSAMPLE_ATTR_GROUP_SEQ] = { .type = NL_A_U32 },
>>> +        [PSAMPLE_ATTR_DATA] = { .type = NL_A_UNSPEC },
>>> +    };
>>> +    struct nlattr *a[ARRAY_SIZE(ovs_psample_policy)];
>>> +    struct genlmsghdr *genl;
>>> +    struct nlmsghdr *nlmsg;
>>> +    struct ofpbuf b;
>>> +
>>> +    b = ofpbuf_const_initializer(buf->data, buf->size);
>>> +    nlmsg = ofpbuf_try_pull(&b, sizeof *nlmsg);
>>> +    genl = ofpbuf_try_pull(&b, sizeof *genl);
>>> +    if (!nlmsg || !genl || nlmsg->nlmsg_type != psample_family
>>> +        || !nl_policy_parse(&b, 0, ovs_psample_policy, a,
>>> +                            
>>> ARRAY_SIZE(ovs_psample_policy))) {
>>> +        return EINVAL;
>>> +    }
>>> +
>>> +    psample->iifindex = 
>>> nl_attr_get_u16(a[PSAMPLE_ATTR_IIFINDEX]);
>>> +    psample->dp_group_id = 
>>> nl_attr_get_u32(a[PSAMPLE_ATTR_SAMPLE_GROUP]);
>>> +    psample->group_seq = 
>>> nl_attr_get_u16(a[PSAMPLE_ATTR_GROUP_SEQ]);
>>> +    psample->packet = a[PSAMPLE_ATTR_DATA];
>>> +
>>> +    return 0;
>>> +}
>>> +
>>> +static int
>>> +netdev_tc_psample_parse_packet(struct netdev_tc_psample *psample,
>>> +                               struct 
>>> dpif_upcall_sflow *dupcall)
>>> +{
>>> +    const struct gid_node *node;
>>> +
>>> +    dp_packet_use_stub(&dupcall->packet,
>>> +                       CONST_CAST(struct 
>>> nlattr *,
>>> + nl_attr_get(psample->packet)) - 1,
>>> +                       
>>> nl_attr_get_size(psample->packet) +
>>> +                       sizeof(struct 
>>> nlattr));
>>> +    dp_packet_set_data(&dupcall->packet,
>>> +                       (char *) 
>>> dp_packet_data(&dupcall->packet) +
>>> +                       sizeof(struct 
>>> nlattr));
>>> +    dp_packet_set_size(&dupcall->packet, 
>>> nl_attr_get_size(psample->packet));
>>> +
>>> +    node = gid_find(psample->dp_group_id);
>>> +    dupcall->sflow_attr = &node->sflow;
>>> +    dupcall->iifindex = psample->iifindex;
>>> +
>>> +    return 0;
>>> +}
>>> +
>>> +static int
>>> +netdev_tc_psample_poll(struct dpif_upcall_sflow *dupcall,
>>> +                       struct nl_sock *sock)
>>> +{
>>> +    for (;;) {
>>> +        static struct vlog_rate_limit rl = 
>>> VLOG_RATE_LIMIT_INIT(1, 5);
>>> +        struct netdev_tc_psample psample;
>>> +        uint64_t buf_stub[4096 / 8];
>>> +        struct ofpbuf buf;
>>> +        int error;
>>> +
>>> +        ofpbuf_use_stub(&buf, buf_stub, sizeof buf_stub);
>>> +        error = nl_sock_recv(sock, &buf, NULL, false);
>>> +
>>> +        if (!error) {
>>> +            error = 
>>> netdev_tc_psample_from_ofpbuf(&psample, &buf);
>>> +            if (!error) {
>>> +                    ofpbuf_uninit(&buf);
>>> +                    error = 
>>> netdev_tc_psample_parse_packet(&psample, dupcall);
>>> +                    return error;
>>> +            }
>>> +        } else if (error != EAGAIN) {
>>> +            VLOG_WARN_RL(&rl, "%s: error reading or 
>>> parsing netlink (%s)",
>>> +                         __func__, 
>>> ovs_strerror(error));
>>> +            nl_sock_drain(sock);
>>> +            error = ENOBUFS;
>>> +        }
>>> +
>>> +        ofpbuf_uninit(&buf);
>>> +        if (error) {
>>> +            return error;
>>> +        }
>>> +    }
>>> +}
>>> +
>>> +static void *
>>> +netdev_tc_psample_handler(void *arg)
>>> +{
>>> +    struct nl_sock *sock = CONST_CAST(struct nl_sock *, arg);
>>> +
>>> +    struct dpif_upcall_sflow dupcall;
>>> +    int err;
>>> +
>>> +    while (true) {
>>> +        err = netdev_tc_psample_poll(&dupcall, sock);
>>> +        if (!err) {
>>
>> I have not (fully) reviewed the patches below:
>>
>> [PATCH v9 03/11] dpif: Introduce register sFlow upcall
>> [PATCH v9 04/11] ofproto: Add upcall callback to proces
>> [PATCH v9 05/11] netdev-offload: Introduce register sFlow upcall 
>> callback API
>> [PATCH v9 06/11] netdev-offload-tc: Implement register sFlow upcall 
>> callback API
>> [PATCH v9 07/11] dpif-netlink: Implement register sFlow Upcall 
>> callback API
>>
>> Reason being that adding all this additional infrastructure does not 
>> make sense to me.
>> We should just use the existing register_upcall_cb, and call it when 
>> we have an sflow packet via netlink.
>>
>> We can easily format the data such that 
>> upcall_cb()->upcall_receive()->process_upcall() will
>> handle the sflow packet as it came from a normal upcall.
>>
>> What do you think?
> Actually, even if we re-use upcall_cb, patch #5, #6 and #7 are needed. 
> Only patch #3 is not needed.
> I think we need more change for ofproto to make upcall_cb works for 
> sFlow offload.
> And many of the arguments of upcall_cb are not needed for sFlow 
> offload.
> If you let me choose, I'll think creating a new callback for sFlow 
> offload  is simple and clean.

I would like some other people’s feedback on this because if we add 
this callback (for a slow patch action), we might end up with more in 
the future. Ilya?

>>> +            upcall_cb(&dupcall);
>>> +        }
>>> +        nl_sock_wait(sock, POLLIN);
>>> +        poll_block();
>>> +    }
>>> +
>>> +    return NULL;
>>> +}
>>> +
>>>  static int
>>>  netdev_tc_init_flow_api(struct netdev *netdev)
>>>  {
>>> @@ -2324,12 +2443,18 @@ netdev_tc_init_flow_api(struct netdev 
>>> *netdev)
>>>      tc_add_del_qdisc(ifindex, false, 0, hook);
>>>
>>>      if (ovsthread_once_start(&once)) {
>>> +        struct nl_sock *sock;
>>> +
>>>          probe_tc_block_support(ifindex);
>>>          /* Need to re-fetch block id as it depends on 
>>> feature availability. */
>>>          block_id = get_block_id_from_netdev(netdev);
>>>
>>>          probe_multi_mask_per_prio(ifindex);
>>> -        netdev_tc_psample_init();
>>> +        sock = netdev_tc_psample_init();
>>> +        if (sock) {
>>> +            ovs_thread_create("psample_handler", 
>>> netdev_tc_psample_handler,
>>> +                              sock);
>>> +        }
>>>          ovsthread_once_done(&once);
>>>      }
>>>
>>> -- 
>>> 2.26.2
>>
Chris Mi Feb. 2, 2021, 9:11 a.m. UTC | #4
Hi Ilya,

On 1/25/2021 5:04 PM, Eelco Chaudron wrote:
>
>
> On 22 Jan 2021, at 4:38, Chris Mi wrote:
>
>> On 1/13/2021 3:53 AM, Eelco Chaudron wrote:
>>>
>>>
>>> On 15 Dec 2020, at 4:38, Chris Mi wrote:
>>>
>>>> Create a dedicated thread to poll psample netlink socket, receive
>>>> sampled packet, parse it to sFlow format and send it to sFlow
>>>> monitoring host.
>>>>
>>>> Signed-off-by: Chris Mi <cmi@nvidia.com>
>>>> Reviewed-by: Eli Britstein <elibr@nvidia.com>
>>>> ---
>>>>  lib/netdev-offload-tc.c | 127 
>>>> +++++++++++++++++++++++++++++++++++++++-
>>>>  1 file changed, 126 insertions(+), 1 deletion(-)
>>>>
>>>> diff --git a/lib/netdev-offload-tc.c b/lib/netdev-offload-tc.c
>>>> index 61c5a14e6..e5bf719a4 100644
>>>> --- a/lib/netdev-offload-tc.c
>>>> +++ b/lib/netdev-offload-tc.c
>>>> @@ -19,12 +19,14 @@
>>>>  #include <errno.h>
>>>>  #include <linux/if_ether.h>
>>>>  #include <linux/psample.h>
>>>> +#include <poll.h>
>>>>
>>>>  #include "dpif.h"
>>>>  #include "hash.h"
>>>>  #include "openvswitch/hmap.h"
>>>>  #include "openvswitch/match.h"
>>>>  #include "openvswitch/ofpbuf.h"
>>>> +#include "openvswitch/poll-loop.h"
>>>>  #include "openvswitch/thread.h"
>>>>  #include "openvswitch/types.h"
>>>>  #include "openvswitch/util.h"
>>>> @@ -2295,6 +2297,123 @@ netdev_tc_psample_init(void)
>>>>      return sock;
>>>>  }
>>>>
>>>> +struct netdev_tc_psample {
>>>> +    struct nlattr *packet;      /* packet data */
>>>> +    int dp_group_id;            /* mapping id for sFlow offload */
>>>> +    int iifindex;               /* input ifindex */
>>>> +    int group_seq;              /* group sequence */
>>>> +};
>>>> +
>>>> +static int
>>>> +netdev_tc_psample_from_ofpbuf(struct netdev_tc_psample *psample,
>>>> +                              const struct ofpbuf *buf)
>>>> +{
>>>> +    static const struct nl_policy ovs_psample_policy[] = {
>>>> +        [PSAMPLE_ATTR_IIFINDEX] = { .type = NL_A_U16 },
>>>> +        [PSAMPLE_ATTR_SAMPLE_GROUP] = { .type = NL_A_U32 },
>>>> +        [PSAMPLE_ATTR_GROUP_SEQ] = { .type = NL_A_U32 },
>>>> +        [PSAMPLE_ATTR_DATA] = { .type = NL_A_UNSPEC },
>>>> +    };
>>>> +    struct nlattr *a[ARRAY_SIZE(ovs_psample_policy)];
>>>> +    struct genlmsghdr *genl;
>>>> +    struct nlmsghdr *nlmsg;
>>>> +    struct ofpbuf b;
>>>> +
>>>> +    b = ofpbuf_const_initializer(buf->data, buf->size);
>>>> +    nlmsg = ofpbuf_try_pull(&b, sizeof *nlmsg);
>>>> +    genl = ofpbuf_try_pull(&b, sizeof *genl);
>>>> +    if (!nlmsg || !genl || nlmsg->nlmsg_type != psample_family
>>>> +        || !nl_policy_parse(&b, 0, ovs_psample_policy, a,
>>>> + ARRAY_SIZE(ovs_psample_policy))) {
>>>> +        return EINVAL;
>>>> +    }
>>>> +
>>>> +    psample->iifindex = nl_attr_get_u16(a[PSAMPLE_ATTR_IIFINDEX]);
>>>> +    psample->dp_group_id = 
>>>> nl_attr_get_u32(a[PSAMPLE_ATTR_SAMPLE_GROUP]);
>>>> +    psample->group_seq = nl_attr_get_u16(a[PSAMPLE_ATTR_GROUP_SEQ]);
>>>> +    psample->packet = a[PSAMPLE_ATTR_DATA];
>>>> +
>>>> +    return 0;
>>>> +}
>>>> +
>>>> +static int
>>>> +netdev_tc_psample_parse_packet(struct netdev_tc_psample *psample,
>>>> +                               struct dpif_upcall_sflow *dupcall)
>>>> +{
>>>> +    const struct gid_node *node;
>>>> +
>>>> +    dp_packet_use_stub(&dupcall->packet,
>>>> +                       CONST_CAST(struct nlattr *,
>>>> + nl_attr_get(psample->packet)) - 1,
>>>> +                       nl_attr_get_size(psample->packet) +
>>>> +                       sizeof(struct nlattr));
>>>> +    dp_packet_set_data(&dupcall->packet,
>>>> +                       (char *) dp_packet_data(&dupcall->packet) +
>>>> +                       sizeof(struct nlattr));
>>>> +    dp_packet_set_size(&dupcall->packet, 
>>>> nl_attr_get_size(psample->packet));
>>>> +
>>>> +    node = gid_find(psample->dp_group_id);
>>>> +    dupcall->sflow_attr = &node->sflow;
>>>> +    dupcall->iifindex = psample->iifindex;
>>>> +
>>>> +    return 0;
>>>> +}
>>>> +
>>>> +static int
>>>> +netdev_tc_psample_poll(struct dpif_upcall_sflow *dupcall,
>>>> +                       struct nl_sock *sock)
>>>> +{
>>>> +    for (;;) {
>>>> +        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 
>>>> 5);
>>>> +        struct netdev_tc_psample psample;
>>>> +        uint64_t buf_stub[4096 / 8];
>>>> +        struct ofpbuf buf;
>>>> +        int error;
>>>> +
>>>> +        ofpbuf_use_stub(&buf, buf_stub, sizeof buf_stub);
>>>> +        error = nl_sock_recv(sock, &buf, NULL, false);
>>>> +
>>>> +        if (!error) {
>>>> +            error = netdev_tc_psample_from_ofpbuf(&psample, &buf);
>>>> +            if (!error) {
>>>> +                    ofpbuf_uninit(&buf);
>>>> +                    error = 
>>>> netdev_tc_psample_parse_packet(&psample, dupcall);
>>>> +                    return error;
>>>> +            }
>>>> +        } else if (error != EAGAIN) {
>>>> +            VLOG_WARN_RL(&rl, "%s: error reading or parsing 
>>>> netlink (%s)",
>>>> +                         __func__, ovs_strerror(error));
>>>> +            nl_sock_drain(sock);
>>>> +            error = ENOBUFS;
>>>> +        }
>>>> +
>>>> +        ofpbuf_uninit(&buf);
>>>> +        if (error) {
>>>> +            return error;
>>>> +        }
>>>> +    }
>>>> +}
>>>> +
>>>> +static void *
>>>> +netdev_tc_psample_handler(void *arg)
>>>> +{
>>>> +    struct nl_sock *sock = CONST_CAST(struct nl_sock *, arg);
>>>> +
>>>> +    struct dpif_upcall_sflow dupcall;
>>>> +    int err;
>>>> +
>>>> +    while (true) {
>>>> +        err = netdev_tc_psample_poll(&dupcall, sock);
>>>> +        if (!err) {
>>>
>>> I have not (fully) reviewed the patches below:
>>>
>>> [PATCH v9 03/11] dpif: Introduce register sFlow upcall
>>> [PATCH v9 04/11] ofproto: Add upcall callback to proces
>>> [PATCH v9 05/11] netdev-offload: Introduce register sFlow upcall 
>>> callback API
>>> [PATCH v9 06/11] netdev-offload-tc: Implement register sFlow upcall 
>>> callback API
>>> [PATCH v9 07/11] dpif-netlink: Implement register sFlow Upcall 
>>> callback API
>>>
>>> Reason being that adding all this additional infrastructure does not 
>>> make sense to me.
>>> We should just use the existing register_upcall_cb, and call it when 
>>> we have an sflow packet via netlink.
>>>
>>> We can easily format the data such that 
>>> upcall_cb()->upcall_receive()->process_upcall() will
>>> handle the sflow packet as it came from a normal upcall.
>>>
>>> What do you think?
>> Actually, even if we re-use upcall_cb, patch #5, #6 and #7 are 
>> needed. Only patch #3 is not needed.
>> I think we need more change for ofproto to make upcall_cb works for 
>> sFlow offload.
>> And many of the arguments of upcall_cb are not needed for sFlow offload.
>> If you let me choose, I'll think creating a new callback for sFlow 
>> offload  is simple and clean.
>
> I would like some other people’s feedback on this because if we add 
> this callback (for a slow patch action), we might end up with more in 
> the future. Ilya?
Could I know what's your opinion about it? And I'm wondering if you have 
time to review other patches?

Thanks,
Chris
>
>>>> +            upcall_cb(&dupcall);
>>>> +        }
>>>> +        nl_sock_wait(sock, POLLIN);
>>>> +        poll_block();
>>>> +    }
>>>> +
>>>> +    return NULL;
>>>> +}
>>>> +
>>>>  static int
>>>>  netdev_tc_init_flow_api(struct netdev *netdev)
>>>>  {
>>>> @@ -2324,12 +2443,18 @@ netdev_tc_init_flow_api(struct netdev *netdev)
>>>>      tc_add_del_qdisc(ifindex, false, 0, hook);
>>>>
>>>>      if (ovsthread_once_start(&once)) {
>>>> +        struct nl_sock *sock;
>>>> +
>>>>          probe_tc_block_support(ifindex);
>>>>          /* Need to re-fetch block id as it depends on feature 
>>>> availability. */
>>>>          block_id = get_block_id_from_netdev(netdev);
>>>>
>>>>          probe_multi_mask_per_prio(ifindex);
>>>> -        netdev_tc_psample_init();
>>>> +        sock = netdev_tc_psample_init();
>>>> +        if (sock) {
>>>> +            ovs_thread_create("psample_handler", 
>>>> netdev_tc_psample_handler,
>>>> +                              sock);
>>>> +        }
>>>>          ovsthread_once_done(&once);
>>>>      }
>>>>
>>>> -- 
>>>> 2.26.2
>>>
>
diff mbox series

Patch

diff --git a/lib/netdev-offload-tc.c b/lib/netdev-offload-tc.c
index 61c5a14e6..e5bf719a4 100644
--- a/lib/netdev-offload-tc.c
+++ b/lib/netdev-offload-tc.c
@@ -19,12 +19,14 @@ 
 #include <errno.h>
 #include <linux/if_ether.h>
 #include <linux/psample.h>
+#include <poll.h>
 
 #include "dpif.h"
 #include "hash.h"
 #include "openvswitch/hmap.h"
 #include "openvswitch/match.h"
 #include "openvswitch/ofpbuf.h"
+#include "openvswitch/poll-loop.h"
 #include "openvswitch/thread.h"
 #include "openvswitch/types.h"
 #include "openvswitch/util.h"
@@ -2295,6 +2297,123 @@  netdev_tc_psample_init(void)
     return sock;
 }
 
+struct netdev_tc_psample {
+    struct nlattr *packet;      /* packet data */
+    int dp_group_id;            /* mapping id for sFlow offload */
+    int iifindex;               /* input ifindex */
+    int group_seq;              /* group sequence */
+};
+
+static int
+netdev_tc_psample_from_ofpbuf(struct netdev_tc_psample *psample,
+                              const struct ofpbuf *buf)
+{
+    static const struct nl_policy ovs_psample_policy[] = {
+        [PSAMPLE_ATTR_IIFINDEX] = { .type = NL_A_U16 },
+        [PSAMPLE_ATTR_SAMPLE_GROUP] = { .type = NL_A_U32 },
+        [PSAMPLE_ATTR_GROUP_SEQ] = { .type = NL_A_U32 },
+        [PSAMPLE_ATTR_DATA] = { .type = NL_A_UNSPEC },
+    };
+    struct nlattr *a[ARRAY_SIZE(ovs_psample_policy)];
+    struct genlmsghdr *genl;
+    struct nlmsghdr *nlmsg;
+    struct ofpbuf b;
+
+    b = ofpbuf_const_initializer(buf->data, buf->size);
+    nlmsg = ofpbuf_try_pull(&b, sizeof *nlmsg);
+    genl = ofpbuf_try_pull(&b, sizeof *genl);
+    if (!nlmsg || !genl || nlmsg->nlmsg_type != psample_family
+        || !nl_policy_parse(&b, 0, ovs_psample_policy, a,
+                            ARRAY_SIZE(ovs_psample_policy))) {
+        return EINVAL;
+    }
+
+    psample->iifindex = nl_attr_get_u16(a[PSAMPLE_ATTR_IIFINDEX]);
+    psample->dp_group_id = nl_attr_get_u32(a[PSAMPLE_ATTR_SAMPLE_GROUP]);
+    psample->group_seq = nl_attr_get_u16(a[PSAMPLE_ATTR_GROUP_SEQ]);
+    psample->packet = a[PSAMPLE_ATTR_DATA];
+
+    return 0;
+}
+
+static int
+netdev_tc_psample_parse_packet(struct netdev_tc_psample *psample,
+                               struct dpif_upcall_sflow *dupcall)
+{
+    const struct gid_node *node;
+
+    dp_packet_use_stub(&dupcall->packet,
+                       CONST_CAST(struct nlattr *,
+                                  nl_attr_get(psample->packet)) - 1,
+                       nl_attr_get_size(psample->packet) +
+                       sizeof(struct nlattr));
+    dp_packet_set_data(&dupcall->packet,
+                       (char *) dp_packet_data(&dupcall->packet) +
+                       sizeof(struct nlattr));
+    dp_packet_set_size(&dupcall->packet, nl_attr_get_size(psample->packet));
+
+    node = gid_find(psample->dp_group_id);
+    dupcall->sflow_attr = &node->sflow;
+    dupcall->iifindex = psample->iifindex;
+
+    return 0;
+}
+
+static int
+netdev_tc_psample_poll(struct dpif_upcall_sflow *dupcall,
+                       struct nl_sock *sock)
+{
+    for (;;) {
+        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
+        struct netdev_tc_psample psample;
+        uint64_t buf_stub[4096 / 8];
+        struct ofpbuf buf;
+        int error;
+
+        ofpbuf_use_stub(&buf, buf_stub, sizeof buf_stub);
+        error = nl_sock_recv(sock, &buf, NULL, false);
+
+        if (!error) {
+            error = netdev_tc_psample_from_ofpbuf(&psample, &buf);
+            if (!error) {
+                    ofpbuf_uninit(&buf);
+                    error = netdev_tc_psample_parse_packet(&psample, dupcall);
+                    return error;
+            }
+        } else if (error != EAGAIN) {
+            VLOG_WARN_RL(&rl, "%s: error reading or parsing netlink (%s)",
+                         __func__, ovs_strerror(error));
+            nl_sock_drain(sock);
+            error = ENOBUFS;
+        }
+
+        ofpbuf_uninit(&buf);
+        if (error) {
+            return error;
+        }
+    }
+}
+
+static void *
+netdev_tc_psample_handler(void *arg)
+{
+    struct nl_sock *sock = CONST_CAST(struct nl_sock *, arg);
+
+    struct dpif_upcall_sflow dupcall;
+    int err;
+
+    while (true) {
+        err = netdev_tc_psample_poll(&dupcall, sock);
+        if (!err) {
+            upcall_cb(&dupcall);
+        }
+        nl_sock_wait(sock, POLLIN);
+        poll_block();
+    }
+
+    return NULL;
+}
+
 static int
 netdev_tc_init_flow_api(struct netdev *netdev)
 {
@@ -2324,12 +2443,18 @@  netdev_tc_init_flow_api(struct netdev *netdev)
     tc_add_del_qdisc(ifindex, false, 0, hook);
 
     if (ovsthread_once_start(&once)) {
+        struct nl_sock *sock;
+
         probe_tc_block_support(ifindex);
         /* Need to re-fetch block id as it depends on feature availability. */
         block_id = get_block_id_from_netdev(netdev);
 
         probe_multi_mask_per_prio(ifindex);
-        netdev_tc_psample_init();
+        sock = netdev_tc_psample_init();
+        if (sock) {
+            ovs_thread_create("psample_handler", netdev_tc_psample_handler,
+                              sock);
+        }
         ovsthread_once_done(&once);
     }