Message ID | 20240220214753.2799436-2-aconole@redhat.com |
---|---|
State | Changes Requested |
Headers | show |
Series | debugging: Add a revalidator probe, and monitor script | expand |
Context | Check | Description |
---|---|---|
ovsrobot/apply-robot | success | apply and check: success |
ovsrobot/github-robot-_Build_and_Test | success | github build: passed |
ovsrobot/intel-ovs-compilation | success | test: success |
On 20 Feb 2024, at 22:47, Aaron Conole wrote: > From: Kevin Sprague <ksprague0711@gmail.com> > > During normal operations, it is useful to understand when a particular flow > gets removed from the system. This can be useful when debugging performance > issues tied to ofproto flow changes, trying to determine deployed traffic > patterns, or while debugging dynamic systems where ports come and go. > > Prior to this change, there was a lack of visibility around flow expiration. > The existing debugging infrastructure could tell us when a flow was added to > the datapath, but not when it was removed or why. > > This change introduces a USDT probe at the point where the revalidator > determines that the flow should be removed. Additionally, we track the > reason for the flow eviction and provide that information as well. With > this change, we can track the complete flow lifecycle for the netlink > datapath by hooking the upcall tracepoint in kernel, the flow put USDT, and > the revalidator USDT, letting us watch as flows are added and removed from > the kernel datapath. > > This change only enables this information via USDT probe, so it won't be > possible to access this information any other way (see: > Documentation/topics/usdt-probes.rst). > > Also included is a script (utilities/usdt-scripts/flow_reval_monitor.py) > which serves as a demonstration of how the new USDT probe might be used > going forward. > > Acked-by: Han Zhou <hzhou@ovn.org> > Signed-off-by: Kevin Sprague <ksprague0711@gmail.com> > Co-authored-by: Aaron Conole <aconole@redhat.com> > Signed-off-by: Aaron Conole <aconole@redhat.com> Thanks for doing the v9, some small comments remain below. Cheers, Eelco > --- > v8 -> v9: Reorganized the flow delete reasons enum > Updated flow_reval_monitor to use pahole to extract fields > Added the purge reason with a proper USDT point > Updated documentation > Dropped all the outstanding ACKs > > Documentation/topics/usdt-probes.rst | 43 + > ofproto/ofproto-dpif-upcall.c | 48 +- > utilities/automake.mk | 3 + > utilities/usdt-scripts/flow_reval_monitor.py | 997 +++++++++++++++++++ > 4 files changed, 1085 insertions(+), 6 deletions(-) > create mode 100755 utilities/usdt-scripts/flow_reval_monitor.py > > diff --git a/Documentation/topics/usdt-probes.rst b/Documentation/topics/usdt-probes.rst > index e527f43bab..015614a6b8 100644 > --- a/Documentation/topics/usdt-probes.rst > +++ b/Documentation/topics/usdt-probes.rst > @@ -214,8 +214,10 @@ Available probes in ``ovs_vswitchd``: > - dpif_recv:recv_upcall > - main:poll_block > - main:run_start > +- revalidate:flow_result > - revalidate_ukey\_\_:entry > - revalidate_ukey\_\_:exit > +- revalidator_sweep\_\_:flow_result > - udpif_revalidator:start_dump > - udpif_revalidator:sweep_done > > @@ -443,6 +445,47 @@ sweep phase was completed. > - ``utilities/usdt-scripts/reval_monitor.py`` > > > +probe revalidate:flow_result > +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ > + > +**Description**: > +This probe is triggered when the revalidator has executed on a particular > +flow key to make a determination whether to evict a flow, and the cause > +for eviction. The revalidator runs periodically, and this probe will only > +be triggered when a flow is flagged for revalidation. > + > +**Arguments**: > + > +- *arg0*: ``(enum reval_result) result`` > +- *arg1*: ``(enum flow_del_reason) reason`` nit: variable name changed, so should be del_reason. > +- *arg2*: ``(struct udpif *) udpif`` > +- *arg3*: ``(struct udpif_key *) ukey`` > + I think you missed my previous comment on re-ordering the arguments to be more inline with existing probes, i.e.: + OVS_USDT_PROBE(revalidator_sweep__, flow_result, udpif, ukey, + result, del_reason); > +**Script references**: > + > +- ``utilities/usdt-scripts/flow_reval_monitor.py`` > + > + > +probe revalidator_sweep\_\_:flow_result > +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ > + > +**Description**: > +This probe is placed in the path of the revalidator sweep, and is executed > +under the condition that a flow entry is in an unexpected state, or the > +flows were asked to be purged due to a user action. > + > +**Arguments**: > + > +- *arg0*: ``(enum reval_result) result`` > +- *arg1*: ``(enum flow_del_reason) reason`` nit: variable name changed, so should be del_reason. > +- *arg2*: ``(struct udpif *) udpif`` > +- *arg3*: ``(struct udpif_key *) ukey`` See comments above on argument ordering. > + > +**Script references**: > + > +- ``utilities/usdt-scripts/flow_reval_monitor.py`` > + > + > Adding your own probes > ---------------------- > > diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c > index b5cbeed878..fbc7858690 100644 > --- a/ofproto/ofproto-dpif-upcall.c > +++ b/ofproto/ofproto-dpif-upcall.c > @@ -269,6 +269,20 @@ enum ukey_state { > }; > #define N_UKEY_STATES (UKEY_DELETED + 1) > > +enum flow_del_reason { > + FDR_NONE = 0, /* No deletion reason for the flow. */ > + FDR_AVOID_CACHING, /* Flow deleted to avoid caching. */ > + FDR_BAD_ODP_FIT, /* The flow had a bad ODP flow fit. */ > + FDR_FLOW_IDLE, /* The flow went unused and was deleted. */ > + FDR_FLOW_LIMIT, /* All flows being killed. */ > + FDR_FLOW_WILDCARDED, /* The flow needed a narrower wildcard mask. */ > + FDR_NO_OFPROTO, /* The flow didn't have an associated ofproto. */ > + FDR_PURGE, /* User action caused flows to be killed. */ > + FDR_TOO_EXPENSIVE, /* The flow was too expensive to revalidate. */ > + FDR_UPDATE_FAIL, /* Flow state transition was unexpected. */ > + FDR_XLATION_ERROR, /* There was an error translating the flow. */ > +}; > + > /* 'udpif_key's are responsible for tracking the little bit of state udpif > * needs to do flow expiration which can't be pulled directly from the > * datapath. They may be created by any handler or revalidator thread at any > @@ -2272,7 +2286,8 @@ populate_xcache(struct udpif *udpif, struct udpif_key *ukey, > static enum reval_result > revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, > uint16_t tcp_flags, struct ofpbuf *odp_actions, > - struct recirc_refs *recircs, struct xlate_cache *xcache) > + struct recirc_refs *recircs, struct xlate_cache *xcache, > + enum flow_del_reason *del_reason) > { > struct xlate_out *xoutp; > struct netflow *netflow; > @@ -2293,11 +2308,13 @@ revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, > netflow = NULL; > > if (xlate_ukey(udpif, ukey, tcp_flags, &ctx)) { > + *del_reason = FDR_XLATION_ERROR; > goto exit; > } > xoutp = &ctx.xout; > > if (xoutp->avoid_caching) { > + *del_reason = FDR_AVOID_CACHING; > goto exit; > } > > @@ -2311,6 +2328,7 @@ revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, > ofpbuf_clear(odp_actions); > > if (!ofproto) { > + *del_reason = FDR_NO_OFPROTO; > goto exit; > } > > @@ -2322,6 +2340,7 @@ revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, > if (odp_flow_key_to_mask(ukey->mask, ukey->mask_len, &dp_mask, &ctx.flow, > NULL) > == ODP_FIT_ERROR) { > + *del_reason = FDR_BAD_ODP_FIT; > goto exit; > } > > @@ -2331,6 +2350,7 @@ revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, > * down. Note that we do not know if the datapath has ignored any of the > * wildcarded bits, so we may be overly conservative here. */ > if (flow_wildcards_has_extra(&dp_mask, ctx.wc)) { > + *del_reason = FDR_FLOW_WILDCARDED; > goto exit; > } > > @@ -2400,7 +2420,7 @@ static enum reval_result > revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey, > const struct dpif_flow_stats *stats, > struct ofpbuf *odp_actions, uint64_t reval_seq, > - struct recirc_refs *recircs) > + struct recirc_refs *recircs, enum flow_del_reason *del_reason) > OVS_REQUIRES(ukey->mutex) > { > bool need_revalidate = ukey->reval_seq != reval_seq; > @@ -2430,8 +2450,12 @@ revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey, > xlate_cache_clear(ukey->xcache); > } > result = revalidate_ukey__(udpif, ukey, push.tcp_flags, > - odp_actions, recircs, ukey->xcache); > - } /* else delete; too expensive to revalidate */ > + odp_actions, recircs, ukey->xcache, > + del_reason); > + } else { > + /* delete; too expensive to revalidate */ nit: Maybe add a trailing dot? > + *del_reason = FDR_TOO_EXPENSIVE; > + } > } else if (!push.n_packets || ukey->xcache > || !populate_xcache(udpif, ukey, push.tcp_flags)) { > result = UKEY_KEEP; > @@ -2831,6 +2855,7 @@ revalidate(struct revalidator *revalidator) > for (f = flows; f < &flows[n_dumped]; f++) { > long long int used = f->stats.used; > struct recirc_refs recircs = RECIRC_REFS_EMPTY_INITIALIZER; > + enum flow_del_reason del_reason = FDR_NONE; > struct dpif_flow_stats stats = f->stats; > enum reval_result result; > struct udpif_key *ukey; > @@ -2905,9 +2930,14 @@ revalidate(struct revalidator *revalidator) > } > if (kill_them_all || (used && used < now - max_idle)) { > result = UKEY_DELETE; > + if (kill_them_all) { > + del_reason = FDR_FLOW_LIMIT; > + } else { > + del_reason = FDR_FLOW_IDLE; > + } Maybe take the same approach as below: del_reason = kill_them_all ? FDR_FLOW_LIMIT : FDR_FLOW_IDLE; > } else { > result = revalidate_ukey(udpif, ukey, &stats, &odp_actions, > - reval_seq, &recircs); > + reval_seq, &recircs, &del_reason); > } > ukey->dump_seq = dump_seq; > > @@ -2916,6 +2946,8 @@ revalidate(struct revalidator *revalidator) > udpif_update_flow_pps(udpif, ukey, f); > } > > + OVS_USDT_PROBE(revalidate, flow_result, result, del_reason, udpif, > + ukey); > if (result != UKEY_KEEP) { > /* Takes ownership of 'recircs'. */ > reval_op_init(&ops[n_ops++], result, udpif, ukey, &recircs, > @@ -2968,6 +3000,7 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge) > size_t n_ops = 0; > > CMAP_FOR_EACH(ukey, cmap_node, &umap->cmap) { > + enum flow_del_reason del_reason = FDR_NONE; > enum ukey_state ukey_state; > > /* Handler threads could be holding a ukey lock while it installs a > @@ -2986,6 +3019,7 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge) > > if (purge || ukey_state == UKEY_INCONSISTENT) { > result = UKEY_DELETE; > + del_reason = purge ? FDR_PURGE : FDR_UPDATE_FAIL; > } else if (!seq_mismatch) { > result = UKEY_KEEP; > } else { > @@ -2993,13 +3027,15 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge) > COVERAGE_INC(revalidate_missed_dp_flow); > memcpy(&stats, &ukey->stats, sizeof stats); > result = revalidate_ukey(udpif, ukey, &stats, &odp_actions, > - reval_seq, &recircs); > + reval_seq, &recircs, &del_reason); > } > if (result != UKEY_KEEP) { > /* Clears 'recircs' if filled by revalidate_ukey(). */ > reval_op_init(&ops[n_ops++], result, udpif, ukey, &recircs, > &odp_actions); > } > + OVS_USDT_PROBE(revalidator_sweep__, flow_sweep_result, result, > + del_reason, udpif, ukey); > } > ovs_mutex_unlock(&ukey->mutex); > > diff --git a/utilities/automake.mk b/utilities/automake.mk > index 9a2114df40..146b8c37fb 100644 > --- a/utilities/automake.mk > +++ b/utilities/automake.mk > @@ -23,6 +23,7 @@ scripts_DATA += utilities/ovs-lib > usdt_SCRIPTS += \ > utilities/usdt-scripts/bridge_loop.bt \ > utilities/usdt-scripts/dpif_nl_exec_monitor.py \ > + utilities/usdt-scripts/flow_reval_monitor.py \ > utilities/usdt-scripts/kernel_delay.py \ > utilities/usdt-scripts/kernel_delay.rst \ > utilities/usdt-scripts/reval_monitor.py \ > @@ -72,6 +73,7 @@ EXTRA_DIST += \ > utilities/docker/debian/build-kernel-modules.sh \ > utilities/usdt-scripts/bridge_loop.bt \ > utilities/usdt-scripts/dpif_nl_exec_monitor.py \ > + utilities/usdt-scripts/flow_reval_monitor.py \ > utilities/usdt-scripts/kernel_delay.py \ > utilities/usdt-scripts/kernel_delay.rst \ > utilities/usdt-scripts/reval_monitor.py \ > @@ -146,6 +148,7 @@ FLAKE8_PYFILES += utilities/ovs-pcap.in \ > utilities/ovs-tcpdump.in \ > utilities/ovs-pipegen.py \ > utilities/usdt-scripts/dpif_nl_exec_monitor.py \ > + utilities/usdt-scripts/flow_reval_monitor.py \ > utilities/usdt-scripts/upcall_monitor.py \ > utilities/usdt-scripts/upcall_cost.py > > diff --git a/utilities/usdt-scripts/flow_reval_monitor.py b/utilities/usdt-scripts/flow_reval_monitor.py > new file mode 100755 > index 0000000000..e76e0b5995 > --- /dev/null > +++ b/utilities/usdt-scripts/flow_reval_monitor.py > @@ -0,0 +1,997 @@ > +#!/usr/bin/env python3 > +# > +# Copyright (c) 2022-2024 Redhat, Inc. > +# > +# Licensed under the Apache License, Version 2.0 (the "License"); > +# you may not use this file except in compliance with the License. > +# You may obtain a copy of the License at: > +# > +# http://www.apache.org/licenses/LICENSE-2.0 > +# > +# Unless required by applicable law or agreed to in writing, software > +# distributed under the License is distributed on an "AS IS" BASIS, > +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > +# See the License for the specific language governing permissions and > +# limitations under the License. > +# > +# Script information: > +# ------------------- > +# flow_reval_monitor.py uses the dpif_netlink_operate:flow_put and > +# revalidator:flow_result USDT probes to monitor flow lifetimes and > +# expiration events. By default, this will show all flow_put and flow > +# expiration events, along with their reasons. This will look like so: > +# > +# TIME UFID EVENT/REASON > +# 101536.226986736 ufid:f76fc899-376d-466b-bc74-0000b933eb97 flow_put > +# 101536.227196214 ufid:d08472b6-110e-46cb-a9e4-00008f46838e flow_put > +# 101541.516610178 ufid:fc5cc4a2-39e7-4a2d-bbce-000019665b32 flow_put > +# 101541.516967303 ufid:fddd6510-26dc-4c87-8f7a-0000fc0c2c3a flow_put > +# 101551.688050747 ufid:fddd6510-26dc-4c87-8f7a-0000fc0c2c3a flow timed out > +# 101551.688077175 ufid:fc5cc4a2-39e7-4a2d-bbce-000019665b32 flow timed out > +# 101557.695391371 ufid:f76fc899-376d-466b-bc74-0000b933eb97 flow timed out > +# 101557.695408909 ufid:d08472b6-110e-46cb-a9e4-00008f46838e flow timed out The output above has changed (the reasons). Here is some new output: TID TIME UFID EVENT/REASON 71828 1549.119959874 39f0f28f-338d-4a77-81b3-0000d6c70b6b Insert (put) flow to ovs kernel module. 71828 1549.420877223 850db41c-47ff-42c0-b48f-0000e180d81c Insert (put) flow to ovs kernel module. 71828 1550.476923456 5bacfca9-fe5f-43aa-97ac-00005c9dc3e3 Insert (put) flow to ovs kernel module. 71832 1559.650192299 850db41c-47ff-42c0-b48f-0000e180d81c Idle flow timed out 71832 1561.153332825 39f0f28f-338d-4a77-81b3-0000d6c70b6b Idle flow timed out 71832 1572.684316304 5bacfca9-fe5f-43aa-97ac-00005c9dc3e3 Idle flow timed out 71828 1577.548886773 5bacfca9-fe5f-43aa-97ac-00005c9dc3e3 Insert (put) flow to ovs kernel module. 71832 1587.720846962 5bacfca9-fe5f-43aa-97ac-00005c9dc3e3 Idle flow timed out Maybe you can shorten the UDIF to fit it in 79 chars. > +# > +# flow key data can be printed using the --flow-keys option. This will Capital F for Flow. > +# print the equivalent datapath flow string. > +# > +# When filtering flows, the syntax is the same as used by > +# `ovs-appctl dpctl/add-flow`. > +# > +# The following options are available: > +# > +# usage: flow_reval_monitor.py [-h] [--buffer-page-count NUMBER] > +# [-f [128-2048]] [-k] [-l [FLOW_STRING ...]] > +# [-p VSWITCHD_PID] [-D [DEBUG]] > +# [-P PAHOLE] > +# > +# optional arguments: > +# -h, --help show this help message and exit > +# --buffer-page-count NUMBER > +# Number of BPF ring buffer pages, default 1024 > +# -f <128..2048>, --flow-key-size=<128..2048> > +# Set the size of the flow key, default 128 > +# -k, --flow-keys Print flow keys as flow strings > +# -l [FLOW_STRING ...], --filter-flows [FLOW_STRING ...] > +# Filter against flow keys that match the specified > +# ODP-like flow. This may not include all packet > +# fields > +# -p VSWITCHD_PID, --pid VSWITCHD_PID > +# ovs-vswitchd's PID > +# -P PAHOLE, --pahole PAHOLE > +# Pahole executable to use, default pahole > +# -D [DEBUG], --debug [DEBUG] > +# Enable eBPF debugging The text above is different than from the help text, or was this done on purpose? > +# Examples: > +# > +# To use the script on a running ovs-vswitchd to see flow keys and expiration > +# events for flows with an ipv4 source of 192.168.10.10: > +# $ ./flow_reval_monitor.py --flow-keys --filter-flows \ > +# "ipv4(src=192.168.10.10)" > +# TIME UFID EVENT/REASON > +# 105082.457322742 ufid:f76fc899-376d-466b-bc74-0000b933eb97 flow_put > +# ufid:f76fc899-376d-466b-bc74-0000b933eb97 has the following flow information: > +# in_port(2), > +# eth(src=0e:04:47:fc:74:51, dst=da:dc:c5:69:05:d7), \ > +# eth_type(0x800), \ > +# ipv4(src=192.168.10.10, dst=192.168.10.30, proto=1, tos=0, ttl=64,[...]), > +# icmp(type=8, code=0) > +# 105092.635450202 ufid:f76fc899-376d-466b-bc74-0000b933eb97 Flow timed out > +# > +# Notes: > +# 1) No options are needed to attach when there is a single running instance > +# of ovs-vswitchd. > +# 2) If you're using the flow filtering option, it will only track flows that > +# have been upcalled since the script began running. > +# 3) When using the flow filtering option, the key size will likely need to > +# be expanded to match on all the fields in the message. The default is > +# kept small to keep the buffer copy sizes down when displaying > +# flows (-k), but is hardcoded to 2048 when an actual filter (-l) is > +# applied > +# 4) The flow filtering format is a simplified form of the ODP syntax, and > +# does not support masked matches, which means you will need to filter > +# on exact details. The fields present are dependent on how the > +# classifier and OFP rules form the ODP rules - not all fields may be > +# present in a particular flow. > +# 5) The flow_put filtering only happens for flows installed into the ovs > +# kernel module. This means flows taking the HW offload path (ie: tc), > +# or on DPDK side won't get matched. > + > +try: > + from bcc import BPF > + from bcc import USDT > + from bcc import USDTException > +except ModuleNotFoundError: > + print("ERROR: Can't find the BPF Compiler Collection Tools.") > + print("Please install them before running this script.") > + exit(1) > + > +from enum import IntEnum > +from ipaddress import IPv4Address, IPv6Address > +from pathlib import Path > + > +import argparse > +import psutil > +import re > +import struct > +import subprocess > +import sys > + > +# > +# eBPF source code > +# > +bpf_src = """ > +#include <linux/sched.h> > + > +#define MAX_KEY <MAX_KEY_VAL> > +#define FLOW_FILTER <FILTER_BOOL> > + > +enum probe { <EVENT_ENUM> }; > + > +<OVS_INCLUDE_DEFINITIONS> > + > +struct event_t { > + u64 ts; > + u32 pid; > + u32 result; > + u32 reason; > + u32 ufid[4]; > + u64 key_size; > + unsigned char key[MAX_KEY]; > + enum probe probe; > +}; > + > +BPF_HASH(watchlist, ovs_u128); > +BPF_RINGBUF_OUTPUT(events, <BUFFER_PAGE_COUNT>); > +BPF_TABLE("percpu_array", uint32_t, uint64_t, dropcnt, 1); > +BPF_TABLE("percpu_array", uint32_t, struct udpif_key, udpk, 1); > + > +static struct event_t *get_event(enum probe p) { > + struct event_t *event = events.ringbuf_reserve(sizeof(struct event_t)); > + > + if (!event) { > + dropcnt.increment(0); > + return NULL; > + } > + > + event->probe = p; > + event->ts = bpf_ktime_get_ns(); > + event->pid = bpf_get_current_pid_tgid(); > + > + return event; > +} > + > +static int emit_flow_result(struct udpif_key *ukey, ovs_u128 ufid, > + u32 result, u32 reason) { > + struct event_t *event = NULL; > + u64 *ufid_present = NULL; > + > + ufid_present = watchlist.lookup(&ufid); > + if (FLOW_FILTER && !ufid_present) { > + return 0; > + } > + > + event = get_event(FLOW_RESULT); > + if (!event) { > + /* If we can't reserve the space in the ring buffer, return 1. */ > + return 1; > + } > + > + event->result = result; > + event->reason = reason; > + bpf_probe_read(&event->ufid, sizeof ufid, &ufid); > + events.ringbuf_submit(event, 0); > + > + return 0; > +} > + > +int usdt__flow_result(struct pt_regs *ctx) { > + struct udpif_key *ukey = NULL; > + u32 reason = 0; > + u32 result = 0; > + ovs_u128 ufid; > + u32 zero = 0; > + > + ukey = udpk.lookup(&zero); > + if (!ukey) { > + return 1; > + } > + bpf_usdt_readarg_p(4, ctx, ukey, sizeof(struct udpif_key)); > + bpf_usdt_readarg(2, ctx, &reason); > + bpf_usdt_readarg(1, ctx, &result); > + ufid = ukey->ufid; > + > + return emit_flow_result(ukey, ufid, result, reason); > +} > + > +int usdt__flow_sweep_result(struct pt_regs *ctx) { > + struct udpif_key *ukey = NULL; > + u32 reason = 0; > + u32 result = 0; > + ovs_u128 ufid; > + u32 zero = 0; > + > + ukey = udpk.lookup(&zero); > + if (!ukey) { > + return 1; > + } > + bpf_usdt_readarg_p(4, ctx, ukey, sizeof(struct udpif_key)); > + bpf_usdt_readarg(2, ctx, &reason); > + bpf_usdt_readarg(1, ctx, &result); > + ufid = ukey->ufid; > + > + return emit_flow_result(ukey, ufid, result, reason); > +} > + > +int usdt__op_flow_put(struct pt_regs *ctx) { > + struct dpif_flow_put put; > + ovs_u128 ufid; > + > + struct event_t *event = get_event(OP_FLOW_PUT); > + if (!event) { > + /* If we can't reserve the space in the ring buffer, return 1. */ > + return 1; > + } > + > + bpf_usdt_readarg_p(2, ctx, &put, sizeof put); > + bpf_probe_read(&event->ufid, sizeof event->ufid, put.ufid); > + bpf_probe_read(&ufid, sizeof ufid, &event->ufid); > + if (put.key_len > MAX_KEY) { > + put.key_len = MAX_KEY; > + } > + event->key_size = put.key_len; > + bpf_probe_read(&event->key, put.key_len, put.key); > + event->reason = 0; > + events.ringbuf_submit(event, 0); > + > + watchlist.increment(ufid); > + return 0; > +} > +""" > + > +Event = IntEnum("Event", ["OP_FLOW_PUT", "FLOW_RESULT"], start=0) > +RevalResult = IntEnum( > + "reval_result", > + [ > + "UKEY_KEEP", > + "UKEY_DELETE", > + "UKEY_MODIFY", > + ], > + start=0, > +) > +FdrReasons = IntEnum( > + "flow_del_reason", > + [ > + "FDR_NONE", > + "FDR_AVOID_CACHING", > + "FDR_BAD_ODP_FIT", > + "FDR_FLOW_IDLE", > + "FDR_FLOW_LIMIT", > + "FDR_FLOW_WILDCARDED", > + "FDR_NO_OFPROTO", > + "FDR_PURGE", > + "FDR_TOO_EXPENSIVE", > + "FDR_UPDATE_FAIL", > + "FDR_XLATION_ERROR", > + ], > + start=0, > +) > + > +FdrReasonStrings = [ > + "No deletion reason", > + "Cache avoidance flag set", > + "Bad ODP flow fit", > + "Idle flow timed out", > + "Kill all flows condition detected", > + "Mask too wide - need narrower match", > + "No matching ofproto rules", > + "Too expensive to revalidate", > + "Purged with user action", > + "Flow state inconsistent after updates", > + "Flow translation error", > +] > + > + > +# > +# run_program() > +# > +def run_program(command): > + try: > + process = subprocess.run( > + command, > + stdout=subprocess.PIPE, > + stderr=subprocess.STDOUT, > + encoding="utf8", > + check=True, I noticed your adding , to all final arguments in function parameter lists, and string lists. Is this some formatter cleaning this up, or a new style? I does this is split over multiple lines, even for function parameters, which looks odd to me. > + ) > + > + except subprocess.CalledProcessError as perror: > + return perror.returncode, perror.stdout > + > + return 0, process.stdout > + > + > +# > +# get_ovs_definitions() > +# > +def get_ovs_definitions(objects, pahole="pahole", pid=None): > + if pid is None: > + raise ValueError("A valid pid value should be supplied!") > + > + if not isinstance(objects, list): > + objects = [objects] > + > + if len(objects) == 0: > + raise ValueError("Must supply at least one object!") > + > + vswitchd = Path(f"/proc/{pid}/exe").resolve() > + > + object_str = ",".join(objects) > + > + def run_pahole(debug_file): > + error, result = run_program( > + [pahole, "-C", object_str, "--compile", debug_file] > + ) > + > + if error: > + if f"pahole: {debug_file}: Invalid argument" not in result: > + print( > + "ERROR: Pahole failed to get ovs-vswitchd data " > + "structures!\n{}".format( > + re.sub( > + "^", " " * 7, result.rstrip(), flags=re.MULTILINE > + ) > + ) > + ) > + sys.exit(-1) > + > + return None > + > + if bool(re.search("pahole: type .* not found", result)): > + return None > + > + return result > + > + def run_readelf(bin_file): > + error, result = run_program( > + ["readelf", "-n", "--debug-dump=links", bin_file] > + ) > + > + if error: > + print( > + "ERROR: Failed 'readelf' on \"{}\"!\n{}".format( > + bin_file, re.sub("^", " " * 7, result, flags=re.MULTILINE) > + ) > + ) > + sys.exit(-1) > + > + return result > + > + def get_debug_file(bin_file): > + elf_result = run_readelf(bin_file) > + match = re.search("Build ID: ([0-9a-fA-F]+)", elf_result) > + if not match: > + print("ERROR: Can't find build ID to read debug symbols!") > + sys.exit(-1) > + > + dbg_file = "/usr/lib/debug/.build-id/{}/{}.debug".format( > + match.group(1)[:2], match.group(1)[2:] > + ) > + > + return dbg_file > + > + def get_from_shared_library(debug_file): > + ovs_libs = [ > + "libofproto", > + "libopenvswitch", > + "libovsdb", > + "libsflow", > + "libvtep", > + ] > + error, ldd_result = run_program(["ldd", debug_file]) > + > + if error: > + print( > + "ERROR: Failed 'ldd' on \"{}\"!\n{}".format( > + debug_file, > + re.sub("^", " " * 7, ldd_result, flags=re.MULTILINE), > + ) > + ) > + sys.exit(-1) > + > + for lib in ovs_libs: > + match = re.search( > + r"^\s*{}.* => (.*) \(.*\)$".format(lib), > + ldd_result, > + flags=re.MULTILINE, > + ) > + if match is None: > + continue > + > + result = run_pahole(match.group(1)) > + if result is None: > + result = run_pahole(get_debug_file(match.group(1))) > + > + if result: > + return result > + > + return None > + > + # > + # First try to find the debug data as part of the executable. > + # > + result = run_pahole(vswitchd) > + > + if result is None: > + print(f'INFO: Failed to find debug info in "{vswitchd}"!') > + > + # > + # Get additional .debug information if available. > + # > + dbg_file = get_debug_file(vswitchd) > + result = run_pahole(dbg_file) > + if result is None: > + print(f'INFO: Failed to find debug info in "{dbg_file}"!') > + > + # > + # Try to get information from shared libraries if used. > + # > + result = get_from_shared_library(vswitchd) > + > + if result is None: > + print(f"ERROR: Failed to find needed data structures through {pahole}") > + sys.exit(-1) > + > + # > + # We need an empty _Atomic definition to avoid compiler complaints. > + # > + result = "#define _Atomic\n" + result > + > + # > + # Remove the uint64_t definition as it conflicts with the kernel one. > + # > + result = re.sub("^typedef.*uint64_t;$", "", result, flags=re.MULTILINE) > + > + return result > + > + > +# > +# buffer_size_type() > +# > +def buffer_size_type(astr, min=64, max=2048): > + value = int(astr) > + if min <= value <= max: > + return value > + else: > + raise argparse.ArgumentTypeError( > + "value not in range {}-{}".format(min, max) > + ) > + > + > +# > +# format_ufid() > +# > +def format_ufid(ufid): > + if ufid is None: > + return "ufid:none" > + > + return "{:08x}-{:04x}-{:04x}-{:04x}-{:04x}{:08x}".format( > + ufid[0], > + ufid[1] >> 16, > + ufid[1] & 0xFFFF, > + ufid[2] >> 16, > + ufid[2] & 0, > + ufid[3], > + ) > + > + > +# > +# find_and_delete_from_watchlist() > +# > +def find_and_delete_from_watchlist(event): > + for k, _ in b["watchlist"].items(): > + key_ufid = struct.unpack("=IIII", k) > + if key_ufid == tuple(event.ufid): > + key = (b["watchlist"].Key * 1)(k) > + b["watchlist"].items_delete_batch(key) > + break > + > + > +# > +# handle_flow_put() > +# > +def handle_flow_put(event): > + if args.flow_keys or args.filter_flows is not None: > + key = decode_key(bytes(event.key)[: event.key_size]) > + flow_dict, flow_str = parse_flow_dict(key) > + # For each attribute that we're watching. > + if args.filter_flows is not None: > + if not compare_flow_to_target(args.filter_flows, flow_dict): > + find_and_delete_from_watchlist(event) > + return > + > + print( > + "{:<10} {:<18.9f} {:<36} {}".format( > + event.pid, > + event.ts / 1000000000, > + format_ufid(event.ufid), > + "Insert (put) flow to ovs kernel module.", > + ) > + ) > + > + if args.flow_keys and len(flow_str): > + flow_str_fields = flow_str.split("), ") > + flow_str = " " > + curlen = 4 > + for field in flow_str_fields: > + if curlen + len(field) > 79: > + flow_str += "\n " > + curlen = 4 > + if field[-1] != ")": > + field += ")" > + flow_str += field + ", " > + curlen += len(field) + 2 > + > + print(" - It holds the following key information:") > + print(flow_str) > + > + > +# > +# compare_flow_to_target() > +# > +def compare_flow_to_target(target, flow): > + for key in target: > + if key not in flow: > + return False > + elif target[key] is True: > + continue > + elif target[key] == flow[key]: > + continue > + elif isinstance(target[key], dict) and isinstance(flow[key], dict): > + return compare_flow_to_target(target[key], flow[key]) > + else: > + return False > + return True > + > + > +# > +# parse_flow_str() > +# > +def parse_flow_str(flow_str): > + f_list = [i.strip(", ") for i in flow_str.split(")")] > + if f_list[-1] == "": > + f_list = f_list[:-1] > + flow_dict = {} > + for e in f_list: > + split_list = e.split("(") > + k = split_list[0] > + if len(split_list) == 1: > + flow_dict[k] = True > + elif split_list[1].count("=") == 0: > + flow_dict[k] = split_list[1] > + else: > + sub_dict = {} > + sublist = [i.strip() for i in split_list[1].split(",")] > + for subkey in sublist: > + brk = subkey.find("=") > + sub_dict[subkey[:brk]] = subkey[brk + 1 :] > + flow_dict[k] = sub_dict > + return flow_dict > + > + > +# > +# print_expiration() > +# > +def print_expiration(event): > + ufid_str = format_ufid(event.ufid) > + > + if event.reason > len(FdrReasons): > + reason = f"Unknown reason '{event.reason}'" > + else: > + reason = FdrReasonStrings[event.reason] > + > + print( > + "{:<10} {:<18.9f} {:<36} {:<17}".format( > + event.pid, > + event.ts / 1000000000, > + ufid_str, > + reason, > + ) > + ) > + > + > +# > +# decode_key() > +# > +def decode_key(msg): > + bytes_left = len(msg) > + result = {} > + while bytes_left: > + if bytes_left < 4: > + break > + nla_len, nla_type = struct.unpack("=HH", msg[:4]) > + if nla_len < 4: > + break > + nla_data = msg[4:nla_len] > + if nla_len > bytes_left: > + nla_data = nla_data[: (bytes_left - 4)] > + break > + else: > + result[get_ovs_key_attr_str(nla_type)] = nla_data > + next_offset = (nla_len + 3) & (~3) > + msg = msg[next_offset:] > + bytes_left -= next_offset > + if bytes_left: > + print(f"INFO: Buffer truncated with {bytes_left} bytes left.") > + return result > + > + > +# > +# get_ovs_key_attr_str() > +# > +def get_ovs_key_attr_str(attr): > + ovs_key_attr = [ > + "OVS_KEY_ATTR_UNSPEC", > + "encap", > + "skb_priority", > + "in_port", > + "eth", > + "vlan", > + "eth_type", > + "ipv4", > + "ipv6", > + "tcp", > + "udp", > + "icmp", > + "icmpv6", > + "arp", > + "nd", > + "skb_mark", > + "tunnel", > + "sctp", > + "tcp_flags", > + "dp_hash", > + "recirc_id", > + "mpls", > + "ct_state", > + "ct_zone", > + "ct_mark", > + "ct_label", > + "ct_tuple4", > + "ct_tuple6", > + "nsh", > + ] > + > + if attr < 0 or attr > len(ovs_key_attr): > + return "<UNKNOWN>: {}".format(attr) > + return ovs_key_attr[attr] > + > + > +# > +# parse_flow_dict() > +# > +def parse_flow_dict(key_dict, decode=True): > + ret_str = "" > + parseable = {} > + skip = ["nsh", "tunnel", "mpls", "vlan"] > + need_byte_swap = ["ct_label"] > + ipv4addrs = ["ct_tuple4", "tunnel", "ipv4", "arp"] > + ipv6addrs = ["ipv6", "nd", "ct_tuple6"] > + macs = {"eth": [0, 1], "arp": [3, 4], "nd": [1, 2]} > + fields = [ > + ("OVS_KEY_ATTR_UNSPEC"), > + ("encap",), > + ("skb_priority", "<I"), > + ("in_port", "<I"), > + ("eth", "!6s6s", "src", "dst"), > + ("vlan",), > + ("eth_type", "!H"), > + ("ipv4", "!4s4s4B", "src", "dst", "proto", "tos", "ttl", "frag"), > + ( > + "ipv6", > + "!16s16s4s4B", > + "src", > + "dst", > + "label", > + "proto", > + "tclass", > + "hlimit", > + "frag", > + ), > + ("tcp", "!2H", "src", "dst"), > + ("udp", "!2H", "src", "dst"), > + ("icmp", "!2B", "type", "code"), > + ("icmpv6", "!2B", "type", "code"), > + ("arp", "!4s4sH6s6s", "sip", "tip", "op", "sha", "tha"), > + ("nd", "!16s6s6s", "target", "sll", "tll"), > + ("skb_mark", "<I"), > + ("tunnel",), > + ("sctp", "!2H", "src", "dst"), > + ("tcp_flags", "!H"), > + ("dp_hash", "<I"), > + ("recirc_id", "<I"), > + ("mpls",), > + ("ct_state", "<I"), > + ("ct_zone", "<H"), > + ("ct_mark", "<I"), > + ("ct_label", "!16s"), > + ("ct_tuple4", "!4s4s2HB", "src", "dst", "tp_src", "tp_dst", "proto"), > + ("ct_tuple6", "!16s16sB2H", "src", "dst", "proto", "tp_src", "tp_dst"), > + ("nsh",), > + ] > + for k, v in key_dict.items(): > + s = "" > + if k in skip: > + continue > + if decode and int.from_bytes(v, "big") == 0: > + parseable[k] = "0" > + continue > + if decode and k in need_byte_swap: > + v = int.from_bytes(v, "little").to_bytes(len(v), "big") > + attr = -1 > + found = False > + for f in fields: > + if k == f[0]: > + attr = fields.index(f) > + found = True > + break > + if not found: > + raise KeyError("Invalid flow field '%s'" % k) > + if decode and len(fields[attr]) > 1: > + data = list( > + struct.unpack( > + fields[attr][1], v[: struct.calcsize(fields[attr][1])] > + ) > + ) > + if k in ipv4addrs: > + if data[0].count(0) < 4: > + data[0] = str(IPv4Address(data[0])) > + else: > + data[0] = b"\x00" > + if data[1].count(0) < 4: > + data[1] = str(IPv4Address(data[1])) > + else: > + data[1] = b"\x00" > + if k in ipv6addrs: > + if data[0].count(0) < 16: > + data[0] = str(IPv6Address(data[0])) > + else: > + data[0] = b"\x00" > + if data[1].count(0) < len(data[1]): > + data[1] = str(IPv6Address(data[1])) > + else: > + data[1] = b"\x00" > + if k in macs.keys(): > + for e in macs[k]: > + if data[e].count(0) == 6: > + mac_str = b"\x00" > + else: > + mac_str = ":".join(["%02x" % i for i in data[e]]) > + data[e] = mac_str > + if decode and len(fields[attr]) > 2: > + field_dict = dict(zip(fields[attr][2:], data)) > + s = ", ".join(k + "=" + str(v) for k, v in field_dict.items()) > + elif decode and k != "eth_type": > + s = str(data[0]) > + field_dict = s > + else: > + if decode: > + s = hex(data[0]) > + field_dict = s > + ret_str += k + "(" + s + "), " > + parseable[k] = field_dict > + ret_str = ret_str[:-2] > + return (parseable, ret_str) > + > + > +# > +# handle_event() > +# > +def handle_event(ctx, data, size): > + # Once we grab the event, we have three cases. > + # 1. It's a revalidator probe and the reason is nonzero: A flow is expiring > + # 2. It's a revalidator probe and the reason is zero: flow revalidated > + # 3. It's a flow_put probe. > + # > + # We will ignore case 2, and report all others. > + # > + event = b["events"].event(data) > + if event.probe == Event.OP_FLOW_PUT: > + handle_flow_put(event) > + elif ( > + event.probe == Event.FLOW_RESULT > + and event.result == RevalResult.UKEY_DELETE > + ): > + print_expiration(event) > + > + > +def main(): > + # > + # Don't like these globals, but ctx passing does not work with the existing > + # open_ring_buffer() API :( > + # > + global b > + global args > + > + # > + # Argument parsing > + # > + parser = argparse.ArgumentParser() > + parser.add_argument( > + "--buffer-page-count", > + help="Number of BPF ring buffer pages, default 1024", > + type=int, > + default=1024, > + metavar="NUMBER", > + ) > + parser.add_argument( > + "-f", > + "--flow-key-size", > + help="Set maximum flow key size to capture, " > + "default 128 - see notes", > + type=buffer_size_type, > + default=128, > + metavar="[128-2048]", > + ) > + parser.add_argument( > + "-k", > + "--flow-keys", > + help="Print flow keys as flow strings", > + action="store_true", > + ) > + parser.add_argument( > + "-l", > + "--filter-flows", > + metavar="FLOW_STRING", > + help="Filter flows that match the specified " "ODP-like flow", > + type=str, > + default=None, > + nargs="*", > + ) > + parser.add_argument( > + "-P", > + "--pahole", > + metavar="PAHOLE", > + help="Pahole executable to use, default pahole", > + type=str, > + default="pahole", > + ) > + parser.add_argument( > + "-p", > + "--pid", > + metavar="VSWITCHD_PID", > + help="ovs-vswitchd's PID", > + type=int, > + default=None, > + ) > + parser.add_argument( > + "-D", > + "--debug", > + help="Enable eBPF debugging", > + type=int, > + const=0x3F, > + default=0, > + nargs="?", > + ) > + args = parser.parse_args() > + > + # > + # Find the PID of the ovs-vswitchd daemon if not specified. > + # > + if args.pid is None: > + for proc in psutil.process_iter(): > + if "ovs-vswitchd" in proc.name(): > + if args.pid is not None: > + print( > + "Error: Multiple ovs-vswitchd daemons running, " > + "use the -p option!" > + ) > + sys.exit(-1) > + > + args.pid = proc.pid > + # > + # Error checking on input parameters > + # > + if args.pid is None: > + print("ERROR: Failed to find ovs-vswitchd's PID!") > + sys.exit(-1) > + > + # > + # Attach the USDT probes > + # > + u = USDT(pid=int(args.pid)) > + try: > + u.enable_probe(probe="op_flow_put", fn_name="usdt__op_flow_put") > + except USDTException as e: > + print("Error attaching the dpif_netlink_operate__:op_flow_put probe.") > + print(str(e)) > + sys.exit(-1) > + > + try: > + u.enable_probe(probe="flow_result", fn_name="usdt__flow_result") > + u.enable_probe( > + probe="flow_sweep_result", fn_name="usdt__flow_sweep_result" > + ) > + except USDTException as e: > + print("Error attaching the revalidate:flow_result probe.") We should either use two try/except cases, or update the error test to “...revalidate|revalidator_sweep__:flow_result probe.” > + print(str(e)) > + sys.exit(-1) > + > + # > + # Attach the probes to the running process > + # > + source = bpf_src.replace( > + "<BUFFER_PAGE_COUNT>", str(args.buffer_page_count) > + ) > + > + source = source.replace( > + "<OVS_INCLUDE_DEFINITIONS>", > + get_ovs_definitions( > + ["udpif_key", "ovs_u128", "dpif_flow_put"], > + pid=args.pid, > + pahole=args.pahole, > + ), > + ) > + > + if args.filter_flows is None: > + filter_bool = 0 > + > + # Set the key size based on what the user wanted > + source = source.replace("<MAX_KEY_VAL>", str(args.flow_key_size)) > + else: > + filter_bool = 1 > + args.filter_flows = parse_flow_str(args.filter_flows[0]) > + > + # Run through the parser to make sure we only filter on fields we > + # understand > + parse_flow_dict(args.filter_flows, False) > + > + # This is hardcoded here because it doesn't make sense to shrink the > + # size, since the flow key might be missing fields that are matched in > + # the flow filter. > + source = source.replace("<MAX_KEY_VAL>", "2048") > + > + source = source.replace("<FILTER_BOOL>", str(filter_bool)) > + > + source = source.replace( > + "<EVENT_ENUM>", > + "\n".join([f"{event.name} = {event.value}," for event in Event]), > + ) > + > + b = BPF(text=source, usdt_contexts=[u], debug=args.debug) > + > + # > + # Print header > + # > + print( > + "{:<10} {:<18} {:<36} {:<17}".format( > + "TID", "TIME", "UFID", "EVENT/REASON" > + ) > + ) > + > + # > + # Dump out all events. > + # > + b["events"].open_ring_buffer(handle_event) > + while 1: > + try: > + b.ring_buffer_poll() > + except KeyboardInterrupt: > + break > + > + dropcnt = b.get_table("dropcnt") > + for k in dropcnt.keys(): > + count = dropcnt.sum(k).value > + if k.value == 0 and count > 0: > + print( > + "\n# WARNING: Not all flow operations were captured, {} were" > + " dropped!\n# Increase the BPF ring buffer size " > + "with the --buffer-page-count option.".format(count) > + ) > + > + > +# > +# Start main() as the default entry point > +# > +if __name__ == "__main__": > + main() > -- > 2.41.0
Eelco Chaudron <echaudro@redhat.com> writes: > On 20 Feb 2024, at 22:47, Aaron Conole wrote: > >> From: Kevin Sprague <ksprague0711@gmail.com> >> >> During normal operations, it is useful to understand when a particular flow >> gets removed from the system. This can be useful when debugging performance >> issues tied to ofproto flow changes, trying to determine deployed traffic >> patterns, or while debugging dynamic systems where ports come and go. >> >> Prior to this change, there was a lack of visibility around flow expiration. >> The existing debugging infrastructure could tell us when a flow was added to >> the datapath, but not when it was removed or why. >> >> This change introduces a USDT probe at the point where the revalidator >> determines that the flow should be removed. Additionally, we track the >> reason for the flow eviction and provide that information as well. With >> this change, we can track the complete flow lifecycle for the netlink >> datapath by hooking the upcall tracepoint in kernel, the flow put USDT, and >> the revalidator USDT, letting us watch as flows are added and removed from >> the kernel datapath. >> >> This change only enables this information via USDT probe, so it won't be >> possible to access this information any other way (see: >> Documentation/topics/usdt-probes.rst). >> >> Also included is a script (utilities/usdt-scripts/flow_reval_monitor.py) >> which serves as a demonstration of how the new USDT probe might be used >> going forward. >> >> Acked-by: Han Zhou <hzhou@ovn.org> >> Signed-off-by: Kevin Sprague <ksprague0711@gmail.com> >> Co-authored-by: Aaron Conole <aconole@redhat.com> >> Signed-off-by: Aaron Conole <aconole@redhat.com> > > Thanks for doing the v9, some small comments remain below. > > Cheers, > > Eelco > >> --- >> v8 -> v9: Reorganized the flow delete reasons enum >> Updated flow_reval_monitor to use pahole to extract fields >> Added the purge reason with a proper USDT point >> Updated documentation >> Dropped all the outstanding ACKs >> >> Documentation/topics/usdt-probes.rst | 43 + >> ofproto/ofproto-dpif-upcall.c | 48 +- >> utilities/automake.mk | 3 + >> utilities/usdt-scripts/flow_reval_monitor.py | 997 +++++++++++++++++++ >> 4 files changed, 1085 insertions(+), 6 deletions(-) >> create mode 100755 utilities/usdt-scripts/flow_reval_monitor.py >> >> diff --git a/Documentation/topics/usdt-probes.rst b/Documentation/topics/usdt-probes.rst >> index e527f43bab..015614a6b8 100644 >> --- a/Documentation/topics/usdt-probes.rst >> +++ b/Documentation/topics/usdt-probes.rst >> @@ -214,8 +214,10 @@ Available probes in ``ovs_vswitchd``: >> - dpif_recv:recv_upcall >> - main:poll_block >> - main:run_start >> +- revalidate:flow_result >> - revalidate_ukey\_\_:entry >> - revalidate_ukey\_\_:exit >> +- revalidator_sweep\_\_:flow_result >> - udpif_revalidator:start_dump >> - udpif_revalidator:sweep_done >> >> @@ -443,6 +445,47 @@ sweep phase was completed. >> - ``utilities/usdt-scripts/reval_monitor.py`` >> >> >> +probe revalidate:flow_result >> +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ >> + >> +**Description**: >> +This probe is triggered when the revalidator has executed on a particular >> +flow key to make a determination whether to evict a flow, and the cause >> +for eviction. The revalidator runs periodically, and this probe will only >> +be triggered when a flow is flagged for revalidation. >> + >> +**Arguments**: >> + >> +- *arg0*: ``(enum reval_result) result`` >> +- *arg1*: ``(enum flow_del_reason) reason`` > > nit: variable name changed, so should be del_reason. Good catch, I'll update. >> +- *arg2*: ``(struct udpif *) udpif`` >> +- *arg3*: ``(struct udpif_key *) ukey`` >> + > > I think you missed my previous comment on re-ordering the arguments to > be more inline with existing probes, i.e.: > > + OVS_USDT_PROBE(revalidator_sweep__, flow_result, udpif, ukey, > + result, del_reason); Guess so. I'll fix it. >> +**Script references**: >> + >> +- ``utilities/usdt-scripts/flow_reval_monitor.py`` >> + >> + >> +probe revalidator_sweep\_\_:flow_result >> +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ >> + >> +**Description**: >> +This probe is placed in the path of the revalidator sweep, and is executed >> +under the condition that a flow entry is in an unexpected state, or the >> +flows were asked to be purged due to a user action. >> + >> +**Arguments**: >> + >> +- *arg0*: ``(enum reval_result) result`` >> +- *arg1*: ``(enum flow_del_reason) reason`` > > nit: variable name changed, so should be del_reason. Okay. >> +- *arg2*: ``(struct udpif *) udpif`` >> +- *arg3*: ``(struct udpif_key *) ukey`` > > See comments above on argument ordering. > >> + >> +**Script references**: >> + >> +- ``utilities/usdt-scripts/flow_reval_monitor.py`` >> + >> + >> Adding your own probes >> ---------------------- >> >> diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c >> index b5cbeed878..fbc7858690 100644 >> --- a/ofproto/ofproto-dpif-upcall.c >> +++ b/ofproto/ofproto-dpif-upcall.c >> @@ -269,6 +269,20 @@ enum ukey_state { >> }; >> #define N_UKEY_STATES (UKEY_DELETED + 1) >> >> +enum flow_del_reason { >> + FDR_NONE = 0, /* No deletion reason for the flow. */ >> + FDR_AVOID_CACHING, /* Flow deleted to avoid caching. */ >> + FDR_BAD_ODP_FIT, /* The flow had a bad ODP flow fit. */ >> + FDR_FLOW_IDLE, /* The flow went unused and was deleted. */ >> + FDR_FLOW_LIMIT, /* All flows being killed. */ >> + FDR_FLOW_WILDCARDED, /* The flow needed a narrower wildcard mask. */ >> + FDR_NO_OFPROTO, /* The flow didn't have an associated ofproto. */ >> + FDR_PURGE, /* User action caused flows to be killed. */ >> + FDR_TOO_EXPENSIVE, /* The flow was too expensive to revalidate. */ >> + FDR_UPDATE_FAIL, /* Flow state transition was unexpected. */ >> + FDR_XLATION_ERROR, /* There was an error translating the flow. */ >> +}; >> + >> /* 'udpif_key's are responsible for tracking the little bit of state udpif >> * needs to do flow expiration which can't be pulled directly from the >> * datapath. They may be created by any handler or revalidator thread at any >> @@ -2272,7 +2286,8 @@ populate_xcache(struct udpif *udpif, struct udpif_key *ukey, >> static enum reval_result >> revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, >> uint16_t tcp_flags, struct ofpbuf *odp_actions, >> - struct recirc_refs *recircs, struct xlate_cache *xcache) >> + struct recirc_refs *recircs, struct xlate_cache *xcache, >> + enum flow_del_reason *del_reason) >> { >> struct xlate_out *xoutp; >> struct netflow *netflow; >> @@ -2293,11 +2308,13 @@ revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, >> netflow = NULL; >> >> if (xlate_ukey(udpif, ukey, tcp_flags, &ctx)) { >> + *del_reason = FDR_XLATION_ERROR; >> goto exit; >> } >> xoutp = &ctx.xout; >> >> if (xoutp->avoid_caching) { >> + *del_reason = FDR_AVOID_CACHING; >> goto exit; >> } >> >> @@ -2311,6 +2328,7 @@ revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, >> ofpbuf_clear(odp_actions); >> >> if (!ofproto) { >> + *del_reason = FDR_NO_OFPROTO; >> goto exit; >> } >> >> @@ -2322,6 +2340,7 @@ revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, >> if (odp_flow_key_to_mask(ukey->mask, ukey->mask_len, &dp_mask, &ctx.flow, >> NULL) >> == ODP_FIT_ERROR) { >> + *del_reason = FDR_BAD_ODP_FIT; >> goto exit; >> } >> >> @@ -2331,6 +2350,7 @@ revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, >> * down. Note that we do not know if the datapath has ignored any of the >> * wildcarded bits, so we may be overly conservative here. */ >> if (flow_wildcards_has_extra(&dp_mask, ctx.wc)) { >> + *del_reason = FDR_FLOW_WILDCARDED; >> goto exit; >> } >> >> @@ -2400,7 +2420,7 @@ static enum reval_result >> revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey, >> const struct dpif_flow_stats *stats, >> struct ofpbuf *odp_actions, uint64_t reval_seq, >> - struct recirc_refs *recircs) >> + struct recirc_refs *recircs, enum flow_del_reason *del_reason) >> OVS_REQUIRES(ukey->mutex) >> { >> bool need_revalidate = ukey->reval_seq != reval_seq; >> @@ -2430,8 +2450,12 @@ revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey, >> xlate_cache_clear(ukey->xcache); >> } >> result = revalidate_ukey__(udpif, ukey, push.tcp_flags, >> - odp_actions, recircs, ukey->xcache); >> - } /* else delete; too expensive to revalidate */ >> + odp_actions, recircs, ukey->xcache, >> + del_reason); >> + } else { >> + /* delete; too expensive to revalidate */ > > nit: Maybe add a trailing dot? There wasn't one in the previous instance comment, so I preserved the comment as-is. Since I'm respinning I'll add it. >> + *del_reason = FDR_TOO_EXPENSIVE; >> + } >> } else if (!push.n_packets || ukey->xcache >> || !populate_xcache(udpif, ukey, push.tcp_flags)) { >> result = UKEY_KEEP; >> @@ -2831,6 +2855,7 @@ revalidate(struct revalidator *revalidator) >> for (f = flows; f < &flows[n_dumped]; f++) { >> long long int used = f->stats.used; >> struct recirc_refs recircs = RECIRC_REFS_EMPTY_INITIALIZER; >> + enum flow_del_reason del_reason = FDR_NONE; >> struct dpif_flow_stats stats = f->stats; >> enum reval_result result; >> struct udpif_key *ukey; >> @@ -2905,9 +2930,14 @@ revalidate(struct revalidator *revalidator) >> } >> if (kill_them_all || (used && used < now - max_idle)) { >> result = UKEY_DELETE; >> + if (kill_them_all) { >> + del_reason = FDR_FLOW_LIMIT; >> + } else { >> + del_reason = FDR_FLOW_IDLE; >> + } > > Maybe take the same approach as below: > > del_reason = kill_them_all ? FDR_FLOW_LIMIT : FDR_FLOW_IDLE; Okay. >> } else { >> result = revalidate_ukey(udpif, ukey, &stats, &odp_actions, >> - reval_seq, &recircs); >> + reval_seq, &recircs, &del_reason); >> } >> ukey->dump_seq = dump_seq; >> >> @@ -2916,6 +2946,8 @@ revalidate(struct revalidator *revalidator) >> udpif_update_flow_pps(udpif, ukey, f); >> } >> >> + OVS_USDT_PROBE(revalidate, flow_result, result, del_reason, udpif, >> + ukey); >> if (result != UKEY_KEEP) { >> /* Takes ownership of 'recircs'. */ >> reval_op_init(&ops[n_ops++], result, udpif, ukey, &recircs, >> @@ -2968,6 +3000,7 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge) >> size_t n_ops = 0; >> >> CMAP_FOR_EACH(ukey, cmap_node, &umap->cmap) { >> + enum flow_del_reason del_reason = FDR_NONE; >> enum ukey_state ukey_state; >> >> /* Handler threads could be holding a ukey lock while it installs a >> @@ -2986,6 +3019,7 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge) >> >> if (purge || ukey_state == UKEY_INCONSISTENT) { >> result = UKEY_DELETE; >> + del_reason = purge ? FDR_PURGE : FDR_UPDATE_FAIL; >> } else if (!seq_mismatch) { >> result = UKEY_KEEP; >> } else { >> @@ -2993,13 +3027,15 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge) >> COVERAGE_INC(revalidate_missed_dp_flow); >> memcpy(&stats, &ukey->stats, sizeof stats); >> result = revalidate_ukey(udpif, ukey, &stats, &odp_actions, >> - reval_seq, &recircs); >> + reval_seq, &recircs, &del_reason); >> } >> if (result != UKEY_KEEP) { >> /* Clears 'recircs' if filled by revalidate_ukey(). */ >> reval_op_init(&ops[n_ops++], result, udpif, ukey, &recircs, >> &odp_actions); >> } >> + OVS_USDT_PROBE(revalidator_sweep__, flow_sweep_result, result, >> + del_reason, udpif, ukey); >> } >> ovs_mutex_unlock(&ukey->mutex); >> >> diff --git a/utilities/automake.mk b/utilities/automake.mk >> index 9a2114df40..146b8c37fb 100644 >> --- a/utilities/automake.mk >> +++ b/utilities/automake.mk >> @@ -23,6 +23,7 @@ scripts_DATA += utilities/ovs-lib >> usdt_SCRIPTS += \ >> utilities/usdt-scripts/bridge_loop.bt \ >> utilities/usdt-scripts/dpif_nl_exec_monitor.py \ >> + utilities/usdt-scripts/flow_reval_monitor.py \ >> utilities/usdt-scripts/kernel_delay.py \ >> utilities/usdt-scripts/kernel_delay.rst \ >> utilities/usdt-scripts/reval_monitor.py \ >> @@ -72,6 +73,7 @@ EXTRA_DIST += \ >> utilities/docker/debian/build-kernel-modules.sh \ >> utilities/usdt-scripts/bridge_loop.bt \ >> utilities/usdt-scripts/dpif_nl_exec_monitor.py \ >> + utilities/usdt-scripts/flow_reval_monitor.py \ >> utilities/usdt-scripts/kernel_delay.py \ >> utilities/usdt-scripts/kernel_delay.rst \ >> utilities/usdt-scripts/reval_monitor.py \ >> @@ -146,6 +148,7 @@ FLAKE8_PYFILES += utilities/ovs-pcap.in \ >> utilities/ovs-tcpdump.in \ >> utilities/ovs-pipegen.py \ >> utilities/usdt-scripts/dpif_nl_exec_monitor.py \ >> + utilities/usdt-scripts/flow_reval_monitor.py \ >> utilities/usdt-scripts/upcall_monitor.py \ >> utilities/usdt-scripts/upcall_cost.py >> >> diff --git a/utilities/usdt-scripts/flow_reval_monitor.py > b/utilities/usdt-scripts/flow_reval_monitor.py >> new file mode 100755 >> index 0000000000..e76e0b5995 >> --- /dev/null >> +++ b/utilities/usdt-scripts/flow_reval_monitor.py >> @@ -0,0 +1,997 @@ >> +#!/usr/bin/env python3 >> +# >> +# Copyright (c) 2022-2024 Redhat, Inc. >> +# >> +# Licensed under the Apache License, Version 2.0 (the "License"); >> +# you may not use this file except in compliance with the License. >> +# You may obtain a copy of the License at: >> +# >> +# http://www.apache.org/licenses/LICENSE-2.0 >> +# >> +# Unless required by applicable law or agreed to in writing, software >> +# distributed under the License is distributed on an "AS IS" BASIS, >> +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >> +# See the License for the specific language governing permissions and >> +# limitations under the License. >> +# >> +# Script information: >> +# ------------------- >> +# flow_reval_monitor.py uses the dpif_netlink_operate:flow_put and >> +# revalidator:flow_result USDT probes to monitor flow lifetimes and >> +# expiration events. By default, this will show all flow_put and flow >> +# expiration events, along with their reasons. This will look like so: >> +# >> +# TIME UFID EVENT/REASON >> +# 101536.226986736 ufid:f76fc899-376d-466b-bc74-0000b933eb97 flow_put >> +# 101536.227196214 ufid:d08472b6-110e-46cb-a9e4-00008f46838e flow_put >> +# 101541.516610178 ufid:fc5cc4a2-39e7-4a2d-bbce-000019665b32 flow_put >> +# 101541.516967303 ufid:fddd6510-26dc-4c87-8f7a-0000fc0c2c3a flow_put >> +# 101551.688050747 ufid:fddd6510-26dc-4c87-8f7a-0000fc0c2c3a flow timed out >> +# 101551.688077175 ufid:fc5cc4a2-39e7-4a2d-bbce-000019665b32 flow timed out >> +# 101557.695391371 ufid:f76fc899-376d-466b-bc74-0000b933eb97 flow timed out >> +# 101557.695408909 ufid:d08472b6-110e-46cb-a9e4-00008f46838e flow timed out > > The output above has changed (the reasons). Here is some new output: > > TID TIME UFID EVENT/REASON > 71828 1549.119959874 39f0f28f-338d-4a77-81b3-0000d6c70b6b Insert (put) > flow to ovs kernel module. > 71828 1549.420877223 850db41c-47ff-42c0-b48f-0000e180d81c Insert (put) > flow to ovs kernel module. > 71828 1550.476923456 5bacfca9-fe5f-43aa-97ac-00005c9dc3e3 Insert (put) > flow to ovs kernel module. > 71832 1559.650192299 850db41c-47ff-42c0-b48f-0000e180d81c Idle flow timed out > 71832 1561.153332825 39f0f28f-338d-4a77-81b3-0000d6c70b6b Idle flow timed out > 71832 1572.684316304 5bacfca9-fe5f-43aa-97ac-00005c9dc3e3 Idle flow timed out > 71828 1577.548886773 5bacfca9-fe5f-43aa-97ac-00005c9dc3e3 Insert (put) > flow to ovs kernel module. > 71832 1587.720846962 5bacfca9-fe5f-43aa-97ac-00005c9dc3e3 Idle flow timed out > > Maybe you can shorten the UDIF to fit it in 79 chars. It's difficult to do that. The UFID is quite large, and since this is a debug tool rather than just status, I'd prefer to keep it this way. >> +# >> +# flow key data can be printed using the --flow-keys option. This will > > Capital F for Flow. Ack. >> +# print the equivalent datapath flow string. >> +# >> +# When filtering flows, the syntax is the same as used by >> +# `ovs-appctl dpctl/add-flow`. >> +# >> +# The following options are available: >> +# >> +# usage: flow_reval_monitor.py [-h] [--buffer-page-count NUMBER] >> +# [-f [128-2048]] [-k] [-l [FLOW_STRING ...]] >> +# [-p VSWITCHD_PID] [-D [DEBUG]] >> +# [-P PAHOLE] >> +# >> +# optional arguments: >> +# -h, --help show this help message and exit >> +# --buffer-page-count NUMBER >> +# Number of BPF ring buffer pages, default 1024 >> +# -f <128..2048>, --flow-key-size=<128..2048> >> +# Set the size of the flow key, default 128 >> +# -k, --flow-keys Print flow keys as flow strings >> +# -l [FLOW_STRING ...], --filter-flows [FLOW_STRING ...] >> +# Filter against flow keys that match the specified >> +# ODP-like flow. This may not include all packet >> +# fields >> +# -p VSWITCHD_PID, --pid VSWITCHD_PID >> +# ovs-vswitchd's PID >> +# -P PAHOLE, --pahole PAHOLE >> +# Pahole executable to use, default pahole >> +# -D [DEBUG], --debug [DEBUG] >> +# Enable eBPF debugging > > The text above is different than from the help text, or was this done on purpose? To be honest, I dislike putting this text here completely. Why keep two copies of help? I'd rather delete this section if that's okay. >> +# Examples: >> +# >> +# To use the script on a running ovs-vswitchd to see flow keys and expiration >> +# events for flows with an ipv4 source of 192.168.10.10: >> +# $ ./flow_reval_monitor.py --flow-keys --filter-flows \ >> +# "ipv4(src=192.168.10.10)" >> +# TIME UFID EVENT/REASON >> +# 105082.457322742 ufid:f76fc899-376d-466b-bc74-0000b933eb97 flow_put >> +# ufid:f76fc899-376d-466b-bc74-0000b933eb97 has the following flow information: >> +# in_port(2), >> +# eth(src=0e:04:47:fc:74:51, dst=da:dc:c5:69:05:d7), \ >> +# eth_type(0x800), \ >> +# ipv4(src=192.168.10.10, dst=192.168.10.30, proto=1, tos=0, ttl=64,[...]), >> +# icmp(type=8, code=0) >> +# 105092.635450202 ufid:f76fc899-376d-466b-bc74-0000b933eb97 Flow timed out >> +# >> +# Notes: >> +# 1) No options are needed to attach when there is a single running instance >> +# of ovs-vswitchd. >> +# 2) If you're using the flow filtering option, it will only track flows that >> +# have been upcalled since the script began running. >> +# 3) When using the flow filtering option, the key size will likely need to >> +# be expanded to match on all the fields in the message. The default is >> +# kept small to keep the buffer copy sizes down when displaying >> +# flows (-k), but is hardcoded to 2048 when an actual filter (-l) is >> +# applied >> +# 4) The flow filtering format is a simplified form of the ODP syntax, and >> +# does not support masked matches, which means you will need to filter >> +# on exact details. The fields present are dependent on how the >> +# classifier and OFP rules form the ODP rules - not all fields may be >> +# present in a particular flow. >> +# 5) The flow_put filtering only happens for flows installed into the ovs >> +# kernel module. This means flows taking the HW offload path (ie: tc), >> +# or on DPDK side won't get matched. >> + >> +try: >> + from bcc import BPF >> + from bcc import USDT >> + from bcc import USDTException >> +except ModuleNotFoundError: >> + print("ERROR: Can't find the BPF Compiler Collection Tools.") >> + print("Please install them before running this script.") >> + exit(1) >> + >> +from enum import IntEnum >> +from ipaddress import IPv4Address, IPv6Address >> +from pathlib import Path >> + >> +import argparse >> +import psutil >> +import re >> +import struct >> +import subprocess >> +import sys >> + >> +# >> +# eBPF source code >> +# >> +bpf_src = """ >> +#include <linux/sched.h> >> + >> +#define MAX_KEY <MAX_KEY_VAL> >> +#define FLOW_FILTER <FILTER_BOOL> >> + >> +enum probe { <EVENT_ENUM> }; >> + >> +<OVS_INCLUDE_DEFINITIONS> >> + >> +struct event_t { >> + u64 ts; >> + u32 pid; >> + u32 result; >> + u32 reason; >> + u32 ufid[4]; >> + u64 key_size; >> + unsigned char key[MAX_KEY]; >> + enum probe probe; >> +}; >> + >> +BPF_HASH(watchlist, ovs_u128); >> +BPF_RINGBUF_OUTPUT(events, <BUFFER_PAGE_COUNT>); >> +BPF_TABLE("percpu_array", uint32_t, uint64_t, dropcnt, 1); >> +BPF_TABLE("percpu_array", uint32_t, struct udpif_key, udpk, 1); >> + >> +static struct event_t *get_event(enum probe p) { >> + struct event_t *event = events.ringbuf_reserve(sizeof(struct event_t)); >> + >> + if (!event) { >> + dropcnt.increment(0); >> + return NULL; >> + } >> + >> + event->probe = p; >> + event->ts = bpf_ktime_get_ns(); >> + event->pid = bpf_get_current_pid_tgid(); >> + >> + return event; >> +} >> + >> +static int emit_flow_result(struct udpif_key *ukey, ovs_u128 ufid, >> + u32 result, u32 reason) { >> + struct event_t *event = NULL; >> + u64 *ufid_present = NULL; >> + >> + ufid_present = watchlist.lookup(&ufid); >> + if (FLOW_FILTER && !ufid_present) { >> + return 0; >> + } >> + >> + event = get_event(FLOW_RESULT); >> + if (!event) { >> + /* If we can't reserve the space in the ring buffer, return 1. */ >> + return 1; >> + } >> + >> + event->result = result; >> + event->reason = reason; >> + bpf_probe_read(&event->ufid, sizeof ufid, &ufid); >> + events.ringbuf_submit(event, 0); >> + >> + return 0; >> +} >> + >> +int usdt__flow_result(struct pt_regs *ctx) { >> + struct udpif_key *ukey = NULL; >> + u32 reason = 0; >> + u32 result = 0; >> + ovs_u128 ufid; >> + u32 zero = 0; >> + >> + ukey = udpk.lookup(&zero); >> + if (!ukey) { >> + return 1; >> + } >> + bpf_usdt_readarg_p(4, ctx, ukey, sizeof(struct udpif_key)); >> + bpf_usdt_readarg(2, ctx, &reason); >> + bpf_usdt_readarg(1, ctx, &result); >> + ufid = ukey->ufid; >> + >> + return emit_flow_result(ukey, ufid, result, reason); >> +} >> + >> +int usdt__flow_sweep_result(struct pt_regs *ctx) { >> + struct udpif_key *ukey = NULL; >> + u32 reason = 0; >> + u32 result = 0; >> + ovs_u128 ufid; >> + u32 zero = 0; >> + >> + ukey = udpk.lookup(&zero); >> + if (!ukey) { >> + return 1; >> + } >> + bpf_usdt_readarg_p(4, ctx, ukey, sizeof(struct udpif_key)); >> + bpf_usdt_readarg(2, ctx, &reason); >> + bpf_usdt_readarg(1, ctx, &result); >> + ufid = ukey->ufid; >> + >> + return emit_flow_result(ukey, ufid, result, reason); >> +} >> + >> +int usdt__op_flow_put(struct pt_regs *ctx) { >> + struct dpif_flow_put put; >> + ovs_u128 ufid; >> + >> + struct event_t *event = get_event(OP_FLOW_PUT); >> + if (!event) { >> + /* If we can't reserve the space in the ring buffer, return 1. */ >> + return 1; >> + } >> + >> + bpf_usdt_readarg_p(2, ctx, &put, sizeof put); >> + bpf_probe_read(&event->ufid, sizeof event->ufid, put.ufid); >> + bpf_probe_read(&ufid, sizeof ufid, &event->ufid); >> + if (put.key_len > MAX_KEY) { >> + put.key_len = MAX_KEY; >> + } >> + event->key_size = put.key_len; >> + bpf_probe_read(&event->key, put.key_len, put.key); >> + event->reason = 0; >> + events.ringbuf_submit(event, 0); >> + >> + watchlist.increment(ufid); >> + return 0; >> +} >> +""" >> + >> +Event = IntEnum("Event", ["OP_FLOW_PUT", "FLOW_RESULT"], start=0) >> +RevalResult = IntEnum( >> + "reval_result", >> + [ >> + "UKEY_KEEP", >> + "UKEY_DELETE", >> + "UKEY_MODIFY", >> + ], >> + start=0, >> +) >> +FdrReasons = IntEnum( >> + "flow_del_reason", >> + [ >> + "FDR_NONE", >> + "FDR_AVOID_CACHING", >> + "FDR_BAD_ODP_FIT", >> + "FDR_FLOW_IDLE", >> + "FDR_FLOW_LIMIT", >> + "FDR_FLOW_WILDCARDED", >> + "FDR_NO_OFPROTO", >> + "FDR_PURGE", >> + "FDR_TOO_EXPENSIVE", >> + "FDR_UPDATE_FAIL", >> + "FDR_XLATION_ERROR", >> + ], >> + start=0, >> +) >> + >> +FdrReasonStrings = [ >> + "No deletion reason", >> + "Cache avoidance flag set", >> + "Bad ODP flow fit", >> + "Idle flow timed out", >> + "Kill all flows condition detected", >> + "Mask too wide - need narrower match", >> + "No matching ofproto rules", >> + "Too expensive to revalidate", >> + "Purged with user action", >> + "Flow state inconsistent after updates", >> + "Flow translation error", >> +] >> + >> + >> +# >> +# run_program() >> +# >> +def run_program(command): >> + try: >> + process = subprocess.run( >> + command, >> + stdout=subprocess.PIPE, >> + stderr=subprocess.STDOUT, >> + encoding="utf8", >> + check=True, > > I noticed your adding , to all final arguments in function parameter > lists, and string lists. Is this some formatter cleaning this up, or a > new style? This is actually how PEP8 wants it, and ``black`` will auto format this way. So I run it. We need both black and flake8 to make sure we catch all the formatting stuff, but that's what latest python development procedures is doing (from what I can tell). > I does this is split over multiple lines, even for function parameters, which looks odd to me. Agreed, but see above. >> + ) >> + >> + except subprocess.CalledProcessError as perror: >> + return perror.returncode, perror.stdout >> + >> + return 0, process.stdout >> + >> + >> +# >> +# get_ovs_definitions() >> +# >> +def get_ovs_definitions(objects, pahole="pahole", pid=None): >> + if pid is None: >> + raise ValueError("A valid pid value should be supplied!") >> + >> + if not isinstance(objects, list): >> + objects = [objects] >> + >> + if len(objects) == 0: >> + raise ValueError("Must supply at least one object!") >> + >> + vswitchd = Path(f"/proc/{pid}/exe").resolve() >> + >> + object_str = ",".join(objects) >> + >> + def run_pahole(debug_file): >> + error, result = run_program( >> + [pahole, "-C", object_str, "--compile", debug_file] >> + ) >> + >> + if error: >> + if f"pahole: {debug_file}: Invalid argument" not in result: >> + print( >> + "ERROR: Pahole failed to get ovs-vswitchd data " >> + "structures!\n{}".format( >> + re.sub( >> + "^", " " * 7, result.rstrip(), flags=re.MULTILINE >> + ) >> + ) >> + ) >> + sys.exit(-1) >> + >> + return None >> + >> + if bool(re.search("pahole: type .* not found", result)): >> + return None >> + >> + return result >> + >> + def run_readelf(bin_file): >> + error, result = run_program( >> + ["readelf", "-n", "--debug-dump=links", bin_file] >> + ) >> + >> + if error: >> + print( >> + "ERROR: Failed 'readelf' on \"{}\"!\n{}".format( >> + bin_file, re.sub("^", " " * 7, result, flags=re.MULTILINE) >> + ) >> + ) >> + sys.exit(-1) >> + >> + return result >> + >> + def get_debug_file(bin_file): >> + elf_result = run_readelf(bin_file) >> + match = re.search("Build ID: ([0-9a-fA-F]+)", elf_result) >> + if not match: >> + print("ERROR: Can't find build ID to read debug symbols!") >> + sys.exit(-1) >> + >> + dbg_file = "/usr/lib/debug/.build-id/{}/{}.debug".format( >> + match.group(1)[:2], match.group(1)[2:] >> + ) >> + >> + return dbg_file >> + >> + def get_from_shared_library(debug_file): >> + ovs_libs = [ >> + "libofproto", >> + "libopenvswitch", >> + "libovsdb", >> + "libsflow", >> + "libvtep", >> + ] >> + error, ldd_result = run_program(["ldd", debug_file]) >> + >> + if error: >> + print( >> + "ERROR: Failed 'ldd' on \"{}\"!\n{}".format( >> + debug_file, >> + re.sub("^", " " * 7, ldd_result, flags=re.MULTILINE), >> + ) >> + ) >> + sys.exit(-1) >> + >> + for lib in ovs_libs: >> + match = re.search( >> + r"^\s*{}.* => (.*) \(.*\)$".format(lib), >> + ldd_result, >> + flags=re.MULTILINE, >> + ) >> + if match is None: >> + continue >> + >> + result = run_pahole(match.group(1)) >> + if result is None: >> + result = run_pahole(get_debug_file(match.group(1))) >> + >> + if result: >> + return result >> + >> + return None >> + >> + # >> + # First try to find the debug data as part of the executable. >> + # >> + result = run_pahole(vswitchd) >> + >> + if result is None: >> + print(f'INFO: Failed to find debug info in "{vswitchd}"!') >> + >> + # >> + # Get additional .debug information if available. >> + # >> + dbg_file = get_debug_file(vswitchd) >> + result = run_pahole(dbg_file) >> + if result is None: >> + print(f'INFO: Failed to find debug info in "{dbg_file}"!') >> + >> + # >> + # Try to get information from shared libraries if used. >> + # >> + result = get_from_shared_library(vswitchd) >> + >> + if result is None: >> + print(f"ERROR: Failed to find needed data structures through {pahole}") >> + sys.exit(-1) >> + >> + # >> + # We need an empty _Atomic definition to avoid compiler complaints. >> + # >> + result = "#define _Atomic\n" + result >> + >> + # >> + # Remove the uint64_t definition as it conflicts with the kernel one. >> + # >> + result = re.sub("^typedef.*uint64_t;$", "", result, flags=re.MULTILINE) >> + >> + return result >> + >> + >> +# >> +# buffer_size_type() >> +# >> +def buffer_size_type(astr, min=64, max=2048): >> + value = int(astr) >> + if min <= value <= max: >> + return value >> + else: >> + raise argparse.ArgumentTypeError( >> + "value not in range {}-{}".format(min, max) >> + ) >> + >> + >> +# >> +# format_ufid() >> +# >> +def format_ufid(ufid): >> + if ufid is None: >> + return "ufid:none" >> + >> + return "{:08x}-{:04x}-{:04x}-{:04x}-{:04x}{:08x}".format( >> + ufid[0], >> + ufid[1] >> 16, >> + ufid[1] & 0xFFFF, >> + ufid[2] >> 16, >> + ufid[2] & 0, >> + ufid[3], >> + ) >> + >> + >> +# >> +# find_and_delete_from_watchlist() >> +# >> +def find_and_delete_from_watchlist(event): >> + for k, _ in b["watchlist"].items(): >> + key_ufid = struct.unpack("=IIII", k) >> + if key_ufid == tuple(event.ufid): >> + key = (b["watchlist"].Key * 1)(k) >> + b["watchlist"].items_delete_batch(key) >> + break >> + >> + >> +# >> +# handle_flow_put() >> +# >> +def handle_flow_put(event): >> + if args.flow_keys or args.filter_flows is not None: >> + key = decode_key(bytes(event.key)[: event.key_size]) >> + flow_dict, flow_str = parse_flow_dict(key) >> + # For each attribute that we're watching. >> + if args.filter_flows is not None: >> + if not compare_flow_to_target(args.filter_flows, flow_dict): >> + find_and_delete_from_watchlist(event) >> + return >> + >> + print( >> + "{:<10} {:<18.9f} {:<36} {}".format( >> + event.pid, >> + event.ts / 1000000000, >> + format_ufid(event.ufid), >> + "Insert (put) flow to ovs kernel module.", >> + ) >> + ) >> + >> + if args.flow_keys and len(flow_str): >> + flow_str_fields = flow_str.split("), ") >> + flow_str = " " >> + curlen = 4 >> + for field in flow_str_fields: >> + if curlen + len(field) > 79: >> + flow_str += "\n " >> + curlen = 4 >> + if field[-1] != ")": >> + field += ")" >> + flow_str += field + ", " >> + curlen += len(field) + 2 >> + >> + print(" - It holds the following key information:") >> + print(flow_str) >> + >> + >> +# >> +# compare_flow_to_target() >> +# >> +def compare_flow_to_target(target, flow): >> + for key in target: >> + if key not in flow: >> + return False >> + elif target[key] is True: >> + continue >> + elif target[key] == flow[key]: >> + continue >> + elif isinstance(target[key], dict) and isinstance(flow[key], dict): >> + return compare_flow_to_target(target[key], flow[key]) >> + else: >> + return False >> + return True >> + >> + >> +# >> +# parse_flow_str() >> +# >> +def parse_flow_str(flow_str): >> + f_list = [i.strip(", ") for i in flow_str.split(")")] >> + if f_list[-1] == "": >> + f_list = f_list[:-1] >> + flow_dict = {} >> + for e in f_list: >> + split_list = e.split("(") >> + k = split_list[0] >> + if len(split_list) == 1: >> + flow_dict[k] = True >> + elif split_list[1].count("=") == 0: >> + flow_dict[k] = split_list[1] >> + else: >> + sub_dict = {} >> + sublist = [i.strip() for i in split_list[1].split(",")] >> + for subkey in sublist: >> + brk = subkey.find("=") >> + sub_dict[subkey[:brk]] = subkey[brk + 1 :] >> + flow_dict[k] = sub_dict >> + return flow_dict >> + >> + >> +# >> +# print_expiration() >> +# >> +def print_expiration(event): >> + ufid_str = format_ufid(event.ufid) >> + >> + if event.reason > len(FdrReasons): >> + reason = f"Unknown reason '{event.reason}'" >> + else: >> + reason = FdrReasonStrings[event.reason] >> + >> + print( >> + "{:<10} {:<18.9f} {:<36} {:<17}".format( >> + event.pid, >> + event.ts / 1000000000, >> + ufid_str, >> + reason, >> + ) >> + ) >> + >> + >> +# >> +# decode_key() >> +# >> +def decode_key(msg): >> + bytes_left = len(msg) >> + result = {} >> + while bytes_left: >> + if bytes_left < 4: >> + break >> + nla_len, nla_type = struct.unpack("=HH", msg[:4]) >> + if nla_len < 4: >> + break >> + nla_data = msg[4:nla_len] >> + if nla_len > bytes_left: >> + nla_data = nla_data[: (bytes_left - 4)] >> + break >> + else: >> + result[get_ovs_key_attr_str(nla_type)] = nla_data >> + next_offset = (nla_len + 3) & (~3) >> + msg = msg[next_offset:] >> + bytes_left -= next_offset >> + if bytes_left: >> + print(f"INFO: Buffer truncated with {bytes_left} bytes left.") >> + return result >> + >> + >> +# >> +# get_ovs_key_attr_str() >> +# >> +def get_ovs_key_attr_str(attr): >> + ovs_key_attr = [ >> + "OVS_KEY_ATTR_UNSPEC", >> + "encap", >> + "skb_priority", >> + "in_port", >> + "eth", >> + "vlan", >> + "eth_type", >> + "ipv4", >> + "ipv6", >> + "tcp", >> + "udp", >> + "icmp", >> + "icmpv6", >> + "arp", >> + "nd", >> + "skb_mark", >> + "tunnel", >> + "sctp", >> + "tcp_flags", >> + "dp_hash", >> + "recirc_id", >> + "mpls", >> + "ct_state", >> + "ct_zone", >> + "ct_mark", >> + "ct_label", >> + "ct_tuple4", >> + "ct_tuple6", >> + "nsh", >> + ] >> + >> + if attr < 0 or attr > len(ovs_key_attr): >> + return "<UNKNOWN>: {}".format(attr) >> + return ovs_key_attr[attr] >> + >> + >> +# >> +# parse_flow_dict() >> +# >> +def parse_flow_dict(key_dict, decode=True): >> + ret_str = "" >> + parseable = {} >> + skip = ["nsh", "tunnel", "mpls", "vlan"] >> + need_byte_swap = ["ct_label"] >> + ipv4addrs = ["ct_tuple4", "tunnel", "ipv4", "arp"] >> + ipv6addrs = ["ipv6", "nd", "ct_tuple6"] >> + macs = {"eth": [0, 1], "arp": [3, 4], "nd": [1, 2]} >> + fields = [ >> + ("OVS_KEY_ATTR_UNSPEC"), >> + ("encap",), >> + ("skb_priority", "<I"), >> + ("in_port", "<I"), >> + ("eth", "!6s6s", "src", "dst"), >> + ("vlan",), >> + ("eth_type", "!H"), >> + ("ipv4", "!4s4s4B", "src", "dst", "proto", "tos", "ttl", "frag"), >> + ( >> + "ipv6", >> + "!16s16s4s4B", >> + "src", >> + "dst", >> + "label", >> + "proto", >> + "tclass", >> + "hlimit", >> + "frag", >> + ), >> + ("tcp", "!2H", "src", "dst"), >> + ("udp", "!2H", "src", "dst"), >> + ("icmp", "!2B", "type", "code"), >> + ("icmpv6", "!2B", "type", "code"), >> + ("arp", "!4s4sH6s6s", "sip", "tip", "op", "sha", "tha"), >> + ("nd", "!16s6s6s", "target", "sll", "tll"), >> + ("skb_mark", "<I"), >> + ("tunnel",), >> + ("sctp", "!2H", "src", "dst"), >> + ("tcp_flags", "!H"), >> + ("dp_hash", "<I"), >> + ("recirc_id", "<I"), >> + ("mpls",), >> + ("ct_state", "<I"), >> + ("ct_zone", "<H"), >> + ("ct_mark", "<I"), >> + ("ct_label", "!16s"), >> + ("ct_tuple4", "!4s4s2HB", "src", "dst", "tp_src", "tp_dst", "proto"), >> + ("ct_tuple6", "!16s16sB2H", "src", "dst", "proto", "tp_src", "tp_dst"), >> + ("nsh",), >> + ] >> + for k, v in key_dict.items(): >> + s = "" >> + if k in skip: >> + continue >> + if decode and int.from_bytes(v, "big") == 0: >> + parseable[k] = "0" >> + continue >> + if decode and k in need_byte_swap: >> + v = int.from_bytes(v, "little").to_bytes(len(v), "big") >> + attr = -1 >> + found = False >> + for f in fields: >> + if k == f[0]: >> + attr = fields.index(f) >> + found = True >> + break >> + if not found: >> + raise KeyError("Invalid flow field '%s'" % k) >> + if decode and len(fields[attr]) > 1: >> + data = list( >> + struct.unpack( >> + fields[attr][1], v[: struct.calcsize(fields[attr][1])] >> + ) >> + ) >> + if k in ipv4addrs: >> + if data[0].count(0) < 4: >> + data[0] = str(IPv4Address(data[0])) >> + else: >> + data[0] = b"\x00" >> + if data[1].count(0) < 4: >> + data[1] = str(IPv4Address(data[1])) >> + else: >> + data[1] = b"\x00" >> + if k in ipv6addrs: >> + if data[0].count(0) < 16: >> + data[0] = str(IPv6Address(data[0])) >> + else: >> + data[0] = b"\x00" >> + if data[1].count(0) < len(data[1]): >> + data[1] = str(IPv6Address(data[1])) >> + else: >> + data[1] = b"\x00" >> + if k in macs.keys(): >> + for e in macs[k]: >> + if data[e].count(0) == 6: >> + mac_str = b"\x00" >> + else: >> + mac_str = ":".join(["%02x" % i for i in data[e]]) >> + data[e] = mac_str >> + if decode and len(fields[attr]) > 2: >> + field_dict = dict(zip(fields[attr][2:], data)) >> + s = ", ".join(k + "=" + str(v) for k, v in field_dict.items()) >> + elif decode and k != "eth_type": >> + s = str(data[0]) >> + field_dict = s >> + else: >> + if decode: >> + s = hex(data[0]) >> + field_dict = s >> + ret_str += k + "(" + s + "), " >> + parseable[k] = field_dict >> + ret_str = ret_str[:-2] >> + return (parseable, ret_str) >> + >> + >> +# >> +# handle_event() >> +# >> +def handle_event(ctx, data, size): >> + # Once we grab the event, we have three cases. >> + # 1. It's a revalidator probe and the reason is nonzero: A flow is expiring >> + # 2. It's a revalidator probe and the reason is zero: flow revalidated >> + # 3. It's a flow_put probe. >> + # >> + # We will ignore case 2, and report all others. >> + # >> + event = b["events"].event(data) >> + if event.probe == Event.OP_FLOW_PUT: >> + handle_flow_put(event) >> + elif ( >> + event.probe == Event.FLOW_RESULT >> + and event.result == RevalResult.UKEY_DELETE >> + ): >> + print_expiration(event) >> + >> + >> +def main(): >> + # >> + # Don't like these globals, but ctx passing does not work with the existing >> + # open_ring_buffer() API :( >> + # >> + global b >> + global args >> + >> + # >> + # Argument parsing >> + # >> + parser = argparse.ArgumentParser() >> + parser.add_argument( >> + "--buffer-page-count", >> + help="Number of BPF ring buffer pages, default 1024", >> + type=int, >> + default=1024, >> + metavar="NUMBER", >> + ) >> + parser.add_argument( >> + "-f", >> + "--flow-key-size", >> + help="Set maximum flow key size to capture, " >> + "default 128 - see notes", >> + type=buffer_size_type, >> + default=128, >> + metavar="[128-2048]", >> + ) >> + parser.add_argument( >> + "-k", >> + "--flow-keys", >> + help="Print flow keys as flow strings", >> + action="store_true", >> + ) >> + parser.add_argument( >> + "-l", >> + "--filter-flows", >> + metavar="FLOW_STRING", >> + help="Filter flows that match the specified " "ODP-like flow", >> + type=str, >> + default=None, >> + nargs="*", >> + ) >> + parser.add_argument( >> + "-P", >> + "--pahole", >> + metavar="PAHOLE", >> + help="Pahole executable to use, default pahole", >> + type=str, >> + default="pahole", >> + ) >> + parser.add_argument( >> + "-p", >> + "--pid", >> + metavar="VSWITCHD_PID", >> + help="ovs-vswitchd's PID", >> + type=int, >> + default=None, >> + ) >> + parser.add_argument( >> + "-D", >> + "--debug", >> + help="Enable eBPF debugging", >> + type=int, >> + const=0x3F, >> + default=0, >> + nargs="?", >> + ) >> + args = parser.parse_args() >> + >> + # >> + # Find the PID of the ovs-vswitchd daemon if not specified. >> + # >> + if args.pid is None: >> + for proc in psutil.process_iter(): >> + if "ovs-vswitchd" in proc.name(): >> + if args.pid is not None: >> + print( >> + "Error: Multiple ovs-vswitchd daemons running, " >> + "use the -p option!" >> + ) >> + sys.exit(-1) >> + >> + args.pid = proc.pid >> + # >> + # Error checking on input parameters >> + # >> + if args.pid is None: >> + print("ERROR: Failed to find ovs-vswitchd's PID!") >> + sys.exit(-1) >> + >> + # >> + # Attach the USDT probes >> + # >> + u = USDT(pid=int(args.pid)) >> + try: >> + u.enable_probe(probe="op_flow_put", fn_name="usdt__op_flow_put") >> + except USDTException as e: >> + print("Error attaching the dpif_netlink_operate__:op_flow_put probe.") >> + print(str(e)) >> + sys.exit(-1) >> + >> + try: >> + u.enable_probe(probe="flow_result", fn_name="usdt__flow_result") >> + u.enable_probe( >> + probe="flow_sweep_result", fn_name="usdt__flow_sweep_result" >> + ) >> + except USDTException as e: >> + print("Error attaching the revalidate:flow_result probe.") > > We should either use two try/except cases, or update the error test to > “...revalidate|revalidator_sweep__:flow_result probe.” I will test and see if we can pull the probe name from the USDTException and then print that instead. >> + print(str(e)) >> + sys.exit(-1) >> + >> + # >> + # Attach the probes to the running process >> + # >> + source = bpf_src.replace( >> + "<BUFFER_PAGE_COUNT>", str(args.buffer_page_count) >> + ) >> + >> + source = source.replace( >> + "<OVS_INCLUDE_DEFINITIONS>", >> + get_ovs_definitions( >> + ["udpif_key", "ovs_u128", "dpif_flow_put"], >> + pid=args.pid, >> + pahole=args.pahole, >> + ), >> + ) >> + >> + if args.filter_flows is None: >> + filter_bool = 0 >> + >> + # Set the key size based on what the user wanted >> + source = source.replace("<MAX_KEY_VAL>", str(args.flow_key_size)) >> + else: >> + filter_bool = 1 >> + args.filter_flows = parse_flow_str(args.filter_flows[0]) >> + >> + # Run through the parser to make sure we only filter on fields we >> + # understand >> + parse_flow_dict(args.filter_flows, False) >> + >> + # This is hardcoded here because it doesn't make sense to shrink the >> + # size, since the flow key might be missing fields that are matched in >> + # the flow filter. >> + source = source.replace("<MAX_KEY_VAL>", "2048") >> + >> + source = source.replace("<FILTER_BOOL>", str(filter_bool)) >> + >> + source = source.replace( >> + "<EVENT_ENUM>", >> + "\n".join([f"{event.name} = {event.value}," for event in Event]), >> + ) >> + >> + b = BPF(text=source, usdt_contexts=[u], debug=args.debug) >> + >> + # >> + # Print header >> + # >> + print( >> + "{:<10} {:<18} {:<36} {:<17}".format( >> + "TID", "TIME", "UFID", "EVENT/REASON" >> + ) >> + ) >> + >> + # >> + # Dump out all events. >> + # >> + b["events"].open_ring_buffer(handle_event) >> + while 1: >> + try: >> + b.ring_buffer_poll() >> + except KeyboardInterrupt: >> + break >> + >> + dropcnt = b.get_table("dropcnt") >> + for k in dropcnt.keys(): >> + count = dropcnt.sum(k).value >> + if k.value == 0 and count > 0: >> + print( >> + "\n# WARNING: Not all flow operations were captured, {} were" >> + " dropped!\n# Increase the BPF ring buffer size " >> + "with the --buffer-page-count option.".format(count) >> + ) >> + >> + >> +# >> +# Start main() as the default entry point >> +# >> +if __name__ == "__main__": >> + main() >> -- >> 2.41.0
On 4 Mar 2024, at 16:46, Aaron Conole wrote: > Eelco Chaudron <echaudro@redhat.com> writes: > >> On 20 Feb 2024, at 22:47, Aaron Conole wrote: >> >>> From: Kevin Sprague <ksprague0711@gmail.com> >>> >>> During normal operations, it is useful to understand when a particular flow >>> gets removed from the system. This can be useful when debugging performance >>> issues tied to ofproto flow changes, trying to determine deployed traffic >>> patterns, or while debugging dynamic systems where ports come and go. >>> >>> Prior to this change, there was a lack of visibility around flow expiration. >>> The existing debugging infrastructure could tell us when a flow was added to >>> the datapath, but not when it was removed or why. >>> >>> This change introduces a USDT probe at the point where the revalidator >>> determines that the flow should be removed. Additionally, we track the >>> reason for the flow eviction and provide that information as well. With >>> this change, we can track the complete flow lifecycle for the netlink >>> datapath by hooking the upcall tracepoint in kernel, the flow put USDT, and >>> the revalidator USDT, letting us watch as flows are added and removed from >>> the kernel datapath. >>> >>> This change only enables this information via USDT probe, so it won't be >>> possible to access this information any other way (see: >>> Documentation/topics/usdt-probes.rst). >>> >>> Also included is a script (utilities/usdt-scripts/flow_reval_monitor.py) >>> which serves as a demonstration of how the new USDT probe might be used >>> going forward. >>> >>> Acked-by: Han Zhou <hzhou@ovn.org> >>> Signed-off-by: Kevin Sprague <ksprague0711@gmail.com> >>> Co-authored-by: Aaron Conole <aconole@redhat.com> >>> Signed-off-by: Aaron Conole <aconole@redhat.com> >> >> Thanks for doing the v9, some small comments remain below. >> >> Cheers, >> >> Eelco >> >>> --- >>> v8 -> v9: Reorganized the flow delete reasons enum >>> Updated flow_reval_monitor to use pahole to extract fields >>> Added the purge reason with a proper USDT point >>> Updated documentation >>> Dropped all the outstanding ACKs >>> >>> Documentation/topics/usdt-probes.rst | 43 + >>> ofproto/ofproto-dpif-upcall.c | 48 +- >>> utilities/automake.mk | 3 + >>> utilities/usdt-scripts/flow_reval_monitor.py | 997 +++++++++++++++++++ >>> 4 files changed, 1085 insertions(+), 6 deletions(-) >>> create mode 100755 utilities/usdt-scripts/flow_reval_monitor.py >>> >>> diff --git a/Documentation/topics/usdt-probes.rst b/Documentation/topics/usdt-probes.rst >>> index e527f43bab..015614a6b8 100644 >>> --- a/Documentation/topics/usdt-probes.rst >>> +++ b/Documentation/topics/usdt-probes.rst >>> @@ -214,8 +214,10 @@ Available probes in ``ovs_vswitchd``: >>> - dpif_recv:recv_upcall >>> - main:poll_block >>> - main:run_start >>> +- revalidate:flow_result >>> - revalidate_ukey\_\_:entry >>> - revalidate_ukey\_\_:exit >>> +- revalidator_sweep\_\_:flow_result >>> - udpif_revalidator:start_dump >>> - udpif_revalidator:sweep_done >>> >>> @@ -443,6 +445,47 @@ sweep phase was completed. >>> - ``utilities/usdt-scripts/reval_monitor.py`` >>> >>> >>> +probe revalidate:flow_result >>> +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ >>> + >>> +**Description**: >>> +This probe is triggered when the revalidator has executed on a particular >>> +flow key to make a determination whether to evict a flow, and the cause >>> +for eviction. The revalidator runs periodically, and this probe will only >>> +be triggered when a flow is flagged for revalidation. >>> + >>> +**Arguments**: >>> + >>> +- *arg0*: ``(enum reval_result) result`` >>> +- *arg1*: ``(enum flow_del_reason) reason`` >> >> nit: variable name changed, so should be del_reason. > > Good catch, I'll update. > >>> +- *arg2*: ``(struct udpif *) udpif`` >>> +- *arg3*: ``(struct udpif_key *) ukey`` >>> + >> >> I think you missed my previous comment on re-ordering the arguments to >> be more inline with existing probes, i.e.: >> >> + OVS_USDT_PROBE(revalidator_sweep__, flow_result, udpif, ukey, >> + result, del_reason); > > Guess so. I'll fix it. > >>> +**Script references**: >>> + >>> +- ``utilities/usdt-scripts/flow_reval_monitor.py`` >>> + >>> + >>> +probe revalidator_sweep\_\_:flow_result >>> +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ >>> + >>> +**Description**: >>> +This probe is placed in the path of the revalidator sweep, and is executed >>> +under the condition that a flow entry is in an unexpected state, or the >>> +flows were asked to be purged due to a user action. >>> + >>> +**Arguments**: >>> + >>> +- *arg0*: ``(enum reval_result) result`` >>> +- *arg1*: ``(enum flow_del_reason) reason`` >> >> nit: variable name changed, so should be del_reason. > > Okay. > >>> +- *arg2*: ``(struct udpif *) udpif`` >>> +- *arg3*: ``(struct udpif_key *) ukey`` >> >> See comments above on argument ordering. >> >>> + >>> +**Script references**: >>> + >>> +- ``utilities/usdt-scripts/flow_reval_monitor.py`` >>> + >>> + >>> Adding your own probes >>> ---------------------- >>> >>> diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c >>> index b5cbeed878..fbc7858690 100644 >>> --- a/ofproto/ofproto-dpif-upcall.c >>> +++ b/ofproto/ofproto-dpif-upcall.c >>> @@ -269,6 +269,20 @@ enum ukey_state { >>> }; >>> #define N_UKEY_STATES (UKEY_DELETED + 1) >>> >>> +enum flow_del_reason { >>> + FDR_NONE = 0, /* No deletion reason for the flow. */ >>> + FDR_AVOID_CACHING, /* Flow deleted to avoid caching. */ >>> + FDR_BAD_ODP_FIT, /* The flow had a bad ODP flow fit. */ >>> + FDR_FLOW_IDLE, /* The flow went unused and was deleted. */ >>> + FDR_FLOW_LIMIT, /* All flows being killed. */ >>> + FDR_FLOW_WILDCARDED, /* The flow needed a narrower wildcard mask. */ >>> + FDR_NO_OFPROTO, /* The flow didn't have an associated ofproto. */ >>> + FDR_PURGE, /* User action caused flows to be killed. */ >>> + FDR_TOO_EXPENSIVE, /* The flow was too expensive to revalidate. */ >>> + FDR_UPDATE_FAIL, /* Flow state transition was unexpected. */ >>> + FDR_XLATION_ERROR, /* There was an error translating the flow. */ >>> +}; >>> + >>> /* 'udpif_key's are responsible for tracking the little bit of state udpif >>> * needs to do flow expiration which can't be pulled directly from the >>> * datapath. They may be created by any handler or revalidator thread at any >>> @@ -2272,7 +2286,8 @@ populate_xcache(struct udpif *udpif, struct udpif_key *ukey, >>> static enum reval_result >>> revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, >>> uint16_t tcp_flags, struct ofpbuf *odp_actions, >>> - struct recirc_refs *recircs, struct xlate_cache *xcache) >>> + struct recirc_refs *recircs, struct xlate_cache *xcache, >>> + enum flow_del_reason *del_reason) >>> { >>> struct xlate_out *xoutp; >>> struct netflow *netflow; >>> @@ -2293,11 +2308,13 @@ revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, >>> netflow = NULL; >>> >>> if (xlate_ukey(udpif, ukey, tcp_flags, &ctx)) { >>> + *del_reason = FDR_XLATION_ERROR; >>> goto exit; >>> } >>> xoutp = &ctx.xout; >>> >>> if (xoutp->avoid_caching) { >>> + *del_reason = FDR_AVOID_CACHING; >>> goto exit; >>> } >>> >>> @@ -2311,6 +2328,7 @@ revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, >>> ofpbuf_clear(odp_actions); >>> >>> if (!ofproto) { >>> + *del_reason = FDR_NO_OFPROTO; >>> goto exit; >>> } >>> >>> @@ -2322,6 +2340,7 @@ revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, >>> if (odp_flow_key_to_mask(ukey->mask, ukey->mask_len, &dp_mask, &ctx.flow, >>> NULL) >>> == ODP_FIT_ERROR) { >>> + *del_reason = FDR_BAD_ODP_FIT; >>> goto exit; >>> } >>> >>> @@ -2331,6 +2350,7 @@ revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, >>> * down. Note that we do not know if the datapath has ignored any of the >>> * wildcarded bits, so we may be overly conservative here. */ >>> if (flow_wildcards_has_extra(&dp_mask, ctx.wc)) { >>> + *del_reason = FDR_FLOW_WILDCARDED; >>> goto exit; >>> } >>> >>> @@ -2400,7 +2420,7 @@ static enum reval_result >>> revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey, >>> const struct dpif_flow_stats *stats, >>> struct ofpbuf *odp_actions, uint64_t reval_seq, >>> - struct recirc_refs *recircs) >>> + struct recirc_refs *recircs, enum flow_del_reason *del_reason) >>> OVS_REQUIRES(ukey->mutex) >>> { >>> bool need_revalidate = ukey->reval_seq != reval_seq; >>> @@ -2430,8 +2450,12 @@ revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey, >>> xlate_cache_clear(ukey->xcache); >>> } >>> result = revalidate_ukey__(udpif, ukey, push.tcp_flags, >>> - odp_actions, recircs, ukey->xcache); >>> - } /* else delete; too expensive to revalidate */ >>> + odp_actions, recircs, ukey->xcache, >>> + del_reason); >>> + } else { >>> + /* delete; too expensive to revalidate */ >> >> nit: Maybe add a trailing dot? > > There wasn't one in the previous instance comment, so I preserved the > comment as-is. Since I'm respinning I'll add it. > >>> + *del_reason = FDR_TOO_EXPENSIVE; >>> + } >>> } else if (!push.n_packets || ukey->xcache >>> || !populate_xcache(udpif, ukey, push.tcp_flags)) { >>> result = UKEY_KEEP; >>> @@ -2831,6 +2855,7 @@ revalidate(struct revalidator *revalidator) >>> for (f = flows; f < &flows[n_dumped]; f++) { >>> long long int used = f->stats.used; >>> struct recirc_refs recircs = RECIRC_REFS_EMPTY_INITIALIZER; >>> + enum flow_del_reason del_reason = FDR_NONE; >>> struct dpif_flow_stats stats = f->stats; >>> enum reval_result result; >>> struct udpif_key *ukey; >>> @@ -2905,9 +2930,14 @@ revalidate(struct revalidator *revalidator) >>> } >>> if (kill_them_all || (used && used < now - max_idle)) { >>> result = UKEY_DELETE; >>> + if (kill_them_all) { >>> + del_reason = FDR_FLOW_LIMIT; >>> + } else { >>> + del_reason = FDR_FLOW_IDLE; >>> + } >> >> Maybe take the same approach as below: >> >> del_reason = kill_them_all ? FDR_FLOW_LIMIT : FDR_FLOW_IDLE; > > Okay. > >>> } else { >>> result = revalidate_ukey(udpif, ukey, &stats, &odp_actions, >>> - reval_seq, &recircs); >>> + reval_seq, &recircs, &del_reason); >>> } >>> ukey->dump_seq = dump_seq; >>> >>> @@ -2916,6 +2946,8 @@ revalidate(struct revalidator *revalidator) >>> udpif_update_flow_pps(udpif, ukey, f); >>> } >>> >>> + OVS_USDT_PROBE(revalidate, flow_result, result, del_reason, udpif, >>> + ukey); >>> if (result != UKEY_KEEP) { >>> /* Takes ownership of 'recircs'. */ >>> reval_op_init(&ops[n_ops++], result, udpif, ukey, &recircs, >>> @@ -2968,6 +3000,7 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge) >>> size_t n_ops = 0; >>> >>> CMAP_FOR_EACH(ukey, cmap_node, &umap->cmap) { >>> + enum flow_del_reason del_reason = FDR_NONE; >>> enum ukey_state ukey_state; >>> >>> /* Handler threads could be holding a ukey lock while it installs a >>> @@ -2986,6 +3019,7 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge) >>> >>> if (purge || ukey_state == UKEY_INCONSISTENT) { >>> result = UKEY_DELETE; >>> + del_reason = purge ? FDR_PURGE : FDR_UPDATE_FAIL; >>> } else if (!seq_mismatch) { >>> result = UKEY_KEEP; >>> } else { >>> @@ -2993,13 +3027,15 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge) >>> COVERAGE_INC(revalidate_missed_dp_flow); >>> memcpy(&stats, &ukey->stats, sizeof stats); >>> result = revalidate_ukey(udpif, ukey, &stats, &odp_actions, >>> - reval_seq, &recircs); >>> + reval_seq, &recircs, &del_reason); >>> } >>> if (result != UKEY_KEEP) { >>> /* Clears 'recircs' if filled by revalidate_ukey(). */ >>> reval_op_init(&ops[n_ops++], result, udpif, ukey, &recircs, >>> &odp_actions); >>> } >>> + OVS_USDT_PROBE(revalidator_sweep__, flow_sweep_result, result, >>> + del_reason, udpif, ukey); >>> } >>> ovs_mutex_unlock(&ukey->mutex); >>> >>> diff --git a/utilities/automake.mk b/utilities/automake.mk >>> index 9a2114df40..146b8c37fb 100644 >>> --- a/utilities/automake.mk >>> +++ b/utilities/automake.mk >>> @@ -23,6 +23,7 @@ scripts_DATA += utilities/ovs-lib >>> usdt_SCRIPTS += \ >>> utilities/usdt-scripts/bridge_loop.bt \ >>> utilities/usdt-scripts/dpif_nl_exec_monitor.py \ >>> + utilities/usdt-scripts/flow_reval_monitor.py \ >>> utilities/usdt-scripts/kernel_delay.py \ >>> utilities/usdt-scripts/kernel_delay.rst \ >>> utilities/usdt-scripts/reval_monitor.py \ >>> @@ -72,6 +73,7 @@ EXTRA_DIST += \ >>> utilities/docker/debian/build-kernel-modules.sh \ >>> utilities/usdt-scripts/bridge_loop.bt \ >>> utilities/usdt-scripts/dpif_nl_exec_monitor.py \ >>> + utilities/usdt-scripts/flow_reval_monitor.py \ >>> utilities/usdt-scripts/kernel_delay.py \ >>> utilities/usdt-scripts/kernel_delay.rst \ >>> utilities/usdt-scripts/reval_monitor.py \ >>> @@ -146,6 +148,7 @@ FLAKE8_PYFILES += utilities/ovs-pcap.in \ >>> utilities/ovs-tcpdump.in \ >>> utilities/ovs-pipegen.py \ >>> utilities/usdt-scripts/dpif_nl_exec_monitor.py \ >>> + utilities/usdt-scripts/flow_reval_monitor.py \ >>> utilities/usdt-scripts/upcall_monitor.py \ >>> utilities/usdt-scripts/upcall_cost.py >>> >>> diff --git a/utilities/usdt-scripts/flow_reval_monitor.py >> b/utilities/usdt-scripts/flow_reval_monitor.py >>> new file mode 100755 >>> index 0000000000..e76e0b5995 >>> --- /dev/null >>> +++ b/utilities/usdt-scripts/flow_reval_monitor.py >>> @@ -0,0 +1,997 @@ >>> +#!/usr/bin/env python3 >>> +# >>> +# Copyright (c) 2022-2024 Redhat, Inc. >>> +# >>> +# Licensed under the Apache License, Version 2.0 (the "License"); >>> +# you may not use this file except in compliance with the License. >>> +# You may obtain a copy of the License at: >>> +# >>> +# http://www.apache.org/licenses/LICENSE-2.0 >>> +# >>> +# Unless required by applicable law or agreed to in writing, software >>> +# distributed under the License is distributed on an "AS IS" BASIS, >>> +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >>> +# See the License for the specific language governing permissions and >>> +# limitations under the License. >>> +# >>> +# Script information: >>> +# ------------------- >>> +# flow_reval_monitor.py uses the dpif_netlink_operate:flow_put and >>> +# revalidator:flow_result USDT probes to monitor flow lifetimes and >>> +# expiration events. By default, this will show all flow_put and flow >>> +# expiration events, along with their reasons. This will look like so: >>> +# >>> +# TIME UFID EVENT/REASON >>> +# 101536.226986736 ufid:f76fc899-376d-466b-bc74-0000b933eb97 flow_put >>> +# 101536.227196214 ufid:d08472b6-110e-46cb-a9e4-00008f46838e flow_put >>> +# 101541.516610178 ufid:fc5cc4a2-39e7-4a2d-bbce-000019665b32 flow_put >>> +# 101541.516967303 ufid:fddd6510-26dc-4c87-8f7a-0000fc0c2c3a flow_put >>> +# 101551.688050747 ufid:fddd6510-26dc-4c87-8f7a-0000fc0c2c3a flow timed out >>> +# 101551.688077175 ufid:fc5cc4a2-39e7-4a2d-bbce-000019665b32 flow timed out >>> +# 101557.695391371 ufid:f76fc899-376d-466b-bc74-0000b933eb97 flow timed out >>> +# 101557.695408909 ufid:d08472b6-110e-46cb-a9e4-00008f46838e flow timed out >> >> The output above has changed (the reasons). Here is some new output: >> >> TID TIME UFID EVENT/REASON >> 71828 1549.119959874 39f0f28f-338d-4a77-81b3-0000d6c70b6b Insert (put) >> flow to ovs kernel module. >> 71828 1549.420877223 850db41c-47ff-42c0-b48f-0000e180d81c Insert (put) >> flow to ovs kernel module. >> 71828 1550.476923456 5bacfca9-fe5f-43aa-97ac-00005c9dc3e3 Insert (put) >> flow to ovs kernel module. >> 71832 1559.650192299 850db41c-47ff-42c0-b48f-0000e180d81c Idle flow timed out >> 71832 1561.153332825 39f0f28f-338d-4a77-81b3-0000d6c70b6b Idle flow timed out >> 71832 1572.684316304 5bacfca9-fe5f-43aa-97ac-00005c9dc3e3 Idle flow timed out >> 71828 1577.548886773 5bacfca9-fe5f-43aa-97ac-00005c9dc3e3 Insert (put) >> flow to ovs kernel module. >> 71832 1587.720846962 5bacfca9-fe5f-43aa-97ac-00005c9dc3e3 Idle flow timed out >> >> Maybe you can shorten the UDIF to fit it in 79 chars. > > It's difficult to do that. The UFID is quite large, and since this is a > debug tool rather than just status, I'd prefer to keep it this way. Sorry, I was not clear, I mean in the comment here, not the actual implementation ;) >>> +# >>> +# flow key data can be printed using the --flow-keys option. This will >> >> Capital F for Flow. > > Ack. > >>> +# print the equivalent datapath flow string. >>> +# >>> +# When filtering flows, the syntax is the same as used by >>> +# `ovs-appctl dpctl/add-flow`. >>> +# >>> +# The following options are available: >>> +# >>> +# usage: flow_reval_monitor.py [-h] [--buffer-page-count NUMBER] >>> +# [-f [128-2048]] [-k] [-l [FLOW_STRING ...]] >>> +# [-p VSWITCHD_PID] [-D [DEBUG]] >>> +# [-P PAHOLE] >>> +# >>> +# optional arguments: >>> +# -h, --help show this help message and exit >>> +# --buffer-page-count NUMBER >>> +# Number of BPF ring buffer pages, default 1024 >>> +# -f <128..2048>, --flow-key-size=<128..2048> >>> +# Set the size of the flow key, default 128 >>> +# -k, --flow-keys Print flow keys as flow strings >>> +# -l [FLOW_STRING ...], --filter-flows [FLOW_STRING ...] >>> +# Filter against flow keys that match the specified >>> +# ODP-like flow. This may not include all packet >>> +# fields >>> +# -p VSWITCHD_PID, --pid VSWITCHD_PID >>> +# ovs-vswitchd's PID >>> +# -P PAHOLE, --pahole PAHOLE >>> +# Pahole executable to use, default pahole >>> +# -D [DEBUG], --debug [DEBUG] >>> +# Enable eBPF debugging >> >> The text above is different than from the help text, or was this done on purpose? > > To be honest, I dislike putting this text here completely. Why keep two > copies of help? I'd rather delete this section if that's okay. > ACK, fine by me. Maybe just replace it with something along the lines of: “For a full list of available options, use the --help option.” >>> +# Examples: >>> +# >>> +# To use the script on a running ovs-vswitchd to see flow keys and expiration >>> +# events for flows with an ipv4 source of 192.168.10.10: >>> +# $ ./flow_reval_monitor.py --flow-keys --filter-flows \ >>> +# "ipv4(src=192.168.10.10)" >>> +# TIME UFID EVENT/REASON >>> +# 105082.457322742 ufid:f76fc899-376d-466b-bc74-0000b933eb97 flow_put >>> +# ufid:f76fc899-376d-466b-bc74-0000b933eb97 has the following flow information: >>> +# in_port(2), >>> +# eth(src=0e:04:47:fc:74:51, dst=da:dc:c5:69:05:d7), \ >>> +# eth_type(0x800), \ >>> +# ipv4(src=192.168.10.10, dst=192.168.10.30, proto=1, tos=0, ttl=64,[...]), >>> +# icmp(type=8, code=0) >>> +# 105092.635450202 ufid:f76fc899-376d-466b-bc74-0000b933eb97 Flow timed out >>> +# >>> +# Notes: >>> +# 1) No options are needed to attach when there is a single running instance >>> +# of ovs-vswitchd. >>> +# 2) If you're using the flow filtering option, it will only track flows that >>> +# have been upcalled since the script began running. >>> +# 3) When using the flow filtering option, the key size will likely need to >>> +# be expanded to match on all the fields in the message. The default is >>> +# kept small to keep the buffer copy sizes down when displaying >>> +# flows (-k), but is hardcoded to 2048 when an actual filter (-l) is >>> +# applied >>> +# 4) The flow filtering format is a simplified form of the ODP syntax, and >>> +# does not support masked matches, which means you will need to filter >>> +# on exact details. The fields present are dependent on how the >>> +# classifier and OFP rules form the ODP rules - not all fields may be >>> +# present in a particular flow. >>> +# 5) The flow_put filtering only happens for flows installed into the ovs >>> +# kernel module. This means flows taking the HW offload path (ie: tc), >>> +# or on DPDK side won't get matched. >>> + >>> +try: >>> + from bcc import BPF >>> + from bcc import USDT >>> + from bcc import USDTException >>> +except ModuleNotFoundError: >>> + print("ERROR: Can't find the BPF Compiler Collection Tools.") >>> + print("Please install them before running this script.") >>> + exit(1) >>> + >>> +from enum import IntEnum >>> +from ipaddress import IPv4Address, IPv6Address >>> +from pathlib import Path >>> + >>> +import argparse >>> +import psutil >>> +import re >>> +import struct >>> +import subprocess >>> +import sys >>> + >>> +# >>> +# eBPF source code >>> +# >>> +bpf_src = """ >>> +#include <linux/sched.h> >>> + >>> +#define MAX_KEY <MAX_KEY_VAL> >>> +#define FLOW_FILTER <FILTER_BOOL> >>> + >>> +enum probe { <EVENT_ENUM> }; >>> + >>> +<OVS_INCLUDE_DEFINITIONS> >>> + >>> +struct event_t { >>> + u64 ts; >>> + u32 pid; >>> + u32 result; >>> + u32 reason; >>> + u32 ufid[4]; >>> + u64 key_size; >>> + unsigned char key[MAX_KEY]; >>> + enum probe probe; >>> +}; >>> + >>> +BPF_HASH(watchlist, ovs_u128); >>> +BPF_RINGBUF_OUTPUT(events, <BUFFER_PAGE_COUNT>); >>> +BPF_TABLE("percpu_array", uint32_t, uint64_t, dropcnt, 1); >>> +BPF_TABLE("percpu_array", uint32_t, struct udpif_key, udpk, 1); >>> + >>> +static struct event_t *get_event(enum probe p) { >>> + struct event_t *event = events.ringbuf_reserve(sizeof(struct event_t)); >>> + >>> + if (!event) { >>> + dropcnt.increment(0); >>> + return NULL; >>> + } >>> + >>> + event->probe = p; >>> + event->ts = bpf_ktime_get_ns(); >>> + event->pid = bpf_get_current_pid_tgid(); >>> + >>> + return event; >>> +} >>> + >>> +static int emit_flow_result(struct udpif_key *ukey, ovs_u128 ufid, >>> + u32 result, u32 reason) { >>> + struct event_t *event = NULL; >>> + u64 *ufid_present = NULL; >>> + >>> + ufid_present = watchlist.lookup(&ufid); >>> + if (FLOW_FILTER && !ufid_present) { >>> + return 0; >>> + } >>> + >>> + event = get_event(FLOW_RESULT); >>> + if (!event) { >>> + /* If we can't reserve the space in the ring buffer, return 1. */ >>> + return 1; >>> + } >>> + >>> + event->result = result; >>> + event->reason = reason; >>> + bpf_probe_read(&event->ufid, sizeof ufid, &ufid); >>> + events.ringbuf_submit(event, 0); >>> + >>> + return 0; >>> +} >>> + >>> +int usdt__flow_result(struct pt_regs *ctx) { >>> + struct udpif_key *ukey = NULL; >>> + u32 reason = 0; >>> + u32 result = 0; >>> + ovs_u128 ufid; >>> + u32 zero = 0; >>> + >>> + ukey = udpk.lookup(&zero); >>> + if (!ukey) { >>> + return 1; >>> + } >>> + bpf_usdt_readarg_p(4, ctx, ukey, sizeof(struct udpif_key)); >>> + bpf_usdt_readarg(2, ctx, &reason); >>> + bpf_usdt_readarg(1, ctx, &result); >>> + ufid = ukey->ufid; >>> + >>> + return emit_flow_result(ukey, ufid, result, reason); >>> +} >>> + >>> +int usdt__flow_sweep_result(struct pt_regs *ctx) { >>> + struct udpif_key *ukey = NULL; >>> + u32 reason = 0; >>> + u32 result = 0; >>> + ovs_u128 ufid; >>> + u32 zero = 0; >>> + >>> + ukey = udpk.lookup(&zero); >>> + if (!ukey) { >>> + return 1; >>> + } >>> + bpf_usdt_readarg_p(4, ctx, ukey, sizeof(struct udpif_key)); >>> + bpf_usdt_readarg(2, ctx, &reason); >>> + bpf_usdt_readarg(1, ctx, &result); >>> + ufid = ukey->ufid; >>> + >>> + return emit_flow_result(ukey, ufid, result, reason); >>> +} >>> + >>> +int usdt__op_flow_put(struct pt_regs *ctx) { >>> + struct dpif_flow_put put; >>> + ovs_u128 ufid; >>> + >>> + struct event_t *event = get_event(OP_FLOW_PUT); >>> + if (!event) { >>> + /* If we can't reserve the space in the ring buffer, return 1. */ >>> + return 1; >>> + } >>> + >>> + bpf_usdt_readarg_p(2, ctx, &put, sizeof put); >>> + bpf_probe_read(&event->ufid, sizeof event->ufid, put.ufid); >>> + bpf_probe_read(&ufid, sizeof ufid, &event->ufid); >>> + if (put.key_len > MAX_KEY) { >>> + put.key_len = MAX_KEY; >>> + } >>> + event->key_size = put.key_len; >>> + bpf_probe_read(&event->key, put.key_len, put.key); >>> + event->reason = 0; >>> + events.ringbuf_submit(event, 0); >>> + >>> + watchlist.increment(ufid); >>> + return 0; >>> +} >>> +""" >>> + >>> +Event = IntEnum("Event", ["OP_FLOW_PUT", "FLOW_RESULT"], start=0) >>> +RevalResult = IntEnum( >>> + "reval_result", >>> + [ >>> + "UKEY_KEEP", >>> + "UKEY_DELETE", >>> + "UKEY_MODIFY", >>> + ], >>> + start=0, >>> +) >>> +FdrReasons = IntEnum( >>> + "flow_del_reason", >>> + [ >>> + "FDR_NONE", >>> + "FDR_AVOID_CACHING", >>> + "FDR_BAD_ODP_FIT", >>> + "FDR_FLOW_IDLE", >>> + "FDR_FLOW_LIMIT", >>> + "FDR_FLOW_WILDCARDED", >>> + "FDR_NO_OFPROTO", >>> + "FDR_PURGE", >>> + "FDR_TOO_EXPENSIVE", >>> + "FDR_UPDATE_FAIL", >>> + "FDR_XLATION_ERROR", >>> + ], >>> + start=0, >>> +) >>> + >>> +FdrReasonStrings = [ >>> + "No deletion reason", >>> + "Cache avoidance flag set", >>> + "Bad ODP flow fit", >>> + "Idle flow timed out", >>> + "Kill all flows condition detected", >>> + "Mask too wide - need narrower match", >>> + "No matching ofproto rules", >>> + "Too expensive to revalidate", >>> + "Purged with user action", >>> + "Flow state inconsistent after updates", >>> + "Flow translation error", >>> +] >>> + >>> + >>> +# >>> +# run_program() >>> +# >>> +def run_program(command): >>> + try: >>> + process = subprocess.run( >>> + command, >>> + stdout=subprocess.PIPE, >>> + stderr=subprocess.STDOUT, >>> + encoding="utf8", >>> + check=True, >> >> I noticed your adding , to all final arguments in function parameter >> lists, and string lists. Is this some formatter cleaning this up, or a >> new style? > > This is actually how PEP8 wants it, and ``black`` will auto format this > way. So I run it. We need both black and flake8 to make sure we > catch all the formatting stuff, but that's what latest python > development procedures is doing (from what I can tell). > >> I does this is split over multiple lines, even for function parameters, which looks odd to me. > > Agreed, but see above. Guess I’m getting old, so it will be hard to get used too ;) >>> + ) >>> + >>> + except subprocess.CalledProcessError as perror: >>> + return perror.returncode, perror.stdout >>> + >>> + return 0, process.stdout >>> + >>> + >>> +# >>> +# get_ovs_definitions() >>> +# >>> +def get_ovs_definitions(objects, pahole="pahole", pid=None): >>> + if pid is None: >>> + raise ValueError("A valid pid value should be supplied!") >>> + >>> + if not isinstance(objects, list): >>> + objects = [objects] >>> + >>> + if len(objects) == 0: >>> + raise ValueError("Must supply at least one object!") >>> + >>> + vswitchd = Path(f"/proc/{pid}/exe").resolve() >>> + >>> + object_str = ",".join(objects) >>> + >>> + def run_pahole(debug_file): >>> + error, result = run_program( >>> + [pahole, "-C", object_str, "--compile", debug_file] >>> + ) >>> + >>> + if error: >>> + if f"pahole: {debug_file}: Invalid argument" not in result: >>> + print( >>> + "ERROR: Pahole failed to get ovs-vswitchd data " >>> + "structures!\n{}".format( >>> + re.sub( >>> + "^", " " * 7, result.rstrip(), flags=re.MULTILINE >>> + ) >>> + ) >>> + ) >>> + sys.exit(-1) >>> + >>> + return None >>> + >>> + if bool(re.search("pahole: type .* not found", result)): >>> + return None >>> + >>> + return result >>> + >>> + def run_readelf(bin_file): >>> + error, result = run_program( >>> + ["readelf", "-n", "--debug-dump=links", bin_file] >>> + ) >>> + >>> + if error: >>> + print( >>> + "ERROR: Failed 'readelf' on \"{}\"!\n{}".format( >>> + bin_file, re.sub("^", " " * 7, result, flags=re.MULTILINE) >>> + ) >>> + ) >>> + sys.exit(-1) >>> + >>> + return result >>> + >>> + def get_debug_file(bin_file): >>> + elf_result = run_readelf(bin_file) >>> + match = re.search("Build ID: ([0-9a-fA-F]+)", elf_result) >>> + if not match: >>> + print("ERROR: Can't find build ID to read debug symbols!") >>> + sys.exit(-1) >>> + >>> + dbg_file = "/usr/lib/debug/.build-id/{}/{}.debug".format( >>> + match.group(1)[:2], match.group(1)[2:] >>> + ) >>> + >>> + return dbg_file >>> + >>> + def get_from_shared_library(debug_file): >>> + ovs_libs = [ >>> + "libofproto", >>> + "libopenvswitch", >>> + "libovsdb", >>> + "libsflow", >>> + "libvtep", >>> + ] >>> + error, ldd_result = run_program(["ldd", debug_file]) >>> + >>> + if error: >>> + print( >>> + "ERROR: Failed 'ldd' on \"{}\"!\n{}".format( >>> + debug_file, >>> + re.sub("^", " " * 7, ldd_result, flags=re.MULTILINE), >>> + ) >>> + ) >>> + sys.exit(-1) >>> + >>> + for lib in ovs_libs: >>> + match = re.search( >>> + r"^\s*{}.* => (.*) \(.*\)$".format(lib), >>> + ldd_result, >>> + flags=re.MULTILINE, >>> + ) >>> + if match is None: >>> + continue >>> + >>> + result = run_pahole(match.group(1)) >>> + if result is None: >>> + result = run_pahole(get_debug_file(match.group(1))) >>> + >>> + if result: >>> + return result >>> + >>> + return None >>> + >>> + # >>> + # First try to find the debug data as part of the executable. >>> + # >>> + result = run_pahole(vswitchd) >>> + >>> + if result is None: >>> + print(f'INFO: Failed to find debug info in "{vswitchd}"!') >>> + >>> + # >>> + # Get additional .debug information if available. >>> + # >>> + dbg_file = get_debug_file(vswitchd) >>> + result = run_pahole(dbg_file) >>> + if result is None: >>> + print(f'INFO: Failed to find debug info in "{dbg_file}"!') >>> + >>> + # >>> + # Try to get information from shared libraries if used. >>> + # >>> + result = get_from_shared_library(vswitchd) >>> + >>> + if result is None: >>> + print(f"ERROR: Failed to find needed data structures through {pahole}") >>> + sys.exit(-1) >>> + >>> + # >>> + # We need an empty _Atomic definition to avoid compiler complaints. >>> + # >>> + result = "#define _Atomic\n" + result >>> + >>> + # >>> + # Remove the uint64_t definition as it conflicts with the kernel one. >>> + # >>> + result = re.sub("^typedef.*uint64_t;$", "", result, flags=re.MULTILINE) >>> + >>> + return result >>> + >>> + >>> +# >>> +# buffer_size_type() >>> +# >>> +def buffer_size_type(astr, min=64, max=2048): >>> + value = int(astr) >>> + if min <= value <= max: >>> + return value >>> + else: >>> + raise argparse.ArgumentTypeError( >>> + "value not in range {}-{}".format(min, max) >>> + ) >>> + >>> + >>> +# >>> +# format_ufid() >>> +# >>> +def format_ufid(ufid): >>> + if ufid is None: >>> + return "ufid:none" >>> + >>> + return "{:08x}-{:04x}-{:04x}-{:04x}-{:04x}{:08x}".format( >>> + ufid[0], >>> + ufid[1] >> 16, >>> + ufid[1] & 0xFFFF, >>> + ufid[2] >> 16, >>> + ufid[2] & 0, >>> + ufid[3], >>> + ) >>> + >>> + >>> +# >>> +# find_and_delete_from_watchlist() >>> +# >>> +def find_and_delete_from_watchlist(event): >>> + for k, _ in b["watchlist"].items(): >>> + key_ufid = struct.unpack("=IIII", k) >>> + if key_ufid == tuple(event.ufid): >>> + key = (b["watchlist"].Key * 1)(k) >>> + b["watchlist"].items_delete_batch(key) >>> + break >>> + >>> + >>> +# >>> +# handle_flow_put() >>> +# >>> +def handle_flow_put(event): >>> + if args.flow_keys or args.filter_flows is not None: >>> + key = decode_key(bytes(event.key)[: event.key_size]) >>> + flow_dict, flow_str = parse_flow_dict(key) >>> + # For each attribute that we're watching. >>> + if args.filter_flows is not None: >>> + if not compare_flow_to_target(args.filter_flows, flow_dict): >>> + find_and_delete_from_watchlist(event) >>> + return >>> + >>> + print( >>> + "{:<10} {:<18.9f} {:<36} {}".format( >>> + event.pid, >>> + event.ts / 1000000000, >>> + format_ufid(event.ufid), >>> + "Insert (put) flow to ovs kernel module.", >>> + ) >>> + ) >>> + >>> + if args.flow_keys and len(flow_str): >>> + flow_str_fields = flow_str.split("), ") >>> + flow_str = " " >>> + curlen = 4 >>> + for field in flow_str_fields: >>> + if curlen + len(field) > 79: >>> + flow_str += "\n " >>> + curlen = 4 >>> + if field[-1] != ")": >>> + field += ")" >>> + flow_str += field + ", " >>> + curlen += len(field) + 2 >>> + >>> + print(" - It holds the following key information:") >>> + print(flow_str) >>> + >>> + >>> +# >>> +# compare_flow_to_target() >>> +# >>> +def compare_flow_to_target(target, flow): >>> + for key in target: >>> + if key not in flow: >>> + return False >>> + elif target[key] is True: >>> + continue >>> + elif target[key] == flow[key]: >>> + continue >>> + elif isinstance(target[key], dict) and isinstance(flow[key], dict): >>> + return compare_flow_to_target(target[key], flow[key]) >>> + else: >>> + return False >>> + return True >>> + >>> + >>> +# >>> +# parse_flow_str() >>> +# >>> +def parse_flow_str(flow_str): >>> + f_list = [i.strip(", ") for i in flow_str.split(")")] >>> + if f_list[-1] == "": >>> + f_list = f_list[:-1] >>> + flow_dict = {} >>> + for e in f_list: >>> + split_list = e.split("(") >>> + k = split_list[0] >>> + if len(split_list) == 1: >>> + flow_dict[k] = True >>> + elif split_list[1].count("=") == 0: >>> + flow_dict[k] = split_list[1] >>> + else: >>> + sub_dict = {} >>> + sublist = [i.strip() for i in split_list[1].split(",")] >>> + for subkey in sublist: >>> + brk = subkey.find("=") >>> + sub_dict[subkey[:brk]] = subkey[brk + 1 :] >>> + flow_dict[k] = sub_dict >>> + return flow_dict >>> + >>> + >>> +# >>> +# print_expiration() >>> +# >>> +def print_expiration(event): >>> + ufid_str = format_ufid(event.ufid) >>> + >>> + if event.reason > len(FdrReasons): >>> + reason = f"Unknown reason '{event.reason}'" >>> + else: >>> + reason = FdrReasonStrings[event.reason] >>> + >>> + print( >>> + "{:<10} {:<18.9f} {:<36} {:<17}".format( >>> + event.pid, >>> + event.ts / 1000000000, >>> + ufid_str, >>> + reason, >>> + ) >>> + ) >>> + >>> + >>> +# >>> +# decode_key() >>> +# >>> +def decode_key(msg): >>> + bytes_left = len(msg) >>> + result = {} >>> + while bytes_left: >>> + if bytes_left < 4: >>> + break >>> + nla_len, nla_type = struct.unpack("=HH", msg[:4]) >>> + if nla_len < 4: >>> + break >>> + nla_data = msg[4:nla_len] >>> + if nla_len > bytes_left: >>> + nla_data = nla_data[: (bytes_left - 4)] >>> + break >>> + else: >>> + result[get_ovs_key_attr_str(nla_type)] = nla_data >>> + next_offset = (nla_len + 3) & (~3) >>> + msg = msg[next_offset:] >>> + bytes_left -= next_offset >>> + if bytes_left: >>> + print(f"INFO: Buffer truncated with {bytes_left} bytes left.") >>> + return result >>> + >>> + >>> +# >>> +# get_ovs_key_attr_str() >>> +# >>> +def get_ovs_key_attr_str(attr): >>> + ovs_key_attr = [ >>> + "OVS_KEY_ATTR_UNSPEC", >>> + "encap", >>> + "skb_priority", >>> + "in_port", >>> + "eth", >>> + "vlan", >>> + "eth_type", >>> + "ipv4", >>> + "ipv6", >>> + "tcp", >>> + "udp", >>> + "icmp", >>> + "icmpv6", >>> + "arp", >>> + "nd", >>> + "skb_mark", >>> + "tunnel", >>> + "sctp", >>> + "tcp_flags", >>> + "dp_hash", >>> + "recirc_id", >>> + "mpls", >>> + "ct_state", >>> + "ct_zone", >>> + "ct_mark", >>> + "ct_label", >>> + "ct_tuple4", >>> + "ct_tuple6", >>> + "nsh", >>> + ] >>> + >>> + if attr < 0 or attr > len(ovs_key_attr): >>> + return "<UNKNOWN>: {}".format(attr) >>> + return ovs_key_attr[attr] >>> + >>> + >>> +# >>> +# parse_flow_dict() >>> +# >>> +def parse_flow_dict(key_dict, decode=True): >>> + ret_str = "" >>> + parseable = {} >>> + skip = ["nsh", "tunnel", "mpls", "vlan"] >>> + need_byte_swap = ["ct_label"] >>> + ipv4addrs = ["ct_tuple4", "tunnel", "ipv4", "arp"] >>> + ipv6addrs = ["ipv6", "nd", "ct_tuple6"] >>> + macs = {"eth": [0, 1], "arp": [3, 4], "nd": [1, 2]} >>> + fields = [ >>> + ("OVS_KEY_ATTR_UNSPEC"), >>> + ("encap",), >>> + ("skb_priority", "<I"), >>> + ("in_port", "<I"), >>> + ("eth", "!6s6s", "src", "dst"), >>> + ("vlan",), >>> + ("eth_type", "!H"), >>> + ("ipv4", "!4s4s4B", "src", "dst", "proto", "tos", "ttl", "frag"), >>> + ( >>> + "ipv6", >>> + "!16s16s4s4B", >>> + "src", >>> + "dst", >>> + "label", >>> + "proto", >>> + "tclass", >>> + "hlimit", >>> + "frag", >>> + ), >>> + ("tcp", "!2H", "src", "dst"), >>> + ("udp", "!2H", "src", "dst"), >>> + ("icmp", "!2B", "type", "code"), >>> + ("icmpv6", "!2B", "type", "code"), >>> + ("arp", "!4s4sH6s6s", "sip", "tip", "op", "sha", "tha"), >>> + ("nd", "!16s6s6s", "target", "sll", "tll"), >>> + ("skb_mark", "<I"), >>> + ("tunnel",), >>> + ("sctp", "!2H", "src", "dst"), >>> + ("tcp_flags", "!H"), >>> + ("dp_hash", "<I"), >>> + ("recirc_id", "<I"), >>> + ("mpls",), >>> + ("ct_state", "<I"), >>> + ("ct_zone", "<H"), >>> + ("ct_mark", "<I"), >>> + ("ct_label", "!16s"), >>> + ("ct_tuple4", "!4s4s2HB", "src", "dst", "tp_src", "tp_dst", "proto"), >>> + ("ct_tuple6", "!16s16sB2H", "src", "dst", "proto", "tp_src", "tp_dst"), >>> + ("nsh",), >>> + ] >>> + for k, v in key_dict.items(): >>> + s = "" >>> + if k in skip: >>> + continue >>> + if decode and int.from_bytes(v, "big") == 0: >>> + parseable[k] = "0" >>> + continue >>> + if decode and k in need_byte_swap: >>> + v = int.from_bytes(v, "little").to_bytes(len(v), "big") >>> + attr = -1 >>> + found = False >>> + for f in fields: >>> + if k == f[0]: >>> + attr = fields.index(f) >>> + found = True >>> + break >>> + if not found: >>> + raise KeyError("Invalid flow field '%s'" % k) >>> + if decode and len(fields[attr]) > 1: >>> + data = list( >>> + struct.unpack( >>> + fields[attr][1], v[: struct.calcsize(fields[attr][1])] >>> + ) >>> + ) >>> + if k in ipv4addrs: >>> + if data[0].count(0) < 4: >>> + data[0] = str(IPv4Address(data[0])) >>> + else: >>> + data[0] = b"\x00" >>> + if data[1].count(0) < 4: >>> + data[1] = str(IPv4Address(data[1])) >>> + else: >>> + data[1] = b"\x00" >>> + if k in ipv6addrs: >>> + if data[0].count(0) < 16: >>> + data[0] = str(IPv6Address(data[0])) >>> + else: >>> + data[0] = b"\x00" >>> + if data[1].count(0) < len(data[1]): >>> + data[1] = str(IPv6Address(data[1])) >>> + else: >>> + data[1] = b"\x00" >>> + if k in macs.keys(): >>> + for e in macs[k]: >>> + if data[e].count(0) == 6: >>> + mac_str = b"\x00" >>> + else: >>> + mac_str = ":".join(["%02x" % i for i in data[e]]) >>> + data[e] = mac_str >>> + if decode and len(fields[attr]) > 2: >>> + field_dict = dict(zip(fields[attr][2:], data)) >>> + s = ", ".join(k + "=" + str(v) for k, v in field_dict.items()) >>> + elif decode and k != "eth_type": >>> + s = str(data[0]) >>> + field_dict = s >>> + else: >>> + if decode: >>> + s = hex(data[0]) >>> + field_dict = s >>> + ret_str += k + "(" + s + "), " >>> + parseable[k] = field_dict >>> + ret_str = ret_str[:-2] >>> + return (parseable, ret_str) >>> + >>> + >>> +# >>> +# handle_event() >>> +# >>> +def handle_event(ctx, data, size): >>> + # Once we grab the event, we have three cases. >>> + # 1. It's a revalidator probe and the reason is nonzero: A flow is expiring >>> + # 2. It's a revalidator probe and the reason is zero: flow revalidated >>> + # 3. It's a flow_put probe. >>> + # >>> + # We will ignore case 2, and report all others. >>> + # >>> + event = b["events"].event(data) >>> + if event.probe == Event.OP_FLOW_PUT: >>> + handle_flow_put(event) >>> + elif ( >>> + event.probe == Event.FLOW_RESULT >>> + and event.result == RevalResult.UKEY_DELETE >>> + ): >>> + print_expiration(event) >>> + >>> + >>> +def main(): >>> + # >>> + # Don't like these globals, but ctx passing does not work with the existing >>> + # open_ring_buffer() API :( >>> + # >>> + global b >>> + global args >>> + >>> + # >>> + # Argument parsing >>> + # >>> + parser = argparse.ArgumentParser() >>> + parser.add_argument( >>> + "--buffer-page-count", >>> + help="Number of BPF ring buffer pages, default 1024", >>> + type=int, >>> + default=1024, >>> + metavar="NUMBER", >>> + ) >>> + parser.add_argument( >>> + "-f", >>> + "--flow-key-size", >>> + help="Set maximum flow key size to capture, " >>> + "default 128 - see notes", >>> + type=buffer_size_type, >>> + default=128, >>> + metavar="[128-2048]", >>> + ) >>> + parser.add_argument( >>> + "-k", >>> + "--flow-keys", >>> + help="Print flow keys as flow strings", >>> + action="store_true", >>> + ) >>> + parser.add_argument( >>> + "-l", >>> + "--filter-flows", >>> + metavar="FLOW_STRING", >>> + help="Filter flows that match the specified " "ODP-like flow", >>> + type=str, >>> + default=None, >>> + nargs="*", >>> + ) >>> + parser.add_argument( >>> + "-P", >>> + "--pahole", >>> + metavar="PAHOLE", >>> + help="Pahole executable to use, default pahole", >>> + type=str, >>> + default="pahole", >>> + ) >>> + parser.add_argument( >>> + "-p", >>> + "--pid", >>> + metavar="VSWITCHD_PID", >>> + help="ovs-vswitchd's PID", >>> + type=int, >>> + default=None, >>> + ) >>> + parser.add_argument( >>> + "-D", >>> + "--debug", >>> + help="Enable eBPF debugging", >>> + type=int, >>> + const=0x3F, >>> + default=0, >>> + nargs="?", >>> + ) >>> + args = parser.parse_args() >>> + >>> + # >>> + # Find the PID of the ovs-vswitchd daemon if not specified. >>> + # >>> + if args.pid is None: >>> + for proc in psutil.process_iter(): >>> + if "ovs-vswitchd" in proc.name(): >>> + if args.pid is not None: >>> + print( >>> + "Error: Multiple ovs-vswitchd daemons running, " >>> + "use the -p option!" >>> + ) >>> + sys.exit(-1) >>> + >>> + args.pid = proc.pid >>> + # >>> + # Error checking on input parameters >>> + # >>> + if args.pid is None: >>> + print("ERROR: Failed to find ovs-vswitchd's PID!") >>> + sys.exit(-1) >>> + >>> + # >>> + # Attach the USDT probes >>> + # >>> + u = USDT(pid=int(args.pid)) >>> + try: >>> + u.enable_probe(probe="op_flow_put", fn_name="usdt__op_flow_put") >>> + except USDTException as e: >>> + print("Error attaching the dpif_netlink_operate__:op_flow_put probe.") >>> + print(str(e)) >>> + sys.exit(-1) >>> + >>> + try: >>> + u.enable_probe(probe="flow_result", fn_name="usdt__flow_result") >>> + u.enable_probe( >>> + probe="flow_sweep_result", fn_name="usdt__flow_sweep_result" >>> + ) >>> + except USDTException as e: >>> + print("Error attaching the revalidate:flow_result probe.") >> >> We should either use two try/except cases, or update the error test to >> “...revalidate|revalidator_sweep__:flow_result probe.” > > I will test and see if we can pull the probe name from the USDTException > and then print that instead. ACK, yes that would be nice, guess then all three probes can go into one try/except clause. >>> + print(str(e)) >>> + sys.exit(-1) >>> + >>> + # >>> + # Attach the probes to the running process >>> + # >>> + source = bpf_src.replace( >>> + "<BUFFER_PAGE_COUNT>", str(args.buffer_page_count) >>> + ) >>> + >>> + source = source.replace( >>> + "<OVS_INCLUDE_DEFINITIONS>", >>> + get_ovs_definitions( >>> + ["udpif_key", "ovs_u128", "dpif_flow_put"], >>> + pid=args.pid, >>> + pahole=args.pahole, >>> + ), >>> + ) >>> + >>> + if args.filter_flows is None: >>> + filter_bool = 0 >>> + >>> + # Set the key size based on what the user wanted >>> + source = source.replace("<MAX_KEY_VAL>", str(args.flow_key_size)) >>> + else: >>> + filter_bool = 1 >>> + args.filter_flows = parse_flow_str(args.filter_flows[0]) >>> + >>> + # Run through the parser to make sure we only filter on fields we >>> + # understand >>> + parse_flow_dict(args.filter_flows, False) >>> + >>> + # This is hardcoded here because it doesn't make sense to shrink the >>> + # size, since the flow key might be missing fields that are matched in >>> + # the flow filter. >>> + source = source.replace("<MAX_KEY_VAL>", "2048") >>> + >>> + source = source.replace("<FILTER_BOOL>", str(filter_bool)) >>> + >>> + source = source.replace( >>> + "<EVENT_ENUM>", >>> + "\n".join([f"{event.name} = {event.value}," for event in Event]), >>> + ) >>> + >>> + b = BPF(text=source, usdt_contexts=[u], debug=args.debug) >>> + >>> + # >>> + # Print header >>> + # >>> + print( >>> + "{:<10} {:<18} {:<36} {:<17}".format( >>> + "TID", "TIME", "UFID", "EVENT/REASON" >>> + ) >>> + ) >>> + >>> + # >>> + # Dump out all events. >>> + # >>> + b["events"].open_ring_buffer(handle_event) >>> + while 1: >>> + try: >>> + b.ring_buffer_poll() >>> + except KeyboardInterrupt: >>> + break >>> + >>> + dropcnt = b.get_table("dropcnt") >>> + for k in dropcnt.keys(): >>> + count = dropcnt.sum(k).value >>> + if k.value == 0 and count > 0: >>> + print( >>> + "\n# WARNING: Not all flow operations were captured, {} were" >>> + " dropped!\n# Increase the BPF ring buffer size " >>> + "with the --buffer-page-count option.".format(count) >>> + ) >>> + >>> + >>> +# >>> +# Start main() as the default entry point >>> +# >>> +if __name__ == "__main__": >>> + main() >>> -- >>> 2.41.0
diff --git a/Documentation/topics/usdt-probes.rst b/Documentation/topics/usdt-probes.rst index e527f43bab..015614a6b8 100644 --- a/Documentation/topics/usdt-probes.rst +++ b/Documentation/topics/usdt-probes.rst @@ -214,8 +214,10 @@ Available probes in ``ovs_vswitchd``: - dpif_recv:recv_upcall - main:poll_block - main:run_start +- revalidate:flow_result - revalidate_ukey\_\_:entry - revalidate_ukey\_\_:exit +- revalidator_sweep\_\_:flow_result - udpif_revalidator:start_dump - udpif_revalidator:sweep_done @@ -443,6 +445,47 @@ sweep phase was completed. - ``utilities/usdt-scripts/reval_monitor.py`` +probe revalidate:flow_result +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Description**: +This probe is triggered when the revalidator has executed on a particular +flow key to make a determination whether to evict a flow, and the cause +for eviction. The revalidator runs periodically, and this probe will only +be triggered when a flow is flagged for revalidation. + +**Arguments**: + +- *arg0*: ``(enum reval_result) result`` +- *arg1*: ``(enum flow_del_reason) reason`` +- *arg2*: ``(struct udpif *) udpif`` +- *arg3*: ``(struct udpif_key *) ukey`` + +**Script references**: + +- ``utilities/usdt-scripts/flow_reval_monitor.py`` + + +probe revalidator_sweep\_\_:flow_result +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Description**: +This probe is placed in the path of the revalidator sweep, and is executed +under the condition that a flow entry is in an unexpected state, or the +flows were asked to be purged due to a user action. + +**Arguments**: + +- *arg0*: ``(enum reval_result) result`` +- *arg1*: ``(enum flow_del_reason) reason`` +- *arg2*: ``(struct udpif *) udpif`` +- *arg3*: ``(struct udpif_key *) ukey`` + +**Script references**: + +- ``utilities/usdt-scripts/flow_reval_monitor.py`` + + Adding your own probes ---------------------- diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c index b5cbeed878..fbc7858690 100644 --- a/ofproto/ofproto-dpif-upcall.c +++ b/ofproto/ofproto-dpif-upcall.c @@ -269,6 +269,20 @@ enum ukey_state { }; #define N_UKEY_STATES (UKEY_DELETED + 1) +enum flow_del_reason { + FDR_NONE = 0, /* No deletion reason for the flow. */ + FDR_AVOID_CACHING, /* Flow deleted to avoid caching. */ + FDR_BAD_ODP_FIT, /* The flow had a bad ODP flow fit. */ + FDR_FLOW_IDLE, /* The flow went unused and was deleted. */ + FDR_FLOW_LIMIT, /* All flows being killed. */ + FDR_FLOW_WILDCARDED, /* The flow needed a narrower wildcard mask. */ + FDR_NO_OFPROTO, /* The flow didn't have an associated ofproto. */ + FDR_PURGE, /* User action caused flows to be killed. */ + FDR_TOO_EXPENSIVE, /* The flow was too expensive to revalidate. */ + FDR_UPDATE_FAIL, /* Flow state transition was unexpected. */ + FDR_XLATION_ERROR, /* There was an error translating the flow. */ +}; + /* 'udpif_key's are responsible for tracking the little bit of state udpif * needs to do flow expiration which can't be pulled directly from the * datapath. They may be created by any handler or revalidator thread at any @@ -2272,7 +2286,8 @@ populate_xcache(struct udpif *udpif, struct udpif_key *ukey, static enum reval_result revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, uint16_t tcp_flags, struct ofpbuf *odp_actions, - struct recirc_refs *recircs, struct xlate_cache *xcache) + struct recirc_refs *recircs, struct xlate_cache *xcache, + enum flow_del_reason *del_reason) { struct xlate_out *xoutp; struct netflow *netflow; @@ -2293,11 +2308,13 @@ revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, netflow = NULL; if (xlate_ukey(udpif, ukey, tcp_flags, &ctx)) { + *del_reason = FDR_XLATION_ERROR; goto exit; } xoutp = &ctx.xout; if (xoutp->avoid_caching) { + *del_reason = FDR_AVOID_CACHING; goto exit; } @@ -2311,6 +2328,7 @@ revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, ofpbuf_clear(odp_actions); if (!ofproto) { + *del_reason = FDR_NO_OFPROTO; goto exit; } @@ -2322,6 +2340,7 @@ revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, if (odp_flow_key_to_mask(ukey->mask, ukey->mask_len, &dp_mask, &ctx.flow, NULL) == ODP_FIT_ERROR) { + *del_reason = FDR_BAD_ODP_FIT; goto exit; } @@ -2331,6 +2350,7 @@ revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, * down. Note that we do not know if the datapath has ignored any of the * wildcarded bits, so we may be overly conservative here. */ if (flow_wildcards_has_extra(&dp_mask, ctx.wc)) { + *del_reason = FDR_FLOW_WILDCARDED; goto exit; } @@ -2400,7 +2420,7 @@ static enum reval_result revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey, const struct dpif_flow_stats *stats, struct ofpbuf *odp_actions, uint64_t reval_seq, - struct recirc_refs *recircs) + struct recirc_refs *recircs, enum flow_del_reason *del_reason) OVS_REQUIRES(ukey->mutex) { bool need_revalidate = ukey->reval_seq != reval_seq; @@ -2430,8 +2450,12 @@ revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey, xlate_cache_clear(ukey->xcache); } result = revalidate_ukey__(udpif, ukey, push.tcp_flags, - odp_actions, recircs, ukey->xcache); - } /* else delete; too expensive to revalidate */ + odp_actions, recircs, ukey->xcache, + del_reason); + } else { + /* delete; too expensive to revalidate */ + *del_reason = FDR_TOO_EXPENSIVE; + } } else if (!push.n_packets || ukey->xcache || !populate_xcache(udpif, ukey, push.tcp_flags)) { result = UKEY_KEEP; @@ -2831,6 +2855,7 @@ revalidate(struct revalidator *revalidator) for (f = flows; f < &flows[n_dumped]; f++) { long long int used = f->stats.used; struct recirc_refs recircs = RECIRC_REFS_EMPTY_INITIALIZER; + enum flow_del_reason del_reason = FDR_NONE; struct dpif_flow_stats stats = f->stats; enum reval_result result; struct udpif_key *ukey; @@ -2905,9 +2930,14 @@ revalidate(struct revalidator *revalidator) } if (kill_them_all || (used && used < now - max_idle)) { result = UKEY_DELETE; + if (kill_them_all) { + del_reason = FDR_FLOW_LIMIT; + } else { + del_reason = FDR_FLOW_IDLE; + } } else { result = revalidate_ukey(udpif, ukey, &stats, &odp_actions, - reval_seq, &recircs); + reval_seq, &recircs, &del_reason); } ukey->dump_seq = dump_seq; @@ -2916,6 +2946,8 @@ revalidate(struct revalidator *revalidator) udpif_update_flow_pps(udpif, ukey, f); } + OVS_USDT_PROBE(revalidate, flow_result, result, del_reason, udpif, + ukey); if (result != UKEY_KEEP) { /* Takes ownership of 'recircs'. */ reval_op_init(&ops[n_ops++], result, udpif, ukey, &recircs, @@ -2968,6 +3000,7 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge) size_t n_ops = 0; CMAP_FOR_EACH(ukey, cmap_node, &umap->cmap) { + enum flow_del_reason del_reason = FDR_NONE; enum ukey_state ukey_state; /* Handler threads could be holding a ukey lock while it installs a @@ -2986,6 +3019,7 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge) if (purge || ukey_state == UKEY_INCONSISTENT) { result = UKEY_DELETE; + del_reason = purge ? FDR_PURGE : FDR_UPDATE_FAIL; } else if (!seq_mismatch) { result = UKEY_KEEP; } else { @@ -2993,13 +3027,15 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge) COVERAGE_INC(revalidate_missed_dp_flow); memcpy(&stats, &ukey->stats, sizeof stats); result = revalidate_ukey(udpif, ukey, &stats, &odp_actions, - reval_seq, &recircs); + reval_seq, &recircs, &del_reason); } if (result != UKEY_KEEP) { /* Clears 'recircs' if filled by revalidate_ukey(). */ reval_op_init(&ops[n_ops++], result, udpif, ukey, &recircs, &odp_actions); } + OVS_USDT_PROBE(revalidator_sweep__, flow_sweep_result, result, + del_reason, udpif, ukey); } ovs_mutex_unlock(&ukey->mutex); diff --git a/utilities/automake.mk b/utilities/automake.mk index 9a2114df40..146b8c37fb 100644 --- a/utilities/automake.mk +++ b/utilities/automake.mk @@ -23,6 +23,7 @@ scripts_DATA += utilities/ovs-lib usdt_SCRIPTS += \ utilities/usdt-scripts/bridge_loop.bt \ utilities/usdt-scripts/dpif_nl_exec_monitor.py \ + utilities/usdt-scripts/flow_reval_monitor.py \ utilities/usdt-scripts/kernel_delay.py \ utilities/usdt-scripts/kernel_delay.rst \ utilities/usdt-scripts/reval_monitor.py \ @@ -72,6 +73,7 @@ EXTRA_DIST += \ utilities/docker/debian/build-kernel-modules.sh \ utilities/usdt-scripts/bridge_loop.bt \ utilities/usdt-scripts/dpif_nl_exec_monitor.py \ + utilities/usdt-scripts/flow_reval_monitor.py \ utilities/usdt-scripts/kernel_delay.py \ utilities/usdt-scripts/kernel_delay.rst \ utilities/usdt-scripts/reval_monitor.py \ @@ -146,6 +148,7 @@ FLAKE8_PYFILES += utilities/ovs-pcap.in \ utilities/ovs-tcpdump.in \ utilities/ovs-pipegen.py \ utilities/usdt-scripts/dpif_nl_exec_monitor.py \ + utilities/usdt-scripts/flow_reval_monitor.py \ utilities/usdt-scripts/upcall_monitor.py \ utilities/usdt-scripts/upcall_cost.py diff --git a/utilities/usdt-scripts/flow_reval_monitor.py b/utilities/usdt-scripts/flow_reval_monitor.py new file mode 100755 index 0000000000..e76e0b5995 --- /dev/null +++ b/utilities/usdt-scripts/flow_reval_monitor.py @@ -0,0 +1,997 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2022-2024 Redhat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Script information: +# ------------------- +# flow_reval_monitor.py uses the dpif_netlink_operate:flow_put and +# revalidator:flow_result USDT probes to monitor flow lifetimes and +# expiration events. By default, this will show all flow_put and flow +# expiration events, along with their reasons. This will look like so: +# +# TIME UFID EVENT/REASON +# 101536.226986736 ufid:f76fc899-376d-466b-bc74-0000b933eb97 flow_put +# 101536.227196214 ufid:d08472b6-110e-46cb-a9e4-00008f46838e flow_put +# 101541.516610178 ufid:fc5cc4a2-39e7-4a2d-bbce-000019665b32 flow_put +# 101541.516967303 ufid:fddd6510-26dc-4c87-8f7a-0000fc0c2c3a flow_put +# 101551.688050747 ufid:fddd6510-26dc-4c87-8f7a-0000fc0c2c3a flow timed out +# 101551.688077175 ufid:fc5cc4a2-39e7-4a2d-bbce-000019665b32 flow timed out +# 101557.695391371 ufid:f76fc899-376d-466b-bc74-0000b933eb97 flow timed out +# 101557.695408909 ufid:d08472b6-110e-46cb-a9e4-00008f46838e flow timed out +# +# flow key data can be printed using the --flow-keys option. This will +# print the equivalent datapath flow string. +# +# When filtering flows, the syntax is the same as used by +# `ovs-appctl dpctl/add-flow`. +# +# The following options are available: +# +# usage: flow_reval_monitor.py [-h] [--buffer-page-count NUMBER] +# [-f [128-2048]] [-k] [-l [FLOW_STRING ...]] +# [-p VSWITCHD_PID] [-D [DEBUG]] +# [-P PAHOLE] +# +# optional arguments: +# -h, --help show this help message and exit +# --buffer-page-count NUMBER +# Number of BPF ring buffer pages, default 1024 +# -f <128..2048>, --flow-key-size=<128..2048> +# Set the size of the flow key, default 128 +# -k, --flow-keys Print flow keys as flow strings +# -l [FLOW_STRING ...], --filter-flows [FLOW_STRING ...] +# Filter against flow keys that match the specified +# ODP-like flow. This may not include all packet +# fields +# -p VSWITCHD_PID, --pid VSWITCHD_PID +# ovs-vswitchd's PID +# -P PAHOLE, --pahole PAHOLE +# Pahole executable to use, default pahole +# -D [DEBUG], --debug [DEBUG] +# Enable eBPF debugging +# +# Examples: +# +# To use the script on a running ovs-vswitchd to see flow keys and expiration +# events for flows with an ipv4 source of 192.168.10.10: +# $ ./flow_reval_monitor.py --flow-keys --filter-flows \ +# "ipv4(src=192.168.10.10)" +# TIME UFID EVENT/REASON +# 105082.457322742 ufid:f76fc899-376d-466b-bc74-0000b933eb97 flow_put +# ufid:f76fc899-376d-466b-bc74-0000b933eb97 has the following flow information: +# in_port(2), +# eth(src=0e:04:47:fc:74:51, dst=da:dc:c5:69:05:d7), \ +# eth_type(0x800), \ +# ipv4(src=192.168.10.10, dst=192.168.10.30, proto=1, tos=0, ttl=64,[...]), +# icmp(type=8, code=0) +# 105092.635450202 ufid:f76fc899-376d-466b-bc74-0000b933eb97 Flow timed out +# +# Notes: +# 1) No options are needed to attach when there is a single running instance +# of ovs-vswitchd. +# 2) If you're using the flow filtering option, it will only track flows that +# have been upcalled since the script began running. +# 3) When using the flow filtering option, the key size will likely need to +# be expanded to match on all the fields in the message. The default is +# kept small to keep the buffer copy sizes down when displaying +# flows (-k), but is hardcoded to 2048 when an actual filter (-l) is +# applied +# 4) The flow filtering format is a simplified form of the ODP syntax, and +# does not support masked matches, which means you will need to filter +# on exact details. The fields present are dependent on how the +# classifier and OFP rules form the ODP rules - not all fields may be +# present in a particular flow. +# 5) The flow_put filtering only happens for flows installed into the ovs +# kernel module. This means flows taking the HW offload path (ie: tc), +# or on DPDK side won't get matched. + +try: + from bcc import BPF + from bcc import USDT + from bcc import USDTException +except ModuleNotFoundError: + print("ERROR: Can't find the BPF Compiler Collection Tools.") + print("Please install them before running this script.") + exit(1) + +from enum import IntEnum +from ipaddress import IPv4Address, IPv6Address +from pathlib import Path + +import argparse +import psutil +import re +import struct +import subprocess +import sys + +# +# eBPF source code +# +bpf_src = """ +#include <linux/sched.h> + +#define MAX_KEY <MAX_KEY_VAL> +#define FLOW_FILTER <FILTER_BOOL> + +enum probe { <EVENT_ENUM> }; + +<OVS_INCLUDE_DEFINITIONS> + +struct event_t { + u64 ts; + u32 pid; + u32 result; + u32 reason; + u32 ufid[4]; + u64 key_size; + unsigned char key[MAX_KEY]; + enum probe probe; +}; + +BPF_HASH(watchlist, ovs_u128); +BPF_RINGBUF_OUTPUT(events, <BUFFER_PAGE_COUNT>); +BPF_TABLE("percpu_array", uint32_t, uint64_t, dropcnt, 1); +BPF_TABLE("percpu_array", uint32_t, struct udpif_key, udpk, 1); + +static struct event_t *get_event(enum probe p) { + struct event_t *event = events.ringbuf_reserve(sizeof(struct event_t)); + + if (!event) { + dropcnt.increment(0); + return NULL; + } + + event->probe = p; + event->ts = bpf_ktime_get_ns(); + event->pid = bpf_get_current_pid_tgid(); + + return event; +} + +static int emit_flow_result(struct udpif_key *ukey, ovs_u128 ufid, + u32 result, u32 reason) { + struct event_t *event = NULL; + u64 *ufid_present = NULL; + + ufid_present = watchlist.lookup(&ufid); + if (FLOW_FILTER && !ufid_present) { + return 0; + } + + event = get_event(FLOW_RESULT); + if (!event) { + /* If we can't reserve the space in the ring buffer, return 1. */ + return 1; + } + + event->result = result; + event->reason = reason; + bpf_probe_read(&event->ufid, sizeof ufid, &ufid); + events.ringbuf_submit(event, 0); + + return 0; +} + +int usdt__flow_result(struct pt_regs *ctx) { + struct udpif_key *ukey = NULL; + u32 reason = 0; + u32 result = 0; + ovs_u128 ufid; + u32 zero = 0; + + ukey = udpk.lookup(&zero); + if (!ukey) { + return 1; + } + bpf_usdt_readarg_p(4, ctx, ukey, sizeof(struct udpif_key)); + bpf_usdt_readarg(2, ctx, &reason); + bpf_usdt_readarg(1, ctx, &result); + ufid = ukey->ufid; + + return emit_flow_result(ukey, ufid, result, reason); +} + +int usdt__flow_sweep_result(struct pt_regs *ctx) { + struct udpif_key *ukey = NULL; + u32 reason = 0; + u32 result = 0; + ovs_u128 ufid; + u32 zero = 0; + + ukey = udpk.lookup(&zero); + if (!ukey) { + return 1; + } + bpf_usdt_readarg_p(4, ctx, ukey, sizeof(struct udpif_key)); + bpf_usdt_readarg(2, ctx, &reason); + bpf_usdt_readarg(1, ctx, &result); + ufid = ukey->ufid; + + return emit_flow_result(ukey, ufid, result, reason); +} + +int usdt__op_flow_put(struct pt_regs *ctx) { + struct dpif_flow_put put; + ovs_u128 ufid; + + struct event_t *event = get_event(OP_FLOW_PUT); + if (!event) { + /* If we can't reserve the space in the ring buffer, return 1. */ + return 1; + } + + bpf_usdt_readarg_p(2, ctx, &put, sizeof put); + bpf_probe_read(&event->ufid, sizeof event->ufid, put.ufid); + bpf_probe_read(&ufid, sizeof ufid, &event->ufid); + if (put.key_len > MAX_KEY) { + put.key_len = MAX_KEY; + } + event->key_size = put.key_len; + bpf_probe_read(&event->key, put.key_len, put.key); + event->reason = 0; + events.ringbuf_submit(event, 0); + + watchlist.increment(ufid); + return 0; +} +""" + +Event = IntEnum("Event", ["OP_FLOW_PUT", "FLOW_RESULT"], start=0) +RevalResult = IntEnum( + "reval_result", + [ + "UKEY_KEEP", + "UKEY_DELETE", + "UKEY_MODIFY", + ], + start=0, +) +FdrReasons = IntEnum( + "flow_del_reason", + [ + "FDR_NONE", + "FDR_AVOID_CACHING", + "FDR_BAD_ODP_FIT", + "FDR_FLOW_IDLE", + "FDR_FLOW_LIMIT", + "FDR_FLOW_WILDCARDED", + "FDR_NO_OFPROTO", + "FDR_PURGE", + "FDR_TOO_EXPENSIVE", + "FDR_UPDATE_FAIL", + "FDR_XLATION_ERROR", + ], + start=0, +) + +FdrReasonStrings = [ + "No deletion reason", + "Cache avoidance flag set", + "Bad ODP flow fit", + "Idle flow timed out", + "Kill all flows condition detected", + "Mask too wide - need narrower match", + "No matching ofproto rules", + "Too expensive to revalidate", + "Purged with user action", + "Flow state inconsistent after updates", + "Flow translation error", +] + + +# +# run_program() +# +def run_program(command): + try: + process = subprocess.run( + command, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + encoding="utf8", + check=True, + ) + + except subprocess.CalledProcessError as perror: + return perror.returncode, perror.stdout + + return 0, process.stdout + + +# +# get_ovs_definitions() +# +def get_ovs_definitions(objects, pahole="pahole", pid=None): + if pid is None: + raise ValueError("A valid pid value should be supplied!") + + if not isinstance(objects, list): + objects = [objects] + + if len(objects) == 0: + raise ValueError("Must supply at least one object!") + + vswitchd = Path(f"/proc/{pid}/exe").resolve() + + object_str = ",".join(objects) + + def run_pahole(debug_file): + error, result = run_program( + [pahole, "-C", object_str, "--compile", debug_file] + ) + + if error: + if f"pahole: {debug_file}: Invalid argument" not in result: + print( + "ERROR: Pahole failed to get ovs-vswitchd data " + "structures!\n{}".format( + re.sub( + "^", " " * 7, result.rstrip(), flags=re.MULTILINE + ) + ) + ) + sys.exit(-1) + + return None + + if bool(re.search("pahole: type .* not found", result)): + return None + + return result + + def run_readelf(bin_file): + error, result = run_program( + ["readelf", "-n", "--debug-dump=links", bin_file] + ) + + if error: + print( + "ERROR: Failed 'readelf' on \"{}\"!\n{}".format( + bin_file, re.sub("^", " " * 7, result, flags=re.MULTILINE) + ) + ) + sys.exit(-1) + + return result + + def get_debug_file(bin_file): + elf_result = run_readelf(bin_file) + match = re.search("Build ID: ([0-9a-fA-F]+)", elf_result) + if not match: + print("ERROR: Can't find build ID to read debug symbols!") + sys.exit(-1) + + dbg_file = "/usr/lib/debug/.build-id/{}/{}.debug".format( + match.group(1)[:2], match.group(1)[2:] + ) + + return dbg_file + + def get_from_shared_library(debug_file): + ovs_libs = [ + "libofproto", + "libopenvswitch", + "libovsdb", + "libsflow", + "libvtep", + ] + error, ldd_result = run_program(["ldd", debug_file]) + + if error: + print( + "ERROR: Failed 'ldd' on \"{}\"!\n{}".format( + debug_file, + re.sub("^", " " * 7, ldd_result, flags=re.MULTILINE), + ) + ) + sys.exit(-1) + + for lib in ovs_libs: + match = re.search( + r"^\s*{}.* => (.*) \(.*\)$".format(lib), + ldd_result, + flags=re.MULTILINE, + ) + if match is None: + continue + + result = run_pahole(match.group(1)) + if result is None: + result = run_pahole(get_debug_file(match.group(1))) + + if result: + return result + + return None + + # + # First try to find the debug data as part of the executable. + # + result = run_pahole(vswitchd) + + if result is None: + print(f'INFO: Failed to find debug info in "{vswitchd}"!') + + # + # Get additional .debug information if available. + # + dbg_file = get_debug_file(vswitchd) + result = run_pahole(dbg_file) + if result is None: + print(f'INFO: Failed to find debug info in "{dbg_file}"!') + + # + # Try to get information from shared libraries if used. + # + result = get_from_shared_library(vswitchd) + + if result is None: + print(f"ERROR: Failed to find needed data structures through {pahole}") + sys.exit(-1) + + # + # We need an empty _Atomic definition to avoid compiler complaints. + # + result = "#define _Atomic\n" + result + + # + # Remove the uint64_t definition as it conflicts with the kernel one. + # + result = re.sub("^typedef.*uint64_t;$", "", result, flags=re.MULTILINE) + + return result + + +# +# buffer_size_type() +# +def buffer_size_type(astr, min=64, max=2048): + value = int(astr) + if min <= value <= max: + return value + else: + raise argparse.ArgumentTypeError( + "value not in range {}-{}".format(min, max) + ) + + +# +# format_ufid() +# +def format_ufid(ufid): + if ufid is None: + return "ufid:none" + + return "{:08x}-{:04x}-{:04x}-{:04x}-{:04x}{:08x}".format( + ufid[0], + ufid[1] >> 16, + ufid[1] & 0xFFFF, + ufid[2] >> 16, + ufid[2] & 0, + ufid[3], + ) + + +# +# find_and_delete_from_watchlist() +# +def find_and_delete_from_watchlist(event): + for k, _ in b["watchlist"].items(): + key_ufid = struct.unpack("=IIII", k) + if key_ufid == tuple(event.ufid): + key = (b["watchlist"].Key * 1)(k) + b["watchlist"].items_delete_batch(key) + break + + +# +# handle_flow_put() +# +def handle_flow_put(event): + if args.flow_keys or args.filter_flows is not None: + key = decode_key(bytes(event.key)[: event.key_size]) + flow_dict, flow_str = parse_flow_dict(key) + # For each attribute that we're watching. + if args.filter_flows is not None: + if not compare_flow_to_target(args.filter_flows, flow_dict): + find_and_delete_from_watchlist(event) + return + + print( + "{:<10} {:<18.9f} {:<36} {}".format( + event.pid, + event.ts / 1000000000, + format_ufid(event.ufid), + "Insert (put) flow to ovs kernel module.", + ) + ) + + if args.flow_keys and len(flow_str): + flow_str_fields = flow_str.split("), ") + flow_str = " " + curlen = 4 + for field in flow_str_fields: + if curlen + len(field) > 79: + flow_str += "\n " + curlen = 4 + if field[-1] != ")": + field += ")" + flow_str += field + ", " + curlen += len(field) + 2 + + print(" - It holds the following key information:") + print(flow_str) + + +# +# compare_flow_to_target() +# +def compare_flow_to_target(target, flow): + for key in target: + if key not in flow: + return False + elif target[key] is True: + continue + elif target[key] == flow[key]: + continue + elif isinstance(target[key], dict) and isinstance(flow[key], dict): + return compare_flow_to_target(target[key], flow[key]) + else: + return False + return True + + +# +# parse_flow_str() +# +def parse_flow_str(flow_str): + f_list = [i.strip(", ") for i in flow_str.split(")")] + if f_list[-1] == "": + f_list = f_list[:-1] + flow_dict = {} + for e in f_list: + split_list = e.split("(") + k = split_list[0] + if len(split_list) == 1: + flow_dict[k] = True + elif split_list[1].count("=") == 0: + flow_dict[k] = split_list[1] + else: + sub_dict = {} + sublist = [i.strip() for i in split_list[1].split(",")] + for subkey in sublist: + brk = subkey.find("=") + sub_dict[subkey[:brk]] = subkey[brk + 1 :] + flow_dict[k] = sub_dict + return flow_dict + + +# +# print_expiration() +# +def print_expiration(event): + ufid_str = format_ufid(event.ufid) + + if event.reason > len(FdrReasons): + reason = f"Unknown reason '{event.reason}'" + else: + reason = FdrReasonStrings[event.reason] + + print( + "{:<10} {:<18.9f} {:<36} {:<17}".format( + event.pid, + event.ts / 1000000000, + ufid_str, + reason, + ) + ) + + +# +# decode_key() +# +def decode_key(msg): + bytes_left = len(msg) + result = {} + while bytes_left: + if bytes_left < 4: + break + nla_len, nla_type = struct.unpack("=HH", msg[:4]) + if nla_len < 4: + break + nla_data = msg[4:nla_len] + if nla_len > bytes_left: + nla_data = nla_data[: (bytes_left - 4)] + break + else: + result[get_ovs_key_attr_str(nla_type)] = nla_data + next_offset = (nla_len + 3) & (~3) + msg = msg[next_offset:] + bytes_left -= next_offset + if bytes_left: + print(f"INFO: Buffer truncated with {bytes_left} bytes left.") + return result + + +# +# get_ovs_key_attr_str() +# +def get_ovs_key_attr_str(attr): + ovs_key_attr = [ + "OVS_KEY_ATTR_UNSPEC", + "encap", + "skb_priority", + "in_port", + "eth", + "vlan", + "eth_type", + "ipv4", + "ipv6", + "tcp", + "udp", + "icmp", + "icmpv6", + "arp", + "nd", + "skb_mark", + "tunnel", + "sctp", + "tcp_flags", + "dp_hash", + "recirc_id", + "mpls", + "ct_state", + "ct_zone", + "ct_mark", + "ct_label", + "ct_tuple4", + "ct_tuple6", + "nsh", + ] + + if attr < 0 or attr > len(ovs_key_attr): + return "<UNKNOWN>: {}".format(attr) + return ovs_key_attr[attr] + + +# +# parse_flow_dict() +# +def parse_flow_dict(key_dict, decode=True): + ret_str = "" + parseable = {} + skip = ["nsh", "tunnel", "mpls", "vlan"] + need_byte_swap = ["ct_label"] + ipv4addrs = ["ct_tuple4", "tunnel", "ipv4", "arp"] + ipv6addrs = ["ipv6", "nd", "ct_tuple6"] + macs = {"eth": [0, 1], "arp": [3, 4], "nd": [1, 2]} + fields = [ + ("OVS_KEY_ATTR_UNSPEC"), + ("encap",), + ("skb_priority", "<I"), + ("in_port", "<I"), + ("eth", "!6s6s", "src", "dst"), + ("vlan",), + ("eth_type", "!H"), + ("ipv4", "!4s4s4B", "src", "dst", "proto", "tos", "ttl", "frag"), + ( + "ipv6", + "!16s16s4s4B", + "src", + "dst", + "label", + "proto", + "tclass", + "hlimit", + "frag", + ), + ("tcp", "!2H", "src", "dst"), + ("udp", "!2H", "src", "dst"), + ("icmp", "!2B", "type", "code"), + ("icmpv6", "!2B", "type", "code"), + ("arp", "!4s4sH6s6s", "sip", "tip", "op", "sha", "tha"), + ("nd", "!16s6s6s", "target", "sll", "tll"), + ("skb_mark", "<I"), + ("tunnel",), + ("sctp", "!2H", "src", "dst"), + ("tcp_flags", "!H"), + ("dp_hash", "<I"), + ("recirc_id", "<I"), + ("mpls",), + ("ct_state", "<I"), + ("ct_zone", "<H"), + ("ct_mark", "<I"), + ("ct_label", "!16s"), + ("ct_tuple4", "!4s4s2HB", "src", "dst", "tp_src", "tp_dst", "proto"), + ("ct_tuple6", "!16s16sB2H", "src", "dst", "proto", "tp_src", "tp_dst"), + ("nsh",), + ] + for k, v in key_dict.items(): + s = "" + if k in skip: + continue + if decode and int.from_bytes(v, "big") == 0: + parseable[k] = "0" + continue + if decode and k in need_byte_swap: + v = int.from_bytes(v, "little").to_bytes(len(v), "big") + attr = -1 + found = False + for f in fields: + if k == f[0]: + attr = fields.index(f) + found = True + break + if not found: + raise KeyError("Invalid flow field '%s'" % k) + if decode and len(fields[attr]) > 1: + data = list( + struct.unpack( + fields[attr][1], v[: struct.calcsize(fields[attr][1])] + ) + ) + if k in ipv4addrs: + if data[0].count(0) < 4: + data[0] = str(IPv4Address(data[0])) + else: + data[0] = b"\x00" + if data[1].count(0) < 4: + data[1] = str(IPv4Address(data[1])) + else: + data[1] = b"\x00" + if k in ipv6addrs: + if data[0].count(0) < 16: + data[0] = str(IPv6Address(data[0])) + else: + data[0] = b"\x00" + if data[1].count(0) < len(data[1]): + data[1] = str(IPv6Address(data[1])) + else: + data[1] = b"\x00" + if k in macs.keys(): + for e in macs[k]: + if data[e].count(0) == 6: + mac_str = b"\x00" + else: + mac_str = ":".join(["%02x" % i for i in data[e]]) + data[e] = mac_str + if decode and len(fields[attr]) > 2: + field_dict = dict(zip(fields[attr][2:], data)) + s = ", ".join(k + "=" + str(v) for k, v in field_dict.items()) + elif decode and k != "eth_type": + s = str(data[0]) + field_dict = s + else: + if decode: + s = hex(data[0]) + field_dict = s + ret_str += k + "(" + s + "), " + parseable[k] = field_dict + ret_str = ret_str[:-2] + return (parseable, ret_str) + + +# +# handle_event() +# +def handle_event(ctx, data, size): + # Once we grab the event, we have three cases. + # 1. It's a revalidator probe and the reason is nonzero: A flow is expiring + # 2. It's a revalidator probe and the reason is zero: flow revalidated + # 3. It's a flow_put probe. + # + # We will ignore case 2, and report all others. + # + event = b["events"].event(data) + if event.probe == Event.OP_FLOW_PUT: + handle_flow_put(event) + elif ( + event.probe == Event.FLOW_RESULT + and event.result == RevalResult.UKEY_DELETE + ): + print_expiration(event) + + +def main(): + # + # Don't like these globals, but ctx passing does not work with the existing + # open_ring_buffer() API :( + # + global b + global args + + # + # Argument parsing + # + parser = argparse.ArgumentParser() + parser.add_argument( + "--buffer-page-count", + help="Number of BPF ring buffer pages, default 1024", + type=int, + default=1024, + metavar="NUMBER", + ) + parser.add_argument( + "-f", + "--flow-key-size", + help="Set maximum flow key size to capture, " + "default 128 - see notes", + type=buffer_size_type, + default=128, + metavar="[128-2048]", + ) + parser.add_argument( + "-k", + "--flow-keys", + help="Print flow keys as flow strings", + action="store_true", + ) + parser.add_argument( + "-l", + "--filter-flows", + metavar="FLOW_STRING", + help="Filter flows that match the specified " "ODP-like flow", + type=str, + default=None, + nargs="*", + ) + parser.add_argument( + "-P", + "--pahole", + metavar="PAHOLE", + help="Pahole executable to use, default pahole", + type=str, + default="pahole", + ) + parser.add_argument( + "-p", + "--pid", + metavar="VSWITCHD_PID", + help="ovs-vswitchd's PID", + type=int, + default=None, + ) + parser.add_argument( + "-D", + "--debug", + help="Enable eBPF debugging", + type=int, + const=0x3F, + default=0, + nargs="?", + ) + args = parser.parse_args() + + # + # Find the PID of the ovs-vswitchd daemon if not specified. + # + if args.pid is None: + for proc in psutil.process_iter(): + if "ovs-vswitchd" in proc.name(): + if args.pid is not None: + print( + "Error: Multiple ovs-vswitchd daemons running, " + "use the -p option!" + ) + sys.exit(-1) + + args.pid = proc.pid + # + # Error checking on input parameters + # + if args.pid is None: + print("ERROR: Failed to find ovs-vswitchd's PID!") + sys.exit(-1) + + # + # Attach the USDT probes + # + u = USDT(pid=int(args.pid)) + try: + u.enable_probe(probe="op_flow_put", fn_name="usdt__op_flow_put") + except USDTException as e: + print("Error attaching the dpif_netlink_operate__:op_flow_put probe.") + print(str(e)) + sys.exit(-1) + + try: + u.enable_probe(probe="flow_result", fn_name="usdt__flow_result") + u.enable_probe( + probe="flow_sweep_result", fn_name="usdt__flow_sweep_result" + ) + except USDTException as e: + print("Error attaching the revalidate:flow_result probe.") + print(str(e)) + sys.exit(-1) + + # + # Attach the probes to the running process + # + source = bpf_src.replace( + "<BUFFER_PAGE_COUNT>", str(args.buffer_page_count) + ) + + source = source.replace( + "<OVS_INCLUDE_DEFINITIONS>", + get_ovs_definitions( + ["udpif_key", "ovs_u128", "dpif_flow_put"], + pid=args.pid, + pahole=args.pahole, + ), + ) + + if args.filter_flows is None: + filter_bool = 0 + + # Set the key size based on what the user wanted + source = source.replace("<MAX_KEY_VAL>", str(args.flow_key_size)) + else: + filter_bool = 1 + args.filter_flows = parse_flow_str(args.filter_flows[0]) + + # Run through the parser to make sure we only filter on fields we + # understand + parse_flow_dict(args.filter_flows, False) + + # This is hardcoded here because it doesn't make sense to shrink the + # size, since the flow key might be missing fields that are matched in + # the flow filter. + source = source.replace("<MAX_KEY_VAL>", "2048") + + source = source.replace("<FILTER_BOOL>", str(filter_bool)) + + source = source.replace( + "<EVENT_ENUM>", + "\n".join([f"{event.name} = {event.value}," for event in Event]), + ) + + b = BPF(text=source, usdt_contexts=[u], debug=args.debug) + + # + # Print header + # + print( + "{:<10} {:<18} {:<36} {:<17}".format( + "TID", "TIME", "UFID", "EVENT/REASON" + ) + ) + + # + # Dump out all events. + # + b["events"].open_ring_buffer(handle_event) + while 1: + try: + b.ring_buffer_poll() + except KeyboardInterrupt: + break + + dropcnt = b.get_table("dropcnt") + for k in dropcnt.keys(): + count = dropcnt.sum(k).value + if k.value == 0 and count > 0: + print( + "\n# WARNING: Not all flow operations were captured, {} were" + " dropped!\n# Increase the BPF ring buffer size " + "with the --buffer-page-count option.".format(count) + ) + + +# +# Start main() as the default entry point +# +if __name__ == "__main__": + main()