diff mbox series

[ovs-dev,5/5] utilities: add netlink flow operation USDT probes and upcall_cost script

Message ID 163949560111.1639090.504580532338976345.stgit@ebuild
State Superseded
Headers show
Series adding USDT points to OVS | expand

Checks

Context Check Description
ovsrobot/apply-robot success apply and check: success
ovsrobot/github-robot-_Build_and_Test success github build: passed

Commit Message

Eelco Chaudron Dec. 14, 2021, 3:27 p.m. UTC
This patch adds a series of NetLink flow operation USDT probes.
These probes are in turn used in the upcall_cost Python script,
which in addition of some kernel tracepoints, give an insight into
the time spent on processing upcall.

Signed-off-by: Eelco Chaudron <echaudro@redhat.com>
---
 Documentation/topics/usdt-probes.rst  |   86 ++
 lib/dpif-netlink.c                    |   16 
 utilities/automake.mk                 |    4 
 utilities/usdt_scripts/upcall_cost.py | 1774 +++++++++++++++++++++++++++++++++
 4 files changed, 1879 insertions(+), 1 deletion(-)
 create mode 100755 utilities/usdt_scripts/upcall_cost.py

Comments

Paolo Valerio Dec. 21, 2021, 3:53 p.m. UTC | #1
Hi Eelco,

I did some tests, and the script seems to work as expected.
My comments inline.

Eelco Chaudron <echaudro@redhat.com> writes:

