Message ID | 20200512074114.6985-2-anton.ivanov@cambridgegreys.com |
---|---|
State | Superseded |
Headers | show |
Series | [ovs-dev,1/2] Make ByteQ safe for simultaneous producer/consumer | expand |
Hi Anton, One general note about this submission is that I see some coding guidelines violations. One is when it comes to definitions of functions. The guidelines say to write these as return_type func_name(params) { } But some of your functions are written as: return_type func_name(params) { } Notice the return_type and opening parenthesis on the same line as the func name. Another I see is with regards to spacing around prefix and postfix operators. For instance, I see pointer dereferencing being done as: * pointer instead of *pointer In general, I'd say to have a look at the guidelines. Unfortunately, checkpatch.py doesn't catch these issues. I have another observation below inline: On 5/12/20 3:41 AM, anton.ivanov@cambridgegreys.com wrote: > From: Anton Ivanov <anton.ivanov@cambridgegreys.com> > > 1. Pull out buffering and send/receive ops from json rpc > > 2. Make the SSL send zero copy (it was creating an ofpbuf > out of an existing ofpbuf data without necessity). > > 3. Add vector IO to stream-fd to make flushing more > efficient. Also makes queueing for stream-fd and stream-ssl > roughly identical. > > 4. Unify backlog management > > 5. Make use of the full capacity of the incoming buffer and > not only when it is empty. > > 6. Allow for IO to be run in worker threads > > 7. Various minor fixes to enable async io - make rx errors > visible to tx and vice versa. Make activity tracking for > reconnect async friendly, etc. > > 8. Enable Async IO in ovsdb > > Signed-off-by: Anton Ivanov <anton.ivanov@cambridgegreys.com> > --- > lib/async-io.c | 530 ++++++++++++++++++++++++++++++++++++++++++ > lib/async-io.h | 86 +++++++ > lib/automake.mk | 2 + > lib/jsonrpc.c | 151 +++++------- > lib/stream-fd.c | 82 +++++++ > lib/stream-provider.h | 34 ++- > lib/stream-ssl.c | 64 ++++- > lib/stream-tcp.c | 2 + > lib/stream-unix.c | 2 + > lib/stream-windows.c | 2 + > ovsdb/ovsdb-server.c | 2 + > 11 files changed, 864 insertions(+), 93 deletions(-) > create mode 100644 lib/async-io.c > create mode 100644 lib/async-io.h > > diff --git a/lib/async-io.c b/lib/async-io.c > new file mode 100644 > index 000000000..16d568a50 > --- /dev/null > +++ b/lib/async-io.c > @@ -0,0 +1,530 @@ > +/* > + * Copyright (c) 2020 Red Hat Inc > + * > + * Licensed under the Apache License, Version 2.0 (the "License"); > + * you may not use this file except in compliance with the License. > + * You may obtain a copy of the License at: > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +#include <config.h> > +#include "stream-provider.h" > +#include <errno.h> > +#include <unistd.h> > +#include <inttypes.h> > +#include <sys/types.h> > +#include <netinet/in.h> > +#include <poll.h> > +#include <stdlib.h> > +#include <string.h> > +#include "coverage.h" > +#include "fatal-signal.h" > +#include "flow.h" > +#include "jsonrpc.h" > +#include "openflow/nicira-ext.h" > +#include "openflow/openflow.h" > +#include "openvswitch/dynamic-string.h" > +#include "openvswitch/ofp-print.h" > +#include "openvswitch/ofpbuf.h" > +#include "openvswitch/vlog.h" > +#include "ovs-thread.h" > +#include "ovs-atomic.h" > +#include "packets.h" > +#include "openvswitch/poll-loop.h" > +#include "random.h" > +#include "socket-util.h" > +#include "util.h" > +#include "timeval.h" > +#include "async-io.h" > +#include "ovs-numa.h" > + > +VLOG_DEFINE_THIS_MODULE(async_io); > + > +static bool allow_async_io = false; > + > +static bool async_io_setup = false; > +static bool kill_async_io = false; > + > +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER; > + > +static struct ovs_list io_pools = OVS_LIST_INITIALIZER(&io_pools); > + > +static int pool_size; > + > +static struct async_io_pool *io_pool = NULL; > + > +static int do_async_recv(struct async_data *data); > +static int do_stream_flush(struct async_data *data); > + > +static inline bool not_in_error(struct async_data *data) { > + int rx_error, tx_error; > + > + if (!data->valid) { > + return false; > + } > + > + atomic_read_relaxed(&data->rx_error, &rx_error); > + atomic_read_relaxed(&data->tx_error, &tx_error); > + > + return ( > + ((rx_error > 0) || (rx_error == -EAGAIN)) && > + ((tx_error >= 0) || (tx_error == -EAGAIN)) > + ); > +} > + > +static inline bool in_error(struct async_data *data) { > + return ! not_in_error(data); > +} > + > + > +static void *default_async_io_helper(void *arg) { > + struct async_io_control *io_control = > + (struct async_io_control *) arg; > + struct async_data *data; > + int retval; > + > + do { > + ovs_mutex_lock(&io_control->mutex); > + latch_poll(&io_control->async_latch); > + LIST_FOR_EACH (data, list_node, &io_control->work_items) { > + long backlog, oldbacklog; > + ovs_mutex_lock(&data->mutex); > + retval = -EAGAIN; > + if (not_in_error(data)) { > + /* > + * We stop reading if the input queue is full > + */ > + if (byteq_headroom(&data->input) != 0) { > + retval = do_async_recv(data); > + } else { > + poll_timer_wait(1); > + retval = 0; > + } > + } > + if (not_in_error(data) && (retval > 0 || retval == -EAGAIN)) { > + stream_recv_wait(data->stream); > + } > + atomic_read_relaxed(&data->backlog, &oldbacklog); > + if (not_in_error(data)) { > + stream_run(data->stream); > + do_stream_flush(data); > + } > + atomic_read_relaxed(&data->backlog, &backlog); > + if (not_in_error(data)) { > + if (backlog) { > + /* upper layers will refuse to process rx > + * until the tx is clear, so no point > + * notifying them > + */ > + stream_send_wait(data->stream); > + } else { > + /* There is no backlog, so the rpc layer will > + * actually pay attention to our notifications > + * We issue a notification for both pending > + * input and what is the equivalent of > + * "IO Completion" > + */ > + if (!byteq_is_empty(&data->input) || oldbacklog) { > + latch_set(&data->rx_notify); > + } > + } > + } > + if (data->valid && in_error(data)) { > + /* make sure that the other thread(s) notice any errors. > + * this should not be an else because errors may have > + * changed inside the ifs above. > + */ > + latch_set(&data->rx_notify); > + data->valid = false; > + } > + if (not_in_error(data)) { > + stream_run_wait(data->stream); > + } > + ovs_mutex_unlock(&data->mutex); > + } > + ovs_mutex_unlock(&io_control->mutex); > + latch_wait(&io_control->async_latch); > + poll_block(); > + } while (!kill_async_io); > + return arg; > +} > + > +static void async_io_hook(void *aux OVS_UNUSED) { > + int i; > + static struct async_io_pool *pool; > + kill_async_io = true; > + LIST_FOR_EACH (pool, list_node, &io_pools) { > + for (i = 0; i < pool->size ; i++) { > + latch_set(&pool->controls[i].async_latch); > + latch_destroy(&pool->controls[i].async_latch); > + } > + } > +} > + > +static void setup_async_io(void) { > + int cores, nodes; > + > + nodes = ovs_numa_get_n_numas(); > + if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) { > + nodes = 1; > + } > + cores = ovs_numa_get_n_cores(); > + if (cores == OVS_CORE_UNSPEC || cores <= 0) { > + pool_size = 4; > + } else { > + pool_size = cores / nodes; > + } > + fatal_signal_add_hook(async_io_hook, NULL, NULL, true); > + async_io_setup = true; > +} > + > +struct async_io_pool *add_pool(void *(*start)(void *)){ > + > + struct async_io_pool *new_pool = NULL; > + struct async_io_control *io_control; > + int i; > + > + ovs_mutex_lock(&init_mutex); > + > + if (!async_io_setup) { > + setup_async_io(); > + } > + > + new_pool = xmalloc(sizeof(struct async_io_pool)); > + new_pool->size = pool_size; /* we may make this more dynamic later */ > + > + ovs_list_push_back(&io_pools, &new_pool->list_node); > + > + new_pool->controls = > + xmalloc(sizeof(struct async_io_control) * new_pool->size); > + for (i = 0; i < new_pool->size; i++) { > + io_control = &new_pool->controls[i]; > + latch_init(&io_control->async_latch); > + ovs_mutex_init(&io_control->mutex); > + ovs_list_init(&io_control->work_items); > + } > + for (i = 0; i < pool_size; i++) { > + ovs_thread_create("async io helper", start, &new_pool->controls[i]); > + } > + ovs_mutex_unlock(&init_mutex); > + return new_pool; > +} > + > +void > +async_init_data(struct async_data *data, struct stream *stream) > +{ > + struct async_io_control *target_control; > + unsigned int buffer_size; > + > + data->stream = stream; > +#ifdef __linux__ > + buffer_size = getpagesize(); > + if (!is_pow2(buffer_size)) { > + buffer_size = ASYNC_BUFFER_SIZE; > + } > +#else > + buffer_size = ASYNC_BUFFER_SIZE; > +#endif > +#if (_POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600) > + /* try to allocate a buffer_size as aligned, that by default is one page > + * if that fails, fall back to normal memory allocation. > + */ > + if (posix_memalign( > + (void **) &data->input_buffer, buffer_size, buffer_size)) { > + data->input_buffer = xmalloc(buffer_size); > + } > +#else > + data->input_buffer = xmalloc(buffer_size); > +#endif > + byteq_init(&data->input, data->input_buffer, buffer_size); > + ovs_list_init(&data->output); > + data->output_count = 0; > + data->rx_error = ATOMIC_VAR_INIT(-EAGAIN); > + data->tx_error = ATOMIC_VAR_INIT(0); > + data->active = ATOMIC_VAR_INIT(false); > + data->backlog = ATOMIC_VAR_INIT(0); > + ovs_mutex_init(&data->mutex); > + data->async_mode = allow_async_io; > + data->valid = true; > + if (data->async_mode) { > + if (!io_pool) { > + io_pool = add_pool(default_async_io_helper); > + } There is a theoretical race condition here since the read of io_pool is not mutex-protected. Multiple threads could potentially call add_pool() here. The result would be a memory leak of an async_io_pool and a thread leak of pool_size threads. > + data->async_id = random_uint32(); > + target_control = &io_pool->controls[data->async_id % io_pool->size]; > + /* these are just fd pairs, no need to play with pointers, we > + * can pass them around > + */ > + data->tx_run_notify = target_control->async_latch; > + latch_init(&data->rx_notify); > + ovs_mutex_lock(&target_control->mutex); > + ovs_list_push_back(&target_control->work_items, &data->list_node); > + ovs_mutex_unlock(&target_control->mutex); > + latch_set(&target_control->async_latch); > + } > +} > + > +void > +async_stream_enable(struct async_data *data) > +{ > + data->async_mode = allow_async_io; > +} > + > +void > +async_stream_disable(struct async_data *data) > +{ > + struct async_io_control *target_control; > + bool needs_wake = false; > + > + > + if (data->async_mode) { > + if (not_in_error(data) && (async_get_backlog(data) > 0)) { > + needs_wake = true; > + latch_poll(&data->rx_notify); > + latch_wait(&data->rx_notify); > + latch_set(&data->tx_run_notify); > + /* limit this to 50ms - should be enough for > + * a single flush and we will not get stuck here > + * waiting for a send to complete > + */ > + poll_timer_wait(50); > + poll_block(); > + } > + if (needs_wake) { > + /* we have lost all poll-wait info because we block()-ed > + * locally, we need to force the upper layers to rerun so > + * that they reinstate the correct waits > + */ > + poll_immediate_wake(); > + } > + target_control = &io_pool->controls[data->async_id % io_pool->size]; > + ovs_mutex_lock(&target_control->mutex); > + ovs_list_remove(&data->list_node); > + ovs_mutex_unlock(&target_control->mutex); > + data->async_mode = false; > + latch_destroy(&data->rx_notify); > + } > + if (data->input_buffer) { > + free(data->input_buffer); > + data->input_buffer = NULL; > + } > +} > + > +void > +async_cleanup_data(struct async_data *data) > +{ > + if (async_get_backlog(data)) { > + ofpbuf_list_delete(&data->output); > + } > + atomic_store_relaxed(&data->backlog, 0); > + data->output_count = 0; > +} > + > +/* Routines intended for async IO */ > + > +long async_stream_enqueue(struct async_data *data, struct ofpbuf *buf) { > + long retval = -EAGAIN; > + long discard; > + > + ovs_mutex_lock(&data->mutex); > + if (buf) { > + ovs_list_push_back(&data->output, &buf->list_node); > + data->output_count ++; > + atomic_add_relaxed(&data->backlog, buf->size, &discard); > + atomic_thread_fence(memory_order_release); > + } > + atomic_read_relaxed(&data->backlog, &retval); > + ovs_mutex_unlock(&data->mutex); > + return retval; > +} > + > +static int do_stream_flush(struct async_data *data) { > + struct ofpbuf *buf; > + int count = 0; > + bool stamp = false; > + int retval = -stream_connect(data->stream); > + long discard; > + > + if (!retval) { > + while (!ovs_list_is_empty(&data->output) && count < 10) { > + buf = ofpbuf_from_list(data->output.next); > + if (data->stream->class->enqueue) { > + ovs_list_remove(&buf->list_node); > + retval = (data->stream->class->enqueue)(data->stream, buf); > + if (retval > 0) { > + data->output_count--; > + } else { > + ovs_list_push_front(&data->output, &buf->list_node); > + } > + } else { > + retval = stream_send(data->stream, buf->data, buf->size); > + if (retval > 0) { > + stamp = true; > + atomic_sub_relaxed(&data->backlog, retval, &discard); > + ofpbuf_pull(buf, retval); > + if (!buf->size) { > + /* stream now owns buf */ > + ovs_list_remove(&buf->list_node); > + data->output_count--; > + ofpbuf_delete(buf); > + } > + } > + } > + if (retval <= 0) { > + break; > + } > + count++; > + } > + if (data->stream->class->flush && (retval >= 0 || retval == -EAGAIN)) { > + (data->stream->class->flush)(data->stream, &retval); > + if (retval > 0) { > + stamp = true; > + atomic_sub_relaxed(&data->backlog, retval, &discard); > + } > + } > + if (stamp) { > + atomic_store_relaxed(&data->active, true); > + } > + } > + atomic_store_relaxed(&data->tx_error, retval); > + return retval; > +} > + > +int async_stream_flush(struct async_data *data) { > + int retval; > + > + if (data->async_mode) { > + atomic_read_relaxed(&data->tx_error, &retval); > + if (retval >= 0) { > + retval = -EAGAIN; /* fake a busy so that upper layers do not > + * retry, we will flush the backlog in the > + * background > + */ > + } > + if (async_get_backlog(data)) { > + latch_set(&data->tx_run_notify); > + } > + } else { > + retval = do_stream_flush(data); > + } > + return retval; > +} > + > +static int do_async_recv(struct async_data *data) { > + size_t chunk; > + int retval; > + > + atomic_read_relaxed(&data->rx_error, &retval); > + if (retval > 0 || retval == -EAGAIN) { > + chunk = byteq_headroom(&data->input); > + if (chunk > 0) { > + retval = stream_recv( > + data->stream, byteq_head(&data->input), chunk); > + if (retval > 0) { > + byteq_advance_head(&data->input, retval); > + } > + } > + } > + if (retval > 0 || retval == -EAGAIN) { > + retval = byteq_used(&data->input); > + if (retval == 0) { > + retval = -EAGAIN; > + } > + } > + atomic_store_relaxed(&data->rx_error, retval); > + return retval; > +} > + > + > +int async_stream_recv(struct async_data *data) { > + int retval = -EAGAIN; > + > + if (data->async_mode) { > + atomic_read_relaxed(&data->rx_error, &retval); > + /* clear RX notifications */ > + latch_poll(&data->rx_notify); > + /* fake a retval from byteq usage */ > + if (retval > 0 || retval == -EAGAIN) { > + retval = byteq_used(&data->input); > + if (retval == 0) { > + retval = -EAGAIN; > + } > + } > + } else { > + retval = do_async_recv(data); > + } > + return retval; > +} > + > +void async_stream_run(struct async_data *data) { > + if (!data->async_mode) { > + stream_run(data->stream); > + } else { > + latch_set(&data->tx_run_notify); > + } > + } > + > +void async_io_kick(struct async_data *data) { > + if (data->async_mode) { > + latch_set(&data->tx_run_notify); > + } > +} > + > +void async_recv_wait(struct async_data *data) { > + if (data->async_mode) { > + latch_poll(&data->rx_notify); > + latch_wait(&data->rx_notify); > + } else { > + stream_recv_wait(data->stream); > + } > +} > + > +void async_io_enable(void) { > + allow_async_io = true; > +} > + > +/* Accessors for JSON RPC */ > + > +struct byteq *async_get_input(struct async_data *data) { > + return &data->input; > +} > +struct stream *async_get_stream(struct async_data *data) { > + return data->stream; > +} > + > +bool async_output_is_empty(struct async_data *data) { > + bool retval; > + ovs_mutex_lock(&data->mutex); > + /* backlog tracks backlog across the full stack all the > + * way to the actual send. It is the source of truth > + * if we have output or not so anybody asking if we > + * have output should be told if we have backlog > + * instead. > + */ > + retval = (data->backlog == 0); > + ovs_mutex_unlock(&data->mutex); > + return retval; > +} > + > +long async_get_backlog(struct async_data *data) { > + long retval; > + /* This is used only by the unixctl connection > + * so not worth it to convert backlog to atomics > + */ > + atomic_read_relaxed(&data->backlog, &retval); > + return retval; > +} > + > +bool async_get_active(struct async_data *data) { > + bool test = true; > + return atomic_compare_exchange_weak(&data->active, &test, false); > +} > + > + > diff --git a/lib/async-io.h b/lib/async-io.h > new file mode 100644 > index 000000000..dea070ee6 > --- /dev/null > +++ b/lib/async-io.h > @@ -0,0 +1,86 @@ > +/* > + * Copyright (c) 2020 Red Hat, Inc > + * > + * Licensed under the Apache License, Version 2.0 (the "License"); > + * you may not use this file except in compliance with the License. > + * You may obtain a copy of the License at: > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +#ifndef ASYNC_IO_H > +#define ASYNC_IO_H 1 > + > +#include <stdbool.h> > +#include <stddef.h> > +#include <stdint.h> > +#include <sys/types.h> > +#include "openvswitch/types.h" > +#include "openvswitch/ofpbuf.h" > +#include "socket-util.h" > +#include "ovs-atomic.h" > +#include "ovs-thread.h" > +#include "latch.h" > +#include "byteq.h" > +#include "util.h" > + > +#define ASYNC_BUFFER_SIZE (4096) > + > +struct stream; > + > +struct async_data { > + struct stream *stream; > + struct ovs_list output; > + struct ovs_list list_node; > + long backlog; > + size_t output_count; > + atomic_bool active; > + atomic_int rx_error, tx_error; > + uint32_t async_id; > + struct latch rx_notify, tx_run_notify; > + struct ovs_mutex mutex; > + bool async_mode, valid; > + struct byteq input; > + uint8_t *input_buffer; > +}; > + > +struct async_io_control { > + struct latch async_latch; > + struct ovs_list work_items; > + struct ovs_mutex mutex; > +}; > + > +struct async_io_pool { > + struct ovs_list list_node; > + struct async_io_control *controls; > + int size; > +}; > + > +struct async_io_pool *add_pool(void *(*start)(void *)); > + > +long async_stream_enqueue(struct async_data *, struct ofpbuf *buf); > +int async_stream_flush(struct async_data *); > +int async_stream_recv(struct async_data *); > +struct byteq *async_get_input(struct async_data *); > +struct stream *async_get_stream(struct async_data *); > +bool async_output_is_empty(struct async_data *); > +long async_get_backlog(struct async_data *); > +bool async_get_active(struct async_data *); > + > +void async_stream_enable(struct async_data *); > +void async_stream_disable(struct async_data *); > + > +void async_init_data(struct async_data *, struct stream *); > +void async_cleanup_data(struct async_data *); > +void async_stream_run(struct async_data *data); > +void async_io_kick(struct async_data *data); > +void async_recv_wait(struct async_data *data); > +void async_io_enable(void); > + > +#endif /* async-io.h */ > diff --git a/lib/automake.mk b/lib/automake.mk > index 86940ccd2..6f7870f26 100644 > --- a/lib/automake.mk > +++ b/lib/automake.mk > @@ -24,6 +24,8 @@ lib_libopenvswitch_la_SOURCES = \ > lib/aes128.c \ > lib/aes128.h \ > lib/async-append.h \ > + lib/async-io.h \ > + lib/async-io.c \ > lib/backtrace.c \ > lib/backtrace.h \ > lib/bfd.c \ > diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c > index ed748dbde..f831bc2dd 100644 > --- a/lib/jsonrpc.c > +++ b/lib/jsonrpc.c > @@ -30,28 +30,23 @@ > #include "openvswitch/poll-loop.h" > #include "reconnect.h" > #include "stream.h" > +#include "stream-provider.h" > #include "svec.h" > #include "timeval.h" > +#include "async-io.h" > #include "openvswitch/vlog.h" > > VLOG_DEFINE_THIS_MODULE(jsonrpc); > > struct jsonrpc { > - struct stream *stream; > char *name; > int status; > - > - /* Input. */ > - struct byteq input; > - uint8_t input_buffer[4096]; > struct json_parser *parser; > - > - /* Output. */ > - struct ovs_list output; /* Contains "struct ofpbuf"s. */ > - size_t output_count; /* Number of elements in "output". */ > - size_t backlog; > + struct async_data data; > }; > > +#define MIN_IDLE_TIME 10 > + > /* Rate limit for error messages. */ > static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5); > > @@ -59,6 +54,11 @@ static struct jsonrpc_msg *jsonrpc_parse_received_message(struct jsonrpc *); > static void jsonrpc_cleanup(struct jsonrpc *); > static void jsonrpc_error(struct jsonrpc *, int error); > > +static inline struct async_data *adata(struct jsonrpc *rpc) { > + return &rpc->data; > +} > + > + > /* This is just the same as stream_open() except that it uses the default > * JSONRPC port if none is specified. */ > int > @@ -86,10 +86,8 @@ jsonrpc_open(struct stream *stream) > > rpc = xzalloc(sizeof *rpc); > rpc->name = xstrdup(stream_get_name(stream)); > - rpc->stream = stream; > - byteq_init(&rpc->input, rpc->input_buffer, sizeof rpc->input_buffer); > - ovs_list_init(&rpc->output); > - > + async_init_data(adata(rpc), stream); > + async_stream_enable(adata(rpc)); > return rpc; > } > > @@ -109,33 +107,22 @@ jsonrpc_close(struct jsonrpc *rpc) > void > jsonrpc_run(struct jsonrpc *rpc) > { > + int retval; > if (rpc->status) { > return; > } > > - stream_run(rpc->stream); > - while (!ovs_list_is_empty(&rpc->output)) { > - struct ofpbuf *buf = ofpbuf_from_list(rpc->output.next); > - int retval; > - > - retval = stream_send(rpc->stream, buf->data, buf->size); > - if (retval >= 0) { > - rpc->backlog -= retval; > - ofpbuf_pull(buf, retval); > - if (!buf->size) { > - ovs_list_remove(&buf->list_node); > - rpc->output_count--; > - ofpbuf_delete(buf); > - } > - } else { > + async_stream_run(adata(rpc)); > + do { > + retval = async_stream_flush(&rpc->data); > + if (retval < 0) { > if (retval != -EAGAIN) { > VLOG_WARN_RL(&rl, "%s: send error: %s", > rpc->name, ovs_strerror(-retval)); > jsonrpc_error(rpc, -retval); > } > - break; > } > - } > + } while (retval > 0); > } > > /* Arranges for the poll loop to wake up when 'rpc' needs to perform > @@ -144,9 +131,13 @@ void > jsonrpc_wait(struct jsonrpc *rpc) > { > if (!rpc->status) { > - stream_run_wait(rpc->stream); > - if (!ovs_list_is_empty(&rpc->output)) { > - stream_send_wait(rpc->stream); > + if (adata(rpc)->async_mode) { > + async_recv_wait(adata(rpc)); > + } else { > + stream_run_wait(rpc->data.stream); > + if (!async_output_is_empty(adata(rpc))) { > + stream_send_wait(async_get_stream(adata(rpc))); > + } > } > } > } > @@ -175,7 +166,7 @@ jsonrpc_get_status(const struct jsonrpc *rpc) > size_t > jsonrpc_get_backlog(const struct jsonrpc *rpc) > { > - return rpc->status ? 0 : rpc->backlog; > + return rpc->status ? 0 : async_get_backlog(adata((struct jsonrpc *) rpc)); > } > > /* Returns the number of bytes that have been received on 'rpc''s underlying > @@ -183,7 +174,7 @@ jsonrpc_get_backlog(const struct jsonrpc *rpc) > unsigned int > jsonrpc_get_received_bytes(const struct jsonrpc *rpc) > { > - return rpc->input.head; > + return async_get_input(adata((struct jsonrpc *) rpc))->head; > } > > /* Returns 'rpc''s name, that is, the name returned by stream_get_name() for > @@ -234,13 +225,13 @@ jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title, > * buffered in 'rpc'.) > * > * Always takes ownership of 'msg', regardless of success. */ > + > int > jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg) > { > struct ofpbuf *buf; > struct json *json; > struct ds ds = DS_EMPTY_INITIALIZER; > - size_t length; > > if (rpc->status) { > jsonrpc_msg_destroy(msg); > @@ -251,24 +242,13 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg) > > json = jsonrpc_msg_to_json(msg); > json_to_ds(json, 0, &ds); > - length = ds.length; > json_destroy(json); > > buf = xmalloc(sizeof *buf); > ofpbuf_use_ds(buf, &ds); > - ovs_list_push_back(&rpc->output, &buf->list_node); > - rpc->output_count++; > - rpc->backlog += length; > - > - if (rpc->output_count >= 50) { > - VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of" > - " msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name, > - rpc->output_count, rpc->backlog); > - } > + async_stream_enqueue(adata(rpc), buf); > > - if (rpc->backlog == length) { > - jsonrpc_run(rpc); > - } > + jsonrpc_run(rpc); > return rpc->status; > } > > @@ -291,7 +271,7 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg) > int > jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) > { > - int i; > + int i, retval; > > *msgp = NULL; > if (rpc->status) { > @@ -302,36 +282,32 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) > size_t n, used; > > /* Fill our input buffer if it's empty. */ > - if (byteq_is_empty(&rpc->input)) { > - size_t chunk; > - int retval; > - > - chunk = byteq_headroom(&rpc->input); > - retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk); > - if (retval < 0) { > - if (retval == -EAGAIN) { > - return EAGAIN; > - } else { > - VLOG_WARN_RL(&rl, "%s: receive error: %s", > - rpc->name, ovs_strerror(-retval)); > - jsonrpc_error(rpc, -retval); > - return rpc->status; > - } > - } else if (retval == 0) { > - jsonrpc_error(rpc, EOF); > - return EOF; > + retval = async_stream_recv(adata(rpc)); > + if (retval < 0) { > + if (retval == -EAGAIN) { > + return EAGAIN; > + } else { > + VLOG_WARN_RL(&rl, "%s: receive error: %s", > + rpc->name, ovs_strerror(-retval)); > + jsonrpc_error(rpc, -retval); > + return rpc->status; > } > - byteq_advance_head(&rpc->input, retval); > + } else if (retval == 0) { > + jsonrpc_error(rpc, EOF); > + return EOF; > } > > /* We have some input. Feed it into the JSON parser. */ > if (!rpc->parser) { > rpc->parser = json_parser_create(0); > } > - n = byteq_tailroom(&rpc->input); > + n = byteq_tailroom(async_get_input(adata(rpc))); > + if (n == 0) { > + break; > + } > used = json_parser_feed(rpc->parser, > - (char *) byteq_tail(&rpc->input), n); > - byteq_advance_tail(&rpc->input, used); > + (char *) byteq_tail(async_get_input(adata(rpc))), n); > + byteq_advance_tail(async_get_input(adata(rpc)), used); > > /* If we have complete JSON, attempt to parse it as JSON-RPC. */ > if (json_parser_is_done(rpc->parser)) { > @@ -341,7 +317,7 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) > } > > if (rpc->status) { > - const struct byteq *q = &rpc->input; > + const struct byteq *q = async_get_input(adata(rpc)); > if (q->head <= q->size) { > stream_report_content(q->buffer, q->head, STREAM_JSONRPC, > &this_module, rpc->name); > @@ -359,10 +335,10 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) > void > jsonrpc_recv_wait(struct jsonrpc *rpc) > { > - if (rpc->status || !byteq_is_empty(&rpc->input)) { > + if (rpc->status || !byteq_is_empty(async_get_input(adata(rpc)))) { > poll_immediate_wake_at(rpc->name); > } else { > - stream_recv_wait(rpc->stream); > + async_recv_wait(adata(rpc)); > } > } > > @@ -385,7 +361,7 @@ jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg) > > for (;;) { > jsonrpc_run(rpc); > - if (ovs_list_is_empty(&rpc->output) || rpc->status) { > + if (async_output_is_empty(adata(rpc)) || rpc->status) { > return rpc->status; > } > jsonrpc_wait(rpc); > @@ -495,15 +471,14 @@ jsonrpc_error(struct jsonrpc *rpc, int error) > static void > jsonrpc_cleanup(struct jsonrpc *rpc) > { > - stream_close(rpc->stream); > - rpc->stream = NULL; > + async_stream_disable(adata(rpc)); > + stream_close(rpc->data.stream); > + rpc->data.stream = NULL; > > json_parser_abort(rpc->parser); > rpc->parser = NULL; > > - ofpbuf_list_delete(&rpc->output); > - rpc->backlog = 0; > - rpc->output_count = 0; > + async_cleanup_data(adata(rpc)); > } > > static struct jsonrpc_msg * > @@ -977,12 +952,14 @@ jsonrpc_session_run(struct jsonrpc_session *s) > } > > if (s->rpc) { > - size_t backlog; > int error; > + bool active = async_get_active(adata(s->rpc)); > > - backlog = jsonrpc_get_backlog(s->rpc); > jsonrpc_run(s->rpc); > - if (jsonrpc_get_backlog(s->rpc) < backlog) { > + > + active |= async_get_active(adata(s->rpc)); > + > + if (active) { > /* Data previously caught in a queue was successfully sent (or > * there's an error, which we'll catch below.) > * > @@ -1076,8 +1053,8 @@ jsonrpc_session_get_name(const struct jsonrpc_session *s) > const char * > jsonrpc_session_get_id(const struct jsonrpc_session *s) > { > - if (s->rpc && s->rpc->stream) { > - return stream_get_peer_id(s->rpc->stream); > + if (s->rpc && async_get_stream(adata(s->rpc))) { > + return stream_get_peer_id(adata(s->rpc)->stream); > } else { > return NULL; > } > diff --git a/lib/stream-fd.c b/lib/stream-fd.c > index 46ee7ae27..747d543cf 100644 > --- a/lib/stream-fd.c > +++ b/lib/stream-fd.c > @@ -30,6 +30,7 @@ > #include "stream-provider.h" > #include "stream.h" > #include "openvswitch/vlog.h" > +#include "openvswitch/list.h" > > VLOG_DEFINE_THIS_MODULE(stream_fd); > > @@ -40,6 +41,8 @@ struct stream_fd > struct stream stream; > int fd; > int fd_type; > + struct ovs_list output; > + int queue_depth; > }; > > static const struct stream_class stream_fd_class; > @@ -67,6 +70,8 @@ new_fd_stream(char *name, int fd, int connect_status, int fd_type, > stream_init(&s->stream, &stream_fd_class, connect_status, name); > s->fd = fd; > s->fd_type = fd_type; > + s->queue_depth = 0; > + ovs_list_init(&s->output); > *streamp = &s->stream; > return 0; > } > @@ -83,6 +88,7 @@ fd_close(struct stream *stream) > { > struct stream_fd *s = stream_fd_cast(stream); > closesocket(s->fd); > + ofpbuf_list_delete(&s->output); > free(s); > } > > @@ -111,6 +117,11 @@ fd_recv(struct stream *stream, void *buffer, size_t n) > if (error == WSAEWOULDBLOCK) { > error = EAGAIN; > } > +#endif > +#ifdef __linux__ > + if (error == ENOBUFS) { > + error = EAGAIN; > + } > #endif > if (error != EAGAIN) { > VLOG_DBG_RL(&rl, "recv: %s", sock_strerror(error)); > @@ -162,6 +173,75 @@ fd_wait(struct stream *stream, enum stream_wait_type wait) > } > } > > +static int > +fd_enqueue(struct stream *stream, struct ofpbuf *buf) > +{ > + struct stream_fd *sfd = stream_fd_cast(stream); > + ovs_list_push_back(&sfd->output, &buf->list_node); > + sfd->queue_depth ++; > + return buf->size; > +} > + > +static bool > +fd_flush(struct stream *stream, int *retval) > +{ > + struct stream_fd *sfd = stream_fd_cast(stream); > + int old_q_depth; > + > + if (sfd->queue_depth == 0) { > + * retval = -EAGAIN; > + return true; > + } else { > + int sent, i = 0; > + struct msghdr msg; > + struct ofpbuf *buf; > + > + msg.msg_name = NULL; > + msg.msg_namelen = 0; > + msg.msg_iov = xmalloc(sizeof(struct iovec) * sfd->queue_depth); > + msg.msg_iovlen = sfd->queue_depth; > + msg.msg_control = NULL; > + msg.msg_controllen = 0; > + msg.msg_flags = 0; > + > + LIST_FOR_EACH (buf, list_node, &sfd->output) { > + msg.msg_iov[i].iov_base = buf->data; > + msg.msg_iov[i].iov_len = buf->size; > + i++; > + } > + > + sent = sendmsg(sfd->fd, &msg, 0); > + > + free(msg.msg_iov); > + > + if (sent > 0) { > + * retval = sent; > + old_q_depth = sfd->queue_depth; > + for (i = 0; i < old_q_depth ; i++) { > + buf = ofpbuf_from_list(sfd->output.next); > + if (buf->size > sent) { > + ofpbuf_pull(buf, sent); > + sent = 0; > + } else { > + sent -= buf->size; > + sfd->queue_depth --; > + ovs_list_remove(&buf->list_node); > + ofpbuf_delete(buf); > + } > + if (sent == 0) { > + break; > + } > + } > + return true; > + } else { > + *retval = -sock_errno(); > + return false; > + } > + } > +} > + > + > + > static const struct stream_class stream_fd_class = { > "fd", /* name */ > false, /* needs_probes */ > @@ -173,6 +253,8 @@ static const struct stream_class stream_fd_class = { > NULL, /* run */ > NULL, /* run_wait */ > fd_wait, /* wait */ > + fd_enqueue, /* enqueue */ > + fd_flush, /* flush */ > }; > > /* Passive file descriptor stream. */ > diff --git a/lib/stream-provider.h b/lib/stream-provider.h > index 75f4f059b..b5161bd04 100644 > --- a/lib/stream-provider.h > +++ b/lib/stream-provider.h > @@ -18,9 +18,13 @@ > #define STREAM_PROVIDER_H 1 > > #include <sys/types.h> > +#include <poll.h> > +#include "openvswitch/list.h" > +#include "openvswitch/ofpbuf.h" > +#include "openvswitch/thread.h" > #include "stream.h" > - > -/* Active stream connection. */ > +#include "byteq.h" > +#include "latch.h" > > /* Active stream connection. > * > @@ -124,6 +128,31 @@ struct stream_class { > /* Arranges for the poll loop to wake up when 'stream' is ready to take an > * action of the given 'type'. */ > void (*wait)(struct stream *stream, enum stream_wait_type type); > + /* Enqueues an ofpbuf and surrenders its ownership to the > + * stream > + * > + * - If successful - stream now owns the buffer, returns > + * backlog size > + * > + * - On error, negative value, buffer is not claimed by > + * the stream. > + * > + * The enqueue function must not block. If no bytes can be immediately > + * accepted for transmission, it should return -EAGAIN immediately. */ > + int (*enqueue)(struct stream *stream, struct ofpbuf *buf); > + /* Flushes any stream buffers > + * > + * - If successful returns true and retval contains the backlog size > + * > + * - If partially successful (EAGAIN), returns false and retval is > + * a positive backlog size > + * > + * - If unsuccessful, returns false and retval contains a negative > + * error value > + * > + * The flush function must not block. If buffers cannot be flushed > + * completely it should return "partial success" immediately. */ > + bool (*flush)(struct stream *stream, int *retval); > }; > > /* Passive listener for incoming stream connections. > @@ -184,6 +213,7 @@ struct pstream_class { > /* Arranges for the poll loop to wake up when a connection is ready to be > * accepted on 'pstream'. */ > void (*wait)(struct pstream *pstream); > + > }; > > /* Active and passive stream classes. */ > diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c > index 078fcbc3a..0046e383e 100644 > --- a/lib/stream-ssl.c > +++ b/lib/stream-ssl.c > @@ -85,6 +85,8 @@ struct ssl_stream > SSL *ssl; > struct ofpbuf *txbuf; > unsigned int session_nr; > + int last_enqueued; > + long backlog_to_report; > > /* rx_want and tx_want record the result of the last call to SSL_read() > * and SSL_write(), respectively: > @@ -304,6 +306,8 @@ new_ssl_stream(char *name, char *server_name, int fd, enum session_type type, > sslv->rx_want = sslv->tx_want = SSL_NOTHING; > sslv->session_nr = next_session_nr++; > sslv->n_head = 0; > + sslv->last_enqueued = 0; > + sslv->backlog_to_report = 0; > > if (VLOG_IS_DBG_ENABLED()) { > SSL_set_msg_callback(ssl, ssl_protocol_cb); > @@ -784,8 +788,59 @@ ssl_run(struct stream *stream) > { > struct ssl_stream *sslv = ssl_stream_cast(stream); > > - if (sslv->txbuf && ssl_do_tx(stream) != EAGAIN) { > - ssl_clear_txbuf(sslv); > + if (sslv->txbuf) { > + if (ssl_do_tx(stream) != EAGAIN) { > + sslv->backlog_to_report += sslv->last_enqueued; > + ssl_clear_txbuf(sslv); > + } > + } > +} > + > +static int > +ssl_enqueue(struct stream *stream, struct ofpbuf *buf) > +{ > + int n = buf->size; > + struct ssl_stream *sslv = ssl_stream_cast(stream); > + if (sslv->txbuf) { > + return -EAGAIN; > + } > + sslv->txbuf = buf; > + sslv->last_enqueued = n; > + return n; > +} > + > +static bool > +ssl_flush(struct stream *stream, int *retval) > +{ > + struct ssl_stream *sslv = ssl_stream_cast(stream); > + > + if (!sslv->txbuf) { > + if (sslv->backlog_to_report) { > + * retval = sslv->backlog_to_report; > + sslv->backlog_to_report = 0; > + } else { > + * retval = -EAGAIN; > + } > + return true; > + } else { > + int error; > + > + error = ssl_do_tx(stream); > + switch (error) { > + case 0: > + ssl_clear_txbuf(sslv); > + * retval = sslv->backlog_to_report + sslv->last_enqueued; > + sslv->backlog_to_report = 0; > + sslv->last_enqueued = 0; > + return true; > + case EAGAIN: > + * retval = 0; > + return false; > + default: > + ssl_clear_txbuf(sslv); > + * retval = -error; > + return false; > + } > } > } > > @@ -840,8 +895,7 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait) > /* We have room in our tx queue. */ > poll_immediate_wake(); > } else { > - /* stream_run_wait() will do the right thing; don't bother with > - * redundancy. */ > + poll_fd_wait(sslv->fd, POLLOUT); > } > break; > > @@ -861,6 +915,8 @@ const struct stream_class ssl_stream_class = { > ssl_run, /* run */ > ssl_run_wait, /* run_wait */ > ssl_wait, /* wait */ > + ssl_enqueue, /* send_buf */ The comment should surely be "/* enqueue */", correct? > + ssl_flush, > }; > > /* Passive SSL. */ > diff --git a/lib/stream-tcp.c b/lib/stream-tcp.c > index e8dc2bfaa..63632e989 100644 > --- a/lib/stream-tcp.c > +++ b/lib/stream-tcp.c > @@ -73,6 +73,8 @@ const struct stream_class tcp_stream_class = { > NULL, /* run */ > NULL, /* run_wait */ > NULL, /* wait */ > + NULL, /* enqueue */ > + NULL, /* flush */ > }; > > /* Passive TCP. */ > diff --git a/lib/stream-unix.c b/lib/stream-unix.c > index d265efb83..4fd7573f2 100644 > --- a/lib/stream-unix.c > +++ b/lib/stream-unix.c > @@ -73,6 +73,8 @@ const struct stream_class unix_stream_class = { > NULL, /* run */ > NULL, /* run_wait */ > NULL, /* wait */ > + NULL, /* enqueue */ > + NULL, /* flush */ > }; > > /* Passive UNIX socket. */ > diff --git a/lib/stream-windows.c b/lib/stream-windows.c > index 5c4c55e5d..cb8c2d3c3 100644 > --- a/lib/stream-windows.c > +++ b/lib/stream-windows.c > @@ -374,6 +374,8 @@ const struct stream_class windows_stream_class = { > NULL, /* run */ > NULL, /* run_wait */ > windows_wait, /* wait */ > + NULL, /* enqueue */ > + NULL, /* flush */ > }; > > struct pwindows_pstream > diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c > index d416f1b60..83450beaa 100644 > --- a/ovsdb/ovsdb-server.c > +++ b/ovsdb/ovsdb-server.c > @@ -59,6 +59,7 @@ > #include "perf-counter.h" > #include "ovsdb-util.h" > #include "openvswitch/vlog.h" > +#include "async-io.h" > > VLOG_DEFINE_THIS_MODULE(ovsdb_server); > > @@ -398,6 +399,7 @@ main(int argc, char *argv[]) > } > > daemonize_complete(); > + async_io_enable(); > > if (!run_command) { > /* ovsdb-server is usually a long-running process, in which case it >
diff --git a/lib/async-io.c b/lib/async-io.c new file mode 100644 index 000000000..16d568a50 --- /dev/null +++ b/lib/async-io.c @@ -0,0 +1,530 @@ +/* + * Copyright (c) 2020 Red Hat Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> +#include "stream-provider.h" +#include <errno.h> +#include <unistd.h> +#include <inttypes.h> +#include <sys/types.h> +#include <netinet/in.h> +#include <poll.h> +#include <stdlib.h> +#include <string.h> +#include "coverage.h" +#include "fatal-signal.h" +#include "flow.h" +#include "jsonrpc.h" +#include "openflow/nicira-ext.h" +#include "openflow/openflow.h" +#include "openvswitch/dynamic-string.h" +#include "openvswitch/ofp-print.h" +#include "openvswitch/ofpbuf.h" +#include "openvswitch/vlog.h" +#include "ovs-thread.h" +#include "ovs-atomic.h" +#include "packets.h" +#include "openvswitch/poll-loop.h" +#include "random.h" +#include "socket-util.h" +#include "util.h" +#include "timeval.h" +#include "async-io.h" +#include "ovs-numa.h" + +VLOG_DEFINE_THIS_MODULE(async_io); + +static bool allow_async_io = false; + +static bool async_io_setup = false; +static bool kill_async_io = false; + +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER; + +static struct ovs_list io_pools = OVS_LIST_INITIALIZER(&io_pools); + +static int pool_size; + +static struct async_io_pool *io_pool = NULL; + +static int do_async_recv(struct async_data *data); +static int do_stream_flush(struct async_data *data); + +static inline bool not_in_error(struct async_data *data) { + int rx_error, tx_error; + + if (!data->valid) { + return false; + } + + atomic_read_relaxed(&data->rx_error, &rx_error); + atomic_read_relaxed(&data->tx_error, &tx_error); + + return ( + ((rx_error > 0) || (rx_error == -EAGAIN)) && + ((tx_error >= 0) || (tx_error == -EAGAIN)) + ); +} + +static inline bool in_error(struct async_data *data) { + return ! not_in_error(data); +} + + +static void *default_async_io_helper(void *arg) { + struct async_io_control *io_control = + (struct async_io_control *) arg; + struct async_data *data; + int retval; + + do { + ovs_mutex_lock(&io_control->mutex); + latch_poll(&io_control->async_latch); + LIST_FOR_EACH (data, list_node, &io_control->work_items) { + long backlog, oldbacklog; + ovs_mutex_lock(&data->mutex); + retval = -EAGAIN; + if (not_in_error(data)) { + /* + * We stop reading if the input queue is full + */ + if (byteq_headroom(&data->input) != 0) { + retval = do_async_recv(data); + } else { + poll_timer_wait(1); + retval = 0; + } + } + if (not_in_error(data) && (retval > 0 || retval == -EAGAIN)) { + stream_recv_wait(data->stream); + } + atomic_read_relaxed(&data->backlog, &oldbacklog); + if (not_in_error(data)) { + stream_run(data->stream); + do_stream_flush(data); + } + atomic_read_relaxed(&data->backlog, &backlog); + if (not_in_error(data)) { + if (backlog) { + /* upper layers will refuse to process rx + * until the tx is clear, so no point + * notifying them + */ + stream_send_wait(data->stream); + } else { + /* There is no backlog, so the rpc layer will + * actually pay attention to our notifications + * We issue a notification for both pending + * input and what is the equivalent of + * "IO Completion" + */ + if (!byteq_is_empty(&data->input) || oldbacklog) { + latch_set(&data->rx_notify); + } + } + } + if (data->valid && in_error(data)) { + /* make sure that the other thread(s) notice any errors. + * this should not be an else because errors may have + * changed inside the ifs above. + */ + latch_set(&data->rx_notify); + data->valid = false; + } + if (not_in_error(data)) { + stream_run_wait(data->stream); + } + ovs_mutex_unlock(&data->mutex); + } + ovs_mutex_unlock(&io_control->mutex); + latch_wait(&io_control->async_latch); + poll_block(); + } while (!kill_async_io); + return arg; +} + +static void async_io_hook(void *aux OVS_UNUSED) { + int i; + static struct async_io_pool *pool; + kill_async_io = true; + LIST_FOR_EACH (pool, list_node, &io_pools) { + for (i = 0; i < pool->size ; i++) { + latch_set(&pool->controls[i].async_latch); + latch_destroy(&pool->controls[i].async_latch); + } + } +} + +static void setup_async_io(void) { + int cores, nodes; + + nodes = ovs_numa_get_n_numas(); + if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) { + nodes = 1; + } + cores = ovs_numa_get_n_cores(); + if (cores == OVS_CORE_UNSPEC || cores <= 0) { + pool_size = 4; + } else { + pool_size = cores / nodes; + } + fatal_signal_add_hook(async_io_hook, NULL, NULL, true); + async_io_setup = true; +} + +struct async_io_pool *add_pool(void *(*start)(void *)){ + + struct async_io_pool *new_pool = NULL; + struct async_io_control *io_control; + int i; + + ovs_mutex_lock(&init_mutex); + + if (!async_io_setup) { + setup_async_io(); + } + + new_pool = xmalloc(sizeof(struct async_io_pool)); + new_pool->size = pool_size; /* we may make this more dynamic later */ + + ovs_list_push_back(&io_pools, &new_pool->list_node); + + new_pool->controls = + xmalloc(sizeof(struct async_io_control) * new_pool->size); + for (i = 0; i < new_pool->size; i++) { + io_control = &new_pool->controls[i]; + latch_init(&io_control->async_latch); + ovs_mutex_init(&io_control->mutex); + ovs_list_init(&io_control->work_items); + } + for (i = 0; i < pool_size; i++) { + ovs_thread_create("async io helper", start, &new_pool->controls[i]); + } + ovs_mutex_unlock(&init_mutex); + return new_pool; +} + +void +async_init_data(struct async_data *data, struct stream *stream) +{ + struct async_io_control *target_control; + unsigned int buffer_size; + + data->stream = stream; +#ifdef __linux__ + buffer_size = getpagesize(); + if (!is_pow2(buffer_size)) { + buffer_size = ASYNC_BUFFER_SIZE; + } +#else + buffer_size = ASYNC_BUFFER_SIZE; +#endif +#if (_POSIX_C_SOURCE >= 200112L || _XOPEN_SOURCE >= 600) + /* try to allocate a buffer_size as aligned, that by default is one page + * if that fails, fall back to normal memory allocation. + */ + if (posix_memalign( + (void **) &data->input_buffer, buffer_size, buffer_size)) { + data->input_buffer = xmalloc(buffer_size); + } +#else + data->input_buffer = xmalloc(buffer_size); +#endif + byteq_init(&data->input, data->input_buffer, buffer_size); + ovs_list_init(&data->output); + data->output_count = 0; + data->rx_error = ATOMIC_VAR_INIT(-EAGAIN); + data->tx_error = ATOMIC_VAR_INIT(0); + data->active = ATOMIC_VAR_INIT(false); + data->backlog = ATOMIC_VAR_INIT(0); + ovs_mutex_init(&data->mutex); + data->async_mode = allow_async_io; + data->valid = true; + if (data->async_mode) { + if (!io_pool) { + io_pool = add_pool(default_async_io_helper); + } + data->async_id = random_uint32(); + target_control = &io_pool->controls[data->async_id % io_pool->size]; + /* these are just fd pairs, no need to play with pointers, we + * can pass them around + */ + data->tx_run_notify = target_control->async_latch; + latch_init(&data->rx_notify); + ovs_mutex_lock(&target_control->mutex); + ovs_list_push_back(&target_control->work_items, &data->list_node); + ovs_mutex_unlock(&target_control->mutex); + latch_set(&target_control->async_latch); + } +} + +void +async_stream_enable(struct async_data *data) +{ + data->async_mode = allow_async_io; +} + +void +async_stream_disable(struct async_data *data) +{ + struct async_io_control *target_control; + bool needs_wake = false; + + + if (data->async_mode) { + if (not_in_error(data) && (async_get_backlog(data) > 0)) { + needs_wake = true; + latch_poll(&data->rx_notify); + latch_wait(&data->rx_notify); + latch_set(&data->tx_run_notify); + /* limit this to 50ms - should be enough for + * a single flush and we will not get stuck here + * waiting for a send to complete + */ + poll_timer_wait(50); + poll_block(); + } + if (needs_wake) { + /* we have lost all poll-wait info because we block()-ed + * locally, we need to force the upper layers to rerun so + * that they reinstate the correct waits + */ + poll_immediate_wake(); + } + target_control = &io_pool->controls[data->async_id % io_pool->size]; + ovs_mutex_lock(&target_control->mutex); + ovs_list_remove(&data->list_node); + ovs_mutex_unlock(&target_control->mutex); + data->async_mode = false; + latch_destroy(&data->rx_notify); + } + if (data->input_buffer) { + free(data->input_buffer); + data->input_buffer = NULL; + } +} + +void +async_cleanup_data(struct async_data *data) +{ + if (async_get_backlog(data)) { + ofpbuf_list_delete(&data->output); + } + atomic_store_relaxed(&data->backlog, 0); + data->output_count = 0; +} + +/* Routines intended for async IO */ + +long async_stream_enqueue(struct async_data *data, struct ofpbuf *buf) { + long retval = -EAGAIN; + long discard; + + ovs_mutex_lock(&data->mutex); + if (buf) { + ovs_list_push_back(&data->output, &buf->list_node); + data->output_count ++; + atomic_add_relaxed(&data->backlog, buf->size, &discard); + atomic_thread_fence(memory_order_release); + } + atomic_read_relaxed(&data->backlog, &retval); + ovs_mutex_unlock(&data->mutex); + return retval; +} + +static int do_stream_flush(struct async_data *data) { + struct ofpbuf *buf; + int count = 0; + bool stamp = false; + int retval = -stream_connect(data->stream); + long discard; + + if (!retval) { + while (!ovs_list_is_empty(&data->output) && count < 10) { + buf = ofpbuf_from_list(data->output.next); + if (data->stream->class->enqueue) { + ovs_list_remove(&buf->list_node); + retval = (data->stream->class->enqueue)(data->stream, buf); + if (retval > 0) { + data->output_count--; + } else { + ovs_list_push_front(&data->output, &buf->list_node); + } + } else { + retval = stream_send(data->stream, buf->data, buf->size); + if (retval > 0) { + stamp = true; + atomic_sub_relaxed(&data->backlog, retval, &discard); + ofpbuf_pull(buf, retval); + if (!buf->size) { + /* stream now owns buf */ + ovs_list_remove(&buf->list_node); + data->output_count--; + ofpbuf_delete(buf); + } + } + } + if (retval <= 0) { + break; + } + count++; + } + if (data->stream->class->flush && (retval >= 0 || retval == -EAGAIN)) { + (data->stream->class->flush)(data->stream, &retval); + if (retval > 0) { + stamp = true; + atomic_sub_relaxed(&data->backlog, retval, &discard); + } + } + if (stamp) { + atomic_store_relaxed(&data->active, true); + } + } + atomic_store_relaxed(&data->tx_error, retval); + return retval; +} + +int async_stream_flush(struct async_data *data) { + int retval; + + if (data->async_mode) { + atomic_read_relaxed(&data->tx_error, &retval); + if (retval >= 0) { + retval = -EAGAIN; /* fake a busy so that upper layers do not + * retry, we will flush the backlog in the + * background + */ + } + if (async_get_backlog(data)) { + latch_set(&data->tx_run_notify); + } + } else { + retval = do_stream_flush(data); + } + return retval; +} + +static int do_async_recv(struct async_data *data) { + size_t chunk; + int retval; + + atomic_read_relaxed(&data->rx_error, &retval); + if (retval > 0 || retval == -EAGAIN) { + chunk = byteq_headroom(&data->input); + if (chunk > 0) { + retval = stream_recv( + data->stream, byteq_head(&data->input), chunk); + if (retval > 0) { + byteq_advance_head(&data->input, retval); + } + } + } + if (retval > 0 || retval == -EAGAIN) { + retval = byteq_used(&data->input); + if (retval == 0) { + retval = -EAGAIN; + } + } + atomic_store_relaxed(&data->rx_error, retval); + return retval; +} + + +int async_stream_recv(struct async_data *data) { + int retval = -EAGAIN; + + if (data->async_mode) { + atomic_read_relaxed(&data->rx_error, &retval); + /* clear RX notifications */ + latch_poll(&data->rx_notify); + /* fake a retval from byteq usage */ + if (retval > 0 || retval == -EAGAIN) { + retval = byteq_used(&data->input); + if (retval == 0) { + retval = -EAGAIN; + } + } + } else { + retval = do_async_recv(data); + } + return retval; +} + +void async_stream_run(struct async_data *data) { + if (!data->async_mode) { + stream_run(data->stream); + } else { + latch_set(&data->tx_run_notify); + } + } + +void async_io_kick(struct async_data *data) { + if (data->async_mode) { + latch_set(&data->tx_run_notify); + } +} + +void async_recv_wait(struct async_data *data) { + if (data->async_mode) { + latch_poll(&data->rx_notify); + latch_wait(&data->rx_notify); + } else { + stream_recv_wait(data->stream); + } +} + +void async_io_enable(void) { + allow_async_io = true; +} + +/* Accessors for JSON RPC */ + +struct byteq *async_get_input(struct async_data *data) { + return &data->input; +} +struct stream *async_get_stream(struct async_data *data) { + return data->stream; +} + +bool async_output_is_empty(struct async_data *data) { + bool retval; + ovs_mutex_lock(&data->mutex); + /* backlog tracks backlog across the full stack all the + * way to the actual send. It is the source of truth + * if we have output or not so anybody asking if we + * have output should be told if we have backlog + * instead. + */ + retval = (data->backlog == 0); + ovs_mutex_unlock(&data->mutex); + return retval; +} + +long async_get_backlog(struct async_data *data) { + long retval; + /* This is used only by the unixctl connection + * so not worth it to convert backlog to atomics + */ + atomic_read_relaxed(&data->backlog, &retval); + return retval; +} + +bool async_get_active(struct async_data *data) { + bool test = true; + return atomic_compare_exchange_weak(&data->active, &test, false); +} + + diff --git a/lib/async-io.h b/lib/async-io.h new file mode 100644 index 000000000..dea070ee6 --- /dev/null +++ b/lib/async-io.h @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2020 Red Hat, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ASYNC_IO_H +#define ASYNC_IO_H 1 + +#include <stdbool.h> +#include <stddef.h> +#include <stdint.h> +#include <sys/types.h> +#include "openvswitch/types.h" +#include "openvswitch/ofpbuf.h" +#include "socket-util.h" +#include "ovs-atomic.h" +#include "ovs-thread.h" +#include "latch.h" +#include "byteq.h" +#include "util.h" + +#define ASYNC_BUFFER_SIZE (4096) + +struct stream; + +struct async_data { + struct stream *stream; + struct ovs_list output; + struct ovs_list list_node; + long backlog; + size_t output_count; + atomic_bool active; + atomic_int rx_error, tx_error; + uint32_t async_id; + struct latch rx_notify, tx_run_notify; + struct ovs_mutex mutex; + bool async_mode, valid; + struct byteq input; + uint8_t *input_buffer; +}; + +struct async_io_control { + struct latch async_latch; + struct ovs_list work_items; + struct ovs_mutex mutex; +}; + +struct async_io_pool { + struct ovs_list list_node; + struct async_io_control *controls; + int size; +}; + +struct async_io_pool *add_pool(void *(*start)(void *)); + +long async_stream_enqueue(struct async_data *, struct ofpbuf *buf); +int async_stream_flush(struct async_data *); +int async_stream_recv(struct async_data *); +struct byteq *async_get_input(struct async_data *); +struct stream *async_get_stream(struct async_data *); +bool async_output_is_empty(struct async_data *); +long async_get_backlog(struct async_data *); +bool async_get_active(struct async_data *); + +void async_stream_enable(struct async_data *); +void async_stream_disable(struct async_data *); + +void async_init_data(struct async_data *, struct stream *); +void async_cleanup_data(struct async_data *); +void async_stream_run(struct async_data *data); +void async_io_kick(struct async_data *data); +void async_recv_wait(struct async_data *data); +void async_io_enable(void); + +#endif /* async-io.h */ diff --git a/lib/automake.mk b/lib/automake.mk index 86940ccd2..6f7870f26 100644 --- a/lib/automake.mk +++ b/lib/automake.mk @@ -24,6 +24,8 @@ lib_libopenvswitch_la_SOURCES = \ lib/aes128.c \ lib/aes128.h \ lib/async-append.h \ + lib/async-io.h \ + lib/async-io.c \ lib/backtrace.c \ lib/backtrace.h \ lib/bfd.c \ diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c index ed748dbde..f831bc2dd 100644 --- a/lib/jsonrpc.c +++ b/lib/jsonrpc.c @@ -30,28 +30,23 @@ #include "openvswitch/poll-loop.h" #include "reconnect.h" #include "stream.h" +#include "stream-provider.h" #include "svec.h" #include "timeval.h" +#include "async-io.h" #include "openvswitch/vlog.h" VLOG_DEFINE_THIS_MODULE(jsonrpc); struct jsonrpc { - struct stream *stream; char *name; int status; - - /* Input. */ - struct byteq input; - uint8_t input_buffer[4096]; struct json_parser *parser; - - /* Output. */ - struct ovs_list output; /* Contains "struct ofpbuf"s. */ - size_t output_count; /* Number of elements in "output". */ - size_t backlog; + struct async_data data; }; +#define MIN_IDLE_TIME 10 + /* Rate limit for error messages. */ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5); @@ -59,6 +54,11 @@ static struct jsonrpc_msg *jsonrpc_parse_received_message(struct jsonrpc *); static void jsonrpc_cleanup(struct jsonrpc *); static void jsonrpc_error(struct jsonrpc *, int error); +static inline struct async_data *adata(struct jsonrpc *rpc) { + return &rpc->data; +} + + /* This is just the same as stream_open() except that it uses the default * JSONRPC port if none is specified. */ int @@ -86,10 +86,8 @@ jsonrpc_open(struct stream *stream) rpc = xzalloc(sizeof *rpc); rpc->name = xstrdup(stream_get_name(stream)); - rpc->stream = stream; - byteq_init(&rpc->input, rpc->input_buffer, sizeof rpc->input_buffer); - ovs_list_init(&rpc->output); - + async_init_data(adata(rpc), stream); + async_stream_enable(adata(rpc)); return rpc; } @@ -109,33 +107,22 @@ jsonrpc_close(struct jsonrpc *rpc) void jsonrpc_run(struct jsonrpc *rpc) { + int retval; if (rpc->status) { return; } - stream_run(rpc->stream); - while (!ovs_list_is_empty(&rpc->output)) { - struct ofpbuf *buf = ofpbuf_from_list(rpc->output.next); - int retval; - - retval = stream_send(rpc->stream, buf->data, buf->size); - if (retval >= 0) { - rpc->backlog -= retval; - ofpbuf_pull(buf, retval); - if (!buf->size) { - ovs_list_remove(&buf->list_node); - rpc->output_count--; - ofpbuf_delete(buf); - } - } else { + async_stream_run(adata(rpc)); + do { + retval = async_stream_flush(&rpc->data); + if (retval < 0) { if (retval != -EAGAIN) { VLOG_WARN_RL(&rl, "%s: send error: %s", rpc->name, ovs_strerror(-retval)); jsonrpc_error(rpc, -retval); } - break; } - } + } while (retval > 0); } /* Arranges for the poll loop to wake up when 'rpc' needs to perform @@ -144,9 +131,13 @@ void jsonrpc_wait(struct jsonrpc *rpc) { if (!rpc->status) { - stream_run_wait(rpc->stream); - if (!ovs_list_is_empty(&rpc->output)) { - stream_send_wait(rpc->stream); + if (adata(rpc)->async_mode) { + async_recv_wait(adata(rpc)); + } else { + stream_run_wait(rpc->data.stream); + if (!async_output_is_empty(adata(rpc))) { + stream_send_wait(async_get_stream(adata(rpc))); + } } } } @@ -175,7 +166,7 @@ jsonrpc_get_status(const struct jsonrpc *rpc) size_t jsonrpc_get_backlog(const struct jsonrpc *rpc) { - return rpc->status ? 0 : rpc->backlog; + return rpc->status ? 0 : async_get_backlog(adata((struct jsonrpc *) rpc)); } /* Returns the number of bytes that have been received on 'rpc''s underlying @@ -183,7 +174,7 @@ jsonrpc_get_backlog(const struct jsonrpc *rpc) unsigned int jsonrpc_get_received_bytes(const struct jsonrpc *rpc) { - return rpc->input.head; + return async_get_input(adata((struct jsonrpc *) rpc))->head; } /* Returns 'rpc''s name, that is, the name returned by stream_get_name() for @@ -234,13 +225,13 @@ jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title, * buffered in 'rpc'.) * * Always takes ownership of 'msg', regardless of success. */ + int jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg) { struct ofpbuf *buf; struct json *json; struct ds ds = DS_EMPTY_INITIALIZER; - size_t length; if (rpc->status) { jsonrpc_msg_destroy(msg); @@ -251,24 +242,13 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg) json = jsonrpc_msg_to_json(msg); json_to_ds(json, 0, &ds); - length = ds.length; json_destroy(json); buf = xmalloc(sizeof *buf); ofpbuf_use_ds(buf, &ds); - ovs_list_push_back(&rpc->output, &buf->list_node); - rpc->output_count++; - rpc->backlog += length; - - if (rpc->output_count >= 50) { - VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of" - " msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name, - rpc->output_count, rpc->backlog); - } + async_stream_enqueue(adata(rpc), buf); - if (rpc->backlog == length) { - jsonrpc_run(rpc); - } + jsonrpc_run(rpc); return rpc->status; } @@ -291,7 +271,7 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg) int jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) { - int i; + int i, retval; *msgp = NULL; if (rpc->status) { @@ -302,36 +282,32 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) size_t n, used; /* Fill our input buffer if it's empty. */ - if (byteq_is_empty(&rpc->input)) { - size_t chunk; - int retval; - - chunk = byteq_headroom(&rpc->input); - retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk); - if (retval < 0) { - if (retval == -EAGAIN) { - return EAGAIN; - } else { - VLOG_WARN_RL(&rl, "%s: receive error: %s", - rpc->name, ovs_strerror(-retval)); - jsonrpc_error(rpc, -retval); - return rpc->status; - } - } else if (retval == 0) { - jsonrpc_error(rpc, EOF); - return EOF; + retval = async_stream_recv(adata(rpc)); + if (retval < 0) { + if (retval == -EAGAIN) { + return EAGAIN; + } else { + VLOG_WARN_RL(&rl, "%s: receive error: %s", + rpc->name, ovs_strerror(-retval)); + jsonrpc_error(rpc, -retval); + return rpc->status; } - byteq_advance_head(&rpc->input, retval); + } else if (retval == 0) { + jsonrpc_error(rpc, EOF); + return EOF; } /* We have some input. Feed it into the JSON parser. */ if (!rpc->parser) { rpc->parser = json_parser_create(0); } - n = byteq_tailroom(&rpc->input); + n = byteq_tailroom(async_get_input(adata(rpc))); + if (n == 0) { + break; + } used = json_parser_feed(rpc->parser, - (char *) byteq_tail(&rpc->input), n); - byteq_advance_tail(&rpc->input, used); + (char *) byteq_tail(async_get_input(adata(rpc))), n); + byteq_advance_tail(async_get_input(adata(rpc)), used); /* If we have complete JSON, attempt to parse it as JSON-RPC. */ if (json_parser_is_done(rpc->parser)) { @@ -341,7 +317,7 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) } if (rpc->status) { - const struct byteq *q = &rpc->input; + const struct byteq *q = async_get_input(adata(rpc)); if (q->head <= q->size) { stream_report_content(q->buffer, q->head, STREAM_JSONRPC, &this_module, rpc->name); @@ -359,10 +335,10 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp) void jsonrpc_recv_wait(struct jsonrpc *rpc) { - if (rpc->status || !byteq_is_empty(&rpc->input)) { + if (rpc->status || !byteq_is_empty(async_get_input(adata(rpc)))) { poll_immediate_wake_at(rpc->name); } else { - stream_recv_wait(rpc->stream); + async_recv_wait(adata(rpc)); } } @@ -385,7 +361,7 @@ jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg) for (;;) { jsonrpc_run(rpc); - if (ovs_list_is_empty(&rpc->output) || rpc->status) { + if (async_output_is_empty(adata(rpc)) || rpc->status) { return rpc->status; } jsonrpc_wait(rpc); @@ -495,15 +471,14 @@ jsonrpc_error(struct jsonrpc *rpc, int error) static void jsonrpc_cleanup(struct jsonrpc *rpc) { - stream_close(rpc->stream); - rpc->stream = NULL; + async_stream_disable(adata(rpc)); + stream_close(rpc->data.stream); + rpc->data.stream = NULL; json_parser_abort(rpc->parser); rpc->parser = NULL; - ofpbuf_list_delete(&rpc->output); - rpc->backlog = 0; - rpc->output_count = 0; + async_cleanup_data(adata(rpc)); } static struct jsonrpc_msg * @@ -977,12 +952,14 @@ jsonrpc_session_run(struct jsonrpc_session *s) } if (s->rpc) { - size_t backlog; int error; + bool active = async_get_active(adata(s->rpc)); - backlog = jsonrpc_get_backlog(s->rpc); jsonrpc_run(s->rpc); - if (jsonrpc_get_backlog(s->rpc) < backlog) { + + active |= async_get_active(adata(s->rpc)); + + if (active) { /* Data previously caught in a queue was successfully sent (or * there's an error, which we'll catch below.) * @@ -1076,8 +1053,8 @@ jsonrpc_session_get_name(const struct jsonrpc_session *s) const char * jsonrpc_session_get_id(const struct jsonrpc_session *s) { - if (s->rpc && s->rpc->stream) { - return stream_get_peer_id(s->rpc->stream); + if (s->rpc && async_get_stream(adata(s->rpc))) { + return stream_get_peer_id(adata(s->rpc)->stream); } else { return NULL; } diff --git a/lib/stream-fd.c b/lib/stream-fd.c index 46ee7ae27..747d543cf 100644 --- a/lib/stream-fd.c +++ b/lib/stream-fd.c @@ -30,6 +30,7 @@ #include "stream-provider.h" #include "stream.h" #include "openvswitch/vlog.h" +#include "openvswitch/list.h" VLOG_DEFINE_THIS_MODULE(stream_fd); @@ -40,6 +41,8 @@ struct stream_fd struct stream stream; int fd; int fd_type; + struct ovs_list output; + int queue_depth; }; static const struct stream_class stream_fd_class; @@ -67,6 +70,8 @@ new_fd_stream(char *name, int fd, int connect_status, int fd_type, stream_init(&s->stream, &stream_fd_class, connect_status, name); s->fd = fd; s->fd_type = fd_type; + s->queue_depth = 0; + ovs_list_init(&s->output); *streamp = &s->stream; return 0; } @@ -83,6 +88,7 @@ fd_close(struct stream *stream) { struct stream_fd *s = stream_fd_cast(stream); closesocket(s->fd); + ofpbuf_list_delete(&s->output); free(s); } @@ -111,6 +117,11 @@ fd_recv(struct stream *stream, void *buffer, size_t n) if (error == WSAEWOULDBLOCK) { error = EAGAIN; } +#endif +#ifdef __linux__ + if (error == ENOBUFS) { + error = EAGAIN; + } #endif if (error != EAGAIN) { VLOG_DBG_RL(&rl, "recv: %s", sock_strerror(error)); @@ -162,6 +173,75 @@ fd_wait(struct stream *stream, enum stream_wait_type wait) } } +static int +fd_enqueue(struct stream *stream, struct ofpbuf *buf) +{ + struct stream_fd *sfd = stream_fd_cast(stream); + ovs_list_push_back(&sfd->output, &buf->list_node); + sfd->queue_depth ++; + return buf->size; +} + +static bool +fd_flush(struct stream *stream, int *retval) +{ + struct stream_fd *sfd = stream_fd_cast(stream); + int old_q_depth; + + if (sfd->queue_depth == 0) { + * retval = -EAGAIN; + return true; + } else { + int sent, i = 0; + struct msghdr msg; + struct ofpbuf *buf; + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = xmalloc(sizeof(struct iovec) * sfd->queue_depth); + msg.msg_iovlen = sfd->queue_depth; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_flags = 0; + + LIST_FOR_EACH (buf, list_node, &sfd->output) { + msg.msg_iov[i].iov_base = buf->data; + msg.msg_iov[i].iov_len = buf->size; + i++; + } + + sent = sendmsg(sfd->fd, &msg, 0); + + free(msg.msg_iov); + + if (sent > 0) { + * retval = sent; + old_q_depth = sfd->queue_depth; + for (i = 0; i < old_q_depth ; i++) { + buf = ofpbuf_from_list(sfd->output.next); + if (buf->size > sent) { + ofpbuf_pull(buf, sent); + sent = 0; + } else { + sent -= buf->size; + sfd->queue_depth --; + ovs_list_remove(&buf->list_node); + ofpbuf_delete(buf); + } + if (sent == 0) { + break; + } + } + return true; + } else { + *retval = -sock_errno(); + return false; + } + } +} + + + static const struct stream_class stream_fd_class = { "fd", /* name */ false, /* needs_probes */ @@ -173,6 +253,8 @@ static const struct stream_class stream_fd_class = { NULL, /* run */ NULL, /* run_wait */ fd_wait, /* wait */ + fd_enqueue, /* enqueue */ + fd_flush, /* flush */ }; /* Passive file descriptor stream. */ diff --git a/lib/stream-provider.h b/lib/stream-provider.h index 75f4f059b..b5161bd04 100644 --- a/lib/stream-provider.h +++ b/lib/stream-provider.h @@ -18,9 +18,13 @@ #define STREAM_PROVIDER_H 1 #include <sys/types.h> +#include <poll.h> +#include "openvswitch/list.h" +#include "openvswitch/ofpbuf.h" +#include "openvswitch/thread.h" #include "stream.h" - -/* Active stream connection. */ +#include "byteq.h" +#include "latch.h" /* Active stream connection. * @@ -124,6 +128,31 @@ struct stream_class { /* Arranges for the poll loop to wake up when 'stream' is ready to take an * action of the given 'type'. */ void (*wait)(struct stream *stream, enum stream_wait_type type); + /* Enqueues an ofpbuf and surrenders its ownership to the + * stream + * + * - If successful - stream now owns the buffer, returns + * backlog size + * + * - On error, negative value, buffer is not claimed by + * the stream. + * + * The enqueue function must not block. If no bytes can be immediately + * accepted for transmission, it should return -EAGAIN immediately. */ + int (*enqueue)(struct stream *stream, struct ofpbuf *buf); + /* Flushes any stream buffers + * + * - If successful returns true and retval contains the backlog size + * + * - If partially successful (EAGAIN), returns false and retval is + * a positive backlog size + * + * - If unsuccessful, returns false and retval contains a negative + * error value + * + * The flush function must not block. If buffers cannot be flushed + * completely it should return "partial success" immediately. */ + bool (*flush)(struct stream *stream, int *retval); }; /* Passive listener for incoming stream connections. @@ -184,6 +213,7 @@ struct pstream_class { /* Arranges for the poll loop to wake up when a connection is ready to be * accepted on 'pstream'. */ void (*wait)(struct pstream *pstream); + }; /* Active and passive stream classes. */ diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c index 078fcbc3a..0046e383e 100644 --- a/lib/stream-ssl.c +++ b/lib/stream-ssl.c @@ -85,6 +85,8 @@ struct ssl_stream SSL *ssl; struct ofpbuf *txbuf; unsigned int session_nr; + int last_enqueued; + long backlog_to_report; /* rx_want and tx_want record the result of the last call to SSL_read() * and SSL_write(), respectively: @@ -304,6 +306,8 @@ new_ssl_stream(char *name, char *server_name, int fd, enum session_type type, sslv->rx_want = sslv->tx_want = SSL_NOTHING; sslv->session_nr = next_session_nr++; sslv->n_head = 0; + sslv->last_enqueued = 0; + sslv->backlog_to_report = 0; if (VLOG_IS_DBG_ENABLED()) { SSL_set_msg_callback(ssl, ssl_protocol_cb); @@ -784,8 +788,59 @@ ssl_run(struct stream *stream) { struct ssl_stream *sslv = ssl_stream_cast(stream); - if (sslv->txbuf && ssl_do_tx(stream) != EAGAIN) { - ssl_clear_txbuf(sslv); + if (sslv->txbuf) { + if (ssl_do_tx(stream) != EAGAIN) { + sslv->backlog_to_report += sslv->last_enqueued; + ssl_clear_txbuf(sslv); + } + } +} + +static int +ssl_enqueue(struct stream *stream, struct ofpbuf *buf) +{ + int n = buf->size; + struct ssl_stream *sslv = ssl_stream_cast(stream); + if (sslv->txbuf) { + return -EAGAIN; + } + sslv->txbuf = buf; + sslv->last_enqueued = n; + return n; +} + +static bool +ssl_flush(struct stream *stream, int *retval) +{ + struct ssl_stream *sslv = ssl_stream_cast(stream); + + if (!sslv->txbuf) { + if (sslv->backlog_to_report) { + * retval = sslv->backlog_to_report; + sslv->backlog_to_report = 0; + } else { + * retval = -EAGAIN; + } + return true; + } else { + int error; + + error = ssl_do_tx(stream); + switch (error) { + case 0: + ssl_clear_txbuf(sslv); + * retval = sslv->backlog_to_report + sslv->last_enqueued; + sslv->backlog_to_report = 0; + sslv->last_enqueued = 0; + return true; + case EAGAIN: + * retval = 0; + return false; + default: + ssl_clear_txbuf(sslv); + * retval = -error; + return false; + } } } @@ -840,8 +895,7 @@ ssl_wait(struct stream *stream, enum stream_wait_type wait) /* We have room in our tx queue. */ poll_immediate_wake(); } else { - /* stream_run_wait() will do the right thing; don't bother with - * redundancy. */ + poll_fd_wait(sslv->fd, POLLOUT); } break; @@ -861,6 +915,8 @@ const struct stream_class ssl_stream_class = { ssl_run, /* run */ ssl_run_wait, /* run_wait */ ssl_wait, /* wait */ + ssl_enqueue, /* send_buf */ + ssl_flush, }; /* Passive SSL. */ diff --git a/lib/stream-tcp.c b/lib/stream-tcp.c index e8dc2bfaa..63632e989 100644 --- a/lib/stream-tcp.c +++ b/lib/stream-tcp.c @@ -73,6 +73,8 @@ const struct stream_class tcp_stream_class = { NULL, /* run */ NULL, /* run_wait */ NULL, /* wait */ + NULL, /* enqueue */ + NULL, /* flush */ }; /* Passive TCP. */ diff --git a/lib/stream-unix.c b/lib/stream-unix.c index d265efb83..4fd7573f2 100644 --- a/lib/stream-unix.c +++ b/lib/stream-unix.c @@ -73,6 +73,8 @@ const struct stream_class unix_stream_class = { NULL, /* run */ NULL, /* run_wait */ NULL, /* wait */ + NULL, /* enqueue */ + NULL, /* flush */ }; /* Passive UNIX socket. */ diff --git a/lib/stream-windows.c b/lib/stream-windows.c index 5c4c55e5d..cb8c2d3c3 100644 --- a/lib/stream-windows.c +++ b/lib/stream-windows.c @@ -374,6 +374,8 @@ const struct stream_class windows_stream_class = { NULL, /* run */ NULL, /* run_wait */ windows_wait, /* wait */ + NULL, /* enqueue */ + NULL, /* flush */ }; struct pwindows_pstream diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c index d416f1b60..83450beaa 100644 --- a/ovsdb/ovsdb-server.c +++ b/ovsdb/ovsdb-server.c @@ -59,6 +59,7 @@ #include "perf-counter.h" #include "ovsdb-util.h" #include "openvswitch/vlog.h" +#include "async-io.h" VLOG_DEFINE_THIS_MODULE(ovsdb_server); @@ -398,6 +399,7 @@ main(int argc, char *argv[]) } daemonize_complete(); + async_io_enable(); if (!run_command) { /* ovsdb-server is usually a long-running process, in which case it