From patchwork Tue May 12 07:41:13 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Ivanov X-Patchwork-Id: 1288269 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Authentication-Results: ozlabs.org; spf=pass (sender SPF authorized) smtp.mailfrom=openvswitch.org (client-ip=140.211.166.133; helo=hemlock.osuosl.org; envelope-from=ovs-dev-bounces@openvswitch.org; receiver=) Authentication-Results: ozlabs.org; dmarc=none (p=none dis=none) header.from=cambridgegreys.com Received: from hemlock.osuosl.org (smtp2.osuosl.org [140.211.166.133]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 49LqVw5FxBz9sRY for ; Tue, 12 May 2020 17:41:28 +1000 (AEST) Received: from localhost (localhost [127.0.0.1]) by hemlock.osuosl.org (Postfix) with ESMTP id 17E61885E4; Tue, 12 May 2020 07:41:27 +0000 (UTC) X-Virus-Scanned: amavisd-new at osuosl.org Received: from hemlock.osuosl.org ([127.0.0.1]) by localhost (.osuosl.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id n1JzfU57+oVt; Tue, 12 May 2020 07:41:26 +0000 (UTC) Received: from lists.linuxfoundation.org (lf-lists.osuosl.org [140.211.9.56]) by hemlock.osuosl.org (Postfix) with ESMTP id 31323883E2; Tue, 12 May 2020 07:41:26 +0000 (UTC) Received: from lf-lists.osuosl.org (localhost [127.0.0.1]) by lists.linuxfoundation.org (Postfix) with ESMTP id 0197DC088A; Tue, 12 May 2020 07:41:26 +0000 (UTC) X-Original-To: dev@openvswitch.org Delivered-To: ovs-dev@lists.linuxfoundation.org Received: from hemlock.osuosl.org (smtp2.osuosl.org [140.211.166.133]) by lists.linuxfoundation.org (Postfix) with ESMTP id E46EAC016F for ; Tue, 12 May 2020 07:41:24 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by hemlock.osuosl.org (Postfix) with ESMTP id DDF4C882BE for ; Tue, 12 May 2020 07:41:24 +0000 (UTC) X-Virus-Scanned: amavisd-new at osuosl.org Received: from hemlock.osuosl.org ([127.0.0.1]) by localhost (.osuosl.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id fJHo2owuLajT for ; Tue, 12 May 2020 07:41:23 +0000 (UTC) X-Greylist: from auto-whitelisted by SQLgrey-1.7.6 Received: from www.kot-begemot.co.uk (ivanoab7.miniserver.com [37.128.132.42]) by hemlock.osuosl.org (Postfix) with ESMTPS id 9953D88166 for ; Tue, 12 May 2020 07:41:23 +0000 (UTC) Received: from tun252.jain.kot-begemot.co.uk ([192.168.18.6] helo=jain.kot-begemot.co.uk) by www.kot-begemot.co.uk with esmtps (TLS1.3:ECDHE_RSA_AES_256_GCM_SHA384:256) (Exim 4.92) (envelope-from ) id 1jYPXU-0003zB-Db; Tue, 12 May 2020 07:41:20 +0000 Received: from jain.kot-begemot.co.uk ([192.168.3.3]) by jain.kot-begemot.co.uk with esmtp (Exim 4.92) (envelope-from ) id 1jYPXR-0001uB-BE; Tue, 12 May 2020 08:41:19 +0100 From: anton.ivanov@cambridgegreys.com To: dev@openvswitch.org Date: Tue, 12 May 2020 08:41:13 +0100 Message-Id: <20200512074114.6985-1-anton.ivanov@cambridgegreys.com> X-Mailer: git-send-email 2.20.1 MIME-Version: 1.0 X-Clacks-Overhead: GNU Terry Pratchett Cc: Anton Ivanov Subject: [ovs-dev] [PATCH 1/2] Make ByteQ safe for simultaneous producer/consumer X-BeenThere: ovs-dev@openvswitch.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: ovs-dev-bounces@openvswitch.org Sender: "dev" From: Anton Ivanov A ByteQ with unlocked head and tail is unsafe for simultaneous consume/produce. If simultaneous use is desired, these either need to be locked or there needs to be a third atomic or lock guarded variable "used". An atomic "used" allows the producer to enqueue safely because it "owns" the head and even if the consumer changes the head it will only increase the space available versus the value in "used". Once the data has been written and the enqueued should be made visible it fenced and the used is updated. Similar for "consumer" - it can safely consume now as it "owns" tail and never reads beyond tail + used (wrapped around as needed). Signed-off-by: Anton Ivanov --- lib/byteq.c | 17 ++++++++++++++++- lib/byteq.h | 2 ++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/lib/byteq.c b/lib/byteq.c index 3f865cf9e..da40c2530 100644 --- a/lib/byteq.c +++ b/lib/byteq.c @@ -19,6 +19,7 @@ #include #include #include "util.h" +#include "ovs-atomic.h" /* Initializes 'q' as an empty byteq that uses the 'size' bytes of 'buffer' to * store data. 'size' must be a power of 2. @@ -32,13 +33,16 @@ byteq_init(struct byteq *q, uint8_t *buffer, size_t size) q->buffer = buffer; q->size = size; q->head = q->tail = 0; + q->used = ATOMIC_VAR_INIT(0); } /* Returns the number of bytes current queued in 'q'. */ int byteq_used(const struct byteq *q) { - return q->head - q->tail; + int retval; + atomic_read_relaxed(&q->used, &retval); + return retval; } /* Returns the number of bytes that can be added to 'q' without overflow. */ @@ -68,9 +72,11 @@ byteq_is_full(const struct byteq *q) void byteq_put(struct byteq *q, uint8_t c) { + int discard; ovs_assert(!byteq_is_full(q)); *byteq_head(q) = c; q->head++; + atomic_add(&q->used, 1, &discard); } /* Adds the 'n' bytes in 'p' at the head of 'q', which must have at least 'n' @@ -79,6 +85,7 @@ void byteq_putn(struct byteq *q, const void *p_, size_t n) { const uint8_t *p = p_; + int discard; ovs_assert(byteq_avail(q) >= n); while (n > 0) { size_t chunk = MIN(n, byteq_headroom(q)); @@ -86,6 +93,7 @@ byteq_putn(struct byteq *q, const void *p_, size_t n) byteq_advance_head(q, chunk); p += chunk; n -= chunk; + atomic_add(&q->used, chunk, &discard); } } @@ -103,9 +111,11 @@ uint8_t byteq_get(struct byteq *q) { uint8_t c; + int discard; ovs_assert(!byteq_is_empty(q)); c = *byteq_tail(q); q->tail++; + atomic_sub(&q->used, 1, &discard); return c; } @@ -168,8 +178,10 @@ byteq_tail(const struct byteq *q) void byteq_advance_tail(struct byteq *q, unsigned int n) { + int discard; ovs_assert(byteq_tailroom(q) >= n); q->tail += n; + atomic_sub_relaxed(&q->used, n, &discard); } /* Returns the byte after the last in-use byte of 'q', the point at which new @@ -195,6 +207,9 @@ byteq_headroom(const struct byteq *q) void byteq_advance_head(struct byteq *q, unsigned int n) { + int discard; ovs_assert(byteq_headroom(q) >= n); q->head += n; + atomic_thread_fence(memory_order_release); + atomic_add_relaxed(&q->used, n, &discard); } diff --git a/lib/byteq.h b/lib/byteq.h index d73e3684e..e829efab0 100644 --- a/lib/byteq.h +++ b/lib/byteq.h @@ -19,6 +19,7 @@ #include #include #include +#include "ovs-atomic.h" /* General-purpose circular queue of bytes. */ struct byteq { @@ -26,6 +27,7 @@ struct byteq { unsigned int size; /* Number of bytes allocated for 'buffer'. */ unsigned int head; /* Head of queue. */ unsigned int tail; /* Chases the head. */ + atomic_int used; }; void byteq_init(struct byteq *, uint8_t *buffer, size_t size); From patchwork Tue May 12 07:41:14 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Anton Ivanov X-Patchwork-Id: 1288270 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Authentication-Results: ozlabs.org; spf=pass (sender SPF authorized) smtp.mailfrom=openvswitch.org (client-ip=140.211.166.136; helo=silver.osuosl.org; envelope-from=ovs-dev-bounces@openvswitch.org; receiver=) Authentication-Results: ozlabs.org; dmarc=none (p=none dis=none) header.from=cambridgegreys.com Received: from silver.osuosl.org (smtp3.osuosl.org [140.211.166.136]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by ozlabs.org (Postfix) with ESMTPS id 49LqW36fSCz9sRf for ; Tue, 12 May 2020 17:41:35 +1000 (AEST) Received: from localhost (localhost [127.0.0.1]) by silver.osuosl.org (Postfix) with ESMTP id 507A725877; Tue, 12 May 2020 07:41:34 +0000 (UTC) X-Virus-Scanned: amavisd-new at osuosl.org Received: from silver.osuosl.org ([127.0.0.1]) by localhost (.osuosl.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id jwXBNaem0se8; Tue, 12 May 2020 07:41:30 +0000 (UTC) Received: from lists.linuxfoundation.org (lf-lists.osuosl.org [140.211.9.56]) by silver.osuosl.org (Postfix) with ESMTP id 0EEA025729; Tue, 12 May 2020 07:41:30 +0000 (UTC) Received: from lf-lists.osuosl.org (localhost [127.0.0.1]) by lists.linuxfoundation.org (Postfix) with ESMTP id DB03CC0893; Tue, 12 May 2020 07:41:29 +0000 (UTC) X-Original-To: dev@openvswitch.org Delivered-To: ovs-dev@lists.linuxfoundation.org Received: from hemlock.osuosl.org (smtp2.osuosl.org [140.211.166.133]) by lists.linuxfoundation.org (Postfix) with ESMTP id 1DEF7C016F for ; Tue, 12 May 2020 07:41:29 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by hemlock.osuosl.org (Postfix) with ESMTP id 000A188871 for ; Tue, 12 May 2020 07:41:28 +0000 (UTC) X-Virus-Scanned: amavisd-new at osuosl.org Received: from hemlock.osuosl.org ([127.0.0.1]) by localhost (.osuosl.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id jBAWCzgBTXfw for ; Tue, 12 May 2020 07:41:26 +0000 (UTC) X-Greylist: from auto-whitelisted by SQLgrey-1.7.6 Received: from www.kot-begemot.co.uk (ivanoab7.miniserver.com [37.128.132.42]) by hemlock.osuosl.org (Postfix) with ESMTPS id EBA80882BE for ; Tue, 12 May 2020 07:41:25 +0000 (UTC) Received: from tun252.jain.kot-begemot.co.uk ([192.168.18.6] helo=jain.kot-begemot.co.uk) by www.kot-begemot.co.uk with esmtps (TLS1.3:ECDHE_RSA_AES_256_GCM_SHA384:256) (Exim 4.92) (envelope-from ) id 1jYPXX-0003zG-Mz; Tue, 12 May 2020 07:41:24 +0000 Received: from jain.kot-begemot.co.uk ([192.168.3.3]) by jain.kot-begemot.co.uk with esmtp (Exim 4.92) (envelope-from ) id 1jYPXT-0001uB-5A; Tue, 12 May 2020 08:41:21 +0100 From: anton.ivanov@cambridgegreys.com To: dev@openvswitch.org Date: Tue, 12 May 2020 08:41:14 +0100 Message-Id: <20200512074114.6985-2-anton.ivanov@cambridgegreys.com> X-Mailer: git-send-email 2.20.1 In-Reply-To: <20200512074114.6985-1-anton.ivanov@cambridgegreys.com> References: <20200512074114.6985-1-anton.ivanov@cambridgegreys.com> MIME-Version: 1.0 X-Clacks-Overhead: GNU Terry Pratchett Cc: Anton Ivanov Subject: [ovs-dev] [PATCH 2/2] Introduce async IO in JSONRPC X-BeenThere: ovs-dev@openvswitch.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: ovs-dev-bounces@openvswitch.org Sender: "dev" From: Anton Ivanov 1. Pull out buffering and send/receive ops from json rpc 2. Make the SSL send zero copy (it was creating an ofpbuf out of an existing ofpbuf data without necessity). 3. Add vector IO to stream-fd to make flushing more efficient. Also makes queueing for stream-fd and stream-ssl roughly identical. 4. Unify backlog management 5. Make use of the full capacity of the incoming buffer and not only when it is empty. 6. Allow for IO to be run in worker threads 7. Various minor fixes to enable async io - make rx errors visible to tx and vice versa. Make activity tracking for reconnect async friendly, etc. 8. Enable Async IO in ovsdb Signed-off-by: Anton Ivanov --- lib/async-io.c | 530 ++++++++++++++++++++++++++++++++++++++++++ lib/async-io.h | 86 +++++++ lib/automake.mk | 2 + lib/jsonrpc.c | 151 +++++------- lib/stream-fd.c | 82 +++++++ lib/stream-provider.h | 34 ++- lib/stream-ssl.c | 64 ++++- lib/stream-tcp.c | 2 + lib/stream-unix.c | 2 + lib/stream-windows.c | 2 + ovsdb/ovsdb-server.c | 2 + 11 files changed, 864 insertions(+), 93 deletions(-) create mode 100644 lib/async-io.c create mode 100644 lib/async-io.h diff --git a/lib/async-io.c b/lib/async-io.c new file mode 100644 index 000000000..16d568a50 --- /dev/null +++ b/lib/async-io.c @@ -0,0 +1,530 @@ +/* + * Copyright (c) 2020 Red Hat Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "stream-provider.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include "coverage.h" +#include "fatal-signal.h" +#include "flow.h" +#include "jsonrpc.h" +#include "openflow/nicira-ext.h" +#include "openflow/openflow.h" +#include "openvswitch/dynamic-string.h" +#include "openvswitch/ofp-print.h" +#include "openvswitch/ofpbuf.h" +#include "openvswitch/vlog.h" +#include "ovs-thread.h" +#include "ovs-atomic.h" +#include "packets.h" +#include "openvswitch/poll-loop.h" +#include "random.h" +#include "socket-util.h" +#include "util.h" +#include "timeval.h" +#include "async-io.h" +#include "ovs-numa.h" + +VLOG_DEFINE_THIS_MODULE(async_io); + +static bool allow_async_io = false; + +static bool async_io_setup = false; +static bool kill_async_io = false; + +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER; + +static struct ovs_list io_pools = OVS_LIST_INITIALIZER(&io_pools); + +static int pool_size; + +static struct async_io_pool *io_pool = NULL; + +static int do_async_recv(struct async_data *data); +static int do_stream_flush(struct async_data *data); + +static inline bool not_in_error(struct async_data *data) { + int rx_error, tx_error; + + if (!data->valid) { + return false; + } + + atomic_read_relaxed(&data->rx_error, &rx_error); + atomic_read_relaxed(&data->tx_error, &tx_error); + + return ( + ((rx_error > 0) || (rx_error == -EAGAIN)) && + ((tx_error >= 0) || (tx_error == -EAGAIN)) + ); +} + +static inline bool in_error(struct async_data *data) { + return ! not_in_error(data); +} + + +static void *default_async_io_helper(void *arg) { + struct async_io_control *io_control = + (struct async_io_control *) arg; + struct async_data *data; + int retval; + + do { + ovs_mutex_lock(&io_control->mutex); + latch_poll(&io_control->async_latch); + LIST_FOR_EACH (data, list_node, &io_control->work_items) { + long backlog, oldbacklog; + ovs_mutex_lock(&data->mutex); + retval = -EAGAIN; + if (not_in_error(data)) { + /* + * We stop reading if the input queue is full + */ + if (byteq_headroom(&data->input) != 0) { + retval = do_async_recv(data); + } else { + poll_timer_wait(1); + retval = 0; + } + } + if (not_in_error(data) && (retval > 0 || retval == -EAGAIN)) { + stream_recv_wait(data->stream); + } + atomic_read_relaxed(&data->backlog, &oldbacklog); + if (not_in_error(data)) { + stream_run(data->stream); + do_stream_flush(data); + } + atomic_read_relaxed(&data->backlog, &backlog); + if (not_in_error(data)) { + if (backlog) { + /* upper layers will refuse to process rx + * until the tx is clear, so no point + * notifying them + */ + stream_send_wait(data->stream); + } else { + /* There is no backlog, so the rpc layer will + * actually pay attention to our notifications + * We issue a notification for both pending + * input and what is the equivalent of + * "IO Completion" + */ + if (!byteq_is_empty(&data->input) || oldbacklog) { + latch_set(&data->rx_notify); + } + } + } + if (data->valid && in_error(data)) { + /* make sure that the other thread(s) notice any errors. + * this should not be an else because errors may have + * changed inside the ifs above. + */ + latch_set(&data->rx_notify); + data->valid = false; + } + if (not_in_error(data)) { + stream_run_wait(data->stream); + } + ovs_mutex_unlock(&data->mutex); + } + ovs_mutex_unlock(&io_control->mutex); + latch_wait(&io_control->async_latch); + poll_block(); + } while (!kill_async_io); + return arg; +} + +static void async_io_hook(void *aux OVS_UNUSED) { + int i; + static struct async_io_pool *pool; + kill_async_io = true; + LIST_FOR_EACH (pool, list_node, &io_pools) { + for (i = 0; i < pool->size ; i++) { + latch_set(&pool->controls[i].async_latch); + latch_destroy(&pool->controls[i].async_latch); + } + } +} + +static void setup_async_io(void) { + int cores, nodes; + + nodes = ovs_numa_get_n_numas(); + if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) { + nodes = 1; + } + cores = ovs_numa_get_n_cores(); + if (cores == OVS_CORE_UNSPEC || cores <= 0) { + pool_size = 4; + } else { + pool_size = cores / nodes; + } + fatal_signal_add_hook(async_io_hook, NULL, NULL, true); + async_io_setup = true; +} + +struct async_io_pool *add_pool(void *(*start)(void *)){ + + struct async_io_pool *new_pool = NULL; + struct async_io_control *io_control; + int i; + + ovs_mutex_lock(&init_mutex); + + if (!async_io_setup) { + setup_async_io(); + } + + new_pool = xmalloc(sizeof(struct async_io_pool)); + new_pool->size = pool_size; /* we may make this more dynamic later */ + + ovs_list_push_back(&io_pools, &new_pool->list_node); + + new_pool->controls = + xmalloc(sizeof(struct async_io_control) * new_pool->size); + for (i = 0; i < new_pool->size; i++) { + io_control = &new_pool->controls[i]; + latch_init(&io_control->async_latch); + ovs_mutex_init(&io_control->mutex); + ovs_list_init(&io_control->work_items); + } + for (i = 0; i < pool_size; i++) { + ovs_thread_create("async io helper", start, &new_pool->controls[i]); + } + ovs_mutex_unlock(&init_mutex); + return new_pool; +} + +void +async_init_data(struct async_data *data, struct stream *stream) +{ + struct async_io_control *target_control; + unsigned int buffer_size; + + data->stream = stream; +#ifdef __linux__ + buffer_size = getpagesize(); + if (!is_pow2(buffer_size)) { + buffer_size = ASYNC_BUFFER_SIZE; + } +#else + buffer_size = ASYNC_BUFFER_SIZE; +#endif +#if (_POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600) + /* try to allocate a buffer_size as aligned, that by default is one page + * if that fails, fall back to normal memory allocation. + */ + if (posix_memalign( + (void **) &data->input_buffer, buffer_size, buffer_size)) { + data->input_buffer = xmalloc(buffer_size); + } +#else + data->input_buffer = xmalloc(buffer_size); +#endif + byteq_init(&data->input, data->input_buffer, buffer_size); + ovs_list_init(&data->output); + data->output_count = 0; + data->rx_error = ATOMIC_VAR_INIT(-EAGAIN); + data->tx_error = ATOMIC_VAR_INIT(0); + data->active = ATOMIC_VAR_INIT(false); + data->backlog = ATOMIC_VAR_INIT(0); + ovs_mutex_init(&data->mutex); + data->async_mode = allow_async_io; + data->valid = true; + if (data->async_mode) { + if (!io_pool) { + io_pool = add_pool(default_async_io_helper); + } + data->async_id = random_uint32(); + target_control = &io_pool->controls[data->async_id % io_pool->size]; + /* these are just fd pairs, no need to play with pointers, we + * can pass them around + */ + data->tx_run_notify = target_control->async_latch; + latch_init(&data->rx_notify); + ovs_mutex_lock(&target_control->mutex); + ovs_list_push_back(&target_control->work_items, &data->list_node); + ovs_mutex_unlock(&target_control->mutex); + latch_set(&target_control->async_latch); + } +} + +void +async_stream_enable(struct async_data *data) +{ + data->async_mode = allow_async_io; +} + +void +async_stream_disable(struct async_data *data) +{ + struct async_io_control *target_control; + bool needs_wake = false; + + + if (data->async_mode) { + if (not_in_error(data) && (async_get_backlog(data) > 0)) { + needs_wake = true; + latch_poll(&data->rx_notify); + latch_wait(&data->rx_notify); + latch_set(&data->tx_run_notify); + /* limit this to 50ms - should be enough for + * a single flush and we will not get stuck here + * waiting for a send to complete + */ + poll_timer_wait(50); + poll_block(); + } + if (needs_wake) { + /* we have lost all poll-wait info because we block()-ed + * locally, we need to force the upper layers to rerun so + * that they reinstate the correct waits + */ + poll_immediate_wake(); + } + target_control = &io_pool->controls[data->async_id % io_pool->size]; + ovs_mutex_lock(&target_control->mutex); + ovs_list_remove(&data->list_node); + ovs_mutex_unlock(&target_control->mutex); + data->async_mode = false; + latch_destroy(&data->rx_notify); + } + if (data->input_buffer) { + free(data->input_buffer); + data->input_buffer = NULL; + } +} + +void +async_cleanup_data(struct async_data *data) +{ + if (async_get_backlog(data)) { + ofpbuf_list_delete(&data->output); + } + atomic_store_relaxed(&data->backlog, 0); + data->output_count = 0; +} + +/* Routines intended for async IO */ + +long async_stream_enqueue(struct async_data *data, struct ofpbuf *buf) { + long retval = -EAGAIN; + long discard; + + ovs_mutex_lock(&data->mutex); + if (buf) { + ovs_list_push_back(&data->output, &buf->list_node); + data->output_count ++; + atomic_add_relaxed(&data->backlog, buf->size, &discard); + atomic_thread_fence(memory_order_release); + } + atomic_read_relaxed(&data->backlog, &retval); + ovs_mutex_unlock(&data->mutex); + return retval; +} + +static int do_stream_flush(struct async_data *data) { + struct ofpbuf *buf; + int count = 0; + bool stamp = false; + int retval = -stream_connect(data->stream); + long discard; + + if (!retval) { + while (!ovs_list_is_empty(&data->output) && count < 10) { + buf = ofpbuf_from_list(data->output.next); + if (data->stream->class->enqueue) { + ovs_list_remove(&buf->list_node); + retval = (data->stream->class->enqueue)(data->stream, buf); + if (retval > 0) { + data->output_count--; + } else { + ovs_list_push_front(&data->output, &buf->list_node); + } + } else { + retval = stream_send(data->stream, buf->data, buf->size); + if (retval > 0) { + stamp = true; + atomic_sub_relaxed(&data->backlog, retval, &discard); + ofpbuf_pull(buf, retval); + if (!buf->size) { + /* stream now owns buf */ + ovs_list_remove(&buf->list_node); + data->output_count--; + ofpbuf_delete(buf); + } + } + } + if (retval <= 0) { + break; + } + count++; + } + if (data->stream->class->flush && (retval >= 0 || retval == -EAGAIN)) { + (data->stream->class->flush)(data->stream, &retval); + if (retval > 0) { + stamp = true; + atomic_sub_relaxed(&data->backlog, retval, &discard); + } + } + if (stamp) { + atomic_store_relaxed(&data->active, true); + } + } + atomic_store_relaxed(&data->tx_error, retval); + return retval; +} + +int async_stream_flush(struct async_data *data) { + int retval; + + if (data->async_mode) { + atomic_read_relaxed(&data->tx_error, &retval); + if (retval >= 0) { + retval = -EAGAIN; /* fake a busy so that upper layers do not + * retry, we will flush the backlog in the + * background + */ + } + if (async_get_backlog(data)) { + latch_set(&data->tx_run_notify); + } + } else { + retval = do_stream_flush(data); + } + return retval; +} + +static int do_async_recv(struct async_data *data) { + size_t chunk; + int retval; + + atomic_read_relaxed(&data->rx_error, &retval); + if (retval > 0 || retval == -EAGAIN) { + chunk = byteq_headroom(&data->input); + if (chunk > 0) { + retval = stream_recv( + data->stream, byteq_head(&data->input), chunk); + if (retval > 0) { + byteq_advance_head(&data->input, retval); + } + } + } + if (retval > 0 || retval == -EAGAIN) { + retval = byteq_used(&data->input); + if (retval == 0) { + retval = -EAGAIN; + } + } + atomic_store_relaxed(&data->rx_error, retval); + return retval; +} + + +int async_stream_recv(struct async_data *data) { + int retval = -EAGAIN; + + if (data->async_mode) { + atomic_read_relaxed(&data->rx_error, &retval); + /* clear RX notifications */ + latch_poll(&data->rx_notify); + /* fake a retval from byteq usage */ + if (retval > 0 || retval == -EAGAIN) { + retval = byteq_used(&data->input); + if (retval == 0) { + retval = -EAGAIN; + } + } + } else { + retval = do_async_recv(data); + } + return retval; +} + +void async_stream_run(struct async_data *data) { + if (!data->async_mode) { + stream_run(data->stream); + } else { + latch_set(&data->tx_run_notify); + } + } + +void async_io_kick(struct async_data *data) { + if (data->async_mode) { + latch_set(&data->tx_run_notify); + } +} + +void async_recv_wait(struct async_data *data) { + if (data->async_mode) { + latch_poll(&data->rx_notify); + latch_wait(&data->rx_notify); + } else { + stream_recv_wait(data->stream); + } +} + +void async_io_enable(void) { + allow_async_io = true; +} + +/* Accessors for JSON RPC */ + +struct byteq *async_get_input(struct async_data *data) { + return &data->input; +} +struct stream *async_get_stream(struct async_data *data) { + return data->stream; +} + +bool async_output_is_empty(struct async_data *data) { + bool retval; + ovs_mutex_lock(&data->mutex); + /* backlog tracks backlog across the full stack all the + * way to the actual send. It is the source of truth + * if we have output or not so anybody asking if we + * have output should be told if we have backlog + * instead. + */ + retval = (data->backlog == 0); + ovs_mutex_unlock(&data->mutex); + return retval; +} + +long async_get_backlog(struct async_data *data) { + long retval; + /* This is used only by the unixctl connection + * so not worth it to convert backlog to atomics + */ + atomic_read_relaxed(&data->backlog, &retval); + return retval; +} + +bool async_get_active(struct async_data *data) { + bool test = true; + return atomic_compare_exchange_weak(&data->active, &test, false); +} + + diff --git a/lib/async-io.h b/lib/async-io.h new file mode 100644 index 000000000..dea070ee6 --- /dev/null +++ b/lib/async-io.h @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2020 Red Hat, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ASYNC_IO_H +#define ASYNC_IO_H 1 + +#include +#include +#include +#include +#include "openvswitch/types.h" +#include "openvswitch/ofpbuf.h" +#include "socket-util.h" +#include "ovs-atomic.h" +#include "ovs-thread.h" +#include "latch.h" +#include "byteq.h" +#include "util.h" + +#define ASYNC_BUFFER_SIZE (4096) + +struct stream; + +struct async_data { + struct stream *stream; + struct ovs_list output; + struct ovs_list list_node; + long backlog; + size_t output_count; + atomic_bool active; + atomic_int rx_error, tx_error; + uint32_t async_id; + struct latch rx_notify, tx_run_notify; + struct ovs_mutex mutex; + bool async_mode, valid; + struct byteq input; + uint8_t *input_buffer; +}; + +struct async_io_control { + struct latch async_latch; + struct ovs_list work_items; + struct ovs_mutex mutex; +}; + +struct async_io_pool { + struct ovs_list list_node; + struct async_io_control *controls; + int size; +}; + +struct async_io_pool *add_pool(void *(*start)(void *)); + +long async_stream_enqueue(struct async_data *, struct ofpbuf *buf); +int async_stream_flush(struct async_data *); +int async_stream_recv(struct async_data *); +struct byteq *async_get_input(struct async_data *); +struct stream *async_get_stream(struct async_data *); +bool async_output_is_empty(struct async_data *); +long async_get_backlog(struct async_data *); +bool async_get_active(struct async_data *); + +void async_stream_enable(struct async_data *); +void async_stream_disable(struct async_data *); + +void async_init_data(struct async_data *, struct stream *); +void async_cleanup_data(struct async_data *); +void async_stream_run(struct async_data *data); +void async_io_kick(struct async_data *data); +void async_recv_wait(struct async_data *data); +void async_io_enable(void); + +#endif /* async-io.h */ diff --git a/lib/automake.mk b/lib/automake.mk index 86940ccd2..6f7870f26 100644 --- a/lib/automake.mk +++ b/lib/automake.mk @@ -24,6 +24,8 @@ lib_libopenvswitch_la_SOURCES = \ lib/aes128.c \ lib/aes128.h \ lib/async-append.h \ + lib/async-io.h \ + lib/async-io.c \ lib/backtrace.c \ lib/backtrace.h \ lib/bfd.c \ diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c index ed748dbde..f831bc2dd 100644 --- a/lib/jsonrpc.c +++ b/lib/jsonrpc.c @@ -30,28 +30,23 @@ #include "openvswitch/poll-loop.h" #include "reconnect.h" #include "stream.h" +#include "stream-provider.h" #include "svec.h" #include "timeval.h" +#include "async-io.h" #include "openvswitch/vlog.h" VLOG_DEFINE_THIS_MODULE(jsonrpc); struct jsonrpc { - struct stream *stream; char *name; int status; - - /* Input. */ - struct byteq input; - uint8_t input_buffer[4096]; struct json_parser *parser; - - /* Output. */ - struct ovs_list output; /* Contains "struct ofpbuf"s. */ - size_t output_count; /* Number of elements in "output". */ - size_t backlog; + struct async_data data; }; +#define MIN_IDLE_TIME 10 + /* Rate limit for error messages. */ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5); @@ -59,6 +54,11 @@ static struct jsonrpc_msg *jsonrpc_parse_received_message(struct jsonrpc *); static void jsonrpc_cleanup(struct jsonrpc *); static void jsonrpc_error(struct jsonrpc *, int error); +static inline struct async_data *adata(struct jsonrpc *rpc) { + return &rpc->data; +} + + /* This is just the same as stream_open() except that it uses the default * JSONRPC port if none is specified. */ int @@ -86,10 +86,8 @@ jsonrpc_open(struct stream *stream) rpc = xzalloc(sizeof *rpc); rpc->name = xstrdup(stream_get_name(stream)); - rpc->stream = stream; - byteq_init(&rpc->input, rpc->input_buffer, sizeof rpc->input_buffer); - ovs_list_init(&rpc->output); - + async_init_data(adata(rpc), stream); + async_stream_enable(adata(rpc)); return rpc; } @@ -109,33 +107,22 @@ jsonrpc_close(struct jsonrpc *rpc) void jsonrpc_run(struct jsonrpc *rpc) { + int retval; if (rpc->status) { return; } - stream_run(rpc->stream); - while (!ovs_list_is_empty(&rpc->output)) { - struct ofpbuf *buf = ofpbuf_from_list(rpc->output.next); - int retval; - - retval = stream_send(rpc->stream, buf->data, buf->size); - if (retval >= 0) { - rpc->backlog -= retval; - ofpbuf_pull(buf, retval); - if (!buf->size) { - ovs_list_remove(&buf->list_node); - rpc->output_count--; - ofpbuf_delete(buf); - } - } else { + async_stream_run(adata(rpc)); + do { + retval = async_stream_flush(&rpc->data); + if (retval < 0) { if (retval != -EAGAIN) { VLOG_WARN_RL(&rl, "%s: send error: %s", rpc->name, ovs_strerror(-retval)); jsonrpc_error(rpc, -retval); } - break; } - } + } while (retval > 0); } /* Arranges for the poll loop to wake up when 'rpc' needs to perform @@ -144,9 +131,13 @@ void jsonrpc_wait(struct jsonrpc *rpc) { if (!rpc->status) { - stream_run_wait(rpc->stream); - if (!ovs_list_is_empty(&rpc->output)) { - stream_send_wait(rpc->stream); + if (adata(rpc)->async_mode) { + async_recv_wait(adata(rpc)); + } else { + stream_run_wait(rpc->data.stream); + if (!async_output_is_empty(adata(rpc))) { + stream_send_wait(async_get_stream(adata(rpc))); + } } } } @@ -175,7 +166,7 @@ jsonrpc_get_status(const struct jsonrpc *rpc) size_t jsonrpc_get_backlog(const struct jsonrpc *rpc) { - return rpc->status ? 0 : rpc->backlog; + return rpc->status ? 0 : async_get_backlog(adata((struct jsonrpc *) rpc)); } /* Returns the number of bytes that have been received on 'rpc''s underlying @@ -183,7 +174,7 @@ jsonrpc_get_backlog(const struct jsonrpc *rpc) unsigned int jsonrpc_get_received_bytes(const struct jsonrpc *rpc) { - return rpc->input.head; + return async_get_input(adata((struct jsonrpc *) rpc))->head; } /* Returns 'rpc''s name, that is, the name returned by stream_get_name() for @@ -234,13 +225,13 @@ jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title, * buffered in 'rpc'.) * * Always takes ownership of 'msg', regardless of success. */ + int jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg) { struct ofpbuf *buf; struct json *json; struct ds ds = DS_EMPTY_INITIALIZER; - size_t length; if (rpc->status) { jsonrpc_msg_destroy(msg); @@ -251,24 +242,13 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg) json = jsonrpc_msg_to_json(msg); json_to_ds(json, 0, &ds); - length = ds.length; json_destroy(json); buf = xmalloc(sizeof *buf); ofpbuf_use_ds(buf, &ds); - ovs_list_push_back(&rpc->output, &buf->list_node); - rpc->output_count++; - rpc->backlog += length; - - if (rpc->output_count >= 50) { - VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of" - " msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name, - rpc->output_count, rpc->backlog); - } + async_stream_enqueue(adata(rpc), buf); - if (rpc->backlog == length) { - jsonrpc_run(rpc); - } + jsonrpc_run(rpc); return rpc->status; } @@ -291,7 +271,7 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg) int jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) { - int i; + int i, retval; *msgp = NULL; if (rpc->status) { @@ -302,36 +282,32 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) size_t n, used; /* Fill our input buffer if it's empty. */ - if (byteq_is_empty(&rpc->input)) { - size_t chunk; - int retval; - - chunk = byteq_headroom(&rpc->input); - retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk); - if (retval < 0) { - if (retval == -EAGAIN) { - return EAGAIN; - } else { - VLOG_WARN_RL(&rl, "%s: receive error: %s", - rpc->name, ovs_strerror(-retval)); - jsonrpc_error(rpc, -retval); - return rpc->status; - } - } else if (retval == 0) { - jsonrpc_error(rpc, EOF); - return EOF; + retval = async_stream_recv(adata(rpc)); + if (retval < 0) { + if (retval == -EAGAIN) { + return EAGAIN; + } else { + VLOG_WARN_RL(&rl, "%s: receive error: %s", + rpc->name, ovs_strerror(-retval)); + jsonrpc_error(rpc, -retval); + return rpc->status; } - byteq_advance_head(&rpc->input, retval); + } else if (retval == 0) { + jsonrpc_error(rpc, EOF); + return EOF; } /* We have some input. Feed it into the JSON parser. */ if (!rpc->parser) { rpc->parser = json_parser_create(0); } - n = byteq_tailroom(&rpc->input); + n = byteq_tailroom(async_get_input(adata(rpc))); + if (n == 0) { + break; + } used = json_parser_feed(rpc->parser, - (char *) byteq_tail(&rpc->input), n); - byteq_advance_tail(&rpc->input, used); + (char *) byteq_tail(async_get_input(adata(rpc))), n); + byteq_advance_tail(async_get_input(adata(rpc)), used); /* If we have complete JSON, attempt to parse it as JSON-RPC. */ if (json_parser_is_done(rpc->parser)) { @@ -341,7 +317,7 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) } if (rpc->status) { - const struct byteq *q = &rpc->input; + const struct byteq *q = async_get_input(adata(rpc)); if (q->head <= q->size) { stream_report_content(q->buffer, q->head, STREAM_JSONRPC, &this_module, rpc->name); @@ -359,10 +335,10 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) void jsonrpc_recv_wait(struct jsonrpc *rpc) { - if (rpc->status || !byteq_is_empty(&rpc->input)) { + if (rpc->status || !byteq_is_empty(async_get_input(adata(rpc)))) { poll_immediate_wake_at(rpc->name); } else { - stream_recv_wait(rpc->stream); + async_recv_wait(adata(rpc)); } } @@ -385,7 +361,7 @@ jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg) for (;;) { jsonrpc_run(rpc); - if (ovs_list_is_empty(&rpc->output) || rpc->status) { + if (async_output_is_empty(adata(rpc)) || rpc->status) { return rpc->status; } jsonrpc_wait(rpc); @@ -495,15 +471,14 @@ jsonrpc_error(struct jsonrpc *rpc, int error) static void jsonrpc_cleanup(struct jsonrpc *rpc) { - stream_close(rpc->stream); - rpc->stream = NULL; + async_stream_disable(adata(rpc)); + stream_close(rpc->data.stream); + rpc->data.stream = NULL; json_parser_abort(rpc->parser); rpc->parser = NULL; - ofpbuf_list_delete(&rpc->output); - rpc->backlog = 0; - rpc->output_count = 0; + async_cleanup_data(adata(rpc)); } static struct jsonrpc_msg * @@ -977,12 +952,14 @@ jsonrpc_session_run(struct jsonrpc_session *s) } if (s->rpc) { - size_t backlog; int error; + bool active = async_get_active(adata(s->rpc)); - backlog = jsonrpc_get_backlog(s->rpc); jsonrpc_run(s->rpc); - if (jsonrpc_get_backlog(s->rpc) < backlog) { + + active |= async_get_active(adata(s->rpc)); + + if (active) { /* Data previously caught in a queue was successfully sent (or * there's an error, which we'll catch below.) * @@ -1076,8 +1053,8 @@ jsonrpc_session_get_name(const struct jsonrpc_session *s) const char * jsonrpc_session_get_id(const struct jsonrpc_session *s) { - if (s->rpc && s->rpc->stream) { - return stream_get_peer_id(s->rpc->stream); + if (s->rpc && async_get_stream(adata(s->rpc))) { + return stream_get_peer_id(adata(s->rpc)->stream); } else { return NULL; } diff --git a/lib/stream-fd.c b/lib/stream-fd.c index 46ee7ae27..747d543cf 100644 --- a/lib/stream-fd.c +++ b/lib/stream-fd.c @@ -30,6 +30,7 @@ #include "stream-provider.h" #include "stream.h" #include "openvswitch/vlog.h" +#include "openvswitch/list.h" VLOG_DEFINE_THIS_MODULE(stream_fd); @@ -40,6 +41,8 @@ struct stream_fd struct stream stream; int fd; int fd_type; + struct ovs_list output; + int queue_depth; }; static const struct stream_class stream_fd_class; @@ -67,6 +70,8 @@ new_fd_stream(char *name, int fd, int connect_status, int fd_type, stream_init(&s->stream, &stream_fd_class, connect_status, name); s->fd = fd; s->fd_type = fd_type; + s->queue_depth = 0; + ovs_list_init(&s->output); *streamp = &s->stream; return 0; } @@ -83,6 +88,7 @@ fd_close(struct stream *stream) { struct stream_fd *s = stream_fd_cast(stream); closesocket(s->fd); + ofpbuf_list_delete(&s->output); free(s); } @@ -111,6 +117,11 @@ fd_recv(struct stream *stream, void *buffer, size_t n) if (error == WSAEWOULDBLOCK) { error = EAGAIN; } +#endif +#ifdef __linux__ + if (error == ENOBUFS) { + error = EAGAIN; + } #endif if (error != EAGAIN) { VLOG_DBG_RL(&rl, "recv: %s", sock_strerror(error)); @@ -162,6 +173,75 @@ fd_wait(struct stream *stream, enum stream_wait_type wait) } } +static int +fd_enqueue(struct stream *stream, struct ofpbuf *buf) +{ + struct stream_fd *sfd = stream_fd_cast(stream); + ovs_list_push_back(&sfd->output, &buf->list_node); + sfd->queue_depth ++; + return buf->size; +} + +static bool +fd_flush(struct stream *stream, int *retval) +{ + struct stream_fd *sfd = stream_fd_cast(stream); + int old_q_depth; + + if (sfd->queue_depth == 0) { + * retval = -EAGAIN; + return true; + } else { + int sent, i = 0; + struct msghdr msg; + struct ofpbuf *buf; + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = xmalloc(sizeof(struct iovec) * sfd->queue_depth); + msg.msg_iovlen = sfd->queue_depth; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_flags = 0; + + LIST_FOR_EACH (buf, list_node, &sfd->output) { + msg.msg_iov[i].iov_base = buf->data; + msg.msg_iov[i].iov_len = buf->size; + i++; + } + + sent = sendmsg(sfd->fd, &msg, 0); + + free(msg.msg_iov); + + if (sent > 0) { + * retval = sent; + old_q_depth = sfd->queue_depth; + for (i = 0; i < old_q_depth ; i++) { + buf = ofpbuf_from_list(sfd->output.next); + if (buf->size > sent) { + ofpbuf_pull(buf, sent); + sent = 0; + } else { + sent -= buf->size; + sfd->queue_depth --; + ovs_list_remove(&buf->list_node); + ofpbuf_delete(buf); + } + if (sent == 0) { + break; + } + } + return true; + } else { + *retval = -sock_errno(); + return false; + } + } +} + + + static const struct stream_class stream_fd_class = { "fd", /* name */ false, /* needs_probes */ @@ -173,6 +253,8 @@ static const struct stream_class stream_fd_class = { NULL, /* run */ NULL, /* run_wait */ fd_wait, /* wait */ + fd_enqueue, /* enqueue */ + fd_flush, /* flush */ }; /* Passive file descriptor stream. */ diff --git a/lib/stream-provider.h b/lib/stream-provider.h index 75f4f059b..b5161bd04 100644 --- a/lib/stream-provider.h +++ b/lib/stream-provider.h @@ -18,9 +18,13 @@ #define STREAM_PROVIDER_H 1 #include +#include +#include "openvswitch/list.h" +#include "openvswitch/ofpbuf.h" +#include "openvswitch/thread.h" #include "stream.h" - -/* Active stream connection. */ +#include "byteq.h" +#include "latch.h" /* Active stream connection. * @@ -124,6 +128,31 @@ struct stream_class { /* Arranges for the poll loop to wake up when 'stream' is ready to take an * action of the given 'type'. */ void (*wait)(struct stream *stream, enum stream_wait_type type); + /* Enqueues an ofpbuf and surrenders its ownership to the + * stream + * + * - If successful - stream now owns the buffer, returns + * backlog size + * + * - On error, negative value, buffer is not claimed by + * the stream. + * + * The enqueue function must not block. If no bytes can be immediately + * accepted for transmission, it should return -EAGAIN immediately. */ + int (*enqueue)(struct stream *stream, struct ofpbuf *buf); + /* Flushes any stream buffers + * + * - If successful returns true and retval contains the backlog size + * + * - If partially successful (EAGAIN), returns false and retval is + * a positive backlog size + * + * - If unsuccessful, returns false and retval contains a negative + * error value + * + * The flush function must not block. If buffers cannot be flushed + * completely it should return "partial success" immediately. */ + bool (*flush)(struct stream *stream, int *retval); }; /* Passive listener for incoming stream connections. @@ -184,6 +213,7 @@ struct pstream_class { /* Arranges for the poll loop to wake up when a connection is ready to be * accepted on 'pstream'. */ void (*wait)(struct pstream *pstream); + }; /* Active and passive stream classes. */ diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c index 078fcbc3a..0046e383e 100644 --- a/lib/stream-ssl.c +++ b/lib/stream-ssl.c @@ -85,6 +85,8 @@ struct ssl_stream SSL *ssl; struct ofpbuf *txbuf; unsigned int session_nr; + int last_enqueued; + long backlog_to_report; /* rx_want and tx_want record the result of the last call to SSL_read() * and SSL_write(), respectively: @@ -304,6 +306,8 @@ new_ssl_stream(char *name, char *server_name, int fd, enum session_type type, sslv->rx_want = sslv->tx_want = SSL_NOTHING; sslv->session_nr = next_session_nr++; sslv->n_head = 0; + sslv->last_enqueued = 0; + sslv->backlog_to_report = 0; if (VLOG_IS_DBG_ENABLED()) { SSL_set_msg_callback(ssl, ssl_protocol_cb); @@ -784,8 +788,59 @@ ssl_run(struct stream *stream) { struct ssl_stream *sslv = ssl_stream_cast(stream); - if (sslv->txbuf && ssl_do_tx(stream) != EAGAIN) { - ssl_clear_txbuf(sslv); + if (sslv->txbuf) { + if (ssl_do_tx(stream) != EAGAIN) { + sslv->backlog_to_report += sslv->last_enqueued; + ssl_clear_txbuf(sslv); + } + } +} + +static int +ssl_enqueue(struct stream *stream, struct ofpbuf *buf) +{ + int n = buf->size; + struct ssl_stream *sslv = ssl_stream_cast(stream); + if (sslv->txbuf) { + return -EAGAIN; + } + sslv->txbuf = buf; + sslv->last_enqueued = n; + return n; +} + +static bool +ssl_flush(struct stream *stream, int *retval) +{ + struct ssl_stream *sslv = ssl_stream_cast(stream); + + if (!sslv->txbuf) { + if (sslv->backlog_to_report) { + * retval = sslv->backlog_to_report; + sslv->backlog_to_report = 0; + } else { + * retval = -EAGAIN; + } + return true; + } else { + int error; + + error = ssl_do_tx(stream); + switch (error) { + case 0: + ssl_clear_txbuf(sslv); + * retval = sslv->backlog_to_report + sslv->last_enqueued; + sslv->backlog_to_report = 0; + sslv->last_enqueued = 0; + return true; + case EAGAIN: + * retval = 0; + return false; + default: + ssl_clear_txbuf(sslv); + * retval = -error; + return false; + } } } @@ -840,8 +895,7 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait) /* We have room in our tx queue. */ poll_immediate_wake(); } else { - /* stream_run_wait() will do the right thing; don't bother with - * redundancy. */ + poll_fd_wait(sslv->fd, POLLOUT); } break; @@ -861,6 +915,8 @@ const struct stream_class ssl_stream_class = { ssl_run, /* run */ ssl_run_wait, /* run_wait */ ssl_wait, /* wait */ + ssl_enqueue, /* send_buf */ + ssl_flush, }; /* Passive SSL. */ diff --git a/lib/stream-tcp.c b/lib/stream-tcp.c index e8dc2bfaa..63632e989 100644 --- a/lib/stream-tcp.c +++ b/lib/stream-tcp.c @@ -73,6 +73,8 @@ const struct stream_class tcp_stream_class = { NULL, /* run */ NULL, /* run_wait */ NULL, /* wait */ + NULL, /* enqueue */ + NULL, /* flush */ }; /* Passive TCP. */ diff --git a/lib/stream-unix.c b/lib/stream-unix.c index d265efb83..4fd7573f2 100644 --- a/lib/stream-unix.c +++ b/lib/stream-unix.c @@ -73,6 +73,8 @@ const struct stream_class unix_stream_class = { NULL, /* run */ NULL, /* run_wait */ NULL, /* wait */ + NULL, /* enqueue */ + NULL, /* flush */ }; /* Passive UNIX socket. */ diff --git a/lib/stream-windows.c b/lib/stream-windows.c index 5c4c55e5d..cb8c2d3c3 100644 --- a/lib/stream-windows.c +++ b/lib/stream-windows.c @@ -374,6 +374,8 @@ const struct stream_class windows_stream_class = { NULL, /* run */ NULL, /* run_wait */ windows_wait, /* wait */ + NULL, /* enqueue */ + NULL, /* flush */ }; struct pwindows_pstream diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c index d416f1b60..83450beaa 100644 --- a/ovsdb/ovsdb-server.c +++ b/ovsdb/ovsdb-server.c @@ -59,6 +59,7 @@ #include "perf-counter.h" #include "ovsdb-util.h" #include "openvswitch/vlog.h" +#include "async-io.h" VLOG_DEFINE_THIS_MODULE(ovsdb_server); @@ -398,6 +399,7 @@ main(int argc, char *argv[]) } daemonize_complete(); + async_io_enable(); if (!run_command) { /* ovsdb-server is usually a long-running process, in which case it