@@ -33,6 +33,7 @@
#include "svec.h"
#include "timeval.h"
#include "openvswitch/vlog.h"
+#include "stream-provider.h"
VLOG_DEFINE_THIS_MODULE(jsonrpc);
@@ -43,7 +44,7 @@ struct jsonrpc {
/* Input. */
struct byteq input;
- uint8_t input_buffer[512];
+ uint8_t *input_buffer;
struct json_parser *parser;
/* Output. */
@@ -62,9 +63,11 @@ static void jsonrpc_error(struct jsonrpc *, int error);
/* This is just the same as stream_open() except that it uses the default
* JSONRPC port if none is specified. */
int
-jsonrpc_stream_open(const char *name, struct stream **streamp, uint8_t dscp)
+jsonrpc_stream_open(const char *name, struct stream **streamp,
+ uint8_t dscp, int bufsize)
{
- return stream_open_with_default_port(name, OVSDB_PORT, streamp, dscp);
+ return stream_open_with_default_port(name, OVSDB_PORT, streamp, dscp,
+ bufsize);
}
/* This is just the same as pstream_open() except that it uses the default
@@ -80,14 +83,16 @@ jsonrpc_pstream_open(const char *name, struct pstream **pstreamp, uint8_t dscp)
struct jsonrpc *
jsonrpc_open(struct stream *stream)
{
+ int bufsize = stream->bufsize > 0 ? stream->bufsize : 512;
struct jsonrpc *rpc;
ovs_assert(stream != NULL);
rpc = xzalloc(sizeof *rpc);
+ rpc->input_buffer = xzalloc(bufsize);
rpc->name = xstrdup(stream_get_name(stream));
rpc->stream = stream;
- byteq_init(&rpc->input, rpc->input_buffer, sizeof rpc->input_buffer);
+ byteq_init(&rpc->input, rpc->input_buffer, bufsize);
ovs_list_init(&rpc->output);
return rpc;
@@ -101,6 +106,7 @@ jsonrpc_close(struct jsonrpc *rpc)
if (rpc) {
jsonrpc_cleanup(rpc);
free(rpc->name);
+ free(rpc->input_buffer);
free(rpc);
}
}
@@ -787,6 +793,7 @@ struct jsonrpc_session {
int last_error;
unsigned int seqno;
uint8_t dscp;
+ int bufsize;
};
static void
@@ -814,11 +821,12 @@ struct jsonrpc_session *
jsonrpc_session_open(const char *name, bool retry)
{
const struct svec remotes = { .names = (char **) &name, .n = 1 };
- return jsonrpc_session_open_multiple(&remotes, retry);
+ return jsonrpc_session_open_multiple(&remotes, retry, 512);
}
struct jsonrpc_session *
-jsonrpc_session_open_multiple(const struct svec *remotes, bool retry)
+jsonrpc_session_open_multiple(const struct svec *remotes, bool retry,
+ int bufsize)
{
struct jsonrpc_session *s;
@@ -839,6 +847,7 @@ jsonrpc_session_open_multiple(const struct svec *remotes, bool retry)
s->seqno = 0;
s->dscp = 0;
s->last_error = 0;
+ s->bufsize = bufsize;
const char *name = reconnect_get_name(s->reconnect);
if (!pstream_verify_name(name)) {
@@ -931,7 +940,7 @@ jsonrpc_session_connect(struct jsonrpc_session *s)
jsonrpc_session_disconnect(s);
if (!reconnect_is_passive(s->reconnect)) {
- error = jsonrpc_stream_open(name, &s->stream, s->dscp);
+ error = jsonrpc_stream_open(name, &s->stream, s->dscp, s->bufsize);
if (!error) {
reconnect_connecting(s->reconnect, time_msec());
} else {
@@ -40,7 +40,8 @@ struct svec;
#define OVSDB_OLD_PORT 6632
#define OVSDB_PORT 6640
-int jsonrpc_stream_open(const char *name, struct stream **, uint8_t dscp);
+int jsonrpc_stream_open(const char *name, struct stream **, uint8_t dscp,
+ int bufsize);
int jsonrpc_pstream_open(const char *name, struct pstream **, uint8_t dscp);
struct jsonrpc *jsonrpc_open(struct stream *);
@@ -105,7 +106,7 @@ char *jsonrpc_msg_to_string(const struct jsonrpc_msg *);
struct jsonrpc_session *jsonrpc_session_open(const char *name, bool retry);
struct jsonrpc_session *jsonrpc_session_open_multiple(const struct svec *,
- bool retry);
+ bool retry, int bufsize);
struct jsonrpc_session *jsonrpc_session_open_unreliably(struct jsonrpc *,
uint8_t);
void jsonrpc_session_close(struct jsonrpc_session *);
@@ -425,7 +425,7 @@ dummy_packet_conn_set_config(struct dummy_packet_conn *conn,
conn->rconn.reconnect = reconnect;
conn->type = ACTIVE;
- error = stream_open(stream, &active_stream, DSCP_DEFAULT);
+ error = stream_open(stream, &active_stream, DSCP_DEFAULT, 512);
conn->rconn.rstream = dummy_packet_stream_create(active_stream);
switch (error) {
@@ -505,7 +505,8 @@ OVS_REQUIRES(dev->mutex)
error = stream_connect(rconn->rstream->stream);
} else {
error = stream_open(reconnect_get_name(rconn->reconnect),
- &rconn->rstream->stream, DSCP_DEFAULT);
+ &rconn->rstream->stream, DSCP_DEFAULT,
+ 512);
}
switch (error) {
@@ -253,6 +253,7 @@ struct ovsdb_idl {
* state machine must restart. */
struct jsonrpc_session *session; /* Connection to the server. */
char *remote; /* 'session' remote name. */
+ int bufsize;
enum ovsdb_idl_state state; /* Current session state. */
unsigned int state_seqno; /* See above. */
struct json *request_id; /* JSON ID for request awaiting reply. */
@@ -455,7 +456,7 @@ ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class,
{
struct ovsdb_idl *idl = ovsdb_idl_create_unconnected(
class, monitor_everything_by_default);
- ovsdb_idl_set_remote(idl, remote, retry);
+ ovsdb_idl_set_remote(idl, remote, retry, 512);
return idl;
}
@@ -511,11 +512,13 @@ ovsdb_idl_create_unconnected(const struct ovsdb_idl_class *class,
* If 'retry' is true, the connection to the remote will automatically retry
* when it fails. If 'retry' is false, the connection is one-time. */
void
-ovsdb_idl_set_remote(struct ovsdb_idl *idl, const char *remote, bool retry)
+ovsdb_idl_set_remote(struct ovsdb_idl *idl, const char *remote, bool retry,
+ int bufsize)
{
if (idl
&& ((remote != NULL) != (idl->remote != NULL)
- || (remote && idl->remote && strcmp(remote, idl->remote)))) {
+ || (remote && idl->remote && strcmp(remote, idl->remote))
+ || bufsize != idl->bufsize)) {
ovs_assert(!idl->data.txn);
/* Close the old session, if any. */
@@ -534,8 +537,10 @@ ovsdb_idl_set_remote(struct ovsdb_idl *idl, const char *remote, bool retry)
if (idl->shuffle_remotes) {
svec_shuffle(&remotes);
}
- idl->session = jsonrpc_session_open_multiple(&remotes, retry);
+ idl->session = jsonrpc_session_open_multiple(&remotes, retry,
+ bufsize);
svec_destroy(&remotes);
+ idl->bufsize = bufsize;
idl->state_seqno = UINT_MAX;
@@ -62,7 +62,7 @@ struct ovsdb_idl *ovsdb_idl_create(const char *remote,
bool retry);
struct ovsdb_idl *ovsdb_idl_create_unconnected(
const struct ovsdb_idl_class *, bool monitor_everything_by_default);
-void ovsdb_idl_set_remote(struct ovsdb_idl *, const char *, bool);
+void ovsdb_idl_set_remote(struct ovsdb_idl *, const char *, bool, int);
void ovsdb_idl_set_shuffle_remotes(struct ovsdb_idl *, bool);
void ovsdb_idl_destroy(struct ovsdb_idl *);
@@ -59,12 +59,12 @@ static void maybe_unlink_and_free(char *path);
* implementation never fails.) */
int
new_fd_stream(char *name, int fd, int connect_status, int fd_type,
- struct stream **streamp)
+ struct stream **streamp, int bufsize)
{
struct stream_fd *s;
s = xmalloc(sizeof *s);
- stream_init(&s->stream, &stream_fd_class, connect_status, name);
+ stream_init(&s->stream, &stream_fd_class, connect_status, name, bufsize);
s->fd = fd;
s->fd_type = fd_type;
*streamp = &s->stream;
@@ -29,7 +29,7 @@ struct pstream;
struct sockaddr_storage;
int new_fd_stream(char *name, int fd, int connect_status,
- int fd_type, struct stream **streamp);
+ int fd_type, struct stream **streamp, int bufsize);
int new_fd_pstream(char *name, int fd,
int (*accept_cb)(int fd, const struct sockaddr_storage *ss,
size_t ss_len, struct stream **),
@@ -31,10 +31,11 @@ struct stream {
int error;
char *name;
char *peer_id;
+ int bufsize;
};
void stream_init(struct stream *, const struct stream_class *,
- int connect_status, char *name);
+ int connect_status, char *name, int bufsize);
static inline void stream_assert_class(const struct stream *stream,
const struct stream_class *class)
{
@@ -66,7 +67,7 @@ struct stream_class {
* EAGAIN (not EINPROGRESS, as returned by the connect system call) and
* continue the connection in the background. */
int (*open)(const char *name, char *suffix, struct stream **streamp,
- uint8_t dscp);
+ uint8_t dscp, int bufsize);
/* Closes 'stream' and frees associated memory. */
void (*close)(struct stream *stream);
@@ -233,7 +233,7 @@ want_to_poll_events(int want)
* ownership of 'server_name'. */
static int
new_ssl_stream(char *name, char *server_name, int fd, enum session_type type,
- enum ssl_state state, struct stream **streamp)
+ enum ssl_state state, struct stream **streamp, int bufsize)
{
struct ssl_stream *sslv;
SSL *ssl = NULL;
@@ -295,7 +295,7 @@ new_ssl_stream(char *name, char *server_name, int fd, enum session_type type,
/* Create and return the ssl_stream. */
sslv = xmalloc(sizeof *sslv);
- stream_init(&sslv->stream, &ssl_stream_class, EAGAIN, name);
+ stream_init(&sslv->stream, &ssl_stream_class, EAGAIN, name, bufsize);
sslv->state = state;
sslv->type = type;
sslv->fd = fd;
@@ -356,7 +356,8 @@ get_server_name(const char *suffix_)
}
static int
-ssl_open(const char *name, char *suffix, struct stream **streamp, uint8_t dscp)
+ssl_open(const char *name, char *suffix, struct stream **streamp,
+ uint8_t dscp, int bufsize)
{
int error, fd;
@@ -370,7 +371,7 @@ ssl_open(const char *name, char *suffix, struct stream **streamp, uint8_t dscp)
if (fd >= 0) {
int state = error ? STATE_TCP_CONNECTING : STATE_SSL_CONNECTING;
return new_ssl_stream(xstrdup(name), get_server_name(suffix),
- fd, CLIENT, state, streamp);
+ fd, CLIENT, state, streamp, bufsize);
} else {
VLOG_ERR("%s: connect: %s", name, ovs_strerror(error));
return error;
@@ -958,7 +959,7 @@ pssl_accept(struct pstream *pstream, struct stream **new_streamp)
ss_format_address(&ss, &name);
ds_put_format(&name, ":%"PRIu16, ss_get_port(&ss));
return new_ssl_stream(ds_steal_cstr(&name), NULL, new_fd, SERVER,
- STATE_SSL_CONNECTING, new_streamp);
+ STATE_SSL_CONNECTING, new_streamp, 512);
}
static void
@@ -39,23 +39,25 @@ VLOG_DEFINE_THIS_MODULE(stream_tcp);
/* Takes ownership of 'name'. */
static int
-new_tcp_stream(char *name, int fd, int connect_status, struct stream **streamp)
+new_tcp_stream(char *name, int fd, int connect_status, struct stream **streamp,
+ int bufsize)
{
if (connect_status == 0) {
setsockopt_tcp_nodelay(fd);
}
- return new_fd_stream(name, fd, connect_status, AF_INET, streamp);
+ return new_fd_stream(name, fd, connect_status, AF_INET, streamp, bufsize);
}
static int
-tcp_open(const char *name, char *suffix, struct stream **streamp, uint8_t dscp)
+tcp_open(const char *name, char *suffix, struct stream **streamp,
+ uint8_t dscp, int bufsize)
{
int fd, error;
error = inet_open_active(SOCK_STREAM, suffix, -1, NULL, &fd, dscp);
if (fd >= 0) {
- return new_tcp_stream(xstrdup(name), fd, error, streamp);
+ return new_tcp_stream(xstrdup(name), fd, error, streamp, bufsize);
} else {
VLOG_ERR("%s: connect: %s", name, ovs_strerror(error));
return error;
@@ -126,7 +128,7 @@ ptcp_accept(int fd, const struct sockaddr_storage *ss,
ss_format_address(ss, &name);
ds_put_format(&name, ":%"PRIu16, ss_get_port(ss));
- return new_tcp_stream(ds_steal_cstr(&name), fd, 0, streamp);
+ return new_tcp_stream(ds_steal_cstr(&name), fd, 0, streamp, 512);
}
const struct pstream_class ptcp_pstream_class = {
@@ -42,7 +42,7 @@ VLOG_DEFINE_THIS_MODULE(stream_unix);
static int
unix_open(const char *name, char *suffix, struct stream **streamp,
- uint8_t dscp OVS_UNUSED)
+ uint8_t dscp OVS_UNUSED, int bufsize)
{
char *connect_path;
int fd;
@@ -59,7 +59,7 @@ unix_open(const char *name, char *suffix, struct stream **streamp,
free(connect_path);
return new_fd_stream(xstrdup(name), fd, check_connection_completion(fd),
- AF_UNIX, streamp);
+ AF_UNIX, streamp, bufsize);
}
const struct stream_class unix_stream_class = {
@@ -124,7 +124,7 @@ punix_accept(int fd, const struct sockaddr_storage *ss, size_t ss_len,
static atomic_count next_idx = ATOMIC_COUNT_INIT(0);
bound_name = xasprintf("unix#%u", atomic_count_inc(&next_idx));
}
- return new_fd_stream(bound_name, fd, 0, AF_UNIX, streamp);
+ return new_fd_stream(bound_name, fd, 0, AF_UNIX, streamp, 512);
}
const struct pstream_class punix_pstream_class = {
@@ -161,7 +161,7 @@ windows_open(const char *name, char *suffix, struct stream **streamp,
return ENOENT;
}
struct windows_stream *s = xmalloc(sizeof *s);
- stream_init(&s->stream, &windows_stream_class, 0, xstrdup(name));
+ stream_init(&s->stream, &windows_stream_class, 0, xstrdup(name), 512);
s->pipe_path = connect_path;
s->fd = npipe;
/* This is an active stream. */
@@ -555,7 +555,8 @@ pwindows_accept(struct pstream *pstream, struct stream **new_streamp)
/* Give the handle p->fd to the new created active stream and specify it
* was created by an active stream. */
struct windows_stream *p_temp = xmalloc(sizeof *p_temp);
- stream_init(&p_temp->stream, &windows_stream_class, 0, xstrdup("unix"));
+ stream_init(&p_temp->stream, &windows_stream_class, 0, xstrdup("unix"),
+ 512);
p_temp->fd = p->fd;
/* Specify it was created by a passive stream. */
p_temp->server = true;
@@ -208,7 +208,8 @@ stream_verify_name(const char *name)
* stores a pointer to the new connection in '*streamp', otherwise a null
* pointer. */
int
-stream_open(const char *name, struct stream **streamp, uint8_t dscp)
+stream_open(const char *name, struct stream **streamp, uint8_t dscp,
+ int bufsize)
{
const struct stream_class *class;
struct stream *stream;
@@ -225,7 +226,7 @@ stream_open(const char *name, struct stream **streamp, uint8_t dscp)
/* Call class's "open" function. */
suffix_copy = xstrdup(strchr(name, ':') + 1);
- error = class->open(name, suffix_copy, &stream, dscp);
+ error = class->open(name, suffix_copy, &stream, dscp, bufsize);
free(suffix_copy);
if (error) {
goto error;
@@ -650,7 +651,7 @@ pstream_get_bound_port(const struct pstream *pstream)
* Takes ownership of 'name'. */
void
stream_init(struct stream *stream, const struct stream_class *class,
- int connect_status, char *name)
+ int connect_status, char *name, int bufsize)
{
memset(stream, 0, sizeof *stream);
stream->class = class;
@@ -659,6 +660,7 @@ stream_init(struct stream *stream, const struct stream_class *class,
: SCS_DISCONNECTED);
stream->error = connect_status;
stream->name = name;
+ stream->bufsize = bufsize;
ovs_assert(stream->state != SCS_CONNECTING || class->connect);
}
@@ -701,7 +703,7 @@ int
stream_open_with_default_port(const char *name_,
uint16_t default_port,
struct stream **streamp,
- uint8_t dscp)
+ uint8_t dscp, int bufsize)
{
char *name;
int error;
@@ -721,7 +723,7 @@ stream_open_with_default_port(const char *name_,
} else {
name = xstrdup(name_);
}
- error = stream_open(name, streamp, dscp);
+ error = stream_open(name, streamp, dscp, bufsize);
free(name);
return error;
@@ -33,7 +33,7 @@ void stream_usage(const char *name, bool active, bool passive, bool bootstrap);
/* Bidirectional byte streams. */
int stream_verify_name(const char *name);
-int stream_open(const char *name, struct stream **, uint8_t dscp);
+int stream_open(const char *name, struct stream **, uint8_t dscp, int bufsize);
int stream_open_block(int error, long long int timeout, struct stream **);
void stream_close(struct stream *);
const char *stream_get_name(const struct stream *);
@@ -72,7 +72,7 @@ ovs_be16 pstream_get_bound_port(const struct pstream *);
int stream_open_with_default_port(const char *name,
uint16_t default_port,
struct stream **,
- uint8_t dscp);
+ uint8_t dscp, int bufsize);
int pstream_open_with_default_port(const char *name,
uint16_t default_port,
struct pstream **,
@@ -459,7 +459,8 @@ unixctl_client_create(const char *path, struct jsonrpc **client)
*client = NULL;
- error = stream_open_block(stream_open(unix_path, &stream, DSCP_DEFAULT),
+ error = stream_open_block(stream_open(unix_path, &stream,
+ DSCP_DEFAULT, 512),
-1, &stream);
free(unix_path);
free(abs_path);
@@ -78,7 +78,7 @@ vconn_stream_open(const char *name, uint32_t allowed_versions,
struct stream *stream;
int error;
- error = stream_open_with_default_port(name, OFP_PORT, &stream, dscp);
+ error = stream_open_with_default_port(name, OFP_PORT, &stream, dscp, 512);
if (!error) {
error = stream_connect(stream);
if (!error || error == EAGAIN) {
@@ -192,7 +192,7 @@ open_rpc(int min_args, enum args_needed need,
* the first one that is up and hosts the database we want (if any) in an
* acceptable state. */
struct jsonrpc_session *js = jsonrpc_session_open_multiple(
- &remotes, false);
+ &remotes, false, 512);
svec_destroy(&remotes);
unsigned int seqno = 0;
@@ -505,7 +505,7 @@ open_jsonrpc(const char *server)
int error;
error = stream_open_block(jsonrpc_stream_open(server, &stream,
- DSCP_DEFAULT), -1, &stream);
+ DSCP_DEFAULT, 512), -1, &stream);
if (error == EAFNOSUPPORT) {
struct pstream *pstream;
@@ -272,7 +272,7 @@ do_request(struct ovs_cmdl_context *ctx)
}
error = stream_open_block(jsonrpc_stream_open(ctx->argv[1], &stream,
- DSCP_DEFAULT), -1, &stream);
+ DSCP_DEFAULT, 512), -1, &stream);
if (error) {
ovs_fatal(error, "could not open \"%s\"", ctx->argv[1]);
}
@@ -312,7 +312,7 @@ do_notify(struct ovs_cmdl_context *ctx)
}
error = stream_open_block(jsonrpc_stream_open(ctx->argv[1], &stream,
- DSCP_DEFAULT), -1, &stream);
+ DSCP_DEFAULT, 512), -1, &stream);
if (error) {
ovs_fatal(error, "could not open \"%s\"", ctx->argv[1]);
}
@@ -2417,7 +2417,7 @@ do_idl(struct ovs_cmdl_context *ctx)
struct stream *stream;
error = stream_open_block(jsonrpc_stream_open(ctx->argv[1], &stream,
- DSCP_DEFAULT), -1, &stream);
+ DSCP_DEFAULT, 512), -1, &stream);
if (error) {
ovs_fatal(error, "failed to connect to \"%s\"", ctx->argv[1]);
}
@@ -36,7 +36,7 @@ main(int argc, char *argv[])
ovs_fatal(0, "usage: %s REMOTE", argv[0]);
}
- error = stream_open_block(stream_open(argv[1], &stream, DSCP_DEFAULT),
+ error = stream_open_block(stream_open(argv[1], &stream, DSCP_DEFAULT, 512),
10000, &stream);
if (error) {
VLOG_ERR("stream_open_block(%s) failure: %s",
@@ -178,7 +178,7 @@ main(int argc, char *argv[])
/* Initialize IDL. */
idl = the_idl = ovsdb_idl_create_unconnected(&ovsrec_idl_class, false);
ovsdb_idl_set_shuffle_remotes(idl, shuffle_remotes);
- ovsdb_idl_set_remote(idl, db, retry);
+ ovsdb_idl_set_remote(idl, db, retry, 512);
ovsdb_idl_set_leader_only(idl, leader_only);
run_prerequisites(commands, n_commands, idl);
Allow jsonrpc clients (e.g. ovn-controller) to specify jsonrpc input buffer size in order to reduce overhead when downloading huge db size since current value is 512B. The user can specify rpc buffer size using ovsdb_idl_set_remote routine passing requested value Signed-off-by: Lorenzo Bianconi <lorenzo.bianconi@redhat.com> --- lib/jsonrpc.c | 23 ++++++++++++++++------- lib/jsonrpc.h | 5 +++-- lib/netdev-dummy.c | 5 +++-- lib/ovsdb-idl.c | 13 +++++++++---- lib/ovsdb-idl.h | 2 +- lib/stream-fd.c | 4 ++-- lib/stream-fd.h | 2 +- lib/stream-provider.h | 5 +++-- lib/stream-ssl.c | 11 ++++++----- lib/stream-tcp.c | 12 +++++++----- lib/stream-unix.c | 6 +++--- lib/stream-windows.c | 5 +++-- lib/stream.c | 12 +++++++----- lib/stream.h | 4 ++-- lib/unixctl.c | 3 ++- lib/vconn-stream.c | 2 +- ovsdb/ovsdb-client.c | 4 ++-- tests/test-jsonrpc.c | 4 ++-- tests/test-ovsdb.c | 2 +- tests/test-stream.c | 2 +- utilities/ovs-vsctl.c | 2 +- 21 files changed, 76 insertions(+), 52 deletions(-)