@@ -4146,7 +4146,9 @@ static void
ovn_lflow_destroy(struct hmap *lflows, struct ovn_lflow *lflow)
{
if (lflow) {
- hmap_remove(lflows, &lflow->hmap_node);
+ if (lflows) {
+ hmap_remove(lflows, &lflow->hmap_node);
+ }
free(lflow->match);
free(lflow->actions);
free(lflow->stage_hint);
@@ -11071,6 +11073,120 @@ build_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
}
}
+
+struct sbrec_result {
+ struct ovs_list list_node;
+ const struct sbrec_logical_flow *sbflow;
+ struct ovn_lflow *lflow;
+ ssize_t lflow_hash;
+};
+
+struct reconcile_info {
+ struct northd_context *ctx;
+ struct hmap *lflows;
+ struct hmap *datapaths;
+ struct ovs_list results;
+};
+
+struct lflow_reconciliation_pool {
+ struct worker_pool *pool;
+};
+
+static void *reconciliation_thread(void *arg) {
+ struct worker_control *control = (struct worker_control *) arg;
+ struct lflow_reconciliation_pool *workload;
+ struct reconcile_info *ri;
+ struct sbrec_result *res;
+
+ while (!seize_fire()) {
+ sem_wait(&control->fire);
+ workload = (struct lflow_reconciliation_pool *) control->workload;
+ ri = (struct reconcile_info *) control->data;
+ if (ri && workload) {
+ /* Push changes to the Logical_Flow table to database. */
+ const struct sbrec_logical_flow *sbflow;
+ SBREC_LOGICAL_FLOW_PARALLEL_FOR_EACH(sbflow, ri->ctx->ovnsb_idl, control->id, workload->pool->size) {
+ struct ovn_datapath *od
+ = ovn_datapath_from_sbrec(ri->datapaths, sbflow->logical_datapath);
+ res = xmalloc(sizeof(struct sbrec_result));
+
+ if (!od || ovn_datapath_is_stale(od)) {
+ res->sbflow = sbflow;
+ res->lflow = NULL;
+ ovs_list_push_back(&ri->results, &res->list_node);
+ continue;
+ }
+
+ enum ovn_datapath_type dp_type = od->nbs ? DP_SWITCH : DP_ROUTER;
+ enum ovn_pipeline pipeline
+ = !strcmp(sbflow->pipeline, "ingress") ? P_IN : P_OUT;
+ struct ovn_lflow *lflow = ovn_lflow_find(
+ ri->lflows, od, ovn_stage_build(dp_type, pipeline, sbflow->table_id),
+ sbflow->priority, sbflow->match, sbflow->actions, sbflow->hash);
+ if (lflow) {
+ res->lflow = lflow;
+ res->sbflow = sbflow;
+ res->lflow_hash = lflow->hmap_node.hash;
+ } else {
+ res->sbflow = sbflow;
+ res->lflow = NULL;
+ }
+ ovs_list_push_back(&ri->results, &res->list_node);
+ }
+ atomic_store_relaxed(&control->finished, true);
+ atomic_thread_fence(memory_order_release);
+ }
+ sem_post(control->done);
+ }
+ return NULL;
+}
+
+static struct lflow_reconciliation_pool *reconcile_pool = NULL;
+
+static void init_reconciliation_pool(void) {
+
+ int index;
+
+ if (!reconcile_pool) {
+ reconcile_pool =
+ xmalloc(sizeof(struct lflow_reconciliation_pool));
+ reconcile_pool->pool =
+ add_worker_pool(reconciliation_thread);
+
+ for (index = 0; index < reconcile_pool->pool->size; index++) {
+ reconcile_pool->pool->controls[index].workload =
+ reconcile_pool;
+ }
+ }
+}
+
+/* Removes 'node' from 'hmap' if present. Does not shrink the hash table; call
+ * hmap_shrink() directly if desired.
+ * Returns true if the node was found and removed, false otherwise.
+ * It needs both a node and a hash in order to function even if the node
+ * has already been freed.
+ */
+static bool
+hmap_safe_remove(struct hmap *hmap, struct hmap_node *node, size_t hash)
+{
+ struct hmap_node **bucket = &hmap->buckets[hash & hmap->mask];
+
+ if (!node) {
+ return false;
+ }
+
+ while ((*bucket) && (*bucket != node)) {
+ bucket = &(*bucket)->next;
+ }
+ if (*bucket) {
+ *bucket = node->next;
+ hmap->n--;
+ return true;
+ }
+ return false;
+}
+
+#define RECONCILE_CUTOFF 1
static ssize_t max_seen_lflow_size = 128;
@@ -11084,6 +11200,7 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths,
struct hmap *lbs)
{
struct hmap lflows;
+ const struct sbrec_logical_flow *sbflow;
fast_hmap_size_for(&lflows, max_seen_lflow_size);
@@ -11096,27 +11213,70 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths,
}
/* Push changes to the Logical_Flow table to database. */
- const struct sbrec_logical_flow *sbflow, *next_sbflow;
- SBREC_LOGICAL_FLOW_FOR_EACH_SAFE (sbflow, next_sbflow, ctx->ovnsb_idl) {
- struct ovn_datapath *od
- = ovn_datapath_from_sbrec(datapaths, sbflow->logical_datapath);
- if (!od || ovn_datapath_is_stale(od)) {
- sbrec_logical_flow_delete(sbflow);
- continue;
+ if (hmap_count(&lflows) < RECONCILE_CUTOFF) {
+ /* Push changes to the Logical_Flow table to database. */
+ const struct sbrec_logical_flow *next_sbflow;
+ SBREC_LOGICAL_FLOW_FOR_EACH_SAFE (sbflow, next_sbflow, ctx->ovnsb_idl) {
+ struct ovn_datapath *od
+ = ovn_datapath_from_sbrec(datapaths, sbflow->logical_datapath);
+ if (!od || ovn_datapath_is_stale(od)) {
+ sbrec_logical_flow_delete(sbflow);
+ continue;
+ }
+
+ enum ovn_datapath_type dp_type = od->nbs ? DP_SWITCH : DP_ROUTER;
+ enum ovn_pipeline pipeline
+ = !strcmp(sbflow->pipeline, "ingress") ? P_IN : P_OUT;
+ struct ovn_lflow *lflow = ovn_lflow_find(
+ &lflows, od, ovn_stage_build(dp_type, pipeline, sbflow->table_id),
+ sbflow->priority, sbflow->match, sbflow->actions, sbflow->hash);
+ if (lflow) {
+ ovn_lflow_destroy(&lflows, lflow);
+ } else {
+ sbrec_logical_flow_delete(sbflow);
+ }
}
+ } else {
+ struct reconcile_info *ri;
+ struct ovs_list *combined_result = NULL;
+ struct ovs_list **results = NULL;
+ int index;
+ init_reconciliation_pool();
- enum ovn_datapath_type dp_type = od->nbs ? DP_SWITCH : DP_ROUTER;
- enum ovn_pipeline pipeline
- = !strcmp(sbflow->pipeline, "ingress") ? P_IN : P_OUT;
- struct ovn_lflow *lflow = ovn_lflow_find(
- &lflows, od, ovn_stage_build(dp_type, pipeline, sbflow->table_id),
- sbflow->priority, sbflow->match, sbflow->actions, sbflow->hash);
- if (lflow) {
- ovn_lflow_destroy(&lflows, lflow);
- } else {
- sbrec_logical_flow_delete(sbflow);
+ ri = xmalloc(sizeof(struct reconcile_info) *
+ reconcile_pool->pool->size);
+ results = xmalloc(sizeof(struct ovs_list *) *
+ reconcile_pool->pool->size);
+
+ for (index = 0;
+ index < reconcile_pool->pool->size; index++) {
+
+ ri[index].lflows = &lflows;
+ ri[index].datapaths = datapaths;
+ ri[index].ctx = ctx;
+ ovs_list_init(&ri[index].results);
+ results[index] = &ri[index].results;
+ reconcile_pool->pool->controls[index].data = &ri[index];
}
+
+ run_pool_list(
+ reconcile_pool->pool,
+ &combined_result,
+ results);
+
+ struct sbrec_result *res;
+ LIST_FOR_EACH_POP (res, list_node, combined_result) {
+ if (hmap_safe_remove(&lflows, &res->lflow->hmap_node, res->lflow_hash)) {
+ ovn_lflow_destroy(NULL, res->lflow);
+ } else {
+ sbrec_logical_flow_delete(res->sbflow);
+ }
+ free(res);
+ }
+ free(results);
+ free(ri);
+
}
struct ovn_lflow *lflow, *next_lflow;
HMAP_FOR_EACH_SAFE (lflow, next_lflow, hmap_node, &lflows) {