@@ -460,25 +460,47 @@ struct dp_offload_thread_item {
};
struct dp_offload_thread {
- struct mpsc_queue queue;
- atomic_uint64_t enqueued_item;
- struct cmap megaflow_to_mark;
- struct cmap mark_to_flow;
- struct mov_avg_cma cma;
- struct mov_avg_ema ema;
+ PADDED_MEMBERS(CACHE_LINE_SIZE,
+ struct mpsc_queue queue;
+ atomic_uint64_t enqueued_item;
+ struct cmap megaflow_to_mark;
+ struct cmap mark_to_flow;
+ struct mov_avg_cma cma;
+ struct mov_avg_ema ema;
+ );
};
+static struct dp_offload_thread *dp_offload_threads;
+static void *dp_netdev_flow_offload_main(void *arg);
-static struct dp_offload_thread dp_offload_thread = {
- .queue = MPSC_QUEUE_INITIALIZER(&dp_offload_thread.queue),
- .megaflow_to_mark = CMAP_INITIALIZER,
- .mark_to_flow = CMAP_INITIALIZER,
- .enqueued_item = ATOMIC_VAR_INIT(0),
- .cma = MOV_AVG_CMA_INITIALIZER,
- .ema = MOV_AVG_EMA_INITIALIZER(100),
-};
+static void
+dp_netdev_offload_init(void)
+{
+ static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
+ unsigned int nb_offload_thread = netdev_offload_thread_nb();
+ unsigned int tid;
+
+ if (!ovsthread_once_start(&once)) {
+ return;
+ }
+
+ dp_offload_threads = xcalloc(nb_offload_thread,
+ sizeof *dp_offload_threads);
-static struct ovsthread_once offload_thread_once
- = OVSTHREAD_ONCE_INITIALIZER;
+ for (tid = 0; tid < nb_offload_thread; tid++) {
+ struct dp_offload_thread *thread;
+
+ thread = &dp_offload_threads[tid];
+ mpsc_queue_init(&thread->queue);
+ cmap_init(&thread->megaflow_to_mark);
+ cmap_init(&thread->mark_to_flow);
+ atomic_init(&thread->enqueued_item, 0);
+ mov_avg_cma_init(&thread->cma);
+ mov_avg_ema_init(&thread->ema, 100);
+ ovs_thread_create("hw_offload", dp_netdev_flow_offload_main, thread);
+ }
+
+ ovsthread_once_done(&once);
+}
#define XPS_TIMEOUT 500000LL /* In microseconds. */
@@ -2478,11 +2500,12 @@ megaflow_to_mark_associate(const ovs_u128 *mega_ufid, uint32_t mark)
{
size_t hash = dp_netdev_flow_hash(mega_ufid);
struct megaflow_to_mark_data *data = xzalloc(sizeof(*data));
+ unsigned int tid = netdev_offload_thread_id();
data->mega_ufid = *mega_ufid;
data->mark = mark;
- cmap_insert(&dp_offload_thread.megaflow_to_mark,
+ cmap_insert(&dp_offload_threads[tid].megaflow_to_mark,
CONST_CAST(struct cmap_node *, &data->node), hash);
}
@@ -2492,11 +2515,12 @@ megaflow_to_mark_disassociate(const ovs_u128 *mega_ufid)
{
size_t hash = dp_netdev_flow_hash(mega_ufid);
struct megaflow_to_mark_data *data;
+ unsigned int tid = netdev_offload_thread_id();
CMAP_FOR_EACH_WITH_HASH (data, node, hash,
- &dp_offload_thread.megaflow_to_mark) {
+ &dp_offload_threads[tid].megaflow_to_mark) {
if (ovs_u128_equals(*mega_ufid, data->mega_ufid)) {
- cmap_remove(&dp_offload_thread.megaflow_to_mark,
+ cmap_remove(&dp_offload_threads[tid].megaflow_to_mark,
CONST_CAST(struct cmap_node *, &data->node), hash);
ovsrcu_postpone(free, data);
return;
@@ -2512,9 +2536,10 @@ megaflow_to_mark_find(const ovs_u128 *mega_ufid)
{
size_t hash = dp_netdev_flow_hash(mega_ufid);
struct megaflow_to_mark_data *data;
+ unsigned int tid = netdev_offload_thread_id();
CMAP_FOR_EACH_WITH_HASH (data, node, hash,
- &dp_offload_thread.megaflow_to_mark) {
+ &dp_offload_threads[tid].megaflow_to_mark) {
if (ovs_u128_equals(*mega_ufid, data->mega_ufid)) {
return data->mark;
}
@@ -2529,9 +2554,10 @@ megaflow_to_mark_find(const ovs_u128 *mega_ufid)
static void
mark_to_flow_associate(const uint32_t mark, struct dp_netdev_flow *flow)
{
+ unsigned int tid = netdev_offload_thread_id();
dp_netdev_flow_ref(flow);
- cmap_insert(&dp_offload_thread.mark_to_flow,
+ cmap_insert(&dp_offload_threads[tid].mark_to_flow,
CONST_CAST(struct cmap_node *, &flow->mark_node),
hash_int(mark, 0));
flow->mark = mark;
@@ -2543,10 +2569,11 @@ mark_to_flow_associate(const uint32_t mark, struct dp_netdev_flow *flow)
static bool
flow_mark_has_no_ref(uint32_t mark)
{
+ unsigned int tid = netdev_offload_thread_id();
struct dp_netdev_flow *flow;
CMAP_FOR_EACH_WITH_HASH (flow, mark_node, hash_int(mark, 0),
- &dp_offload_thread.mark_to_flow) {
+ &dp_offload_threads[tid].mark_to_flow) {
if (flow->mark == mark) {
return false;
}
@@ -2562,6 +2589,7 @@ mark_to_flow_disassociate(struct dp_netdev_pmd_thread *pmd,
const char *dpif_type_str = dpif_normalize_type(pmd->dp->class->type);
struct cmap_node *mark_node = CONST_CAST(struct cmap_node *,
&flow->mark_node);
+ unsigned int tid = netdev_offload_thread_id();
uint32_t mark = flow->mark;
int ret = 0;
@@ -2571,7 +2599,8 @@ mark_to_flow_disassociate(struct dp_netdev_pmd_thread *pmd,
return EINVAL;
}
- cmap_remove(&dp_offload_thread.mark_to_flow, mark_node, hash_int(mark, 0));
+ cmap_remove(&dp_offload_threads[tid].mark_to_flow,
+ mark_node, hash_int(mark, 0));
flow->mark = INVALID_FLOW_MARK;
/*
@@ -2607,10 +2636,18 @@ static void
flow_mark_flush(struct dp_netdev_pmd_thread *pmd)
{
struct dp_netdev_flow *flow;
+ unsigned int tid;
- CMAP_FOR_EACH (flow, mark_node, &dp_offload_thread.mark_to_flow) {
- if (flow->pmd_id == pmd->core_id) {
- queue_netdev_flow_del(pmd, flow);
+ if (dp_offload_threads == NULL) {
+ return;
+ }
+
+ for (tid = 0; tid < netdev_offload_thread_nb(); tid++) {
+ CMAP_FOR_EACH (flow, mark_node,
+ &dp_offload_threads[tid].mark_to_flow) {
+ if (flow->pmd_id == pmd->core_id) {
+ queue_netdev_flow_del(pmd, flow);
+ }
}
}
}
@@ -2620,12 +2657,21 @@ mark_to_flow_find(const struct dp_netdev_pmd_thread *pmd,
const uint32_t mark)
{
struct dp_netdev_flow *flow;
+ unsigned int tid;
+ size_t hash;
- CMAP_FOR_EACH_WITH_HASH (flow, mark_node, hash_int(mark, 0),
- &dp_offload_thread.mark_to_flow) {
- if (flow->mark == mark && flow->pmd_id == pmd->core_id &&
- flow->dead == false) {
- return flow;
+ if (dp_offload_threads == NULL) {
+ return NULL;
+ }
+
+ hash = hash_int(mark, 0);
+ for (tid = 0; tid < netdev_offload_thread_nb(); tid++) {
+ CMAP_FOR_EACH_WITH_HASH (flow, mark_node, hash,
+ &dp_offload_threads[tid].mark_to_flow) {
+ if (flow->mark == mark && flow->pmd_id == pmd->core_id &&
+ flow->dead == false) {
+ return flow;
+ }
}
}
@@ -2690,10 +2736,25 @@ dp_netdev_free_offload(struct dp_offload_thread_item *offload)
}
static void
-dp_netdev_append_offload(struct dp_offload_thread_item *offload)
+dp_netdev_append_offload(struct dp_offload_thread_item *offload,
+ unsigned int tid)
+{
+ dp_netdev_offload_init();
+
+ mpsc_queue_insert(&dp_offload_threads[tid].queue, &offload->node);
+ atomic_count_inc64(&dp_offload_threads[tid].enqueued_item);
+}
+
+static void
+dp_netdev_offload_flow_enqueue(struct dp_offload_thread_item *item)
{
- mpsc_queue_insert(&dp_offload_thread.queue, &offload->node);
- atomic_count_inc64(&dp_offload_thread.enqueued_item);
+ struct dp_offload_flow_item *flow_offload = &item->data->flow;
+ unsigned int tid;
+
+ ovs_assert(item->type == DP_OFFLOAD_FLOW);
+
+ tid = netdev_offload_ufid_to_thread_id(flow_offload->flow->mega_ufid);
+ dp_netdev_append_offload(item, tid);
}
static int
@@ -2831,8 +2892,8 @@ dp_offload_flush(struct dp_offload_thread_item *item)
ovs_barrier_block(flush->barrier);
- /* Allow the other thread to take again the port lock, before
- * continuing offload operations in this thread.
+ /* Allow the initiator thread to take again the port lock,
+ * before continuing offload operations in this thread.
*/
ovs_barrier_block(flush->barrier);
}
@@ -2842,8 +2903,9 @@ dp_offload_flush(struct dp_offload_thread_item *item)
#define DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US (10 * 1000) /* 10 ms */
static void *
-dp_netdev_flow_offload_main(void *data OVS_UNUSED)
+dp_netdev_flow_offload_main(void *arg)
{
+ struct dp_offload_thread *ofl_thread = arg;
struct dp_offload_thread_item *offload;
struct mpsc_queue_node *node;
struct mpsc_queue *queue;
@@ -2852,7 +2914,7 @@ dp_netdev_flow_offload_main(void *data OVS_UNUSED)
long long int now;
uint64_t backoff;
- queue = &dp_offload_thread.queue;
+ queue = &ofl_thread->queue;
mpsc_queue_acquire(queue);
while (true) {
@@ -2867,7 +2929,7 @@ dp_netdev_flow_offload_main(void *data OVS_UNUSED)
next_rcu = time_usec() + DP_NETDEV_OFFLOAD_QUIESCE_INTERVAL_US;
MPSC_QUEUE_FOR_EACH_POP (node, queue) {
offload = CONTAINER_OF(node, struct dp_offload_thread_item, node);
- atomic_count_dec64(&dp_offload_thread.enqueued_item);
+ atomic_count_dec64(&ofl_thread->enqueued_item);
switch (offload->type) {
case DP_OFFLOAD_FLOW:
@@ -2883,8 +2945,8 @@ dp_netdev_flow_offload_main(void *data OVS_UNUSED)
now = time_usec();
latency_us = now - offload->timestamp;
- mov_avg_cma_update(&dp_offload_thread.cma, latency_us);
- mov_avg_ema_update(&dp_offload_thread.ema, latency_us);
+ mov_avg_cma_update(&ofl_thread->cma, latency_us);
+ mov_avg_ema_update(&ofl_thread->ema, latency_us);
dp_netdev_free_offload(offload);
@@ -2908,16 +2970,10 @@ queue_netdev_flow_del(struct dp_netdev_pmd_thread *pmd,
{
struct dp_offload_thread_item *offload;
- if (ovsthread_once_start(&offload_thread_once)) {
- mpsc_queue_init(&dp_offload_thread.queue);
- ovs_thread_create("hw_offload", dp_netdev_flow_offload_main, NULL);
- ovsthread_once_done(&offload_thread_once);
- }
-
offload = dp_netdev_alloc_flow_offload(pmd, flow,
DP_NETDEV_FLOW_OFFLOAD_OP_DEL);
offload->timestamp = pmd->ctx.now;
- dp_netdev_append_offload(offload);
+ dp_netdev_offload_flow_enqueue(offload);
}
static void
@@ -2933,12 +2989,6 @@ queue_netdev_flow_put(struct dp_netdev_pmd_thread *pmd,
return;
}
- if (ovsthread_once_start(&offload_thread_once)) {
- mpsc_queue_init(&dp_offload_thread.queue);
- ovs_thread_create("hw_offload", dp_netdev_flow_offload_main, NULL);
- ovsthread_once_done(&offload_thread_once);
- }
-
if (flow->mark != INVALID_FLOW_MARK) {
op = DP_NETDEV_FLOW_OFFLOAD_OP_MOD;
} else {
@@ -2952,7 +3002,7 @@ queue_netdev_flow_put(struct dp_netdev_pmd_thread *pmd,
flow_offload->actions_len = actions_len;
item->timestamp = pmd->ctx.now;
- dp_netdev_append_offload(item);
+ dp_netdev_offload_flow_enqueue(item);
}
static void
@@ -2981,25 +3031,24 @@ dp_netdev_offload_flush_enqueue(struct dp_netdev *dp,
struct netdev *netdev,
struct ovs_barrier *barrier)
{
- struct dp_offload_thread_item *item;
- struct dp_offload_flush_item *flush;
+ unsigned int tid;
+ long long int now_us = time_usec();
- if (ovsthread_once_start(&offload_thread_once)) {
- mpsc_queue_init(&dp_offload_thread.queue);
- ovs_thread_create("hw_offload", dp_netdev_flow_offload_main, NULL);
- ovsthread_once_done(&offload_thread_once);
- }
+ for (tid = 0; tid < netdev_offload_thread_nb(); tid++) {
+ struct dp_offload_thread_item *item;
+ struct dp_offload_flush_item *flush;
- item = xmalloc(sizeof *item + sizeof *flush);
- item->type = DP_OFFLOAD_FLUSH;
- item->timestamp = time_usec();
+ item = xmalloc(sizeof *item + sizeof *flush);
+ item->type = DP_OFFLOAD_FLUSH;
+ item->timestamp = now_us;
- flush = &item->data->flush;
- flush->dp = dp;
- flush->netdev = netdev;
- flush->barrier = barrier;
+ flush = &item->data->flush;
+ flush->dp = dp;
+ flush->netdev = netdev;
+ flush->barrier = barrier;
- dp_netdev_append_offload(item);
+ dp_netdev_append_offload(item, tid);
+ }
}
/* Blocking call that will wait on the offload thread to
@@ -3018,13 +3067,17 @@ dp_netdev_offload_flush(struct dp_netdev *dp,
struct dp_netdev_port *port)
OVS_REQ_WRLOCK(dp->port_rwlock)
{
- /* The flush mutex only serves to protect the static memory barrier.
+ /* The flush mutex serves to exclude mutual access to the static
+ * barrier, and to prevent multiple flush orders to several threads.
+ *
* The memory barrier needs to go beyond the function scope as
- * the other thread can resume from blocking after this function
+ * the other threads can resume from blocking after this function
* already finished.
- * As the barrier is made static, then it will be shared by
- * calls to this function, and it needs to be protected from
- * concurrent use.
+ *
+ * Additionally, because the flush operation is blocking, it would
+ * deadlock if multiple offload threads were blocking on several
+ * different barriers. Only allow a single flush order in the offload
+ * queue at a time.
*/
static struct ovs_mutex flush_mutex = OVS_MUTEX_INITIALIZER;
static struct ovs_barrier barrier OVS_GUARDED_BY(flush_mutex);
@@ -3037,8 +3090,8 @@ dp_netdev_offload_flush(struct dp_netdev *dp,
ovs_rwlock_unlock(&dp->port_rwlock);
ovs_mutex_lock(&flush_mutex);
- /* This thread and the offload thread. */
- ovs_barrier_init(&barrier, 2);
+ /* This thread and the offload threads. */
+ ovs_barrier_init(&barrier, 1 + netdev_offload_thread_nb());
netdev = netdev_ref(port->netdev);
dp_netdev_offload_flush_enqueue(dp, netdev, &barrier);
@@ -3046,7 +3099,7 @@ dp_netdev_offload_flush(struct dp_netdev *dp,
netdev_close(netdev);
/* Take back the datapath port lock before allowing the offload
- * thread to proceed further. The port deletion must complete first,
+ * threads to proceed further. The port deletion must complete first,
* to ensure no further offloads are inserted after the flush.
*
* Some offload provider (e.g. DPDK) keeps a netdev reference with
@@ -4433,60 +4486,99 @@ dpif_netdev_offload_stats_get(struct dpif *dpif,
DP_NETDEV_HW_OFFLOADS_STATS_LAT_EMA_MEAN,
DP_NETDEV_HW_OFFLOADS_STATS_LAT_EMA_STDDEV,
};
- const char *names[] = {
+ struct {
+ const char *name;
+ uint64_t total;
+ } hwol_stats[] = {
[DP_NETDEV_HW_OFFLOADS_STATS_ENQUEUED] =
- " Enqueued offloads",
+ { " Enqueued offloads", 0 },
[DP_NETDEV_HW_OFFLOADS_STATS_INSERTED] =
- " Inserted offloads",
+ { " Inserted offloads", 0 },
[DP_NETDEV_HW_OFFLOADS_STATS_LAT_CMA_MEAN] =
- " Cumulative Average latency (us)",
+ { " Cumulative Average latency (us)", 0 },
[DP_NETDEV_HW_OFFLOADS_STATS_LAT_CMA_STDDEV] =
- " Cumulative Latency stddev (us)",
+ { " Cumulative Latency stddev (us)", 0 },
[DP_NETDEV_HW_OFFLOADS_STATS_LAT_EMA_MEAN] =
- " Exponential Average latency (us)",
+ { " Exponential Average latency (us)", 0 },
[DP_NETDEV_HW_OFFLOADS_STATS_LAT_EMA_STDDEV] =
- " Exponential Latency stddev (us)",
+ { " Exponential Latency stddev (us)", 0 },
};
struct dp_netdev *dp = get_dp_netdev(dpif);
struct dp_netdev_port *port;
- uint64_t nb_offloads;
+ unsigned int nb_thread;
+ uint64_t *port_nb_offloads;
+ uint64_t *nb_offloads;
+ unsigned int tid;
size_t i;
if (!netdev_is_flow_api_enabled()) {
return EINVAL;
}
- stats->size = ARRAY_SIZE(names);
+ nb_thread = netdev_offload_thread_nb();
+ /* nb_thread counters for the overall total as well. */
+ stats->size = ARRAY_SIZE(hwol_stats) * (nb_thread + 1);
stats->counters = xcalloc(stats->size, sizeof *stats->counters);
- nb_offloads = 0;
+ nb_offloads = xcalloc(nb_thread, sizeof *nb_offloads);
+ port_nb_offloads = xcalloc(nb_thread, sizeof *port_nb_offloads);
ovs_rwlock_rdlock(&dp->port_rwlock);
HMAP_FOR_EACH (port, node, &dp->ports) {
- uint64_t port_nb_offloads = 0;
-
+ memset(port_nb_offloads, 0, nb_thread * sizeof *port_nb_offloads);
/* Do not abort on read error from a port, just report 0. */
- if (!netdev_flow_get_n_flows(port->netdev, &port_nb_offloads)) {
- nb_offloads += port_nb_offloads;
+ if (!netdev_flow_get_n_flows(port->netdev, port_nb_offloads)) {
+ for (i = 0; i < nb_thread; i++) {
+ nb_offloads[i] += port_nb_offloads[i];
+ }
}
}
ovs_rwlock_unlock(&dp->port_rwlock);
- atomic_read_relaxed(&dp_offload_thread.enqueued_item,
- &stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_ENQUEUED].value);
- stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_INSERTED].value = nb_offloads;
- stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_LAT_CMA_MEAN].value =
- mov_avg_cma(&dp_offload_thread.cma);
- stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_LAT_CMA_STDDEV].value =
- mov_avg_cma_std_dev(&dp_offload_thread.cma);
- stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_LAT_EMA_MEAN].value =
- mov_avg_ema(&dp_offload_thread.ema);
- stats->counters[DP_NETDEV_HW_OFFLOADS_STATS_LAT_EMA_STDDEV].value =
- mov_avg_ema_std_dev(&dp_offload_thread.ema);
-
- for (i = 0; i < ARRAY_SIZE(names); i++) {
+ free(port_nb_offloads);
+
+ for (tid = 0; tid < nb_thread; tid++) {
+ uint64_t counts[ARRAY_SIZE(hwol_stats)];
+ size_t idx = ((tid + 1) * ARRAY_SIZE(hwol_stats));
+
+ memset(counts, 0, sizeof counts);
+ counts[DP_NETDEV_HW_OFFLOADS_STATS_INSERTED] = nb_offloads[tid];
+ if (dp_offload_threads != NULL) {
+ atomic_read_relaxed(&dp_offload_threads[tid].enqueued_item,
+ &counts[DP_NETDEV_HW_OFFLOADS_STATS_ENQUEUED]);
+
+ counts[DP_NETDEV_HW_OFFLOADS_STATS_LAT_CMA_MEAN] =
+ mov_avg_cma(&dp_offload_threads[tid].cma);
+ counts[DP_NETDEV_HW_OFFLOADS_STATS_LAT_CMA_STDDEV] =
+ mov_avg_cma_std_dev(&dp_offload_threads[tid].cma);
+
+ counts[DP_NETDEV_HW_OFFLOADS_STATS_LAT_EMA_MEAN] =
+ mov_avg_ema(&dp_offload_threads[tid].ema);
+ counts[DP_NETDEV_HW_OFFLOADS_STATS_LAT_EMA_STDDEV] =
+ mov_avg_ema_std_dev(&dp_offload_threads[tid].ema);
+ }
+
+ for (i = 0; i < ARRAY_SIZE(hwol_stats); i++) {
+ snprintf(stats->counters[idx + i].name,
+ sizeof(stats->counters[idx + i].name),
+ " [%3u] %s", tid, hwol_stats[i].name);
+ stats->counters[idx + i].value = counts[i];
+ hwol_stats[i].total += counts[i];
+ }
+ }
+
+ free(nb_offloads);
+
+ /* Do an average of the average for the aggregate. */
+ hwol_stats[DP_NETDEV_HW_OFFLOADS_STATS_LAT_CMA_MEAN].total /= nb_thread;
+ hwol_stats[DP_NETDEV_HW_OFFLOADS_STATS_LAT_CMA_STDDEV].total /= nb_thread;
+ hwol_stats[DP_NETDEV_HW_OFFLOADS_STATS_LAT_EMA_MEAN].total /= nb_thread;
+ hwol_stats[DP_NETDEV_HW_OFFLOADS_STATS_LAT_EMA_STDDEV].total /= nb_thread;
+
+ for (i = 0; i < ARRAY_SIZE(hwol_stats); i++) {
snprintf(stats->counters[i].name, sizeof(stats->counters[i].name),
- "%s", names[i]);
+ " Total %s", hwol_stats[i].name);
+ stats->counters[i].value = hwol_stats[i].total;
}
return 0;
@@ -59,6 +59,7 @@ struct ufid_to_rte_flow_data {
bool actions_offloaded;
struct dpif_flow_stats stats;
struct ovs_mutex lock;
+ unsigned int creation_tid;
bool dead;
};
@@ -235,6 +236,7 @@ ufid_to_rte_flow_associate(struct netdev *netdev, const ovs_u128 *ufid,
data->netdev = netdev_ref(netdev);
data->rte_flow = rte_flow;
data->actions_offloaded = actions_offloaded;
+ data->creation_tid = netdev_offload_thread_id();
ovs_mutex_init(&data->lock);
cmap_insert(map, CONST_CAST(struct cmap_node *, &data->node), hash);
@@ -1772,13 +1774,16 @@ netdev_offload_dpdk_flow_flush(struct netdev *netdev)
{
struct cmap *map = offload_data_map(netdev);
struct ufid_to_rte_flow_data *data;
+ unsigned int tid = netdev_offload_thread_id();
if (!map) {
return -1;
}
CMAP_FOR_EACH (data, node, map) {
- netdev_offload_dpdk_flow_destroy(data);
+ if (data->creation_tid == tid) {
+ netdev_offload_dpdk_flow_destroy(data);
+ }
}
return 0;