> This patch adds a series of NetLink flow operation USDT probes.
> These probes are in turn used in the upcall_cost Python script,
> which in addition of some kernel tracepoints, give an insight into
> the time spent on processing upcall.
>
> Signed-off-by: Eelco Chaudron <echaudro@redhat.com>
> ---
>  Documentation/topics/usdt-probes.rst  |   86 ++
>  lib/dpif-netlink.c                    |   16 
>  utilities/automake.mk                 |    4 
>  utilities/usdt_scripts/upcall_cost.py | 1774 +++++++++++++++++++++++++++++++++
>  4 files changed, 1879 insertions(+), 1 deletion(-)
>  create mode 100755 utilities/usdt_scripts/upcall_cost.py
>
> diff --git a/Documentation/topics/usdt-probes.rst b/Documentation/topics/usdt-probes.rst
> index 1f6446e12..cfe57f1a3 100644
> --- a/Documentation/topics/usdt-probes.rst
> +++ b/Documentation/topics/usdt-probes.rst
> @@ -200,11 +200,96 @@ used naming convention.
>  
>  Available probes in ``ovs_vswitchd``:
>  
> +- dpif_netlink_operate\_\_:op_flow_del
> +- dpif_netlink_operate\_\_:op_flow_execute
> +- dpif_netlink_operate\_\_:op_flow_get
> +- dpif_netlink_operate\_\_:op_flow_put
>  - dpif_recv:recv_upcall
>  - main:poll_block
>  - main:run_start
>  
>  
> +dpif_netlink_operate\_\_:op_flow_del
> +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> +
> +**Description**:
> +
> +This probe gets triggered when the Netlink datapath is about to execute the
> +DPIF_OP_FLOW_DEL operation as part of the dpif ``operate()`` callback.
> +
> +**Arguments**:
> +
> +- *arg0*: ``(struct dpif_netlink *) dpif``
> +- *arg1*: ``(struct dpif_flow_del *) del``
> +- *arg2*: ``(struct dpif_netlink_flow *) flow``
> +- *arg3*: ``(struct ofpbuf *) aux->request``
> +
> +**Script references**:
> +
> +- *None*
> +
> +
> +dpif_netlink_operate\_\_:op_flow_execute
> +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> +
> +**Description**:
> +
> +This probe gets triggered when the Netlink datapath is about to execute the
> +DPIF_OP_FLOW_EXECUTE operation as part of the dpif ``operate()`` callback.
> +
> +**Arguments**:
> +
> +- *arg0*: ``(struct dpif_netlink *) dpif``
> +- *arg1*: ``(struct dpif_execute *) op->execute``
> +- *arg2*: ``dp_packet_data(op->execute.packet)``
> +- *arg3*: ``dp_packet_size(op->execute.packet)``
> +- *arg4*: ``(struct ofpbuf *) aux->request``
> +
> +**Script references**:
> +
> +- ``/utilities/usdt_scripts/upcall_cost.py``
> +
> +
> +dpif_netlink_operate\_\_:op_flow_get
> +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> +
> +**Description**:
> +
> +This probe gets triggered when the Netlink datapath is about to execute the
> +DPIF_OP_FLOW_GET operation as part of the dpif ``operate()`` callback.
> +
> +**Arguments**:
> +
> +- *arg0*: ``(struct dpif_netlink *) dpif``
> +- *arg1*: ``(struct dpif_flow_get *) get``
> +- *arg2*: ``(struct dpif_netlink_flow *) flow``
> +- *arg3*: ``(struct ofpbuf *) aux->request``
> +
> +**Script references**:
> +
> +- *None*
> +
> +
> +dpif_netlink_operate\_\_:op_flow_put
> +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> +
> +**Description**:
> +
> +This probe gets triggered when the Netlink datapath is about to execute the
> +DPIF_OP_FLOW_PUT operation as part of the dpif ``operate()`` callback.
> +
> +**Arguments**:
> +
> +- *arg0*: ``(struct dpif_netlink *) dpif``
> +- *arg1*: ``(struct dpif_flow_put *) put``
> +- *arg2*: ``(struct dpif_netlink_flow *) flow``
> +- *arg3*: ``(struct ofpbuf *) aux->request``
> +
> +**Script references**:
> +
> +- ``/utilities/usdt_scripts/upcall_cost.py``
> +
> +
>  probe dpif_recv:recv_upcall
>  ~~~~~~~~~~~~~~~~~~~~~~~~~~~
>  
> @@ -227,6 +312,7 @@ sent to ``ovs-vswitchd``.
>  
>  **Script references**:
>  
> +- ``/utilities/usdt_scripts/upcall_cost.py``
>  - ``/utilities/usdt_scripts/upcall_monitor.py``
>  
>  
> diff --git a/lib/dpif-netlink.c b/lib/dpif-netlink.c
> index 424a28401..52c229568 100644
> --- a/lib/dpif-netlink.c
> +++ b/lib/dpif-netlink.c
> @@ -56,6 +56,7 @@
>  #include "openvswitch/poll-loop.h"
>  #include "openvswitch/shash.h"
>  #include "openvswitch/thread.h"
> +#include "openvswitch/usdt_probes.h"
>  #include "openvswitch/vlog.h"
>  #include "packets.h"
>  #include "random.h"
> @@ -2052,6 +2053,9 @@ dpif_netlink_operate__(struct dpif_netlink *dpif,
>                  aux->txn.reply = &aux->reply;
>              }
>              dpif_netlink_flow_to_ofpbuf(&flow, &aux->request);
> +
> +            OVS_USDT_PROBE(dpif_netlink_operate__, op_flow_put,
> +                           dpif, put, &flow, &aux->request);
>              break;
>  
>          case DPIF_OP_FLOW_DEL:
> @@ -2062,6 +2066,9 @@ dpif_netlink_operate__(struct dpif_netlink *dpif,
>                  aux->txn.reply = &aux->reply;
>              }
>              dpif_netlink_flow_to_ofpbuf(&flow, &aux->request);
> +
> +            OVS_USDT_PROBE(dpif_netlink_operate__, op_flow_del,
> +                           dpif, del, &flow, &aux->request);
>              break;
>  
>          case DPIF_OP_EXECUTE:
> @@ -2082,6 +2089,12 @@ dpif_netlink_operate__(struct dpif_netlink *dpif,
>              } else {
>                  dpif_netlink_encode_execute(dpif->dp_ifindex, &op->execute,
>                                              &aux->request);
> +
> +                OVS_USDT_PROBE(dpif_netlink_operate__, op_flow_execute,
> +                               dpif, &op->execute,
> +                               dp_packet_data(op->execute.packet),
> +                               dp_packet_size(op->execute.packet),
> +                               &aux->request);
>              }
>              break;
>  
> @@ -2090,6 +2103,9 @@ dpif_netlink_operate__(struct dpif_netlink *dpif,
>              dpif_netlink_init_flow_get(dpif, get, &flow);
>              aux->txn.reply = get->buffer;
>              dpif_netlink_flow_to_ofpbuf(&flow, &aux->request);
> +
> +            OVS_USDT_PROBE(dpif_netlink_operate__, op_flow_get,
> +                           dpif, get, &flow, &aux->request);
>              break;
>  
>          default:
> diff --git a/utilities/automake.mk b/utilities/automake.mk
> index 382f8e789..7d0bddcd8 100644
> --- a/utilities/automake.mk
> +++ b/utilities/automake.mk
> @@ -63,6 +63,7 @@ EXTRA_DIST += \
>  	utilities/docker/debian/Dockerfile \
>  	utilities/docker/debian/build-kernel-modules.sh \
>  	utilities/usdt_scripts/bridge_loop.bt \
> +	utilities/usdt_scripts/upcall_cost.py \
>  	utilities/usdt_scripts/upcall_monitor.py
>  MAN_ROOTS += \
>  	utilities/ovs-testcontroller.8.in \
> @@ -131,6 +132,7 @@ FLAKE8_PYFILES += utilities/ovs-pcap.in \
>  	utilities/ovs-check-dead-ifs.in \
>  	utilities/ovs-tcpdump.in \
>  	utilities/ovs-pipegen.py \
> -	utilities/usdt_scripts/upcall_monitor.py
> +	utilities/usdt_scripts/upcall_monitor.py \
> +	utilities/usdt_scripts/upcall_cost.py
>  
>  include utilities/bugtool/automake.mk
> diff --git a/utilities/usdt_scripts/upcall_cost.py b/utilities/usdt_scripts/upcall_cost.py
> new file mode 100755
> index 000000000..1af8b981f
> --- /dev/null
> +++ b/utilities/usdt_scripts/upcall_cost.py
> @@ -0,0 +1,1774 @@
> +#!/usr/bin/env python3
> +#
> +# Copyright (c) 2021 Red Hat, Inc.
> +#
> +# Licensed under the Apache License, Version 2.0 (the "License");
> +# you may not use this file except in compliance with the License.
> +# You may obtain a copy of the License at:
> +#
> +#     http://www.apache.org/licenses/LICENSE-2.0
> +#
> +# Unless required by applicable law or agreed to in writing, software
> +# distributed under the License is distributed on an "AS IS" BASIS,
> +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> +# See the License for the specific language governing permissions and
> +# limitations under the License.
> +#
> +# Script information:
> +# -------------------
> +# upcall_cost.py uses various user space and kernel space probes to determine
> +# the costs (in time) for handling the first packet in user space. It
> +# calculates the following costs:
> +#
> +# - Time it takes from the kernel sending the upcall till it's received by the
> +#   ovs-vswitchd process.
> +# - Time it takes from ivs-vswitchd sending the execute actions command till

typo: ovs-vswitchd

> +#   the kernel receives it.
> +# - The total time it takes from the kernel to sent the upcall until it
> +#   receives the packet execute command.
> +# - The total time of the above, minus the time it takes for the actual lookup.
> +#
> +# In addition, it will also report the number of packets batched, as OVS will
> +# first try to read UPCALL_MAX_BATCH(64) packets from kernel space and then
> +# does the flow lookups and execution. So the smaller the batch size, the more
> +# realistic are the cost estimates.
> +#
> +# The script does not need any options to attach to a running instance of
> +# ovs-vswitchd. However, it's recommended always run the script with the
> +# --write-events option. This way, if something does go wrong, the collected
> +# data is saved. Use the --help option to see all the available options.
> +#
> +# Note: In addition to the bcc tools for your specific setup, you need the
> +#       following Python packages:
> +#         pip install alive-progress halo psutil scapy strenum text_histogram3
> +#
> +
> +try:
> +    from bcc import BPF, USDT, USDTException
> +except ModuleNotFoundError:
> +    print("WARNING: Can't find the BPF Compiler Collection (BCC) tools!")
> +    print("         This is NOT problem if you analyzing previously collected"
> +          " data.\n")
> +from alive_progress import alive_bar
> +from collections import namedtuple
> +from halo import Halo
> +from scapy.all import TCP, UDP
> +from scapy.layers.l2 import Ether
> +from strenum import StrEnum
> +from text_histogram3 import histogram
> +from time import process_time
> +
> +import argparse
> +import ast
> +import psutil
> +import re
> +import struct
> +import subprocess
> +import sys
> +import time
> +
> +#
> +# Global definitions
> +#
> +DP_TUNNEL_PORT = -1
> +
> +
> +#
> +# Actual eBPF source code
> +#
> +ebpf_source = """
> +#include <linux/sched.h>
> +#include <linux/skbuff.h>
> +#include <uapi/linux/bpf.h>
> +
> +#define MAX_PACKET <MAX_PACKET_VAL>
> +#define MAX_KEY    <MAX_KEY_VAL>
> +
> +enum {
> +    EVENT_RECV_UPCALL = 0,
> +    EVENT_DP_UPCALL,
> +    EVENT_OP_FLOW_PUT,
> +    EVENT_OP_FLOW_EXECUTE,
> +    EVENT_OVS_PKT_EXEC,
> +    _EVENT_MAX_EVENT
> +};
> +
> +#define barrier_var(var) asm volatile("" : "=r"(var) : "0"(var))
> +
> +struct event_t {
> +    u32 event;
> +    u32 cpu;
> +    u32 pid;
> +    u32 upcall_type;
> +    u64 ts;
> +    u32 pkt_frag_size;
> +    u32 pkt_size;
> +    u64 key_size;
> +    char comm[TASK_COMM_LEN];
> +    char dpif_name[32];
> +    char dev_name[16];
> +    unsigned char pkt[MAX_PACKET];
> +    unsigned char key[MAX_KEY];
> +};
> +
> +BPF_RINGBUF_OUTPUT(events, <BUFFER_PAGE_CNT>);
> +BPF_TABLE("percpu_array", uint32_t, uint64_t, dropcnt, _EVENT_MAX_EVENT);
> +
> +static struct event_t *init_event(u32 type)
> +{
> +    struct event_t *event = events.ringbuf_reserve(sizeof(struct event_t));
> +
> +    if (!event) {
> +        uint64_t *value = dropcnt.lookup(&type);
> +        if (value)
> +            __sync_fetch_and_add(value, 1);
> +
> +        return NULL;
> +    }
> +
> +    event->event = type;
> +    event->ts = bpf_ktime_get_ns();
> +    event->cpu =  bpf_get_smp_processor_id();
> +    event->pid = bpf_get_current_pid_tgid();
> +    bpf_get_current_comm(&event->comm, sizeof(event->comm));
> +
> +    return event;
> +}
> +
> +int trace__recv_upcall(struct pt_regs *ctx) {
> +    uint32_t upcall_type;
> +    uint64_t addr;
> +    uint64_t size;
> +
> +    bpf_usdt_readarg(2, ctx, &upcall_type);
> +    if (upcall_type != 0)
> +        return 0;
> +
> +    struct event_t *event = init_event(EVENT_RECV_UPCALL);
> +    if (!event)
> +        return 1;
> +
> +    bpf_usdt_readarg(1, ctx, &addr);
> +    bpf_probe_read(&event->dpif_name, sizeof(event->dpif_name), (void *)addr);

nit: if it's ok, we could use bpf_probe_read_str() here too.

> +
> +    event->upcall_type = upcall_type;
> +    bpf_usdt_readarg(4, ctx, &event->pkt_size);
> +    bpf_usdt_readarg(6, ctx, &event->key_size);
> +
> +    if (event->pkt_size > MAX_PACKET)
> +        size = MAX_PACKET;
> +    else
> +        size = event->pkt_size;
> +    bpf_usdt_readarg(3, ctx, &addr);
> +    bpf_probe_read(&event->pkt, size, (void *)addr);
> +
> +    if (event->key_size > MAX_KEY)
> +        size = MAX_KEY;
> +    else
> +        size = event->key_size;
> +    bpf_usdt_readarg(5, ctx, &addr);
> +    bpf_probe_read(&event->key, size, (void *)addr);
> +
> +    events.ringbuf_submit(event, 0);
> +    return 0;
> +};
> +
> +
> +int trace__op_flow_put(struct pt_regs *ctx) {
> +    uint64_t addr;
> +    uint64_t size;
> +
> +    struct event_t *event = init_event(EVENT_OP_FLOW_PUT);
> +    if (!event) {
> +        return 1;
> +    }
> +
> +    events.ringbuf_submit(event, 0);
> +    return 0;
> +};
> +
> +
> +int trace__op_flow_execute(struct pt_regs *ctx) {
> +    uint64_t addr;
> +    uint64_t size;
> +
> +    struct event_t *event = init_event(EVENT_OP_FLOW_EXECUTE);
> +    if (!event) {
> +        return 1;
> +    }
> +
> +    bpf_usdt_readarg(4, ctx, &event->pkt_size);
> +
> +    if (event->pkt_size > MAX_PACKET)
> +        size = MAX_PACKET;
> +    else
> +        size = event->pkt_size;
> +    bpf_usdt_readarg(3, ctx, &addr);
> +    bpf_probe_read(&event->pkt, size, (void *)addr);
> +
> +    events.ringbuf_submit(event, 0);
> +    return 0;
> +};
> +
> +
> +TRACEPOINT_PROBE(openvswitch, ovs_dp_upcall) {
> +    uint64_t size;
> +    struct sk_buff *skb = args->skbaddr;
> +
> +    if (args->upcall_cmd != 1 || skb == NULL || skb->data == NULL)
> +        return 0;
> +
> +    struct event_t *event = init_event(EVENT_DP_UPCALL);
> +    if (!event) {
> +        return 1;
> +    }
> +
> +    event->upcall_type = args->upcall_cmd;
> +    event->pkt_size = args->len;
> +
> +    TP_DATA_LOC_READ_CONST(&event->dpif_name, dp_name,
> +                           sizeof(event->dpif_name));
> +    TP_DATA_LOC_READ_CONST(&event->dev_name, dev_name,
> +                           sizeof(event->dev_name));
> +
> +    if (skb->data_len != 0) {
> +        event->pkt_frag_size = (skb->len - skb->data_len) & 0xfffffff;
> +        size = event->pkt_frag_size;
> +    } else {
> +        event->pkt_frag_size = 0;
> +        size = event->pkt_size;
> +    }
> +
> +    /* Prevent clang from using register mirroring (or any optimization) on
> +     * the 'size' variable. */
> +    barrier_var(size);
> +
> +    if (size > MAX_PACKET)
> +        size = MAX_PACKET;
> +    bpf_probe_read_kernel(event->pkt, size, skb->data);
> +
> +    events.ringbuf_submit(event, 0);
> +    return 0;
> +}
> +
> +int kprobe__ovs_packet_cmd_execute(struct pt_regs *ctx, struct sk_buff *skb)
> +{
> +    uint64_t size;
> +
> +    if (skb == NULL || skb->data == NULL)
> +        return 0;
> +
> +    struct event_t *event = init_event(EVENT_OVS_PKT_EXEC);
> +    if (!event) {
> +        return 1;
> +    }
> +
> +    events.ringbuf_submit(event, 0);
> +    return 0;
> +}
> +"""
> +
> +
> +#
> +# Event types
> +#
> +class EventType(StrEnum):
> +    RECV_UPCALL = 'dpif_recv__recv_upcall'
> +    DP_UPCALL = 'openvswitch__dp_upcall'
> +    OP_FLOW_PUT = 'netlink_opperate__op_flow_put'
> +    OP_FLOW_EXECUTE = 'netlink_opperate__op_flow_execute'

besides the typo, shouldn't this be (for both PUT and EXECUTE)
dpif_netlink_operate__* ? (not sure about the number of underscores,
though, considering that the provider is dpif_netlink_operate__ :)

> +    OVS_PKT_EXEC = 'ktrace__ovs_packet_cmd_execute'
> +
> +    def short_name(name, length=22):
> +        if len(name) < length:
> +            return name
> +
> +        return '..' + name[-(length - 2):]
> +
> +    def from_trace(trace_event):
> +        if trace_event == 0:
> +            return EventType.RECV_UPCALL
> +        elif trace_event == 1:
> +            return EventType.DP_UPCALL
> +        elif trace_event == 2:
> +            return EventType.OP_FLOW_PUT
> +        elif trace_event == 3:
> +            return EventType.OP_FLOW_EXECUTE
> +        elif trace_event == 4:
> +            return EventType.OVS_PKT_EXEC
> +
> +        raise ValueError
> +
> +
> +#
> +# Simple event class
> +#
> +class Event(object):
> +    def __init__(self, ts, pid, comm, cpu, event_type):
> +        self.ts = ts
> +        self.pid = pid
> +        self.comm = comm
> +        self.cpu = cpu
> +        self.event_type = event_type
> +
> +    def __str__(self):
> +        return "[{:<22}] {:<16} {:8} [{:03}] {:18.9f}".format(
> +            EventType.short_name(self.event_type),
> +            self.comm,
> +            self.pid,
> +            self.cpu,
> +            self.ts / 1000000000)
> +
> +    def __repr__(self):
> +        more = ""
> +        if self.__class__.__name__ != "Event":
> +            more = ", ..."
> +
> +        return "{}({}, {}, {}, {}, {}{})".format(self.__class__.__name__,
> +                                                 self.ts, self.pid,
> +                                                 self.comm, self.cpu,
> +                                                 self.event_type, more)
> +
> +    def handle_event(event):
> +        event = Event(event.ts, event.pid, event.comm.decode("utf-8"),
> +                      event.cpu, EventType.from_trace(event.event))
> +
> +        if not options.quiet:
> +            print(event)
> +
> +        return event
> +
> +
> +#
> +# dp_upcall event class
> +#
> +class DpUpcall(Event):
> +    def __init__(self, ts, pid, comm, cpu, dpif_name, port, pkt, pkt_len,
> +                 pkt_frag_len):
> +        super(DpUpcall, self).__init__(ts, pid, comm, cpu, EventType.DP_UPCALL)
> +        self.dpif_name = dpif_name
> +        self.dp_port = get_dp_mapping(dpif_name, port)
> +        if self.dp_port is None:
> +            #
> +            # As we only identify interfaces at startup, new interfaces could
> +            # have been added, causing the lookup to fail. Just something to
> +            # keep in mind when running this in a dynamic environment.
> +            #
> +            raise LookupError("Can't find datapath port mapping!")
> +        self.pkt = pkt
> +        self.pkt_len = pkt_len
> +        self.pkt_frag_len = pkt_frag_len
> +
> +    def __str__(self):
> +        return "[{:<22}] {:<16} {:8} [{:03}] {:18.9f}: " \
> +               "{:<17} {:4} {:4} {:4}".format(self.event_type,
> +                                              self.comm,
> +                                              self.pid,
> +                                              self.cpu,
> +                                              self.ts / 1000000000,
> +                                              self.dpif_name,
> +                                              self.dp_port,
> +                                              self.pkt_len,
> +                                              self.pkt_frag_len)
> +
> +    def handle_event(event):
> +        if event.pkt_size < options.packet_size:
> +            pkt_len = event.pkt_size
> +        else:
> +            pkt_len = options.packet_size
> +
> +        pkt_data = bytes(event.pkt)[:pkt_len]
> +
> +        if len(pkt_data) <= 0 or event.pkt_size == 0:
> +            return
> +
> +        try:
> +            event = DpUpcall(event.ts, event.pid, event.comm.decode("utf-8"),
> +                             event.cpu, event.dpif_name.decode("utf-8"),
> +                             event.dev_name.decode("utf-8"),
> +                             pkt_data,
> +                             event.pkt_size,
> +                             event.pkt_frag_size)
> +        except LookupError:
> +            #
> +            # If we can't do the port lookup, ignore this event.
> +            #
> +            return None
> +
> +        if not options.quiet:
> +            print(event)
> +
> +        return event
> +
> +
> +#
> +# recv_upcall event class
> +#
> +class RecvUpcall(Event):
> +    def __init__(self, ts, pid, comm, cpu, dpif_name, key, pkt, pkt_len):
> +        super(RecvUpcall, self).__init__(ts, pid, comm, cpu,
> +                                         EventType.RECV_UPCALL)
> +
> +        if dpif_name.startswith("system@"):
> +            dpif_name = dpif_name[len("system@"):]
> +        self.dpif_name = dpif_name
> +
> +        nla = RecvUpcall.decode_nlm(key, dump=False)
> +        if "OVS_KEY_ATTR_IN_PORT" in nla:
> +            self.dp_port = struct.unpack('=L', nla["OVS_KEY_ATTR_IN_PORT"])[0]
> +        elif "OVS_KEY_ATTR_TUNNEL" in nla:
> +            self.dp_port = DP_TUNNEL_PORT
> +        else:
> +            self.dp_port = RecvUpcall.get_system_dp_port(self.dpif_name)
> +
> +        if self.dp_port is None:
> +            raise LookupError("Can't find RecvUpcall dp port mapping!")
> +
> +        self.pkt = pkt
> +        self.pkt_len = pkt_len
> +
> +    def __str__(self):
> +        return "[{:<22}] {:<16} {:8} [{:03}] {:18.9f}: {:<17} {:4} {:4}". \
> +            format(
> +                self.event_type,
> +                self.comm,
> +                self.pid,
> +                self.cpu,
> +                self.ts / 1000000000,
> +                self.dpif_name,
> +                self.dp_port,
> +                self.pkt_len)
> +
> +    def get_system_dp_port(dpif_name):
> +        dp_map = get_dp_mapping(dpif_name, "ovs-system", return_map=True)
> +        if dpif_name not in dp_map:
> +            return None
> +        try:
> +            return dp_map[dpif_name]["ovs-system"]
> +        except KeyError:
> +            return None
> +
> +    def decode_nlm(msg, indent=4, dump=True):
> +        bytes_left = len(msg)
> +        result = {}
> +
> +        while bytes_left:
> +            if bytes_left < 4:
> +                if dump:
> +                    print("{}WARN: decode truncated; can't read header".format(
> +                        ' ' * indent))
> +                break
> +
> +            nla_len, nla_type = struct.unpack("=HH", msg[:4])
> +
> +            if nla_len < 4:
> +                if dump:
> +                    print("{}WARN: decode truncated; nla_len < 4".format(
> +                        ' ' * indent))
> +                break
> +
> +            nla_data = msg[4:nla_len]
> +            trunc = ""
> +
> +            if nla_len > bytes_left:
> +                trunc = "..."
> +                nla_data = nla_data[:(bytes_left - 4)]
> +                if RecvUpcall.get_ovs_key_attr_str(nla_type) == \
> +                   "OVS_KEY_ATTR_TUNNEL":
> +                    #
> +                    # If we have truncated tunnel information, we still would
> +                    # like to know. This is due to the special tunnel handling
> +                    # needed for port matching.
> +                    #
> +                    result[RecvUpcall.get_ovs_key_attr_str(nla_type)] = bytes()
> +            else:
> +                result[RecvUpcall.get_ovs_key_attr_str(nla_type)] = nla_data
> +
> +            if dump:
> +                print("{}nla_len {}, nla_type {}[{}], data: {}{}".format(
> +                    ' ' * indent, nla_len,
> +                    RecvUpcall.get_ovs_key_attr_str(nla_type),
> +                    nla_type,
> +                    "".join("{:02x} ".format(b) for b in nla_data), trunc))
> +
> +            if trunc != "":
> +                if dump:
> +                    print("{}WARN: decode truncated; nla_len > msg_len[{}] ".
> +                          format(' ' * indent, bytes_left))
> +                break
> +
> +            # Update next offset, but make sure it's aligned correctly.
> +            next_offset = (nla_len + 3) & ~(3)
> +            msg = msg[next_offset:]
> +            bytes_left -= next_offset
> +
> +        return result
> +
> +    def get_ovs_key_attr_str(attr):
> +        ovs_key_attr = ["OVS_KEY_ATTR_UNSPEC",
> +                        "OVS_KEY_ATTR_ENCAP",
> +                        "OVS_KEY_ATTR_PRIORITY",
> +                        "OVS_KEY_ATTR_IN_PORT",
> +                        "OVS_KEY_ATTR_ETHERNET",
> +                        "OVS_KEY_ATTR_VLAN",
> +                        "OVS_KEY_ATTR_ETHERTYPE",
> +                        "OVS_KEY_ATTR_IPV4",
> +                        "OVS_KEY_ATTR_IPV6",
> +                        "OVS_KEY_ATTR_TCP",
> +                        "OVS_KEY_ATTR_UDP",
> +                        "OVS_KEY_ATTR_ICMP",
> +                        "OVS_KEY_ATTR_ICMPV6",
> +                        "OVS_KEY_ATTR_ARP",
> +                        "OVS_KEY_ATTR_ND",
> +                        "OVS_KEY_ATTR_SKB_MARK",
> +                        "OVS_KEY_ATTR_TUNNEL",
> +                        "OVS_KEY_ATTR_SCTP",
> +                        "OVS_KEY_ATTR_TCP_FLAGS",
> +                        "OVS_KEY_ATTR_DP_HASH",
> +                        "OVS_KEY_ATTR_RECIRC_ID",
> +                        "OVS_KEY_ATTR_MPLS",
> +                        "OVS_KEY_ATTR_CT_STATE",
> +                        "OVS_KEY_ATTR_CT_ZONE",
> +                        "OVS_KEY_ATTR_CT_MARK",
> +                        "OVS_KEY_ATTR_CT_LABELS",
> +                        "OVS_KEY_ATTR_CT_ORIG_TUPLE_IPV4",
> +                        "OVS_KEY_ATTR_CT_ORIG_TUPLE_IPV6",
> +                        "OVS_KEY_ATTR_NSH"]
> +
> +        if attr < 0 or attr > len(ovs_key_attr):
> +            return "<UNKNOWN>"
> +
> +        return ovs_key_attr[attr]
> +
> +    def handle_event(event):
> +        #
> +        # For us, only upcalls with a packet, flow_key, and upcall_type
> +        # DPIF_UC_MISS are of interest.
> +        #
> +        if event.pkt_size <= 0 or event.key_size <= 0 or \
> +           event.upcall_type != 0:
> +            return
> +
> +        if event.key_size < options.flow_key_size:
> +            key_len = event.key_size
> +        else:
> +            key_len = options.flow_key_size
> +
> +        if event.pkt_size < options.packet_size:
> +            pkt_len = event.pkt_size
> +        else:
> +            pkt_len = options.packet_size
> +
> +        try:
> +            event = RecvUpcall(event.ts, event.pid, event.comm.decode("utf-8"),
> +                               event.cpu, event.dpif_name.decode("utf-8"),
> +                               bytes(event.key)[:key_len],
> +                               bytes(event.pkt)[:pkt_len],
> +                               event.pkt_size)
> +        except LookupError:
> +            return None
> +
> +        if not options.quiet:
> +            print(event)
> +
> +        return event
> +
> +
> +#
> +# op_flow_execute event class
> +#
> +class OpFlowExecute(Event):
> +    def __init__(self, ts, pid, comm, cpu, pkt, pkt_len):
> +        super(OpFlowExecute, self).__init__(ts, pid, comm, cpu,
> +                                            EventType.OP_FLOW_EXECUTE)
> +        self.pkt = pkt
> +        self.pkt_len = pkt_len
> +
> +    def __str__(self):
> +        return "[{:<22}] {:<16} {:8} [{:03}] {:18.9f}: " \
> +               "{:<17} {:4} {:4}".format(EventType.short_name(self.event_type),
> +                                         self.comm,
> +                                         self.pid,
> +                                         self.cpu,
> +                                         self.ts / 1000000000,
> +                                         "",
> +                                         "",
> +                                         self.pkt_len)
> +
> +    def handle_event(event):
> +        if event.pkt_size < options.packet_size:
> +            pkt_len = event.pkt_size
> +        else:
> +            pkt_len = options.packet_size
> +
> +        pkt_data = bytes(event.pkt)[:pkt_len]
> +
> +        if len(pkt_data) <= 0 or event.pkt_size == 0:
> +            return
> +
> +        event = OpFlowExecute(event.ts, event.pid, event.comm.decode("utf-8"),
> +                              event.cpu, pkt_data, event.pkt_size)
> +
> +        if not options.quiet:
> +            print(event)
> +
> +        return event
> +
> +
> +#
> +# get_dp_mapping()
> +#
> +def get_dp_mapping(dp, port, return_map=False, dp_map=None):
> +    if options.unit_test:
> +        return port
> +
> +    if dp_map is not None:
> +        get_dp_mapping.dp_port_map_cache = dp_map
> +
> +    #
> +    # Build a cache, so we do not have to execue the ovs command each time.
> +    #
> +    if not hasattr(get_dp_mapping, "dp_port_map_cache"):
> +        try:
> +            output = subprocess.check_output(['ovs-appctl', 'dpctl/show'],
> +                                             encoding='utf8').split("\n")
> +        except subprocess.CalledProcessError:
> +            output = ""
> +            pass
> +
> +        current_dp = None
> +        get_dp_mapping.dp_port_map_cache = {}
> +
> +        for line in output:
> +            match = re.match("^system@(.*):$", line)
> +            if match is not None:
> +                current_dp = match.group(1)
> +
> +            match = re.match("^  port ([0-9]+): ([^ /]*)", line)
> +            if match is not None and current_dp is not None:
> +                try:
> +                    get_dp_mapping.dp_port_map_cache[
> +                        current_dp][match.group(2)] = int(match.group(1))
> +                except KeyError:
> +                    get_dp_mapping.dp_port_map_cache[current_dp] = \
> +                        {match.group(2): int(match.group(1))}
> +
> +    if return_map:
> +        return get_dp_mapping.dp_port_map_cache
> +
> +    if dp not in get_dp_mapping.dp_port_map_cache or \
> +       port not in get_dp_mapping.dp_port_map_cache[dp]:
> +        return None
> +
> +    return get_dp_mapping.dp_port_map_cache[dp][port]
> +
> +
> +#
> +# event_to_dict()
> +#
> +def event_to_dict(event):
> +    event_dict = {}
> +
> +    for field, _ in event._fields_:
> +        if isinstance(getattr(event, field), (int, bytes)):
> +            event_dict[field] = getattr(event, field)
> +        else:
> +            if (field == "key" and event.key_size == 0) or \
> +               (field == "pkt" and event.pkt_size == 0):
> +                data = bytes()
> +            else:
> +                data = bytes(getattr(event, field))
> +
> +            event_dict[field] = data
> +
> +    return event_dict
> +
> +
> +#
> +# receive_event_bcc()
> +#
> +def receive_event_bcc(ctx, data, size):
> +    global events_received
> +    events_received += 1
> +
> +    event = b['events'].event(data)
> +
> +    if export_file is not None:
> +        export_file.write("event = {}\n".format(event_to_dict(event)))
> +
> +    receive_event(event)
> +
> +
> +#
> +# receive_event()
> +#
> +def receive_event(event):
> +    global event_count
> +
> +    if event.event == 0:
> +        trace_event = RecvUpcall.handle_event(event)
> +    elif event.event == 1:
> +        trace_event = DpUpcall.handle_event(event)
> +    elif event.event == 2:
> +        trace_event = Event.handle_event(event)
> +    elif event.event == 3:
> +        trace_event = OpFlowExecute.handle_event(event)
> +    elif event.event == 4:
> +        trace_event = Event.handle_event(event)
> +
> +    try:
> +        event_count['total'][EventType.from_trace(event.event)] += 1
> +    except KeyError:
> +        event_count['total'][EventType.from_trace(event.event)] = 1
> +        event_count['valid'][EventType.from_trace(event.event)] = 0
> +
> +    if trace_event is not None:
> +        event_count['valid'][EventType.from_trace(event.event)] += 1
> +        trace_data.append(trace_event)
> +
> +
> +#
> +# collect_event_sets()
> +#
> +def collect_event_sets(events, collect_stats=False, profile=False,
> +                       spinner=False):
> +    t1_time = 0
> +
> +    def t1_start():
> +        nonlocal t1_time
> +        t1_time = process_time()
> +
> +    def t1_stop(description):
> +        print("* PROFILING: {:<50}: {:.06f} seconds".format(
> +            description, process_time() - t1_time))
> +
> +    warn_parcial_match = False
> +    warn_frag = False
> +
> +    if profile:
> +        t1_start()
> +    #
> +    # First let's create a dict of per handler thread events.
> +    #
> +    threads = {}
> +    threads_result = {}
> +    for idx, event in enumerate(events):
> +        if event.event_type == EventType.DP_UPCALL:
> +            continue
> +        if event.pid not in threads:
> +            threads[event.pid] = []
> +        threads[event.pid].append([idx, event])
> +
> +    if profile:
> +        t1_stop("Creating per thread dictionary")
> +        t1_start()
> +    #
> +    # Now spit them in per upcall sets, but remember that
> +    # RecvUpcall event can be batched.
> +    #
> +    batch_stats = []
> +    for thread, items in threads.items():
> +        thread_set = []
> +        batch = []
> +        ovs_pkt_exec_set = []
> +        batching = True
> +        collecting = 0
> +        has_flow_put = False
> +        has_flow_exec = False
> +
> +        def next_batch():
> +            nonlocal batching, batch, collecting, has_flow_put, has_flow_exec
> +            nonlocal ovs_pkt_exec_set, thread_set
> +
> +            if len(batch) > 0:
> +                #
> +                # If we are done with the batch, see if we need to match up
> +                # any batched OVS_PKT_EXEC events.
> +                #
> +                for event in batch:
> +                    if len(ovs_pkt_exec_set) <= 0:
> +                        break
> +                    if any(isinstance(item,
> +                                      OpFlowExecute) for item in event[2]):
> +                        event[2].append(ovs_pkt_exec_set.pop(0))
> +                #
> +                # Append the batch to the thread-specific set.
> +                #
> +                thread_set = thread_set + batch
> +                if collect_stats:
> +                    batch_stats.append(len(batch))
> +            batching = True
> +            batch = []
> +            ovs_pkt_exec_set = []
> +            has_flow_put = False
> +            has_flow_exec = False
> +            collecting = 0
> +
> +        def next_batch_set():
> +            nonlocal has_flow_put, has_flow_exec, collecting
> +            has_flow_put = False
> +            has_flow_exec = False
> +            collecting += 1
> +
> +        for item in items:
> +            idx, event = item
> +
> +            if batching:
> +                if event.event_type == EventType.RECV_UPCALL:
> +                    batch.append(item + [[]])
> +                elif len(batch) > 0:
> +                    batching = False
> +                    collecting = 0
> +                else:
> +                    continue
> +
> +            if not batching:
> +                if event.event_type == EventType.RECV_UPCALL:
> +                    next_batch()
> +                    batch.append(item + [[]])
> +                else:
> +                    if event.event_type == EventType.OP_FLOW_PUT:
> +                        if has_flow_put:
> +                            next_batch_set()
> +                            if collecting >= len(batch):
> +                                next_batch()
> +                                continue
> +
> +                        batch[collecting][2].append(item[1])
> +                        has_flow_put = True
> +
> +                    elif event.event_type == EventType.OP_FLOW_EXECUTE:
> +                        if has_flow_exec:
> +                            next_batch_set()
> +                            if collecting >= len(batch):
> +                                next_batch()
> +                                continue
> +
> +                        if (event.pkt_len == batch[collecting][1].pkt_len
> +                           and event.pkt == batch[collecting][1].pkt):
> +                            batch[collecting][2].append(item[1])
> +                            has_flow_put = True
> +                            has_flow_exec = True
> +                        else:
> +                            #
> +                            # If we end up here it could be that an upcall in a
> +                            # batch did not generate an EXECUTE and we are out
> +                            # of sync. Try to match it to the next batch entry.
> +                            #
> +                            next_idx = collecting + 1
> +                            while True:
> +                                if next_idx >= len(batch):
> +                                    next_batch()
> +                                    break
> +                                if (event.pkt_len == batch[next_idx][1].pkt_len
> +                                   and event.pkt == batch[next_idx][1].pkt):
> +
> +                                    batch[next_idx][2] = batch[collecting][2]
> +                                    batch[collecting][2] = []
> +                                    collecting = next_idx
> +                                    batch[collecting][2].append(item[1])
> +                                    has_flow_put = True
> +                                    has_flow_exec = True
> +                                    break
> +
> +                                next_idx += 1
> +
> +                    elif event.event_type == EventType.OVS_PKT_EXEC:
> +                        #
> +                        # The OVS_PKT_EXEC might also be batched, so we keep
> +                        # them in a separate list and assign them to the
> +                        # correct set when completing the set.
> +                        #
> +                        ovs_pkt_exec_set.append(item[1])
> +                        continue
> +
> +                    if collecting >= len(batch):
> +                        next_batch()
> +        next_batch()
> +        threads_result[thread] = thread_set
> +
> +    if profile:
> +        t1_stop("Creating upcall sets")
> +        t1_start()
> +
> +    #
> +    # Move thread results from list to dictionary
> +    #
> +    thread_stats = {}
> +    for thread, sets in threads_result.items():
> +        if len(sets) > 0:
> +            thread_stats[sets[0][1].comm] = len(sets)
> +
> +        threads_result[thread] = {}
> +        for upcall in sets:
> +            threads_result[thread][upcall[0]] = [upcall[1]] + upcall[2]
> +
> +    if profile:
> +        t1_stop("Moving upcall list to dictionary")
> +        t1_start()
> +
> +    if options.debug & 0x4000000 != 0:
> +        print()
> +        for thread, sets in threads_result.items():
> +            for idx, idx_set in sets.items():
> +                print("DBG: {}".format(idx_set))
> +
> +    #
> +    # Create two lists on with DP_UPCALLs and RECV_UPCALLs
> +    #
> +    dp_upcall_list = []
> +    recv_upcall_list = []
> +    for idx, event in enumerate(events):
> +        if event.event_type == EventType.DP_UPCALL:
> +            dp_upcall_list.append([idx, event])
> +        elif event.event_type == EventType.RECV_UPCALL:
> +            recv_upcall_list.append([idx, event])
> +
> +    if profile:
> +        t1_stop("Creating DP_UPCALL and RECV_UPCALL lists")
> +        t1_start()
> +
> +    if options.debug & 0x4000000 != 0:
> +        print()
> +        for dp_upcall in dp_upcall_list:
> +            print("DBG: {}".format(dp_upcall))
> +        print()
> +        for recv_upcall in recv_upcall_list:
> +            print("DBG: {}".format(recv_upcall))
> +
> +    #
> +    # Now find the matching DP_UPCALL and RECV_UPCALL events
> +    #
> +    event_sets = []
> +    if spinner:
> +        print()
> +    with alive_bar(len(dp_upcall_list),
> +                   title="- Matching DP_UPCALLs to RECV_UPCALLs",
> +                   spinner=None, disable=not spinner) as bar:
> +
> +        for (idx, event) in dp_upcall_list:
> +            remove_indexes = []
> +            this_set = None
> +            #
> +            # TODO: This part needs some optimization, as it's slow in the
> +            #       PVP test scenario. This is because a lot of DP_UPCALLS
> +            #       will not have a matching RECV_UPCALL leading to walking
> +            #       the entire recv_upcall_list list.
> +            #
> +            #       Probably some dictionary, but in the PVP scenario packets
> +            #       come from a limited set of ports, and the length is all the
> +            #       same. So we do need the key to be recv.dport +
> +            #       len(recv.pkt) + recv.pkt, however, the recv.pkt compare
> +            #       needs to happen on  min(len(event.pkt), len(recv.pkt)).
> +            #
> +            for idx_in_list, (idx_recv, recv) in enumerate(recv_upcall_list):
> +                match = False
> +
> +                if idx_recv < idx:
> +                    remove_indexes.append(idx_in_list)
> +                    continue
> +                #
> +                # If the RecvUpcall is a tunnel port, we can not map it to
> +                # the correct tunnel. For now, we assume the first matching
> +                # packet is the correct one. For more details see the OVS
> +                # ukey_to_flow_netdev() function.
> +                #
> +                if (event.dp_port == recv.dp_port or
> +                    recv.dp_port == DP_TUNNEL_PORT) \
> +                   and event.pkt_len == recv.pkt_len:
> +
> +                    compare_len = min(len(event.pkt), len(recv.pkt))
> +
> +                    if len(event.pkt) != len(recv.pkt) \
> +                       and event.pkt_frag_len == 0:
> +                        warn_parcial_match = True
> +                    elif event.pkt_frag_len != 0:
> +                        warn_frag = True
> +                        compare_len = min(compare_len, event.pkt_frag_len)
> +
> +                    if event.pkt[:compare_len] == recv.pkt[:compare_len]:
> +                        match = True
> +                    else:
> +                        #
> +                        # There are still some corner cases due to the fact
> +                        # the kernel dp_upcall tracepoint is hit before the
> +                        # packet is prepared/modified for upcall pass on.
> +                        # Example cases are packet checksum update, VLAN
> +                        # insertion, etc., etc. For now, we try to handle the
> +                        # checksum part, but we might need to add more
> +                        # exceptions in the future.
> +                        #
> +                        diff_bytes = sum(i != j for i, j in zip(
> +                            event.pkt[:compare_len], recv.pkt[:compare_len]))
> +
> +                        if diff_bytes <= 2 and compare_len > 56:
> +                            # This could be a TCP or UDP checksum
> +                            event_pkt = Ether(bytes(event.pkt)[:compare_len])
> +                            recv_pkt = Ether(bytes(recv.pkt)[:compare_len])
> +                            if (event_pkt.haslayer(TCP) and
> +                                recv_pkt.haslayer(TCP)) or (
> +                                    event_pkt.haslayer(UDP) and
> +                                    recv_pkt.haslayer(UDP)):
> +
> +                                if event_pkt.haslayer(TCP):
> +                                    event_chksum = event_pkt[TCP].chksum
> +                                    recv_chksum = recv_pkt[TCP].chksum
> +                                else:
> +                                    event_chksum = event_pkt[UDP].chksum
> +                                    recv_chksum = recv_pkt[UDP].chksum
> +
> +                                if event_chksum & 0xff != recv_chksum & 0xff:
> +                                    diff_bytes -= 1
> +                                if event_chksum & 0xff00 != \
> +                                   recv_chksum & 0xff00:
> +                                    diff_bytes -= 1
> +
> +                        if diff_bytes == 0:
> +                            match = True
> +
> +                    if match:
> +                        this_set = {event.event_type: event}
> +                        for sevent in threads_result[recv.pid][idx_recv]:
> +                            this_set[sevent.event_type] = sevent
> +                        event_sets.append(this_set)
> +                        remove_indexes.append(idx_in_list)
> +
> +                        if options.debug & 0x4000000 != 0:
> +                            print("DBG: Matched DpUpcall({:6}) => "
> +                                  "RecvUpcall({:6})".format(idx, idx_recv))
> +
> +                        break
> +
> +                    elif options.debug & 0x8000000 != 0:
> +                        print("DBG: COMPARE DpUpcall({:6}) != "
> +                              "RecvUpcall({:6})".format(idx, idx_recv))
> +                        event_pkt = Ether(bytes(event.pkt)[:compare_len])
> +                        recv_pkt = Ether(bytes(recv.pkt)[:compare_len])
> +                        print(re.sub('^', 'DBG:' + ' ' * 4,
> +                                     event_pkt.show(dump=True),
> +                                     flags=re.MULTILINE))
> +                        print(re.sub('^', 'DBG:' + ' ' * 4,
> +                                     recv_pkt.show(dump=True),
> +                                     flags=re.MULTILINE))
> +
> +                elif options.debug & 0x8000000 != 0:
> +                    print("DBG: COMPATE DpUpcall({:6}) != "
> +                          "RecvUpcall({:6}) -> port {}, {} -> "
> +                          "len = {}, {}".format(idx, idx_recv,
> +                                                event.dp_port,
> +                                                recv.dp_port,
> +                                                event.pkt_len,
> +                                                recv.pkt_len))
> +
> +            bar()
> +            for remove_idx in sorted(remove_indexes, reverse=True):
> +                del recv_upcall_list[remove_idx]
> +
> +    if profile:
> +        t1_stop("Matching DP_UPCALLs to a set")
> +
> +    if warn_parcial_match:
> +        print("WARNING: Packets not fully captured for matching!\n         "
> +              "Increase the packet buffer with the '--packet-size' option.")
> +    if warn_frag:
> +        print("WARNING: SKB from kernel had fragments, we could only copy/"
> +              "compare the first part!")
> +
> +    if collect_stats:
> +        return event_sets, batch_stats, thread_stats
> +
> +    return event_sets
> +
> +
> +#
> +# unit_test()
> +#
> +def unit_test():
> +    pkt1 = b'\x01\x02\x03\x04\x05'
> +    pkt2 = b'\x01\x02\x03\x04\x06'
> +    pkt3 = b'\x01\x02\x03\x04\x07'
> +    key = b'\x08\x00\x03\x00\x01\x00\x00\x00'  # Port 1
> +    #
> +    # Basic test with all events in line
> +    #
> +    t1_events = [DpUpcall(1, 100, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
> +                 RecvUpcall(2, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
> +                 Event(3, 1, "hndl", 1, EventType.OP_FLOW_PUT),
> +                 OpFlowExecute(4, 1, "hndl", 1, pkt1, len(pkt1)),
> +                 Event(5, 1, "hndl", 1, EventType.OVS_PKT_EXEC)]
> +    t1_result = [{EventType.DP_UPCALL: t1_events[0],
> +                  EventType.RECV_UPCALL: t1_events[1],
> +                  EventType.OP_FLOW_PUT: t1_events[2],
> +                  EventType.OP_FLOW_EXECUTE: t1_events[3],
> +                  EventType.OVS_PKT_EXEC: t1_events[4]}]
> +    #
> +    # Basic test with missing flow put
> +    #
> +    t2_events = [DpUpcall(1, 100, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
> +                 RecvUpcall(2, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
> +                 OpFlowExecute(4, 1, "hndl", 1, pkt1, len(pkt1)),
> +                 Event(5, 1, "hndl", 1, EventType.OVS_PKT_EXEC)]
> +    t2_result = [{EventType.DP_UPCALL: t2_events[0],
> +                  EventType.RECV_UPCALL: t2_events[1],
> +                  EventType.OP_FLOW_EXECUTE: t2_events[2],
> +                  EventType.OVS_PKT_EXEC: t2_events[3]}]
> +    #
> +    # Test with RecvUpcall's being batched
> +    #
> +    t3_events = [DpUpcall(1, 101, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
> +                 DpUpcall(2, 102, "ping", 2, "system", 1, pkt2, len(pkt2), 0),
> +                 DpUpcall(3, 101, "ping", 3, "system", 1, pkt3, len(pkt3), 0),
> +                 RecvUpcall(4, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
> +                 RecvUpcall(5, 1, "hndl", 1, "systen", key, pkt3, len(pkt3)),
> +                 RecvUpcall(6, 1, "hndl", 1, "systen", key, pkt2, len(pkt2)),
> +                 Event(7, 1, "hndl", 1, EventType.OP_FLOW_PUT),
> +                 OpFlowExecute(8, 1, "hndl", 1, pkt1, len(pkt1)),
> +                 Event(9, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
> +                 Event(10, 1, "hndl", 1, EventType.OP_FLOW_PUT),
> +                 OpFlowExecute(11, 1, "hndl", 1, pkt3, len(pkt3)),
> +                 Event(12, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
> +                 Event(13, 1, "hndl", 1, EventType.OP_FLOW_PUT),
> +                 OpFlowExecute(14, 1, "hndl", 1, pkt2, len(pkt2)),
> +                 Event(15, 1, "hndl", 1, EventType.OVS_PKT_EXEC)]
> +    t3_result = [{EventType.DP_UPCALL: t3_events[0],
> +                  EventType.RECV_UPCALL: t3_events[3],
> +                  EventType.OP_FLOW_PUT: t3_events[6],
> +                  EventType.OP_FLOW_EXECUTE: t3_events[7],
> +                  EventType.OVS_PKT_EXEC: t3_events[8]},
> +                 {EventType.DP_UPCALL: t3_events[1],
> +                  EventType.RECV_UPCALL: t3_events[5],
> +                  EventType.OP_FLOW_PUT: t3_events[12],
> +                  EventType.OP_FLOW_EXECUTE: t3_events[13],
> +                  EventType.OVS_PKT_EXEC: t3_events[14]},
> +                 {EventType.DP_UPCALL: t3_events[2],
> +                  EventType.RECV_UPCALL: t3_events[4],
> +                  EventType.OP_FLOW_PUT: t3_events[9],
> +                  EventType.OP_FLOW_EXECUTE: t3_events[10],
> +                  EventType.OVS_PKT_EXEC: t3_events[11]}]
> +    #
> +    # Test with RecvUpcall's single + batch
> +    #
> +    t4_events = [DpUpcall(1, 100, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
> +                 RecvUpcall(2, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
> +                 Event(3, 1, "hndl", 1, EventType.OP_FLOW_PUT),
> +                 OpFlowExecute(4, 1, "hndl", 1, pkt1, len(pkt1)),
> +                 Event(5, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
> +                 DpUpcall(6, 101, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
> +                 DpUpcall(7, 102, "ping", 2, "system", 1, pkt2, len(pkt2), 0),
> +                 DpUpcall(8, 101, "ping", 3, "system", 1, pkt3, len(pkt3), 0),
> +                 RecvUpcall(9, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
> +                 RecvUpcall(10, 1, "hndl", 1, "systen", key, pkt3, len(pkt3)),
> +                 RecvUpcall(11, 1, "hndl", 1, "systen", key, pkt2, len(pkt2)),
> +                 Event(12, 1, "hndl", 1, EventType.OP_FLOW_PUT),
> +                 OpFlowExecute(13, 1, "hndl", 1, pkt1, len(pkt1)),
> +                 Event(14, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
> +                 Event(15, 1, "hndl", 1, EventType.OP_FLOW_PUT),
> +                 OpFlowExecute(16, 1, "hndl", 1, pkt3, len(pkt3)),
> +                 Event(17, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
> +                 Event(18, 1, "hndl", 1, EventType.OP_FLOW_PUT),
> +                 OpFlowExecute(14, 1, "hndl", 1, pkt2, len(pkt2)),
> +                 Event(19, 1, "hndl", 1, EventType.OVS_PKT_EXEC)]
> +    t4_result = [{EventType.DP_UPCALL: t4_events[0],
> +                  EventType.RECV_UPCALL: t4_events[1],
> +                  EventType.OP_FLOW_PUT: t4_events[2],
> +                  EventType.OP_FLOW_EXECUTE: t4_events[3],
> +                  EventType.OVS_PKT_EXEC: t4_events[4]},
> +                 {EventType.DP_UPCALL: t4_events[5],
> +                  EventType.RECV_UPCALL: t4_events[8],
> +                  EventType.OP_FLOW_PUT: t4_events[11],
> +                  EventType.OP_FLOW_EXECUTE: t4_events[12],
> +                  EventType.OVS_PKT_EXEC: t4_events[13]},
> +                 {EventType.DP_UPCALL: t4_events[6],
> +                  EventType.RECV_UPCALL: t4_events[10],
> +                  EventType.OP_FLOW_PUT: t4_events[17],
> +                  EventType.OP_FLOW_EXECUTE: t4_events[18],
> +                  EventType.OVS_PKT_EXEC: t4_events[19]},
> +                 {EventType.DP_UPCALL: t4_events[7],
> +                  EventType.RECV_UPCALL: t4_events[9],
> +                  EventType.OP_FLOW_PUT: t4_events[14],
> +                  EventType.OP_FLOW_EXECUTE: t4_events[15],
> +                  EventType.OVS_PKT_EXEC: t4_events[16]}]
> +    #
> +    # Test with two threads interleaved
> +    #
> +    t5_events = [DpUpcall(1, 100, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
> +                 DpUpcall(2, 100, "ping", 1, "system", 1, pkt2, len(pkt2), 0),
> +                 RecvUpcall(3, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
> +                 RecvUpcall(4, 2, "hndl", 2, "systen", key, pkt2, len(pkt2)),
> +                 Event(5, 2, "hndl", 2, EventType.OP_FLOW_PUT),
> +                 Event(6, 1, "hndl", 1, EventType.OP_FLOW_PUT),
> +                 OpFlowExecute(7, 2, "hndl", 1, pkt2, len(pkt2)),
> +                 OpFlowExecute(8, 1, "hndl", 1, pkt1, len(pkt1)),
> +                 Event(9, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
> +                 Event(10, 2, "hndl", 1, EventType.OVS_PKT_EXEC)]
> +    t5_result = [{EventType.DP_UPCALL: t5_events[0],
> +                  EventType.RECV_UPCALL: t5_events[2],
> +                  EventType.OP_FLOW_PUT: t5_events[5],
> +                  EventType.OP_FLOW_EXECUTE: t5_events[7],
> +                  EventType.OVS_PKT_EXEC: t5_events[8]},
> +                 {EventType.DP_UPCALL: t5_events[1],
> +                  EventType.RECV_UPCALL: t5_events[3],
> +                  EventType.OP_FLOW_PUT: t5_events[4],
> +                  EventType.OP_FLOW_EXECUTE: t5_events[6],
> +                  EventType.OVS_PKT_EXEC: t5_events[9]}]
> +    #
> +    # Test batch with missing events
> +    #
> +    t6_events = [DpUpcall(1, 101, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
> +                 DpUpcall(2, 102, "ping", 2, "system", 1, pkt2, len(pkt2), 0),
> +                 RecvUpcall(3, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
> +                 RecvUpcall(4, 1, "hndl", 1, "systen", key, pkt2, len(pkt2)),
> +                 Event(5, 1, "hndl", 1, EventType.OP_FLOW_PUT),
> +                 OpFlowExecute(6, 1, "hndl", 1, pkt2, len(pkt2)),
> +                 Event(7, 1, "hndl", 1, EventType.OVS_PKT_EXEC)]
> +    t6_result = [{EventType.DP_UPCALL: t6_events[0],
> +                  EventType.RECV_UPCALL: t6_events[2]},
> +                 {EventType.DP_UPCALL: t6_events[1],
> +                  EventType.RECV_UPCALL: t6_events[3],
> +                  EventType.OP_FLOW_PUT: t6_events[4],
> +                  EventType.OP_FLOW_EXECUTE: t6_events[5],
> +                  EventType.OVS_PKT_EXEC: t6_events[6]}]
> +    #
> +    # Test with RecvUpcall's and OVS_PKT_EXEC being batched
> +    #
> +    t7_events = [DpUpcall(1, 101, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
> +                 DpUpcall(2, 102, "ping", 2, "system", 1, pkt2, len(pkt2), 0),
> +                 DpUpcall(3, 101, "ping", 3, "system", 1, pkt3, len(pkt3), 0),
> +                 RecvUpcall(4, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
> +                 RecvUpcall(5, 1, "hndl", 1, "systen", key, pkt2, len(pkt2)),
> +                 RecvUpcall(6, 1, "hndl", 1, "systen", key, pkt3, len(pkt3)),
> +                 Event(7, 1, "hndl", 1, EventType.OP_FLOW_PUT),
> +                 OpFlowExecute(8, 1, "hndl", 1, pkt1, len(pkt1)),
> +                 Event(9, 1, "hndl", 1, EventType.OP_FLOW_PUT),
> +                 OpFlowExecute(10, 1, "hndl", 1, pkt2, len(pkt2)),
> +                 Event(11, 1, "hndl", 1, EventType.OP_FLOW_PUT),
> +                 OpFlowExecute(12, 1, "hndl", 1, pkt3, len(pkt3)),
> +                 Event(13, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
> +                 Event(14, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
> +                 Event(15, 1, "hndl", 1, EventType.OVS_PKT_EXEC)]
> +    t7_result = [{EventType.DP_UPCALL: t7_events[0],
> +                  EventType.RECV_UPCALL: t7_events[3],
> +                  EventType.OP_FLOW_PUT: t7_events[6],
> +                  EventType.OP_FLOW_EXECUTE: t7_events[7],
> +                  EventType.OVS_PKT_EXEC: t7_events[12]},
> +                 {EventType.DP_UPCALL: t7_events[1],
> +                  EventType.RECV_UPCALL: t7_events[4],
> +                  EventType.OP_FLOW_PUT: t7_events[8],
> +                  EventType.OP_FLOW_EXECUTE: t7_events[9],
> +                  EventType.OVS_PKT_EXEC: t7_events[13]},
> +                 {EventType.DP_UPCALL: t7_events[2],
> +                  EventType.RECV_UPCALL: t7_events[5],
> +                  EventType.OP_FLOW_PUT: t7_events[10],
> +                  EventType.OP_FLOW_EXECUTE: t7_events[11],
> +                  EventType.OVS_PKT_EXEC: t7_events[14]}]
> +    #
> +    # Actual test sets
> +    #
> +    test_set = [["Simple single event", t1_events, t1_result],
> +                ["Single event, missing flow_put", t2_events, t2_result],
> +                ["Batched events", t3_events, t3_result],
> +                ["Single + batched events", t4_events, t4_result],
> +                ["Two sets, different threads", t5_events, t5_result],
> +                ["Batch with missing exec", t6_events, t6_result],
> +                ["Batched events including exec", t7_events, t7_result]]
> +
> +    print("Running some simple unit tests:")
> +
> +    for test in test_set:
> +        print("- {:<32}  ".format(test[0]), end="")
> +        result = collect_event_sets(test[1][:])
> +        if result == test[2]:
> +            print("PASS")
> +        else:
> +            print("FAIL")
> +            print("  OUTPUT  :")
> +            for event_set in result:
> +                hdr = "    - "
> +                for event_type, event in event_set.items():
> +                    print("{} {:<16}: {}".format(hdr, event_type.name, event))
> +                    hdr = "      "
> +            print("  EXPECTED:")
> +            for event_set in test[2]:
> +                hdr = "    - "
> +                for event_type, event in event_set.items():
> +                    print("{} {:<16}: {}".format(hdr, event_type.name, event))
> +                    hdr = "      "
> +
> +
> +#
> +# show_key_value()
> +#
> +def show_key_value(data_set, description=None):
> +    if description is not None:
> +        print("\n=> {}:".format(description))
> +
> +    for k, v in data_set.items():
> +        print("  {:36}: {:>10}".format(str(k), str(v)))
> +
> +
> +#
> +# show_batch_histogram()
> +#
> +def show_batch_histogram(data_set, description=None):
> +    nr_of_buckets = 64
> +
> +    if description is not None:
> +        print("\n=> {}:".format(description))
> +
> +    if len(data_set) == 0:
> +        print("# NumSamples = 0")
> +        return
> +
> +    min_val = nr_of_buckets
> +    max_val = 0
> +    entries = 0
> +    high_buckets = 0
> +    buckets = [0] * nr_of_buckets
> +
> +    for entry in data_set:
> +        min_val = min(min_val, entry)
> +        max_val = max(max_val, entry)
> +        if entry == 0:
> +            continue
> +        elif entry > nr_of_buckets:
> +            high_buckets += 1
> +        else:
> +            buckets[entry - 1] += 1
> +
> +        entries += 1
> +
> +    if max(buckets + [high_buckets]) > 4:
> +        scale = int(max(buckets + [high_buckets]) / 4)
> +    else:
> +        scale = 1
> +
> +    print("# NumSamples = {}; Min = {}; Max = {}".format(entries, min_val,
> +                                                         max_val))
> +    print("# each ∎ represents a count of {}".format(scale))
> +
> +    for idx in range(int(nr_of_buckets / 2)):
> +        idx_2nd = idx + int(nr_of_buckets / 2)
> +        print("{:5} [{:8}]: {:22}  {:5} [{:8}]: {:22}".format(
> +            idx + 1, buckets[idx], "∎" * int(buckets[idx] / scale),
> +            idx_2nd + 1, buckets[idx_2nd],
> +            "∎" * int(buckets[idx_2nd] / scale)))
> +
> +    if high_buckets > 0:
> +        print("{:>5} [{:8}]: {:22}".format(">" + str(nr_of_buckets),
> +                                           high_buckets,
> +                                           "∎" * int(high_buckets / scale)))
> +
> +
> +#
> +# show_histogram()
> +#
> +def show_histogram(data_set, description=None, options=None,
> +                   minimum=None, maximum=None, buckets=None, custbuckets=None):
> +    if description is not None:
> +        print("\n=> {}:".format(description))
> +
> +    if options is not None:
> +        if buckets is None:
> +            buckets = options.histogram_buckets
> +        if options is not None and options.sets:
> +            print(data_set)
> +
> +    if len(data_set) == 0:
> +        print("# NumSamples = 0")
> +    elif len(data_set) == 1:
> +        print("# NumSamples = 1; Min = {0:.4f}; Max = {0:.4f}".
> +              format(data_set[0]))
> +    elif len(set(data_set)) == 1 and maximum is None and minimum is None and \
> +            custbuckets is None:
> +        histogram(data_set, buckets=buckets, minimum=list(set(data_set))[0],
> +                  maximum=list(set(data_set))[0] + 1)
> +    else:
> +        histogram(data_set, buckets=buckets,
> +                  minimum=minimum, maximum=maximum, custbuckets=custbuckets)
> +
> +
> +#
> +# buffer_size_type()
> +#
> +def buffer_size_type(astr, min=64, max=2048):
> +    value = int(astr)
> +    if min <= value <= max:
> +        return value
> +    else:
> +        raise argparse.ArgumentTypeError(
> +            'value not in range {}-{}'.format(min, max))
> +
> +
> +#
> +# next_power_of_two()
> +#
> +def next_port_of_two(val):

mismatch between the comment and the function name.

> +    np = 1
> +    while np < val:
> +        np *= 2
> +    return np
> +
> +
> +#
> +# main()
> +#
> +def main():
> +    #
> +    # Don't like these globals, but ctx passing does not seem to work with the
> +    # existing open_ring_buffer() API :(
> +    #
> +    global b
> +    global options
> +    global trace_data
> +    global events_received
> +    global event_count
> +    global export_file
> +
> +    #
> +    # Argument parsing
> +    #
> +    parser = argparse.ArgumentParser()
> +
> +    parser.add_argument("-b", "--histogram-buckets",
> +                        help="Number of buckets per histogram, default 20",
> +                        type=int, default=20, metavar="BUCKETS")
> +    parser.add_argument("--buffer-page-count",
> +                        help="Number of BPF ring buffer pages, default 1024",
> +                        type=int, default=1024, metavar="NUMBER")
> +    parser.add_argument("-D", "--debug",
> +                        help="Enable eBPF debugging",
> +                        type=lambda x: int(x, 0), const=0x3f, default=0,
> +                        nargs='?')
> +    parser.add_argument("-f", "--flow-key-size",
> +                        help="Set maximum flow key size to capture, "
> +                        "default 64", type=buffer_size_type, default=64,
> +                        metavar="[64-2048]")
> +    parser.add_argument("--handler-filter",
> +                        help="Post processing handler thread filter",
> +                        type=str, default=None, metavar="HANDLERS")
> +    parser.add_argument("-P", "--packet-size",
> +                        help="Set maximum packet size to capture, "
> +                        "default 256", type=buffer_size_type, default=256,
> +                        metavar="[64-2048]")
> +    parser.add_argument("-p", "--pid", metavar="VSWITCHD_PID",
> +                        help="ovs-vswitch's PID",
> +                        type=int, default=None)
> +    parser.add_argument("-q", "--quiet", action="store_true",
> +                        help="Do not show individual events")
> +    parser.add_argument("-r", "--read-events",
> +                        help="Read events from FILE instead of installing "
> +                        "tracepoints", type=str, default=None, metavar="FILE")
> +    parser.add_argument("--sets", action="store_true",
> +                        help="Dump content of data sets")
> +    parser.add_argument("-s", "--stop",
> +                        help="Stop after receiving EVENTS number of trace "
> +                        "events",
> +                        type=int, default=0, metavar="EVENTS")
> +    parser.add_argument("--unit-test", action="store_true",
> +                        help=argparse.SUPPRESS)
> +    parser.add_argument("-w", "--write-events",
> +                        help="Write events to FILE",
> +                        type=str, default=None, metavar="FILE")
> +
> +    options = parser.parse_args()
> +
> +    if options.unit_test:
> +        unit_test()
> +        sys.exit(0)
> +
> +    #
> +    # Find the PID of the ovs-vswitchd daemon if not specified.
> +    #
> +    if options.pid is None and options.read_events is None:
> +        for proc in psutil.process_iter():
> +            if 'ovs-vswitchd' in proc.name():
> +                if options.pid is not None:
> +                    print("ERROR: Multiple ovs-vswitchd daemons running, "
> +                          "use the -p option!")
> +                    sys.exit(-1)
> +
> +                options.pid = proc.pid
> +
> +    #
> +    # Error checking on input parameters.
> +    #
> +    if options.pid is None and options.read_events is None:
> +        print("ERROR: Failed to find ovs-vswitchd's PID!")
> +        sys.exit(-1)
> +
> +    if options.read_events is not None and options.write_events is not None:
> +        print("ERROR: Either supply the read or write events option, "
> +              "not both!")
> +        sys.exit(-1)
> +
> +    if options.handler_filter is not None and options.read_events is None:
> +        print("ERROR: The --handler-filter option is only valid with the "
> +              "--read-events option!")
> +        sys.exit(-1)
> +
> +    options.buffer_page_count = next_port_of_two(options.buffer_page_count)
> +
> +    #
> +    # Open write handle if needed.
> +    #
> +    if options.write_events is not None:
> +        try:
> +            export_file = open(options.write_events, "w")
> +        except (FileNotFoundError, IOError, PermissionError) as e:
> +            print("ERROR: Can't create export file \"{}\": {}".format(
> +                options.write_events, e.strerror))
> +            sys.exit(-1)
> +    else:
> +        export_file = None
> +
> +    trace_data = []
> +    event_count = {'total': {}, 'valid': {}, 'miss': {}}
> +    if options.read_events is None:
> +        #
> +        # Call get_dp_mapping() to prepare the cache
> +        #
> +        dp_port_map = get_dp_mapping("ovs-system", "eth0", return_map=True)
> +        if export_file is not None:
> +            export_file.write("dp_port_map = {}\n".format(dp_port_map))
> +
> +        #
> +        # Attach the usdt probe
> +        #
> +        u = USDT(pid=int(options.pid))
> +        try:
> +            u.enable_probe(probe="recv_upcall", fn_name="trace__recv_upcall")
> +            u.enable_probe(probe="op_flow_put", fn_name="trace__op_flow_put")
> +            u.enable_probe(probe="op_flow_execute",
> +                           fn_name="trace__op_flow_execute")
> +        except USDTException as e:
> +            print("ERROR: {}"
> +                  "ovs-vswitchd!".format(
> +                      (re.sub('^', ' ' * 7, str(e),
> +                              flags=re.MULTILINE)).strip().
> +                      replace("--with-dtrace or --enable-dtrace",
> +                              "--enable-usdt-probes")))
> +            sys.exit(-1)
> +
> +        #
> +        # Uncomment to see how arguments are decoded.
> +        #   print(u.get_text())
> +        #
> +        print("- Compiling eBPF programs...")
> +
> +        #
> +        # Attach probes to the running process
> +        #
> +        source = ebpf_source.replace("<MAX_PACKET_VAL>",
> +                                     str(options.packet_size))
> +        source = source.replace("<MAX_KEY_VAL>", str(options.flow_key_size))
> +        source = source.replace("<BUFFER_PAGE_CNT>",
> +                                str(options.buffer_page_count))
> +
> +        b = BPF(text=source, usdt_contexts=[u], debug=options.debug & 0xffffff)
> +
> +        #
> +        # Dump out all events
> +        #
> +        print("- Capturing events [Press ^C to stop]...")
> +        events_received = 0
> +
> +        b['events'].open_ring_buffer(receive_event_bcc)

What do you think about adding a header (even not aligned is ok AFAIC)
before printing the events (when not quiet) like you did with:

=> Events received per type (usable/total) [missed events]

> +        while 1:
> +            try:
> +                b.ring_buffer_poll()
> +                if options.stop != 0 and events_received >= options.stop:
> +                    break
> +                time.sleep(0.5)
> +            except KeyboardInterrupt:
> +                break
> +
> +        dropcnt = b.get_table("dropcnt")
> +        export_misses = {}
> +        for k in dropcnt.keys():
> +            event = EventType.from_trace(k.value)
> +            count = dropcnt.sum(k).value
> +            if count > 0:
> +                if event not in event_count['total']:
> +                    event_count['total'][event] = 0
> +                    event_count['valid'][event] = 0
> +                event_count['miss'][event] = count
> +                export_misses[k.value] = count
> +
> +        if options.write_events is not None:
> +            if sum(event_count['miss'].values()) > 0:
> +                export_file.write("event_miss = {}\n".format(export_misses))
> +
> +            export_file.close()
> +
> +        print()
> +    else:
> +        #
> +        # Here we are requested to read event from an event export
> +        #
> +        thread_filter = None
> +        if options.handler_filter is not None:
> +            thread_filter = options.handler_filter.split(',')
> +
> +        try:
> +            dp_port_mapping_valid = False
> +            with open(options.read_events, 'r') as fd:
> +                events_received = 0
> +
> +                if options.quiet:
> +                    spinner = Halo(spinner="dots", color="cyan",
> +                                   text="Reading events from \"{}\"...".format(
> +                                       options.read_events))
> +                    spinner.start()
> +                else:
> +                    print("- Reading events from \"{}\"...".format(
> +                        options.read_events))
> +                for entry in fd:
> +                    if options.stop != 0 and events_received >= options.stop:
> +                        break
> +
> +                    entry.rstrip()
> +                    if entry.startswith('dp_port_map = {'):
> +                        if not dp_port_mapping_valid:
> +                            dp_port_mapping_valid = True
> +                            get_dp_mapping("", "",
> +                                           dp_map=ast.literal_eval(entry[14:]))
> +                    elif (entry.startswith('event = {') and
> +                          dp_port_mapping_valid):
> +                        event = ast.literal_eval(entry[8:])
> +                        event = namedtuple("EventObject",
> +                                           event.keys())(*event.values())
> +
> +                        if thread_filter is not None \
> +                           and EventType.from_trace(event.event) != \
> +                           EventType.DP_UPCALL \
> +                           and event.comm.decode("utf-8") not in thread_filter:
> +                            # Skip none filtered threads
> +                            continue
> +
> +                        if len(event.pkt) > 0:
> +                            options.packet_size = len(event.pkt)
> +                        if len(event.key) > 0:
> +                            options.flow_key_size = len(event.key)
> +                        receive_event(event)
> +                        events_received += 1
> +                    elif entry.startswith('event_miss = {'):
> +                        misses = ast.literal_eval(entry[13:])
> +                        for e, count in misses.items():
> +                            event = EventType.from_trace(e)
> +                            if count > 0:
> +                                if event not in event_count['total']:
> +                                    event_count['total'][event] = 0
> +                                    event_count['valid'][event] = 0
> +                                event_count['miss'][event] = count
> +
> +            if options.quiet:
> +                spinner.stop()
> +                print("- Reading events from \"{}\"...".format(
> +                    options.read_events))
> +
> +        except (FileNotFoundError, PermissionError):
> +            print("ERROR: Can't open file \"{}\" for reading!".format(
> +                options.read_events))
> +            sys.exit(-1)
> +
> +    #
> +    # Start analyzing the data
> +    #
> +    print("- Analyzing results ({} events)...".format(len(trace_data)))
> +
> +    if events_received > 0:
> +        if sum(event_count['miss'].values()) > 0:
> +            print("\nWARNING: Not all events were captured!\n         "
> +                  "Increase the BPF ring buffer size with the "
> +                  "--buffer-page-count option.")
> +
> +        print("\n=> Events received per type (usable/total) [missed events]:")
> +        for event, total in sorted(event_count['total'].items()):
> +            miss = event_count['miss'][event] if event in event_count['miss'] \
> +                else 0
> +            print("  {:36}: {:10}/{:10} [{:10}]".format(
> +                event, event_count['valid'][event], total, miss))
> +
> +    collection, batch_stats, thread_stats = collect_event_sets(
> +        trace_data, collect_stats=True, spinner=True)
> +
> +    if len(collection) <= 0:
> +        print("No upcall data sets where found!!")
> +        sys.exit(0)
> +
> +    print("\n- Analyzing {} event sets...".format(len(collection)))
> +
> +    if options.debug & 0x1000000 != 0:
> +        for upcall in collection:
> +            print("DBG: {}{}{}{}{}".format(
> +                "U" if EventType.DP_UPCALL in upcall else "-",
> +                "u" if EventType.RECV_UPCALL in upcall else "-",
> +                "p" if EventType.OP_FLOW_PUT in upcall else "-",
> +                "e" if EventType.OP_FLOW_EXECUTE in upcall else "-",
> +                "E" if EventType.OVS_PKT_EXEC in upcall else "-"))
> +            if options.debug & 0x2000000 != 0:
> +                try:
> +                    print("DBG:  - {}".format(upcall[EventType.DP_UPCALL]))
> +                    print("DBG:  - {}".format(upcall[EventType.RECV_UPCALL]))
> +                    print("DBG:  - {}".format(upcall[EventType.OP_FLOW_PUT]))
> +                    print("DBG:  - {}".format(
> +                        upcall[EventType.OP_FLOW_EXECUTE]))
> +                    print("DBG:  - {}".format(upcall[EventType.OVS_PKT_EXEC]))
> +                except LookupError:
> +                    continue
> +
> +    show_key_value(thread_stats, description="Upcalls handled per thread")
> +    show_batch_histogram(batch_stats,
> +                         description="Histogram of upcalls per batch")
> +
> +    kernel_to_vswitchd = []
> +    kernel_to_kernel_exec = []
> +    vswitchd_to_kernel = []
> +    time_minus_lookup = []
> +    for upcall in collection:
> +        kernel_to_vswitchd.append((upcall[EventType.RECV_UPCALL].ts -
> +                                   upcall[EventType.DP_UPCALL].ts) /
> +                                  1000)
> +
> +        if EventType.OP_FLOW_PUT in upcall and \
> +           EventType.OVS_PKT_EXEC in upcall:
> +            time_minus_lookup.append(
> +                ((upcall[EventType.OVS_PKT_EXEC].ts -
> +                  upcall[EventType.DP_UPCALL].ts) -
> +                 (upcall[EventType.OP_FLOW_PUT].ts -
> +                  upcall[EventType.RECV_UPCALL].ts)) / 1000)
> +
> +        if EventType.OP_FLOW_EXECUTE in upcall and \
> +           EventType.OVS_PKT_EXEC in upcall:
> +            vswitchd_to_kernel.append((upcall[EventType.OVS_PKT_EXEC].ts
> +                                       - upcall[EventType.OP_FLOW_EXECUTE].ts)
> +                                      / 1000)
> +
> +        if EventType.OVS_PKT_EXEC in upcall:
> +            kernel_to_kernel_exec.append((upcall[EventType.OVS_PKT_EXEC].ts -
> +                                          upcall[EventType.DP_UPCALL].ts) /
> +                                         1000)
> +
> +    show_histogram(kernel_to_vswitchd,
> +                   description="Kernel upcall action to vswitchd receive "
> +                   "(microseconds)",
> +                   options=options)
> +    show_histogram(vswitchd_to_kernel,
> +                   description="vsiwtchd execute to kernel receive "

typo: vswitchd

> +                   "(microseconds)",
> +                   options=options)
> +    show_histogram(time_minus_lookup,
> +                   description="Upcall overhead (total time minus lookup) "
> +                   "(microseconds)",
> +                   options=options)
> +    show_histogram(kernel_to_kernel_exec,
> +                   description="Kernel upcall to kernel packet execute "
> +                   "(microseconds)",
> +                   options=options)
> +
> +
> +#
> +# Start main() as the default entry point...
> +#
> +if __name__ == '__main__':
> +    main()
>
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
Eelco Chaudron Dec. 21, 2021, 4:51 p.m. UTC | #2
On 21 Dec 2021, at 16:53, Paolo Valerio wrote:

> Hi Eelco,
>
> I did some tests, and the script seems to work as expected.
> My comments inline.


Thanks for the review! See comments inline below.


> Eelco Chaudron <echaudro@redhat.com> writes:
>
>> This patch adds a series of NetLink flow operation USDT probes.
>> These probes are in turn used in the upcall_cost Python script,
>> which in addition of some kernel tracepoints, give an insight into
>> the time spent on processing upcall.
>>
>> Signed-off-by: Eelco Chaudron <echaudro@redhat.com>

<SNIP>

>> +# Script information:
>> +# -------------------
>> +# upcall_cost.py uses various user space and kernel space probes to determine
>> +# the costs (in time) for handling the first packet in user space. It
>> +# calculates the following costs:
>> +#
>> +# - Time it takes from the kernel sending the upcall till it's received by the
>> +#   ovs-vswitchd process.
>> +# - Time it takes from ivs-vswitchd sending the execute actions command till
>
> typo: ovs-vswitchd

Will fix in v2

<SNIP>

>> +int trace__recv_upcall(struct pt_regs *ctx) {
>> +    uint32_t upcall_type;
>> +    uint64_t addr;
>> +    uint64_t size;
>> +
>> +    bpf_usdt_readarg(2, ctx, &upcall_type);
>> +    if (upcall_type != 0)
>> +        return 0;
>> +
>> +    struct event_t *event = init_event(EVENT_RECV_UPCALL);
>> +    if (!event)
>> +        return 1;
>> +
>> +    bpf_usdt_readarg(1, ctx, &addr);
>> +    bpf_probe_read(&event->dpif_name, sizeof(event->dpif_name), (void *)addr);
>
> nit: if it's ok, we could use bpf_probe_read_str() here too.

ACK, will fix in v2.

<SNIP>

>> +
>> +#
>> +# Event types
>> +#
>> +class EventType(StrEnum):
>> +    RECV_UPCALL = 'dpif_recv__recv_upcall'
>> +    DP_UPCALL = 'openvswitch__dp_upcall'
>> +    OP_FLOW_PUT = 'netlink_opperate__op_flow_put'
>> +    OP_FLOW_EXECUTE = 'netlink_opperate__op_flow_execute'
>
> besides the typo, shouldn't this be (for both PUT and EXECUTE)
> dpif_netlink_operate__* ? (not sure about the number of underscores,
> though, considering that the provider is dpif_netlink_operate__ :)

I kept them short to not overrun on line length, but I guess it truncates to 79 for the number of events, so we should be fine.

Will change in v2 to dpif_netlink_operate__*.


<SNIP>

>> +
>> +        #
>> +        # Dump out all events
>> +        #
>> +        print("- Capturing events [Press ^C to stop]...")
>> +        events_received = 0
>> +
>> +        b['events'].open_ring_buffer(receive_event_bcc)
>
> What do you think about adding a header (even not aligned is ok AFAIC)
> before printing the events (when not quiet) like you did with:
>
> => Events received per type (usable/total) [missed events]


Good idea, thought I had this, but I guess I got confused with upcall_monitor ;)

Header will be something like this:

EVENT                    COMM                  PID  CPU  TIME                EVENT DATA[dpif_name/dp_port/pkt_len/pkt_frag_len]


<SNIP>

>> +
>> +    show_histogram(kernel_to_vswitchd,
>> +                   description="Kernel upcall action to vswitchd receive "
>> +                   "(microseconds)",
>> +                   options=options)
>> +    show_histogram(vswitchd_to_kernel,
>> +                   description="vsiwtchd execute to kernel receive "
>
> typo: vswitchd

Will fix in V2.

<SNIP>
diff mbox series

Patch

diff --git a/Documentation/topics/usdt-probes.rst b/Documentation/topics/usdt-probes.rst
index 1f6446e12..cfe57f1a3 100644
--- a/Documentation/topics/usdt-probes.rst
+++ b/Documentation/topics/usdt-probes.rst
@@ -200,11 +200,96 @@  used naming convention.
 
 Available probes in ``ovs_vswitchd``:
 
+- dpif_netlink_operate\_\_:op_flow_del
+- dpif_netlink_operate\_\_:op_flow_execute
+- dpif_netlink_operate\_\_:op_flow_get
+- dpif_netlink_operate\_\_:op_flow_put
 - dpif_recv:recv_upcall
 - main:poll_block
 - main:run_start
 
 
+dpif_netlink_operate\_\_:op_flow_del
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+**Description**:
+
+This probe gets triggered when the Netlink datapath is about to execute the
+DPIF_OP_FLOW_DEL operation as part of the dpif ``operate()`` callback.
+
+**Arguments**:
+
+- *arg0*: ``(struct dpif_netlink *) dpif``
+- *arg1*: ``(struct dpif_flow_del *) del``
+- *arg2*: ``(struct dpif_netlink_flow *) flow``
+- *arg3*: ``(struct ofpbuf *) aux->request``
+
+**Script references**:
+
+- *None*
+
+
+dpif_netlink_operate\_\_:op_flow_execute
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+**Description**:
+
+This probe gets triggered when the Netlink datapath is about to execute the
+DPIF_OP_FLOW_EXECUTE operation as part of the dpif ``operate()`` callback.
+
+**Arguments**:
+
+- *arg0*: ``(struct dpif_netlink *) dpif``
+- *arg1*: ``(struct dpif_execute *) op->execute``
+- *arg2*: ``dp_packet_data(op->execute.packet)``
+- *arg3*: ``dp_packet_size(op->execute.packet)``
+- *arg4*: ``(struct ofpbuf *) aux->request``
+
+**Script references**:
+
+- ``/utilities/usdt_scripts/upcall_cost.py``
+
+
+dpif_netlink_operate\_\_:op_flow_get
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+**Description**:
+
+This probe gets triggered when the Netlink datapath is about to execute the
+DPIF_OP_FLOW_GET operation as part of the dpif ``operate()`` callback.
+
+**Arguments**:
+
+- *arg0*: ``(struct dpif_netlink *) dpif``
+- *arg1*: ``(struct dpif_flow_get *) get``
+- *arg2*: ``(struct dpif_netlink_flow *) flow``
+- *arg3*: ``(struct ofpbuf *) aux->request``
+
+**Script references**:
+
+- *None*
+
+
+dpif_netlink_operate\_\_:op_flow_put
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+**Description**:
+
+This probe gets triggered when the Netlink datapath is about to execute the
+DPIF_OP_FLOW_PUT operation as part of the dpif ``operate()`` callback.
+
+**Arguments**:
+
+- *arg0*: ``(struct dpif_netlink *) dpif``
+- *arg1*: ``(struct dpif_flow_put *) put``
+- *arg2*: ``(struct dpif_netlink_flow *) flow``
+- *arg3*: ``(struct ofpbuf *) aux->request``
+
+**Script references**:
+
+- ``/utilities/usdt_scripts/upcall_cost.py``
+
+
 probe dpif_recv:recv_upcall
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
@@ -227,6 +312,7 @@  sent to ``ovs-vswitchd``.
 
 **Script references**:
 
+- ``/utilities/usdt_scripts/upcall_cost.py``
 - ``/utilities/usdt_scripts/upcall_monitor.py``
 
 
diff --git a/lib/dpif-netlink.c b/lib/dpif-netlink.c
index 424a28401..52c229568 100644
--- a/lib/dpif-netlink.c
+++ b/lib/dpif-netlink.c
@@ -56,6 +56,7 @@ 
 #include "openvswitch/poll-loop.h"
 #include "openvswitch/shash.h"
 #include "openvswitch/thread.h"
+#include "openvswitch/usdt_probes.h"
 #include "openvswitch/vlog.h"
 #include "packets.h"
 #include "random.h"
@@ -2052,6 +2053,9 @@  dpif_netlink_operate__(struct dpif_netlink *dpif,
                 aux->txn.reply = &aux->reply;
             }
             dpif_netlink_flow_to_ofpbuf(&flow, &aux->request);
+
+            OVS_USDT_PROBE(dpif_netlink_operate__, op_flow_put,
+                           dpif, put, &flow, &aux->request);
             break;
 
         case DPIF_OP_FLOW_DEL:
@@ -2062,6 +2066,9 @@  dpif_netlink_operate__(struct dpif_netlink *dpif,
                 aux->txn.reply = &aux->reply;
             }
             dpif_netlink_flow_to_ofpbuf(&flow, &aux->request);
+
+            OVS_USDT_PROBE(dpif_netlink_operate__, op_flow_del,
+                           dpif, del, &flow, &aux->request);
             break;
 
         case DPIF_OP_EXECUTE:
@@ -2082,6 +2089,12 @@  dpif_netlink_operate__(struct dpif_netlink *dpif,
             } else {
                 dpif_netlink_encode_execute(dpif->dp_ifindex, &op->execute,
                                             &aux->request);
+
+                OVS_USDT_PROBE(dpif_netlink_operate__, op_flow_execute,
+                               dpif, &op->execute,
+                               dp_packet_data(op->execute.packet),
+                               dp_packet_size(op->execute.packet),
+                               &aux->request);
             }
             break;
 
@@ -2090,6 +2103,9 @@  dpif_netlink_operate__(struct dpif_netlink *dpif,
             dpif_netlink_init_flow_get(dpif, get, &flow);
             aux->txn.reply = get->buffer;
             dpif_netlink_flow_to_ofpbuf(&flow, &aux->request);
+
+            OVS_USDT_PROBE(dpif_netlink_operate__, op_flow_get,
+                           dpif, get, &flow, &aux->request);
             break;
 
         default:
diff --git a/utilities/automake.mk b/utilities/automake.mk
index 382f8e789..7d0bddcd8 100644
--- a/utilities/automake.mk
+++ b/utilities/automake.mk
@@ -63,6 +63,7 @@  EXTRA_DIST += \
 	utilities/docker/debian/Dockerfile \
 	utilities/docker/debian/build-kernel-modules.sh \
 	utilities/usdt_scripts/bridge_loop.bt \
+	utilities/usdt_scripts/upcall_cost.py \
 	utilities/usdt_scripts/upcall_monitor.py
 MAN_ROOTS += \
 	utilities/ovs-testcontroller.8.in \
@@ -131,6 +132,7 @@  FLAKE8_PYFILES += utilities/ovs-pcap.in \
 	utilities/ovs-check-dead-ifs.in \
 	utilities/ovs-tcpdump.in \
 	utilities/ovs-pipegen.py \
-	utilities/usdt_scripts/upcall_monitor.py
+	utilities/usdt_scripts/upcall_monitor.py \
+	utilities/usdt_scripts/upcall_cost.py
 
 include utilities/bugtool/automake.mk
diff --git a/utilities/usdt_scripts/upcall_cost.py b/utilities/usdt_scripts/upcall_cost.py
new file mode 100755
index 000000000..1af8b981f
--- /dev/null
+++ b/utilities/usdt_scripts/upcall_cost.py
@@ -0,0 +1,1774 @@ 
+#!/usr/bin/env python3
+#
+# Copyright (c) 2021 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Script information:
+# -------------------
+# upcall_cost.py uses various user space and kernel space probes to determine
+# the costs (in time) for handling the first packet in user space. It
+# calculates the following costs:
+#
+# - Time it takes from the kernel sending the upcall till it's received by the
+#   ovs-vswitchd process.
+# - Time it takes from ivs-vswitchd sending the execute actions command till
+#   the kernel receives it.
+# - The total time it takes from the kernel to sent the upcall until it
+#   receives the packet execute command.
+# - The total time of the above, minus the time it takes for the actual lookup.
+#
+# In addition, it will also report the number of packets batched, as OVS will
+# first try to read UPCALL_MAX_BATCH(64) packets from kernel space and then
+# does the flow lookups and execution. So the smaller the batch size, the more
+# realistic are the cost estimates.
+#
+# The script does not need any options to attach to a running instance of
+# ovs-vswitchd. However, it's recommended always run the script with the
+# --write-events option. This way, if something does go wrong, the collected
+# data is saved. Use the --help option to see all the available options.
+#
+# Note: In addition to the bcc tools for your specific setup, you need the
+#       following Python packages:
+#         pip install alive-progress halo psutil scapy strenum text_histogram3
+#
+
+try:
+    from bcc import BPF, USDT, USDTException
+except ModuleNotFoundError:
+    print("WARNING: Can't find the BPF Compiler Collection (BCC) tools!")
+    print("         This is NOT problem if you analyzing previously collected"
+          " data.\n")
+from alive_progress import alive_bar
+from collections import namedtuple
+from halo import Halo
+from scapy.all import TCP, UDP
+from scapy.layers.l2 import Ether
+from strenum import StrEnum
+from text_histogram3 import histogram
+from time import process_time
+
+import argparse
+import ast
+import psutil
+import re
+import struct
+import subprocess
+import sys
+import time
+
+#
+# Global definitions
+#
+DP_TUNNEL_PORT = -1
+
+
+#
+# Actual eBPF source code
+#
+ebpf_source = """
+#include <linux/sched.h>
+#include <linux/skbuff.h>
+#include <uapi/linux/bpf.h>
+
+#define MAX_PACKET <MAX_PACKET_VAL>
+#define MAX_KEY    <MAX_KEY_VAL>
+
+enum {
+    EVENT_RECV_UPCALL = 0,
+    EVENT_DP_UPCALL,
+    EVENT_OP_FLOW_PUT,
+    EVENT_OP_FLOW_EXECUTE,
+    EVENT_OVS_PKT_EXEC,
+    _EVENT_MAX_EVENT
+};
+
+#define barrier_var(var) asm volatile("" : "=r"(var) : "0"(var))
+
+struct event_t {
+    u32 event;
+    u32 cpu;
+    u32 pid;
+    u32 upcall_type;
+    u64 ts;
+    u32 pkt_frag_size;
+    u32 pkt_size;
+    u64 key_size;
+    char comm[TASK_COMM_LEN];
+    char dpif_name[32];
+    char dev_name[16];
+    unsigned char pkt[MAX_PACKET];
+    unsigned char key[MAX_KEY];
+};
+
+BPF_RINGBUF_OUTPUT(events, <BUFFER_PAGE_CNT>);
+BPF_TABLE("percpu_array", uint32_t, uint64_t, dropcnt, _EVENT_MAX_EVENT);
+
+static struct event_t *init_event(u32 type)
+{
+    struct event_t *event = events.ringbuf_reserve(sizeof(struct event_t));
+
+    if (!event) {
+        uint64_t *value = dropcnt.lookup(&type);
+        if (value)
+            __sync_fetch_and_add(value, 1);
+
+        return NULL;
+    }
+
+    event->event = type;
+    event->ts = bpf_ktime_get_ns();
+    event->cpu =  bpf_get_smp_processor_id();
+    event->pid = bpf_get_current_pid_tgid();
+    bpf_get_current_comm(&event->comm, sizeof(event->comm));
+
+    return event;
+}
+
+int trace__recv_upcall(struct pt_regs *ctx) {
+    uint32_t upcall_type;
+    uint64_t addr;
+    uint64_t size;
+
+    bpf_usdt_readarg(2, ctx, &upcall_type);
+    if (upcall_type != 0)
+        return 0;
+
+    struct event_t *event = init_event(EVENT_RECV_UPCALL);
+    if (!event)
+        return 1;
+
+    bpf_usdt_readarg(1, ctx, &addr);
+    bpf_probe_read(&event->dpif_name, sizeof(event->dpif_name), (void *)addr);
+
+    event->upcall_type = upcall_type;
+    bpf_usdt_readarg(4, ctx, &event->pkt_size);
+    bpf_usdt_readarg(6, ctx, &event->key_size);
+
+    if (event->pkt_size > MAX_PACKET)
+        size = MAX_PACKET;
+    else
+        size = event->pkt_size;
+    bpf_usdt_readarg(3, ctx, &addr);
+    bpf_probe_read(&event->pkt, size, (void *)addr);
+
+    if (event->key_size > MAX_KEY)
+        size = MAX_KEY;
+    else
+        size = event->key_size;
+    bpf_usdt_readarg(5, ctx, &addr);
+    bpf_probe_read(&event->key, size, (void *)addr);
+
+    events.ringbuf_submit(event, 0);
+    return 0;
+};
+
+
+int trace__op_flow_put(struct pt_regs *ctx) {
+    uint64_t addr;
+    uint64_t size;
+
+    struct event_t *event = init_event(EVENT_OP_FLOW_PUT);
+    if (!event) {
+        return 1;
+    }
+
+    events.ringbuf_submit(event, 0);
+    return 0;
+};
+
+
+int trace__op_flow_execute(struct pt_regs *ctx) {
+    uint64_t addr;
+    uint64_t size;
+
+    struct event_t *event = init_event(EVENT_OP_FLOW_EXECUTE);
+    if (!event) {
+        return 1;
+    }
+
+    bpf_usdt_readarg(4, ctx, &event->pkt_size);
+
+    if (event->pkt_size > MAX_PACKET)
+        size = MAX_PACKET;
+    else
+        size = event->pkt_size;
+    bpf_usdt_readarg(3, ctx, &addr);
+    bpf_probe_read(&event->pkt, size, (void *)addr);
+
+    events.ringbuf_submit(event, 0);
+    return 0;
+};
+
+
+TRACEPOINT_PROBE(openvswitch, ovs_dp_upcall) {
+    uint64_t size;
+    struct sk_buff *skb = args->skbaddr;
+
+    if (args->upcall_cmd != 1 || skb == NULL || skb->data == NULL)
+        return 0;
+
+    struct event_t *event = init_event(EVENT_DP_UPCALL);
+    if (!event) {
+        return 1;
+    }
+
+    event->upcall_type = args->upcall_cmd;
+    event->pkt_size = args->len;
+
+    TP_DATA_LOC_READ_CONST(&event->dpif_name, dp_name,
+                           sizeof(event->dpif_name));
+    TP_DATA_LOC_READ_CONST(&event->dev_name, dev_name,
+                           sizeof(event->dev_name));
+
+    if (skb->data_len != 0) {
+        event->pkt_frag_size = (skb->len - skb->data_len) & 0xfffffff;
+        size = event->pkt_frag_size;
+    } else {
+        event->pkt_frag_size = 0;
+        size = event->pkt_size;
+    }
+
+    /* Prevent clang from using register mirroring (or any optimization) on
+     * the 'size' variable. */
+    barrier_var(size);
+
+    if (size > MAX_PACKET)
+        size = MAX_PACKET;
+    bpf_probe_read_kernel(event->pkt, size, skb->data);
+
+    events.ringbuf_submit(event, 0);
+    return 0;
+}
+
+int kprobe__ovs_packet_cmd_execute(struct pt_regs *ctx, struct sk_buff *skb)
+{
+    uint64_t size;
+
+    if (skb == NULL || skb->data == NULL)
+        return 0;
+
+    struct event_t *event = init_event(EVENT_OVS_PKT_EXEC);
+    if (!event) {
+        return 1;
+    }
+
+    events.ringbuf_submit(event, 0);
+    return 0;
+}
+"""
+
+
+#
+# Event types
+#
+class EventType(StrEnum):
+    RECV_UPCALL = 'dpif_recv__recv_upcall'
+    DP_UPCALL = 'openvswitch__dp_upcall'
+    OP_FLOW_PUT = 'netlink_opperate__op_flow_put'
+    OP_FLOW_EXECUTE = 'netlink_opperate__op_flow_execute'
+    OVS_PKT_EXEC = 'ktrace__ovs_packet_cmd_execute'
+
+    def short_name(name, length=22):
+        if len(name) < length:
+            return name
+
+        return '..' + name[-(length - 2):]
+
+    def from_trace(trace_event):
+        if trace_event == 0:
+            return EventType.RECV_UPCALL
+        elif trace_event == 1:
+            return EventType.DP_UPCALL
+        elif trace_event == 2:
+            return EventType.OP_FLOW_PUT
+        elif trace_event == 3:
+            return EventType.OP_FLOW_EXECUTE
+        elif trace_event == 4:
+            return EventType.OVS_PKT_EXEC
+
+        raise ValueError
+
+
+#
+# Simple event class
+#
+class Event(object):
+    def __init__(self, ts, pid, comm, cpu, event_type):
+        self.ts = ts
+        self.pid = pid
+        self.comm = comm
+        self.cpu = cpu
+        self.event_type = event_type
+
+    def __str__(self):
+        return "[{:<22}] {:<16} {:8} [{:03}] {:18.9f}".format(
+            EventType.short_name(self.event_type),
+            self.comm,
+            self.pid,
+            self.cpu,
+            self.ts / 1000000000)
+
+    def __repr__(self):
+        more = ""
+        if self.__class__.__name__ != "Event":
+            more = ", ..."
+
+        return "{}({}, {}, {}, {}, {}{})".format(self.__class__.__name__,
+                                                 self.ts, self.pid,
+                                                 self.comm, self.cpu,
+                                                 self.event_type, more)
+
+    def handle_event(event):
+        event = Event(event.ts, event.pid, event.comm.decode("utf-8"),
+                      event.cpu, EventType.from_trace(event.event))
+
+        if not options.quiet:
+            print(event)
+
+        return event
+
+
+#
+# dp_upcall event class
+#
+class DpUpcall(Event):
+    def __init__(self, ts, pid, comm, cpu, dpif_name, port, pkt, pkt_len,
+                 pkt_frag_len):
+        super(DpUpcall, self).__init__(ts, pid, comm, cpu, EventType.DP_UPCALL)
+        self.dpif_name = dpif_name
+        self.dp_port = get_dp_mapping(dpif_name, port)
+        if self.dp_port is None:
+            #
+            # As we only identify interfaces at startup, new interfaces could
+            # have been added, causing the lookup to fail. Just something to
+            # keep in mind when running this in a dynamic environment.
+            #
+            raise LookupError("Can't find datapath port mapping!")
+        self.pkt = pkt
+        self.pkt_len = pkt_len
+        self.pkt_frag_len = pkt_frag_len
+
+    def __str__(self):
+        return "[{:<22}] {:<16} {:8} [{:03}] {:18.9f}: " \
+               "{:<17} {:4} {:4} {:4}".format(self.event_type,
+                                              self.comm,
+                                              self.pid,
+                                              self.cpu,
+                                              self.ts / 1000000000,
+                                              self.dpif_name,
+                                              self.dp_port,
+                                              self.pkt_len,
+                                              self.pkt_frag_len)
+
+    def handle_event(event):
+        if event.pkt_size < options.packet_size:
+            pkt_len = event.pkt_size
+        else:
+            pkt_len = options.packet_size
+
+        pkt_data = bytes(event.pkt)[:pkt_len]
+
+        if len(pkt_data) <= 0 or event.pkt_size == 0:
+            return
+
+        try:
+            event = DpUpcall(event.ts, event.pid, event.comm.decode("utf-8"),
+                             event.cpu, event.dpif_name.decode("utf-8"),
+                             event.dev_name.decode("utf-8"),
+                             pkt_data,
+                             event.pkt_size,
+                             event.pkt_frag_size)
+        except LookupError:
+            #
+            # If we can't do the port lookup, ignore this event.
+            #
+            return None
+
+        if not options.quiet:
+            print(event)
+
+        return event
+
+
+#
+# recv_upcall event class
+#
+class RecvUpcall(Event):
+    def __init__(self, ts, pid, comm, cpu, dpif_name, key, pkt, pkt_len):
+        super(RecvUpcall, self).__init__(ts, pid, comm, cpu,
+                                         EventType.RECV_UPCALL)
+
+        if dpif_name.startswith("system@"):
+            dpif_name = dpif_name[len("system@"):]
+        self.dpif_name = dpif_name
+
+        nla = RecvUpcall.decode_nlm(key, dump=False)
+        if "OVS_KEY_ATTR_IN_PORT" in nla:
+            self.dp_port = struct.unpack('=L', nla["OVS_KEY_ATTR_IN_PORT"])[0]
+        elif "OVS_KEY_ATTR_TUNNEL" in nla:
+            self.dp_port = DP_TUNNEL_PORT
+        else:
+            self.dp_port = RecvUpcall.get_system_dp_port(self.dpif_name)
+
+        if self.dp_port is None:
+            raise LookupError("Can't find RecvUpcall dp port mapping!")
+
+        self.pkt = pkt
+        self.pkt_len = pkt_len
+
+    def __str__(self):
+        return "[{:<22}] {:<16} {:8} [{:03}] {:18.9f}: {:<17} {:4} {:4}". \
+            format(
+                self.event_type,
+                self.comm,
+                self.pid,
+                self.cpu,
+                self.ts / 1000000000,
+                self.dpif_name,
+                self.dp_port,
+                self.pkt_len)
+
+    def get_system_dp_port(dpif_name):
+        dp_map = get_dp_mapping(dpif_name, "ovs-system", return_map=True)
+        if dpif_name not in dp_map:
+            return None
+        try:
+            return dp_map[dpif_name]["ovs-system"]
+        except KeyError:
+            return None
+
+    def decode_nlm(msg, indent=4, dump=True):
+        bytes_left = len(msg)
+        result = {}
+
+        while bytes_left:
+            if bytes_left < 4:
+                if dump:
+                    print("{}WARN: decode truncated; can't read header".format(
+                        ' ' * indent))
+                break
+
+            nla_len, nla_type = struct.unpack("=HH", msg[:4])
+
+            if nla_len < 4:
+                if dump:
+                    print("{}WARN: decode truncated; nla_len < 4".format(
+                        ' ' * indent))
+                break
+
+            nla_data = msg[4:nla_len]
+            trunc = ""
+
+            if nla_len > bytes_left:
+                trunc = "..."
+                nla_data = nla_data[:(bytes_left - 4)]
+                if RecvUpcall.get_ovs_key_attr_str(nla_type) == \
+                   "OVS_KEY_ATTR_TUNNEL":
+                    #
+                    # If we have truncated tunnel information, we still would
+                    # like to know. This is due to the special tunnel handling
+                    # needed for port matching.
+                    #
+                    result[RecvUpcall.get_ovs_key_attr_str(nla_type)] = bytes()
+            else:
+                result[RecvUpcall.get_ovs_key_attr_str(nla_type)] = nla_data
+
+            if dump:
+                print("{}nla_len {}, nla_type {}[{}], data: {}{}".format(
+                    ' ' * indent, nla_len,
+                    RecvUpcall.get_ovs_key_attr_str(nla_type),
+                    nla_type,
+                    "".join("{:02x} ".format(b) for b in nla_data), trunc))
+
+            if trunc != "":
+                if dump:
+                    print("{}WARN: decode truncated; nla_len > msg_len[{}] ".
+                          format(' ' * indent, bytes_left))
+                break
+
+            # Update next offset, but make sure it's aligned correctly.
+            next_offset = (nla_len + 3) & ~(3)
+            msg = msg[next_offset:]
+            bytes_left -= next_offset
+
+        return result
+
+    def get_ovs_key_attr_str(attr):
+        ovs_key_attr = ["OVS_KEY_ATTR_UNSPEC",
+                        "OVS_KEY_ATTR_ENCAP",
+                        "OVS_KEY_ATTR_PRIORITY",
+                        "OVS_KEY_ATTR_IN_PORT",
+                        "OVS_KEY_ATTR_ETHERNET",
+                        "OVS_KEY_ATTR_VLAN",
+                        "OVS_KEY_ATTR_ETHERTYPE",
+                        "OVS_KEY_ATTR_IPV4",
+                        "OVS_KEY_ATTR_IPV6",
+                        "OVS_KEY_ATTR_TCP",
+                        "OVS_KEY_ATTR_UDP",
+                        "OVS_KEY_ATTR_ICMP",
+                        "OVS_KEY_ATTR_ICMPV6",
+                        "OVS_KEY_ATTR_ARP",
+                        "OVS_KEY_ATTR_ND",
+                        "OVS_KEY_ATTR_SKB_MARK",
+                        "OVS_KEY_ATTR_TUNNEL",
+                        "OVS_KEY_ATTR_SCTP",
+                        "OVS_KEY_ATTR_TCP_FLAGS",
+                        "OVS_KEY_ATTR_DP_HASH",
+                        "OVS_KEY_ATTR_RECIRC_ID",
+                        "OVS_KEY_ATTR_MPLS",
+                        "OVS_KEY_ATTR_CT_STATE",
+                        "OVS_KEY_ATTR_CT_ZONE",
+                        "OVS_KEY_ATTR_CT_MARK",
+                        "OVS_KEY_ATTR_CT_LABELS",
+                        "OVS_KEY_ATTR_CT_ORIG_TUPLE_IPV4",
+                        "OVS_KEY_ATTR_CT_ORIG_TUPLE_IPV6",
+                        "OVS_KEY_ATTR_NSH"]
+
+        if attr < 0 or attr > len(ovs_key_attr):
+            return "<UNKNOWN>"
+
+        return ovs_key_attr[attr]
+
+    def handle_event(event):
+        #
+        # For us, only upcalls with a packet, flow_key, and upcall_type
+        # DPIF_UC_MISS are of interest.
+        #
+        if event.pkt_size <= 0 or event.key_size <= 0 or \
+           event.upcall_type != 0:
+            return
+
+        if event.key_size < options.flow_key_size:
+            key_len = event.key_size
+        else:
+            key_len = options.flow_key_size
+
+        if event.pkt_size < options.packet_size:
+            pkt_len = event.pkt_size
+        else:
+            pkt_len = options.packet_size
+
+        try:
+            event = RecvUpcall(event.ts, event.pid, event.comm.decode("utf-8"),
+                               event.cpu, event.dpif_name.decode("utf-8"),
+                               bytes(event.key)[:key_len],
+                               bytes(event.pkt)[:pkt_len],
+                               event.pkt_size)
+        except LookupError:
+            return None
+
+        if not options.quiet:
+            print(event)
+
+        return event
+
+
+#
+# op_flow_execute event class
+#
+class OpFlowExecute(Event):
+    def __init__(self, ts, pid, comm, cpu, pkt, pkt_len):
+        super(OpFlowExecute, self).__init__(ts, pid, comm, cpu,
+                                            EventType.OP_FLOW_EXECUTE)
+        self.pkt = pkt
+        self.pkt_len = pkt_len
+
+    def __str__(self):
+        return "[{:<22}] {:<16} {:8} [{:03}] {:18.9f}: " \
+               "{:<17} {:4} {:4}".format(EventType.short_name(self.event_type),
+                                         self.comm,
+                                         self.pid,
+                                         self.cpu,
+                                         self.ts / 1000000000,
+                                         "",
+                                         "",
+                                         self.pkt_len)
+
+    def handle_event(event):
+        if event.pkt_size < options.packet_size:
+            pkt_len = event.pkt_size
+        else:
+            pkt_len = options.packet_size
+
+        pkt_data = bytes(event.pkt)[:pkt_len]
+
+        if len(pkt_data) <= 0 or event.pkt_size == 0:
+            return
+
+        event = OpFlowExecute(event.ts, event.pid, event.comm.decode("utf-8"),
+                              event.cpu, pkt_data, event.pkt_size)
+
+        if not options.quiet:
+            print(event)
+
+        return event
+
+
+#
+# get_dp_mapping()
+#
+def get_dp_mapping(dp, port, return_map=False, dp_map=None):
+    if options.unit_test:
+        return port
+
+    if dp_map is not None:
+        get_dp_mapping.dp_port_map_cache = dp_map
+
+    #
+    # Build a cache, so we do not have to execue the ovs command each time.
+    #
+    if not hasattr(get_dp_mapping, "dp_port_map_cache"):
+        try:
+            output = subprocess.check_output(['ovs-appctl', 'dpctl/show'],
+                                             encoding='utf8').split("\n")
+        except subprocess.CalledProcessError:
+            output = ""
+            pass
+
+        current_dp = None
+        get_dp_mapping.dp_port_map_cache = {}
+
+        for line in output:
+            match = re.match("^system@(.*):$", line)
+            if match is not None:
+                current_dp = match.group(1)
+
+            match = re.match("^  port ([0-9]+): ([^ /]*)", line)
+            if match is not None and current_dp is not None:
+                try:
+                    get_dp_mapping.dp_port_map_cache[
+                        current_dp][match.group(2)] = int(match.group(1))
+                except KeyError:
+                    get_dp_mapping.dp_port_map_cache[current_dp] = \
+                        {match.group(2): int(match.group(1))}
+
+    if return_map:
+        return get_dp_mapping.dp_port_map_cache
+
+    if dp not in get_dp_mapping.dp_port_map_cache or \
+       port not in get_dp_mapping.dp_port_map_cache[dp]:
+        return None
+
+    return get_dp_mapping.dp_port_map_cache[dp][port]
+
+
+#
+# event_to_dict()
+#
+def event_to_dict(event):
+    event_dict = {}
+
+    for field, _ in event._fields_:
+        if isinstance(getattr(event, field), (int, bytes)):
+            event_dict[field] = getattr(event, field)
+        else:
+            if (field == "key" and event.key_size == 0) or \
+               (field == "pkt" and event.pkt_size == 0):
+                data = bytes()
+            else:
+                data = bytes(getattr(event, field))
+
+            event_dict[field] = data
+
+    return event_dict
+
+
+#
+# receive_event_bcc()
+#
+def receive_event_bcc(ctx, data, size):
+    global events_received
+    events_received += 1
+
+    event = b['events'].event(data)
+
+    if export_file is not None:
+        export_file.write("event = {}\n".format(event_to_dict(event)))
+
+    receive_event(event)
+
+
+#
+# receive_event()
+#
+def receive_event(event):
+    global event_count
+
+    if event.event == 0:
+        trace_event = RecvUpcall.handle_event(event)
+    elif event.event == 1:
+        trace_event = DpUpcall.handle_event(event)
+    elif event.event == 2:
+        trace_event = Event.handle_event(event)
+    elif event.event == 3:
+        trace_event = OpFlowExecute.handle_event(event)
+    elif event.event == 4:
+        trace_event = Event.handle_event(event)
+
+    try:
+        event_count['total'][EventType.from_trace(event.event)] += 1
+    except KeyError:
+        event_count['total'][EventType.from_trace(event.event)] = 1
+        event_count['valid'][EventType.from_trace(event.event)] = 0
+
+    if trace_event is not None:
+        event_count['valid'][EventType.from_trace(event.event)] += 1
+        trace_data.append(trace_event)
+
+
+#
+# collect_event_sets()
+#
+def collect_event_sets(events, collect_stats=False, profile=False,
+                       spinner=False):
+    t1_time = 0
+
+    def t1_start():
+        nonlocal t1_time
+        t1_time = process_time()
+
+    def t1_stop(description):
+        print("* PROFILING: {:<50}: {:.06f} seconds".format(
+            description, process_time() - t1_time))
+
+    warn_parcial_match = False
+    warn_frag = False
+
+    if profile:
+        t1_start()
+    #
+    # First let's create a dict of per handler thread events.
+    #
+    threads = {}
+    threads_result = {}
+    for idx, event in enumerate(events):
+        if event.event_type == EventType.DP_UPCALL:
+            continue
+        if event.pid not in threads:
+            threads[event.pid] = []
+        threads[event.pid].append([idx, event])
+
+    if profile:
+        t1_stop("Creating per thread dictionary")
+        t1_start()
+    #
+    # Now spit them in per upcall sets, but remember that
+    # RecvUpcall event can be batched.
+    #
+    batch_stats = []
+    for thread, items in threads.items():
+        thread_set = []
+        batch = []
+        ovs_pkt_exec_set = []
+        batching = True
+        collecting = 0
+        has_flow_put = False
+        has_flow_exec = False
+
+        def next_batch():
+            nonlocal batching, batch, collecting, has_flow_put, has_flow_exec
+            nonlocal ovs_pkt_exec_set, thread_set
+
+            if len(batch) > 0:
+                #
+                # If we are done with the batch, see if we need to match up
+                # any batched OVS_PKT_EXEC events.
+                #
+                for event in batch:
+                    if len(ovs_pkt_exec_set) <= 0:
+                        break
+                    if any(isinstance(item,
+                                      OpFlowExecute) for item in event[2]):
+                        event[2].append(ovs_pkt_exec_set.pop(0))
+                #
+                # Append the batch to the thread-specific set.
+                #
+                thread_set = thread_set + batch
+                if collect_stats:
+                    batch_stats.append(len(batch))
+            batching = True
+            batch = []
+            ovs_pkt_exec_set = []
+            has_flow_put = False
+            has_flow_exec = False
+            collecting = 0
+
+        def next_batch_set():
+            nonlocal has_flow_put, has_flow_exec, collecting
+            has_flow_put = False
+            has_flow_exec = False
+            collecting += 1
+
+        for item in items:
+            idx, event = item
+
+            if batching:
+                if event.event_type == EventType.RECV_UPCALL:
+                    batch.append(item + [[]])
+                elif len(batch) > 0:
+                    batching = False
+                    collecting = 0
+                else:
+                    continue
+
+            if not batching:
+                if event.event_type == EventType.RECV_UPCALL:
+                    next_batch()
+                    batch.append(item + [[]])
+                else:
+                    if event.event_type == EventType.OP_FLOW_PUT:
+                        if has_flow_put:
+                            next_batch_set()
+                            if collecting >= len(batch):
+                                next_batch()
+                                continue
+
+                        batch[collecting][2].append(item[1])
+                        has_flow_put = True
+
+                    elif event.event_type == EventType.OP_FLOW_EXECUTE:
+                        if has_flow_exec:
+                            next_batch_set()
+                            if collecting >= len(batch):
+                                next_batch()
+                                continue
+
+                        if (event.pkt_len == batch[collecting][1].pkt_len
+                           and event.pkt == batch[collecting][1].pkt):
+                            batch[collecting][2].append(item[1])
+                            has_flow_put = True
+                            has_flow_exec = True
+                        else:
+                            #
+                            # If we end up here it could be that an upcall in a
+                            # batch did not generate an EXECUTE and we are out
+                            # of sync. Try to match it to the next batch entry.
+                            #
+                            next_idx = collecting + 1
+                            while True:
+                                if next_idx >= len(batch):
+                                    next_batch()
+                                    break
+                                if (event.pkt_len == batch[next_idx][1].pkt_len
+                                   and event.pkt == batch[next_idx][1].pkt):
+
+                                    batch[next_idx][2] = batch[collecting][2]
+                                    batch[collecting][2] = []
+                                    collecting = next_idx
+                                    batch[collecting][2].append(item[1])
+                                    has_flow_put = True
+                                    has_flow_exec = True
+                                    break
+
+                                next_idx += 1
+
+                    elif event.event_type == EventType.OVS_PKT_EXEC:
+                        #
+                        # The OVS_PKT_EXEC might also be batched, so we keep
+                        # them in a separate list and assign them to the
+                        # correct set when completing the set.
+                        #
+                        ovs_pkt_exec_set.append(item[1])
+                        continue
+
+                    if collecting >= len(batch):
+                        next_batch()
+        next_batch()
+        threads_result[thread] = thread_set
+
+    if profile:
+        t1_stop("Creating upcall sets")
+        t1_start()
+
+    #
+    # Move thread results from list to dictionary
+    #
+    thread_stats = {}
+    for thread, sets in threads_result.items():
+        if len(sets) > 0:
+            thread_stats[sets[0][1].comm] = len(sets)
+
+        threads_result[thread] = {}
+        for upcall in sets:
+            threads_result[thread][upcall[0]] = [upcall[1]] + upcall[2]
+
+    if profile:
+        t1_stop("Moving upcall list to dictionary")
+        t1_start()
+
+    if options.debug & 0x4000000 != 0:
+        print()
+        for thread, sets in threads_result.items():
+            for idx, idx_set in sets.items():
+                print("DBG: {}".format(idx_set))
+
+    #
+    # Create two lists on with DP_UPCALLs and RECV_UPCALLs
+    #
+    dp_upcall_list = []
+    recv_upcall_list = []
+    for idx, event in enumerate(events):
+        if event.event_type == EventType.DP_UPCALL:
+            dp_upcall_list.append([idx, event])
+        elif event.event_type == EventType.RECV_UPCALL:
+            recv_upcall_list.append([idx, event])
+
+    if profile:
+        t1_stop("Creating DP_UPCALL and RECV_UPCALL lists")
+        t1_start()
+
+    if options.debug & 0x4000000 != 0:
+        print()
+        for dp_upcall in dp_upcall_list:
+            print("DBG: {}".format(dp_upcall))
+        print()
+        for recv_upcall in recv_upcall_list:
+            print("DBG: {}".format(recv_upcall))
+
+    #
+    # Now find the matching DP_UPCALL and RECV_UPCALL events
+    #
+    event_sets = []
+    if spinner:
+        print()
+    with alive_bar(len(dp_upcall_list),
+                   title="- Matching DP_UPCALLs to RECV_UPCALLs",
+                   spinner=None, disable=not spinner) as bar:
+
+        for (idx, event) in dp_upcall_list:
+            remove_indexes = []
+            this_set = None
+            #
+            # TODO: This part needs some optimization, as it's slow in the
+            #       PVP test scenario. This is because a lot of DP_UPCALLS
+            #       will not have a matching RECV_UPCALL leading to walking
+            #       the entire recv_upcall_list list.
+            #
+            #       Probably some dictionary, but in the PVP scenario packets
+            #       come from a limited set of ports, and the length is all the
+            #       same. So we do need the key to be recv.dport +
+            #       len(recv.pkt) + recv.pkt, however, the recv.pkt compare
+            #       needs to happen on  min(len(event.pkt), len(recv.pkt)).
+            #
+            for idx_in_list, (idx_recv, recv) in enumerate(recv_upcall_list):
+                match = False
+
+                if idx_recv < idx:
+                    remove_indexes.append(idx_in_list)
+                    continue
+                #
+                # If the RecvUpcall is a tunnel port, we can not map it to
+                # the correct tunnel. For now, we assume the first matching
+                # packet is the correct one. For more details see the OVS
+                # ukey_to_flow_netdev() function.
+                #
+                if (event.dp_port == recv.dp_port or
+                    recv.dp_port == DP_TUNNEL_PORT) \
+                   and event.pkt_len == recv.pkt_len:
+
+                    compare_len = min(len(event.pkt), len(recv.pkt))
+
+                    if len(event.pkt) != len(recv.pkt) \
+                       and event.pkt_frag_len == 0:
+                        warn_parcial_match = True
+                    elif event.pkt_frag_len != 0:
+                        warn_frag = True
+                        compare_len = min(compare_len, event.pkt_frag_len)
+
+                    if event.pkt[:compare_len] == recv.pkt[:compare_len]:
+                        match = True
+                    else:
+                        #
+                        # There are still some corner cases due to the fact
+                        # the kernel dp_upcall tracepoint is hit before the
+                        # packet is prepared/modified for upcall pass on.
+                        # Example cases are packet checksum update, VLAN
+                        # insertion, etc., etc. For now, we try to handle the
+                        # checksum part, but we might need to add more
+                        # exceptions in the future.
+                        #
+                        diff_bytes = sum(i != j for i, j in zip(
+                            event.pkt[:compare_len], recv.pkt[:compare_len]))
+
+                        if diff_bytes <= 2 and compare_len > 56:
+                            # This could be a TCP or UDP checksum
+                            event_pkt = Ether(bytes(event.pkt)[:compare_len])
+                            recv_pkt = Ether(bytes(recv.pkt)[:compare_len])
+                            if (event_pkt.haslayer(TCP) and
+                                recv_pkt.haslayer(TCP)) or (
+                                    event_pkt.haslayer(UDP) and
+                                    recv_pkt.haslayer(UDP)):
+
+                                if event_pkt.haslayer(TCP):
+                                    event_chksum = event_pkt[TCP].chksum
+                                    recv_chksum = recv_pkt[TCP].chksum
+                                else:
+                                    event_chksum = event_pkt[UDP].chksum
+                                    recv_chksum = recv_pkt[UDP].chksum
+
+                                if event_chksum & 0xff != recv_chksum & 0xff:
+                                    diff_bytes -= 1
+                                if event_chksum & 0xff00 != \
+                                   recv_chksum & 0xff00:
+                                    diff_bytes -= 1
+
+                        if diff_bytes == 0:
+                            match = True
+
+                    if match:
+                        this_set = {event.event_type: event}
+                        for sevent in threads_result[recv.pid][idx_recv]:
+                            this_set[sevent.event_type] = sevent
+                        event_sets.append(this_set)
+                        remove_indexes.append(idx_in_list)
+
+                        if options.debug & 0x4000000 != 0:
+                            print("DBG: Matched DpUpcall({:6}) => "
+                                  "RecvUpcall({:6})".format(idx, idx_recv))
+
+                        break
+
+                    elif options.debug & 0x8000000 != 0:
+                        print("DBG: COMPARE DpUpcall({:6}) != "
+                              "RecvUpcall({:6})".format(idx, idx_recv))
+                        event_pkt = Ether(bytes(event.pkt)[:compare_len])
+                        recv_pkt = Ether(bytes(recv.pkt)[:compare_len])
+                        print(re.sub('^', 'DBG:' + ' ' * 4,
+                                     event_pkt.show(dump=True),
+                                     flags=re.MULTILINE))
+                        print(re.sub('^', 'DBG:' + ' ' * 4,
+                                     recv_pkt.show(dump=True),
+                                     flags=re.MULTILINE))
+
+                elif options.debug & 0x8000000 != 0:
+                    print("DBG: COMPATE DpUpcall({:6}) != "
+                          "RecvUpcall({:6}) -> port {}, {} -> "
+                          "len = {}, {}".format(idx, idx_recv,
+                                                event.dp_port,
+                                                recv.dp_port,
+                                                event.pkt_len,
+                                                recv.pkt_len))
+
+            bar()
+            for remove_idx in sorted(remove_indexes, reverse=True):
+                del recv_upcall_list[remove_idx]
+
+    if profile:
+        t1_stop("Matching DP_UPCALLs to a set")
+
+    if warn_parcial_match:
+        print("WARNING: Packets not fully captured for matching!\n         "
+              "Increase the packet buffer with the '--packet-size' option.")
+    if warn_frag:
+        print("WARNING: SKB from kernel had fragments, we could only copy/"
+              "compare the first part!")
+
+    if collect_stats:
+        return event_sets, batch_stats, thread_stats
+
+    return event_sets
+
+
+#
+# unit_test()
+#
+def unit_test():
+    pkt1 = b'\x01\x02\x03\x04\x05'
+    pkt2 = b'\x01\x02\x03\x04\x06'
+    pkt3 = b'\x01\x02\x03\x04\x07'
+    key = b'\x08\x00\x03\x00\x01\x00\x00\x00'  # Port 1
+    #
+    # Basic test with all events in line
+    #
+    t1_events = [DpUpcall(1, 100, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
+                 RecvUpcall(2, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
+                 Event(3, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+                 OpFlowExecute(4, 1, "hndl", 1, pkt1, len(pkt1)),
+                 Event(5, 1, "hndl", 1, EventType.OVS_PKT_EXEC)]
+    t1_result = [{EventType.DP_UPCALL: t1_events[0],
+                  EventType.RECV_UPCALL: t1_events[1],
+                  EventType.OP_FLOW_PUT: t1_events[2],
+                  EventType.OP_FLOW_EXECUTE: t1_events[3],
+                  EventType.OVS_PKT_EXEC: t1_events[4]}]
+    #
+    # Basic test with missing flow put
+    #
+    t2_events = [DpUpcall(1, 100, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
+                 RecvUpcall(2, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
+                 OpFlowExecute(4, 1, "hndl", 1, pkt1, len(pkt1)),
+                 Event(5, 1, "hndl", 1, EventType.OVS_PKT_EXEC)]
+    t2_result = [{EventType.DP_UPCALL: t2_events[0],
+                  EventType.RECV_UPCALL: t2_events[1],
+                  EventType.OP_FLOW_EXECUTE: t2_events[2],
+                  EventType.OVS_PKT_EXEC: t2_events[3]}]
+    #
+    # Test with RecvUpcall's being batched
+    #
+    t3_events = [DpUpcall(1, 101, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
+                 DpUpcall(2, 102, "ping", 2, "system", 1, pkt2, len(pkt2), 0),
+                 DpUpcall(3, 101, "ping", 3, "system", 1, pkt3, len(pkt3), 0),
+                 RecvUpcall(4, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
+                 RecvUpcall(5, 1, "hndl", 1, "systen", key, pkt3, len(pkt3)),
+                 RecvUpcall(6, 1, "hndl", 1, "systen", key, pkt2, len(pkt2)),
+                 Event(7, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+                 OpFlowExecute(8, 1, "hndl", 1, pkt1, len(pkt1)),
+                 Event(9, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
+                 Event(10, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+                 OpFlowExecute(11, 1, "hndl", 1, pkt3, len(pkt3)),
+                 Event(12, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
+                 Event(13, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+                 OpFlowExecute(14, 1, "hndl", 1, pkt2, len(pkt2)),
+                 Event(15, 1, "hndl", 1, EventType.OVS_PKT_EXEC)]
+    t3_result = [{EventType.DP_UPCALL: t3_events[0],
+                  EventType.RECV_UPCALL: t3_events[3],
+                  EventType.OP_FLOW_PUT: t3_events[6],
+                  EventType.OP_FLOW_EXECUTE: t3_events[7],
+                  EventType.OVS_PKT_EXEC: t3_events[8]},
+                 {EventType.DP_UPCALL: t3_events[1],
+                  EventType.RECV_UPCALL: t3_events[5],
+                  EventType.OP_FLOW_PUT: t3_events[12],
+                  EventType.OP_FLOW_EXECUTE: t3_events[13],
+                  EventType.OVS_PKT_EXEC: t3_events[14]},
+                 {EventType.DP_UPCALL: t3_events[2],
+                  EventType.RECV_UPCALL: t3_events[4],
+                  EventType.OP_FLOW_PUT: t3_events[9],
+                  EventType.OP_FLOW_EXECUTE: t3_events[10],
+                  EventType.OVS_PKT_EXEC: t3_events[11]}]
+    #
+    # Test with RecvUpcall's single + batch
+    #
+    t4_events = [DpUpcall(1, 100, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
+                 RecvUpcall(2, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
+                 Event(3, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+                 OpFlowExecute(4, 1, "hndl", 1, pkt1, len(pkt1)),
+                 Event(5, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
+                 DpUpcall(6, 101, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
+                 DpUpcall(7, 102, "ping", 2, "system", 1, pkt2, len(pkt2), 0),
+                 DpUpcall(8, 101, "ping", 3, "system", 1, pkt3, len(pkt3), 0),
+                 RecvUpcall(9, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
+                 RecvUpcall(10, 1, "hndl", 1, "systen", key, pkt3, len(pkt3)),
+                 RecvUpcall(11, 1, "hndl", 1, "systen", key, pkt2, len(pkt2)),
+                 Event(12, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+                 OpFlowExecute(13, 1, "hndl", 1, pkt1, len(pkt1)),
+                 Event(14, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
+                 Event(15, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+                 OpFlowExecute(16, 1, "hndl", 1, pkt3, len(pkt3)),
+                 Event(17, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
+                 Event(18, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+                 OpFlowExecute(14, 1, "hndl", 1, pkt2, len(pkt2)),
+                 Event(19, 1, "hndl", 1, EventType.OVS_PKT_EXEC)]
+    t4_result = [{EventType.DP_UPCALL: t4_events[0],
+                  EventType.RECV_UPCALL: t4_events[1],
+                  EventType.OP_FLOW_PUT: t4_events[2],
+                  EventType.OP_FLOW_EXECUTE: t4_events[3],
+                  EventType.OVS_PKT_EXEC: t4_events[4]},
+                 {EventType.DP_UPCALL: t4_events[5],
+                  EventType.RECV_UPCALL: t4_events[8],
+                  EventType.OP_FLOW_PUT: t4_events[11],
+                  EventType.OP_FLOW_EXECUTE: t4_events[12],
+                  EventType.OVS_PKT_EXEC: t4_events[13]},
+                 {EventType.DP_UPCALL: t4_events[6],
+                  EventType.RECV_UPCALL: t4_events[10],
+                  EventType.OP_FLOW_PUT: t4_events[17],
+                  EventType.OP_FLOW_EXECUTE: t4_events[18],
+                  EventType.OVS_PKT_EXEC: t4_events[19]},
+                 {EventType.DP_UPCALL: t4_events[7],
+                  EventType.RECV_UPCALL: t4_events[9],
+                  EventType.OP_FLOW_PUT: t4_events[14],
+                  EventType.OP_FLOW_EXECUTE: t4_events[15],
+                  EventType.OVS_PKT_EXEC: t4_events[16]}]
+    #
+    # Test with two threads interleaved
+    #
+    t5_events = [DpUpcall(1, 100, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
+                 DpUpcall(2, 100, "ping", 1, "system", 1, pkt2, len(pkt2), 0),
+                 RecvUpcall(3, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
+                 RecvUpcall(4, 2, "hndl", 2, "systen", key, pkt2, len(pkt2)),
+                 Event(5, 2, "hndl", 2, EventType.OP_FLOW_PUT),
+                 Event(6, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+                 OpFlowExecute(7, 2, "hndl", 1, pkt2, len(pkt2)),
+                 OpFlowExecute(8, 1, "hndl", 1, pkt1, len(pkt1)),
+                 Event(9, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
+                 Event(10, 2, "hndl", 1, EventType.OVS_PKT_EXEC)]
+    t5_result = [{EventType.DP_UPCALL: t5_events[0],
+                  EventType.RECV_UPCALL: t5_events[2],
+                  EventType.OP_FLOW_PUT: t5_events[5],
+                  EventType.OP_FLOW_EXECUTE: t5_events[7],
+                  EventType.OVS_PKT_EXEC: t5_events[8]},
+                 {EventType.DP_UPCALL: t5_events[1],
+                  EventType.RECV_UPCALL: t5_events[3],
+                  EventType.OP_FLOW_PUT: t5_events[4],
+                  EventType.OP_FLOW_EXECUTE: t5_events[6],
+                  EventType.OVS_PKT_EXEC: t5_events[9]}]
+    #
+    # Test batch with missing events
+    #
+    t6_events = [DpUpcall(1, 101, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
+                 DpUpcall(2, 102, "ping", 2, "system", 1, pkt2, len(pkt2), 0),
+                 RecvUpcall(3, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
+                 RecvUpcall(4, 1, "hndl", 1, "systen", key, pkt2, len(pkt2)),
+                 Event(5, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+                 OpFlowExecute(6, 1, "hndl", 1, pkt2, len(pkt2)),
+                 Event(7, 1, "hndl", 1, EventType.OVS_PKT_EXEC)]
+    t6_result = [{EventType.DP_UPCALL: t6_events[0],
+                  EventType.RECV_UPCALL: t6_events[2]},
+                 {EventType.DP_UPCALL: t6_events[1],
+                  EventType.RECV_UPCALL: t6_events[3],
+                  EventType.OP_FLOW_PUT: t6_events[4],
+                  EventType.OP_FLOW_EXECUTE: t6_events[5],
+                  EventType.OVS_PKT_EXEC: t6_events[6]}]
+    #
+    # Test with RecvUpcall's and OVS_PKT_EXEC being batched
+    #
+    t7_events = [DpUpcall(1, 101, "ping", 1, "system", 1, pkt1, len(pkt1), 0),
+                 DpUpcall(2, 102, "ping", 2, "system", 1, pkt2, len(pkt2), 0),
+                 DpUpcall(3, 101, "ping", 3, "system", 1, pkt3, len(pkt3), 0),
+                 RecvUpcall(4, 1, "hndl", 1, "systen", key, pkt1, len(pkt1)),
+                 RecvUpcall(5, 1, "hndl", 1, "systen", key, pkt2, len(pkt2)),
+                 RecvUpcall(6, 1, "hndl", 1, "systen", key, pkt3, len(pkt3)),
+                 Event(7, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+                 OpFlowExecute(8, 1, "hndl", 1, pkt1, len(pkt1)),
+                 Event(9, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+                 OpFlowExecute(10, 1, "hndl", 1, pkt2, len(pkt2)),
+                 Event(11, 1, "hndl", 1, EventType.OP_FLOW_PUT),
+                 OpFlowExecute(12, 1, "hndl", 1, pkt3, len(pkt3)),
+                 Event(13, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
+                 Event(14, 1, "hndl", 1, EventType.OVS_PKT_EXEC),
+                 Event(15, 1, "hndl", 1, EventType.OVS_PKT_EXEC)]
+    t7_result = [{EventType.DP_UPCALL: t7_events[0],
+                  EventType.RECV_UPCALL: t7_events[3],
+                  EventType.OP_FLOW_PUT: t7_events[6],
+                  EventType.OP_FLOW_EXECUTE: t7_events[7],
+                  EventType.OVS_PKT_EXEC: t7_events[12]},
+                 {EventType.DP_UPCALL: t7_events[1],
+                  EventType.RECV_UPCALL: t7_events[4],
+                  EventType.OP_FLOW_PUT: t7_events[8],
+                  EventType.OP_FLOW_EXECUTE: t7_events[9],
+                  EventType.OVS_PKT_EXEC: t7_events[13]},
+                 {EventType.DP_UPCALL: t7_events[2],
+                  EventType.RECV_UPCALL: t7_events[5],
+                  EventType.OP_FLOW_PUT: t7_events[10],
+                  EventType.OP_FLOW_EXECUTE: t7_events[11],
+                  EventType.OVS_PKT_EXEC: t7_events[14]}]
+    #
+    # Actual test sets
+    #
+    test_set = [["Simple single event", t1_events, t1_result],
+                ["Single event, missing flow_put", t2_events, t2_result],
+                ["Batched events", t3_events, t3_result],
+                ["Single + batched events", t4_events, t4_result],
+                ["Two sets, different threads", t5_events, t5_result],
+                ["Batch with missing exec", t6_events, t6_result],
+                ["Batched events including exec", t7_events, t7_result]]
+
+    print("Running some simple unit tests:")
+
+    for test in test_set:
+        print("- {:<32}  ".format(test[0]), end="")
+        result = collect_event_sets(test[1][:])
+        if result == test[2]:
+            print("PASS")
+        else:
+            print("FAIL")
+            print("  OUTPUT  :")
+            for event_set in result:
+                hdr = "    - "
+                for event_type, event in event_set.items():
+                    print("{} {:<16}: {}".format(hdr, event_type.name, event))
+                    hdr = "      "
+            print("  EXPECTED:")
+            for event_set in test[2]:
+                hdr = "    - "
+                for event_type, event in event_set.items():
+                    print("{} {:<16}: {}".format(hdr, event_type.name, event))
+                    hdr = "      "
+
+
+#
+# show_key_value()
+#
+def show_key_value(data_set, description=None):
+    if description is not None:
+        print("\n=> {}:".format(description))
+
+    for k, v in data_set.items():
+        print("  {:36}: {:>10}".format(str(k), str(v)))
+
+
+#
+# show_batch_histogram()
+#
+def show_batch_histogram(data_set, description=None):
+    nr_of_buckets = 64
+
+    if description is not None:
+        print("\n=> {}:".format(description))
+
+    if len(data_set) == 0:
+        print("# NumSamples = 0")
+        return
+
+    min_val = nr_of_buckets
+    max_val = 0
+    entries = 0
+    high_buckets = 0
+    buckets = [0] * nr_of_buckets
+
+    for entry in data_set:
+        min_val = min(min_val, entry)
+        max_val = max(max_val, entry)
+        if entry == 0:
+            continue
+        elif entry > nr_of_buckets:
+            high_buckets += 1
+        else:
+            buckets[entry - 1] += 1
+
+        entries += 1
+
+    if max(buckets + [high_buckets]) > 4:
+        scale = int(max(buckets + [high_buckets]) / 4)
+    else:
+        scale = 1
+
+    print("# NumSamples = {}; Min = {}; Max = {}".format(entries, min_val,
+                                                         max_val))
+    print("# each ∎ represents a count of {}".format(scale))
+
+    for idx in range(int(nr_of_buckets / 2)):
+        idx_2nd = idx + int(nr_of_buckets / 2)
+        print("{:5} [{:8}]: {:22}  {:5} [{:8}]: {:22}".format(
+            idx + 1, buckets[idx], "∎" * int(buckets[idx] / scale),
+            idx_2nd + 1, buckets[idx_2nd],
+            "∎" * int(buckets[idx_2nd] / scale)))
+
+    if high_buckets > 0:
+        print("{:>5} [{:8}]: {:22}".format(">" + str(nr_of_buckets),
+                                           high_buckets,
+                                           "∎" * int(high_buckets / scale)))
+
+
+#
+# show_histogram()
+#
+def show_histogram(data_set, description=None, options=None,
+                   minimum=None, maximum=None, buckets=None, custbuckets=None):
+    if description is not None:
+        print("\n=> {}:".format(description))
+
+    if options is not None:
+        if buckets is None:
+            buckets = options.histogram_buckets
+        if options is not None and options.sets:
+            print(data_set)
+
+    if len(data_set) == 0:
+        print("# NumSamples = 0")
+    elif len(data_set) == 1:
+        print("# NumSamples = 1; Min = {0:.4f}; Max = {0:.4f}".
+              format(data_set[0]))
+    elif len(set(data_set)) == 1 and maximum is None and minimum is None and \
+            custbuckets is None:
+        histogram(data_set, buckets=buckets, minimum=list(set(data_set))[0],
+                  maximum=list(set(data_set))[0] + 1)
+    else:
+        histogram(data_set, buckets=buckets,
+                  minimum=minimum, maximum=maximum, custbuckets=custbuckets)
+
+
+#
+# buffer_size_type()
+#
+def buffer_size_type(astr, min=64, max=2048):
+    value = int(astr)
+    if min <= value <= max:
+        return value
+    else:
+        raise argparse.ArgumentTypeError(
+            'value not in range {}-{}'.format(min, max))
+
+
+#
+# next_power_of_two()
+#
+def next_port_of_two(val):
+    np = 1
+    while np < val:
+        np *= 2
+    return np
+
+
+#
+# main()
+#
+def main():
+    #
+    # Don't like these globals, but ctx passing does not seem to work with the
+    # existing open_ring_buffer() API :(
+    #
+    global b
+    global options
+    global trace_data
+    global events_received
+    global event_count
+    global export_file
+
+    #
+    # Argument parsing
+    #
+    parser = argparse.ArgumentParser()
+
+    parser.add_argument("-b", "--histogram-buckets",
+                        help="Number of buckets per histogram, default 20",
+                        type=int, default=20, metavar="BUCKETS")
+    parser.add_argument("--buffer-page-count",
+                        help="Number of BPF ring buffer pages, default 1024",
+                        type=int, default=1024, metavar="NUMBER")
+    parser.add_argument("-D", "--debug",
+                        help="Enable eBPF debugging",
+                        type=lambda x: int(x, 0), const=0x3f, default=0,
+                        nargs='?')
+    parser.add_argument("-f", "--flow-key-size",
+                        help="Set maximum flow key size to capture, "
+                        "default 64", type=buffer_size_type, default=64,
+                        metavar="[64-2048]")
+    parser.add_argument("--handler-filter",
+                        help="Post processing handler thread filter",
+                        type=str, default=None, metavar="HANDLERS")
+    parser.add_argument("-P", "--packet-size",
+                        help="Set maximum packet size to capture, "
+                        "default 256", type=buffer_size_type, default=256,
+                        metavar="[64-2048]")
+    parser.add_argument("-p", "--pid", metavar="VSWITCHD_PID",
+                        help="ovs-vswitch's PID",
+                        type=int, default=None)
+    parser.add_argument("-q", "--quiet", action="store_true",
+                        help="Do not show individual events")
+    parser.add_argument("-r", "--read-events",
+                        help="Read events from FILE instead of installing "
+                        "tracepoints", type=str, default=None, metavar="FILE")
+    parser.add_argument("--sets", action="store_true",
+                        help="Dump content of data sets")
+    parser.add_argument("-s", "--stop",
+                        help="Stop after receiving EVENTS number of trace "
+                        "events",
+                        type=int, default=0, metavar="EVENTS")
+    parser.add_argument("--unit-test", action="store_true",
+                        help=argparse.SUPPRESS)
+    parser.add_argument("-w", "--write-events",
+                        help="Write events to FILE",
+                        type=str, default=None, metavar="FILE")
+
+    options = parser.parse_args()
+
+    if options.unit_test:
+        unit_test()
+        sys.exit(0)
+
+    #
+    # Find the PID of the ovs-vswitchd daemon if not specified.
+    #
+    if options.pid is None and options.read_events is None:
+        for proc in psutil.process_iter():
+            if 'ovs-vswitchd' in proc.name():
+                if options.pid is not None:
+                    print("ERROR: Multiple ovs-vswitchd daemons running, "
+                          "use the -p option!")
+                    sys.exit(-1)
+
+                options.pid = proc.pid
+
+    #
+    # Error checking on input parameters.
+    #
+    if options.pid is None and options.read_events is None:
+        print("ERROR: Failed to find ovs-vswitchd's PID!")
+        sys.exit(-1)
+
+    if options.read_events is not None and options.write_events is not None:
+        print("ERROR: Either supply the read or write events option, "
+              "not both!")
+        sys.exit(-1)
+
+    if options.handler_filter is not None and options.read_events is None:
+        print("ERROR: The --handler-filter option is only valid with the "
+              "--read-events option!")
+        sys.exit(-1)
+
+    options.buffer_page_count = next_port_of_two(options.buffer_page_count)
+
+    #
+    # Open write handle if needed.
+    #
+    if options.write_events is not None:
+        try:
+            export_file = open(options.write_events, "w")
+        except (FileNotFoundError, IOError, PermissionError) as e:
+            print("ERROR: Can't create export file \"{}\": {}".format(
+                options.write_events, e.strerror))
+            sys.exit(-1)
+    else:
+        export_file = None
+
+    trace_data = []
+    event_count = {'total': {}, 'valid': {}, 'miss': {}}
+    if options.read_events is None:
+        #
+        # Call get_dp_mapping() to prepare the cache
+        #
+        dp_port_map = get_dp_mapping("ovs-system", "eth0", return_map=True)
+        if export_file is not None:
+            export_file.write("dp_port_map = {}\n".format(dp_port_map))
+
+        #
+        # Attach the usdt probe
+        #
+        u = USDT(pid=int(options.pid))
+        try:
+            u.enable_probe(probe="recv_upcall", fn_name="trace__recv_upcall")
+            u.enable_probe(probe="op_flow_put", fn_name="trace__op_flow_put")
+            u.enable_probe(probe="op_flow_execute",
+                           fn_name="trace__op_flow_execute")
+        except USDTException as e:
+            print("ERROR: {}"
+                  "ovs-vswitchd!".format(
+                      (re.sub('^', ' ' * 7, str(e),
+                              flags=re.MULTILINE)).strip().
+                      replace("--with-dtrace or --enable-dtrace",
+                              "--enable-usdt-probes")))
+            sys.exit(-1)
+
+        #
+        # Uncomment to see how arguments are decoded.
+        #   print(u.get_text())
+        #
+        print("- Compiling eBPF programs...")
+
+        #
+        # Attach probes to the running process
+        #
+        source = ebpf_source.replace("<MAX_PACKET_VAL>",
+                                     str(options.packet_size))
+        source = source.replace("<MAX_KEY_VAL>", str(options.flow_key_size))
+        source = source.replace("<BUFFER_PAGE_CNT>",
+                                str(options.buffer_page_count))
+
+        b = BPF(text=source, usdt_contexts=[u], debug=options.debug & 0xffffff)
+
+        #
+        # Dump out all events
+        #
+        print("- Capturing events [Press ^C to stop]...")
+        events_received = 0
+
+        b['events'].open_ring_buffer(receive_event_bcc)
+        while 1:
+            try:
+                b.ring_buffer_poll()
+                if options.stop != 0 and events_received >= options.stop:
+                    break
+                time.sleep(0.5)
+            except KeyboardInterrupt:
+                break
+
+        dropcnt = b.get_table("dropcnt")
+        export_misses = {}
+        for k in dropcnt.keys():
+            event = EventType.from_trace(k.value)
+            count = dropcnt.sum(k).value
+            if count > 0:
+                if event not in event_count['total']:
+                    event_count['total'][event] = 0
+                    event_count['valid'][event] = 0
+                event_count['miss'][event] = count
+                export_misses[k.value] = count
+
+        if options.write_events is not None:
+            if sum(event_count['miss'].values()) > 0:
+                export_file.write("event_miss = {}\n".format(export_misses))
+
+            export_file.close()
+
+        print()
+    else:
+        #
+        # Here we are requested to read event from an event export
+        #
+        thread_filter = None
+        if options.handler_filter is not None:
+            thread_filter = options.handler_filter.split(',')
+
+        try:
+            dp_port_mapping_valid = False
+            with open(options.read_events, 'r') as fd:
+                events_received = 0
+
+                if options.quiet:
+                    spinner = Halo(spinner="dots", color="cyan",
+                                   text="Reading events from \"{}\"...".format(
+                                       options.read_events))
+                    spinner.start()
+                else:
+                    print("- Reading events from \"{}\"...".format(
+                        options.read_events))
+                for entry in fd:
+                    if options.stop != 0 and events_received >= options.stop:
+                        break
+
+                    entry.rstrip()
+                    if entry.startswith('dp_port_map = {'):
+                        if not dp_port_mapping_valid:
+                            dp_port_mapping_valid = True
+                            get_dp_mapping("", "",
+                                           dp_map=ast.literal_eval(entry[14:]))
+                    elif (entry.startswith('event = {') and
+                          dp_port_mapping_valid):
+                        event = ast.literal_eval(entry[8:])
+                        event = namedtuple("EventObject",
+                                           event.keys())(*event.values())
+
+                        if thread_filter is not None \
+                           and EventType.from_trace(event.event) != \
+                           EventType.DP_UPCALL \
+                           and event.comm.decode("utf-8") not in thread_filter:
+                            # Skip none filtered threads
+                            continue
+
+                        if len(event.pkt) > 0:
+                            options.packet_size = len(event.pkt)
+                        if len(event.key) > 0:
+                            options.flow_key_size = len(event.key)
+                        receive_event(event)
+                        events_received += 1
+                    elif entry.startswith('event_miss = {'):
+                        misses = ast.literal_eval(entry[13:])
+                        for e, count in misses.items():
+                            event = EventType.from_trace(e)
+                            if count > 0:
+                                if event not in event_count['total']:
+                                    event_count['total'][event] = 0
+                                    event_count['valid'][event] = 0
+                                event_count['miss'][event] = count
+
+            if options.quiet:
+                spinner.stop()
+                print("- Reading events from \"{}\"...".format(
+                    options.read_events))
+
+        except (FileNotFoundError, PermissionError):
+            print("ERROR: Can't open file \"{}\" for reading!".format(
+                options.read_events))
+            sys.exit(-1)
+
+    #
+    # Start analyzing the data
+    #
+    print("- Analyzing results ({} events)...".format(len(trace_data)))
+
+    if events_received > 0:
+        if sum(event_count['miss'].values()) > 0:
+            print("\nWARNING: Not all events were captured!\n         "
+                  "Increase the BPF ring buffer size with the "
+                  "--buffer-page-count option.")
+
+        print("\n=> Events received per type (usable/total) [missed events]:")
+        for event, total in sorted(event_count['total'].items()):
+            miss = event_count['miss'][event] if event in event_count['miss'] \
+                else 0
+            print("  {:36}: {:10}/{:10} [{:10}]".format(
+                event, event_count['valid'][event], total, miss))
+
+    collection, batch_stats, thread_stats = collect_event_sets(
+        trace_data, collect_stats=True, spinner=True)
+
+    if len(collection) <= 0:
+        print("No upcall data sets where found!!")
+        sys.exit(0)
+
+    print("\n- Analyzing {} event sets...".format(len(collection)))
+
+    if options.debug & 0x1000000 != 0:
+        for upcall in collection:
+            print("DBG: {}{}{}{}{}".format(
+                "U" if EventType.DP_UPCALL in upcall else "-",
+                "u" if EventType.RECV_UPCALL in upcall else "-",
+                "p" if EventType.OP_FLOW_PUT in upcall else "-",
+                "e" if EventType.OP_FLOW_EXECUTE in upcall else "-",
+                "E" if EventType.OVS_PKT_EXEC in upcall else "-"))
+            if options.debug & 0x2000000 != 0:
+                try:
+                    print("DBG:  - {}".format(upcall[EventType.DP_UPCALL]))
+                    print("DBG:  - {}".format(upcall[EventType.RECV_UPCALL]))
+                    print("DBG:  - {}".format(upcall[EventType.OP_FLOW_PUT]))
+                    print("DBG:  - {}".format(
+                        upcall[EventType.OP_FLOW_EXECUTE]))
+                    print("DBG:  - {}".format(upcall[EventType.OVS_PKT_EXEC]))
+                except LookupError:
+                    continue
+
+    show_key_value(thread_stats, description="Upcalls handled per thread")
+    show_batch_histogram(batch_stats,
+                         description="Histogram of upcalls per batch")
+
+    kernel_to_vswitchd = []
+    kernel_to_kernel_exec = []
+    vswitchd_to_kernel = []
+    time_minus_lookup = []
+    for upcall in collection:
+        kernel_to_vswitchd.append((upcall[EventType.RECV_UPCALL].ts -
+                                   upcall[EventType.DP_UPCALL].ts) /
+                                  1000)
+
+        if EventType.OP_FLOW_PUT in upcall and \
+           EventType.OVS_PKT_EXEC in upcall:
+            time_minus_lookup.append(
+                ((upcall[EventType.OVS_PKT_EXEC].ts -
+                  upcall[EventType.DP_UPCALL].ts) -
+                 (upcall[EventType.OP_FLOW_PUT].ts -
+                  upcall[EventType.RECV_UPCALL].ts)) / 1000)
+
+        if EventType.OP_FLOW_EXECUTE in upcall and \
+           EventType.OVS_PKT_EXEC in upcall:
+            vswitchd_to_kernel.append((upcall[EventType.OVS_PKT_EXEC].ts
+                                       - upcall[EventType.OP_FLOW_EXECUTE].ts)
+                                      / 1000)
+
+        if EventType.OVS_PKT_EXEC in upcall:
+            kernel_to_kernel_exec.append((upcall[EventType.OVS_PKT_EXEC].ts -
+                                          upcall[EventType.DP_UPCALL].ts) /
+                                         1000)
+
+    show_histogram(kernel_to_vswitchd,
+                   description="Kernel upcall action to vswitchd receive "
+                   "(microseconds)",
+                   options=options)
+    show_histogram(vswitchd_to_kernel,
+                   description="vsiwtchd execute to kernel receive "
+                   "(microseconds)",
+                   options=options)
+    show_histogram(time_minus_lookup,
+                   description="Upcall overhead (total time minus lookup) "
+                   "(microseconds)",
+                   options=options)
+    show_histogram(kernel_to_kernel_exec,
+                   description="Kernel upcall to kernel packet execute "
+                   "(microseconds)",
+                   options=options)
+
+
+#
+# Start main() as the default entry point...
+#
+if __name__ == '__main__':
+    main()