diff mbox series

[ovs-dev,RFC] jsonrpc: make jsonrpc input_buffer size parametric

Message ID 5c845cd1f3824b110ac7c1ebe977c7aa3d830a38.1572963965.git.lorenzo.bianconi@redhat.com
State Accepted
Headers show
Series [ovs-dev,RFC] jsonrpc: make jsonrpc input_buffer size parametric | expand

Commit Message

Lorenzo Bianconi Nov. 5, 2019, 2:27 p.m. UTC
Allow jsonrpc clients (e.g. ovn-controller) to specify jsonrpc input
buffer size in order to reduce overhead when downloading huge db size
since current value is 512B. The user can specify rpc buffer size using
ovsdb_idl_set_remote routine passing requested value

Signed-off-by: Lorenzo Bianconi <lorenzo.bianconi@redhat.com>
---
 lib/jsonrpc.c         | 23 ++++++++++++++++-------
 lib/jsonrpc.h         |  5 +++--
 lib/netdev-dummy.c    |  5 +++--
 lib/ovsdb-idl.c       | 13 +++++++++----
 lib/ovsdb-idl.h       |  2 +-
 lib/stream-fd.c       |  4 ++--
 lib/stream-fd.h       |  2 +-
 lib/stream-provider.h |  5 +++--
 lib/stream-ssl.c      | 11 ++++++-----
 lib/stream-tcp.c      | 12 +++++++-----
 lib/stream-unix.c     |  6 +++---
 lib/stream-windows.c  |  5 +++--
 lib/stream.c          | 12 +++++++-----
 lib/stream.h          |  4 ++--
 lib/unixctl.c         |  3 ++-
 lib/vconn-stream.c    |  2 +-
 ovsdb/ovsdb-client.c  |  4 ++--
 tests/test-jsonrpc.c  |  4 ++--
 tests/test-ovsdb.c    |  2 +-
 tests/test-stream.c   |  2 +-
 utilities/ovs-vsctl.c |  2 +-
 21 files changed, 76 insertions(+), 52 deletions(-)

Comments

Ben Pfaff Nov. 5, 2019, 5:34 p.m. UTC | #1
On Tue, Nov 05, 2019 at 04:27:51PM +0200, Lorenzo Bianconi wrote:
> Allow jsonrpc clients (e.g. ovn-controller) to specify jsonrpc input
> buffer size in order to reduce overhead when downloading huge db size
> since current value is 512B. The user can specify rpc buffer size using
> ovsdb_idl_set_remote routine passing requested value
> 
> Signed-off-by: Lorenzo Bianconi <lorenzo.bianconi@redhat.com>

Hmm, I thought that we had decided to just try a 4096-byte buffer to
start.  That would be a much smaller patch.  The marginal benefits of
buffers larger than that probably decline a lot since the processing
cost of 4096 bytes of JSON is probably a lot more than the overhead of a
system call.  I imagine that any further benefit is probably from being
able to process more JSON per trip through the main loop.  Those same
benefits could also be obtained by increasing the 'for' loop limit in
jsonrpc_recv() from 50 to some larger number.
Lorenzo Bianconi Nov. 5, 2019, 5:42 p.m. UTC | #2
>
> On Tue, Nov 05, 2019 at 04:27:51PM +0200, Lorenzo Bianconi wrote:
> > Allow jsonrpc clients (e.g. ovn-controller) to specify jsonrpc input
> > buffer size in order to reduce overhead when downloading huge db size
> > since current value is 512B. The user can specify rpc buffer size using
> > ovsdb_idl_set_remote routine passing requested value
> >
> > Signed-off-by: Lorenzo Bianconi <lorenzo.bianconi@redhat.com>
>
> Hmm, I thought that we had decided to just try a 4096-byte buffer to
> start.  That would be a much smaller patch.  The marginal benefits of
> buffers larger than that probably decline a lot since the processing
> cost of 4096 bytes of JSON is probably a lot more than the overhead of a
> system call.  I imagine that any further benefit is probably from being
> able to process more JSON per trip through the main loop.  Those same
> benefits could also be obtained by increasing the 'for' loop limit in
> jsonrpc_recv() from 50 to some larger number.

Hi Ben,

thanks for the review. I tried to get a PoC (this is why I sent it as
RFC) but since the IDL/reconnect code is pretty spread
I ended up with a large patch. I agree we can just set the bufsize to 4096.
If it is ok for everybody I will post a patch to set the buffer size
to one page.

Regards,
Lorenzo
Ben Pfaff Nov. 5, 2019, 6:39 p.m. UTC | #3
On Tue, Nov 05, 2019 at 07:42:51PM +0200, Lorenzo Bianconi wrote:
> >
> > On Tue, Nov 05, 2019 at 04:27:51PM +0200, Lorenzo Bianconi wrote:
> > > Allow jsonrpc clients (e.g. ovn-controller) to specify jsonrpc input
> > > buffer size in order to reduce overhead when downloading huge db size
> > > since current value is 512B. The user can specify rpc buffer size using
> > > ovsdb_idl_set_remote routine passing requested value
> > >
> > > Signed-off-by: Lorenzo Bianconi <lorenzo.bianconi@redhat.com>
> >
> > Hmm, I thought that we had decided to just try a 4096-byte buffer to
> > start.  That would be a much smaller patch.  The marginal benefits of
> > buffers larger than that probably decline a lot since the processing
> > cost of 4096 bytes of JSON is probably a lot more than the overhead of a
> > system call.  I imagine that any further benefit is probably from being
> > able to process more JSON per trip through the main loop.  Those same
> > benefits could also be obtained by increasing the 'for' loop limit in
> > jsonrpc_recv() from 50 to some larger number.
> 
> Hi Ben,
> 
> thanks for the review. I tried to get a PoC (this is why I sent it as
> RFC) but since the IDL/reconnect code is pretty spread
> I ended up with a large patch. I agree we can just set the bufsize to 4096.
> If it is ok for everybody I will post a patch to set the buffer size
> to one page.

I suspect that will be fine.
diff mbox series

Patch

diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
index b9619b822..7e9aa62ae 100644
--- a/lib/jsonrpc.c
+++ b/lib/jsonrpc.c
@@ -33,6 +33,7 @@ 
 #include "svec.h"
 #include "timeval.h"
 #include "openvswitch/vlog.h"
+#include "stream-provider.h"
 
 VLOG_DEFINE_THIS_MODULE(jsonrpc);
 
@@ -43,7 +44,7 @@  struct jsonrpc {
 
     /* Input. */
     struct byteq input;
-    uint8_t input_buffer[512];
+    uint8_t *input_buffer;
     struct json_parser *parser;
 
     /* Output. */
@@ -62,9 +63,11 @@  static void jsonrpc_error(struct jsonrpc *, int error);
 /* This is just the same as stream_open() except that it uses the default
  * JSONRPC port if none is specified. */
 int
-jsonrpc_stream_open(const char *name, struct stream **streamp, uint8_t dscp)
+jsonrpc_stream_open(const char *name, struct stream **streamp,
+                    uint8_t dscp, int bufsize)
 {
-    return stream_open_with_default_port(name, OVSDB_PORT, streamp, dscp);
+    return stream_open_with_default_port(name, OVSDB_PORT, streamp, dscp,
+                                         bufsize);
 }
 
 /* This is just the same as pstream_open() except that it uses the default
@@ -80,14 +83,16 @@  jsonrpc_pstream_open(const char *name, struct pstream **pstreamp, uint8_t dscp)
 struct jsonrpc *
 jsonrpc_open(struct stream *stream)
 {
+    int bufsize = stream->bufsize > 0 ? stream->bufsize : 512;
     struct jsonrpc *rpc;
 
     ovs_assert(stream != NULL);
 
     rpc = xzalloc(sizeof *rpc);
+    rpc->input_buffer = xzalloc(bufsize);
     rpc->name = xstrdup(stream_get_name(stream));
     rpc->stream = stream;
-    byteq_init(&rpc->input, rpc->input_buffer, sizeof rpc->input_buffer);
+    byteq_init(&rpc->input, rpc->input_buffer, bufsize);
     ovs_list_init(&rpc->output);
 
     return rpc;
@@ -101,6 +106,7 @@  jsonrpc_close(struct jsonrpc *rpc)
     if (rpc) {
         jsonrpc_cleanup(rpc);
         free(rpc->name);
+        free(rpc->input_buffer);
         free(rpc);
     }
 }
@@ -787,6 +793,7 @@  struct jsonrpc_session {
     int last_error;
     unsigned int seqno;
     uint8_t dscp;
+    int bufsize;
 };
 
 static void
@@ -814,11 +821,12 @@  struct jsonrpc_session *
 jsonrpc_session_open(const char *name, bool retry)
 {
     const struct svec remotes = { .names = (char **) &name, .n = 1 };
-    return jsonrpc_session_open_multiple(&remotes, retry);
+    return jsonrpc_session_open_multiple(&remotes, retry, 512);
 }
 
 struct jsonrpc_session *
-jsonrpc_session_open_multiple(const struct svec *remotes, bool retry)
+jsonrpc_session_open_multiple(const struct svec *remotes, bool retry,
+                              int bufsize)
 {
     struct jsonrpc_session *s;
 
@@ -839,6 +847,7 @@  jsonrpc_session_open_multiple(const struct svec *remotes, bool retry)
     s->seqno = 0;
     s->dscp = 0;
     s->last_error = 0;
+    s->bufsize = bufsize;
 
     const char *name = reconnect_get_name(s->reconnect);
     if (!pstream_verify_name(name)) {
@@ -931,7 +940,7 @@  jsonrpc_session_connect(struct jsonrpc_session *s)
 
     jsonrpc_session_disconnect(s);
     if (!reconnect_is_passive(s->reconnect)) {
-        error = jsonrpc_stream_open(name, &s->stream, s->dscp);
+        error = jsonrpc_stream_open(name, &s->stream, s->dscp, s->bufsize);
         if (!error) {
             reconnect_connecting(s->reconnect, time_msec());
         } else {
diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h
index a44114e8d..2c77732c2 100644
--- a/lib/jsonrpc.h
+++ b/lib/jsonrpc.h
@@ -40,7 +40,8 @@  struct svec;
 #define OVSDB_OLD_PORT 6632
 #define OVSDB_PORT 6640
 
-int jsonrpc_stream_open(const char *name, struct stream **, uint8_t dscp);
+int jsonrpc_stream_open(const char *name, struct stream **, uint8_t dscp,
+                        int bufsize);
 int jsonrpc_pstream_open(const char *name, struct pstream **, uint8_t dscp);
 
 struct jsonrpc *jsonrpc_open(struct stream *);
@@ -105,7 +106,7 @@  char *jsonrpc_msg_to_string(const struct jsonrpc_msg *);
 
 struct jsonrpc_session *jsonrpc_session_open(const char *name, bool retry);
 struct jsonrpc_session *jsonrpc_session_open_multiple(const struct svec *,
-                                                      bool retry);
+                                                      bool retry, int bufsize);
 struct jsonrpc_session *jsonrpc_session_open_unreliably(struct jsonrpc *,
                                                         uint8_t);
 void jsonrpc_session_close(struct jsonrpc_session *);
diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
index 71df29184..8a6cfd691 100644
--- a/lib/netdev-dummy.c
+++ b/lib/netdev-dummy.c
@@ -425,7 +425,7 @@  dummy_packet_conn_set_config(struct dummy_packet_conn *conn,
         conn->rconn.reconnect = reconnect;
         conn->type = ACTIVE;
 
-        error = stream_open(stream, &active_stream, DSCP_DEFAULT);
+        error = stream_open(stream, &active_stream, DSCP_DEFAULT, 512);
         conn->rconn.rstream = dummy_packet_stream_create(active_stream);
 
         switch (error) {
@@ -505,7 +505,8 @@  OVS_REQUIRES(dev->mutex)
                 error = stream_connect(rconn->rstream->stream);
             } else {
                 error = stream_open(reconnect_get_name(rconn->reconnect),
-                                    &rconn->rstream->stream, DSCP_DEFAULT);
+                                    &rconn->rstream->stream, DSCP_DEFAULT,
+                                    512);
             }
 
             switch (error) {
diff --git a/lib/ovsdb-idl.c b/lib/ovsdb-idl.c
index 190143f36..34ecca687 100644
--- a/lib/ovsdb-idl.c
+++ b/lib/ovsdb-idl.c
@@ -253,6 +253,7 @@  struct ovsdb_idl {
      * state machine must restart.  */
     struct jsonrpc_session *session; /* Connection to the server. */
     char *remote;                    /* 'session' remote name. */
+    int bufsize;
     enum ovsdb_idl_state state;      /* Current session state. */
     unsigned int state_seqno;        /* See above. */
     struct json *request_id;         /* JSON ID for request awaiting reply. */
@@ -455,7 +456,7 @@  ovsdb_idl_create(const char *remote, const struct ovsdb_idl_class *class,
 {
     struct ovsdb_idl *idl = ovsdb_idl_create_unconnected(
         class, monitor_everything_by_default);
-    ovsdb_idl_set_remote(idl, remote, retry);
+    ovsdb_idl_set_remote(idl, remote, retry, 512);
     return idl;
 }
 
@@ -511,11 +512,13 @@  ovsdb_idl_create_unconnected(const struct ovsdb_idl_class *class,
  * If 'retry' is true, the connection to the remote will automatically retry
  * when it fails.  If 'retry' is false, the connection is one-time. */
 void
-ovsdb_idl_set_remote(struct ovsdb_idl *idl, const char *remote, bool retry)
+ovsdb_idl_set_remote(struct ovsdb_idl *idl, const char *remote, bool retry,
+                     int bufsize)
 {
     if (idl
         && ((remote != NULL) != (idl->remote != NULL)
-            || (remote && idl->remote && strcmp(remote, idl->remote)))) {
+            || (remote && idl->remote && strcmp(remote, idl->remote))
+            || bufsize != idl->bufsize)) {
         ovs_assert(!idl->data.txn);
 
         /* Close the old session, if any. */
@@ -534,8 +537,10 @@  ovsdb_idl_set_remote(struct ovsdb_idl *idl, const char *remote, bool retry)
             if (idl->shuffle_remotes) {
                 svec_shuffle(&remotes);
             }
-            idl->session = jsonrpc_session_open_multiple(&remotes, retry);
+            idl->session = jsonrpc_session_open_multiple(&remotes, retry,
+                                                         bufsize);
             svec_destroy(&remotes);
+            idl->bufsize = bufsize;
 
             idl->state_seqno = UINT_MAX;
 
diff --git a/lib/ovsdb-idl.h b/lib/ovsdb-idl.h
index 9f12ce320..1d40c7779 100644
--- a/lib/ovsdb-idl.h
+++ b/lib/ovsdb-idl.h
@@ -62,7 +62,7 @@  struct ovsdb_idl *ovsdb_idl_create(const char *remote,
                                    bool retry);
 struct ovsdb_idl *ovsdb_idl_create_unconnected(
     const struct ovsdb_idl_class *, bool monitor_everything_by_default);
-void ovsdb_idl_set_remote(struct ovsdb_idl *, const char *, bool);
+void ovsdb_idl_set_remote(struct ovsdb_idl *, const char *, bool, int);
 void ovsdb_idl_set_shuffle_remotes(struct ovsdb_idl *, bool);
 void ovsdb_idl_destroy(struct ovsdb_idl *);
 
diff --git a/lib/stream-fd.c b/lib/stream-fd.c
index 46ee7ae27..4034ac4e8 100644
--- a/lib/stream-fd.c
+++ b/lib/stream-fd.c
@@ -59,12 +59,12 @@  static void maybe_unlink_and_free(char *path);
  * implementation never fails.) */
 int
 new_fd_stream(char *name, int fd, int connect_status, int fd_type,
-              struct stream **streamp)
+              struct stream **streamp, int bufsize)
 {
     struct stream_fd *s;
 
     s = xmalloc(sizeof *s);
-    stream_init(&s->stream, &stream_fd_class, connect_status, name);
+    stream_init(&s->stream, &stream_fd_class, connect_status, name, bufsize);
     s->fd = fd;
     s->fd_type = fd_type;
     *streamp = &s->stream;
diff --git a/lib/stream-fd.h b/lib/stream-fd.h
index 24639900b..3e60ab10c 100644
--- a/lib/stream-fd.h
+++ b/lib/stream-fd.h
@@ -29,7 +29,7 @@  struct pstream;
 struct sockaddr_storage;
 
 int new_fd_stream(char *name, int fd, int connect_status,
-                  int fd_type, struct stream **streamp);
+                  int fd_type, struct stream **streamp, int bufsize);
 int new_fd_pstream(char *name, int fd,
                    int (*accept_cb)(int fd, const struct sockaddr_storage *ss,
                                     size_t ss_len, struct stream **),
diff --git a/lib/stream-provider.h b/lib/stream-provider.h
index 75f4f059b..e59b72cb2 100644
--- a/lib/stream-provider.h
+++ b/lib/stream-provider.h
@@ -31,10 +31,11 @@  struct stream {
     int error;
     char *name;
     char *peer_id;
+    int bufsize;
 };
 
 void stream_init(struct stream *, const struct stream_class *,
-                 int connect_status, char *name);
+                 int connect_status, char *name, int bufsize);
 static inline void stream_assert_class(const struct stream *stream,
                                        const struct stream_class *class)
 {
@@ -66,7 +67,7 @@  struct stream_class {
      * EAGAIN (not EINPROGRESS, as returned by the connect system call) and
      * continue the connection in the background. */
     int (*open)(const char *name, char *suffix, struct stream **streamp,
-                uint8_t dscp);
+                uint8_t dscp, int bufsize);
 
     /* Closes 'stream' and frees associated memory. */
     void (*close)(struct stream *stream);
diff --git a/lib/stream-ssl.c b/lib/stream-ssl.c
index 078fcbc3a..a3bc09f4c 100644
--- a/lib/stream-ssl.c
+++ b/lib/stream-ssl.c
@@ -233,7 +233,7 @@  want_to_poll_events(int want)
  * ownership of 'server_name'. */
 static int
 new_ssl_stream(char *name, char *server_name, int fd, enum session_type type,
-               enum ssl_state state, struct stream **streamp)
+               enum ssl_state state, struct stream **streamp, int bufsize)
 {
     struct ssl_stream *sslv;
     SSL *ssl = NULL;
@@ -295,7 +295,7 @@  new_ssl_stream(char *name, char *server_name, int fd, enum session_type type,
 
     /* Create and return the ssl_stream. */
     sslv = xmalloc(sizeof *sslv);
-    stream_init(&sslv->stream, &ssl_stream_class, EAGAIN, name);
+    stream_init(&sslv->stream, &ssl_stream_class, EAGAIN, name, bufsize);
     sslv->state = state;
     sslv->type = type;
     sslv->fd = fd;
@@ -356,7 +356,8 @@  get_server_name(const char *suffix_)
 }
 
 static int
-ssl_open(const char *name, char *suffix, struct stream **streamp, uint8_t dscp)
+ssl_open(const char *name, char *suffix, struct stream **streamp,
+         uint8_t dscp, int bufsize)
 {
     int error, fd;
 
@@ -370,7 +371,7 @@  ssl_open(const char *name, char *suffix, struct stream **streamp, uint8_t dscp)
     if (fd >= 0) {
         int state = error ? STATE_TCP_CONNECTING : STATE_SSL_CONNECTING;
         return new_ssl_stream(xstrdup(name), get_server_name(suffix),
-                              fd, CLIENT, state, streamp);
+                              fd, CLIENT, state, streamp, bufsize);
     } else {
         VLOG_ERR("%s: connect: %s", name, ovs_strerror(error));
         return error;
@@ -958,7 +959,7 @@  pssl_accept(struct pstream *pstream, struct stream **new_streamp)
     ss_format_address(&ss, &name);
     ds_put_format(&name, ":%"PRIu16, ss_get_port(&ss));
     return new_ssl_stream(ds_steal_cstr(&name), NULL, new_fd, SERVER,
-                          STATE_SSL_CONNECTING, new_streamp);
+                          STATE_SSL_CONNECTING, new_streamp, 512);
 }
 
 static void
diff --git a/lib/stream-tcp.c b/lib/stream-tcp.c
index e8dc2bfaa..d4fe5f4c7 100644
--- a/lib/stream-tcp.c
+++ b/lib/stream-tcp.c
@@ -39,23 +39,25 @@  VLOG_DEFINE_THIS_MODULE(stream_tcp);
 
 /* Takes ownership of 'name'. */
 static int
-new_tcp_stream(char *name, int fd, int connect_status, struct stream **streamp)
+new_tcp_stream(char *name, int fd, int connect_status, struct stream **streamp,
+               int bufsize)
 {
     if (connect_status == 0) {
         setsockopt_tcp_nodelay(fd);
     }
 
-    return new_fd_stream(name, fd, connect_status, AF_INET, streamp);
+    return new_fd_stream(name, fd, connect_status, AF_INET, streamp, bufsize);
 }
 
 static int
-tcp_open(const char *name, char *suffix, struct stream **streamp, uint8_t dscp)
+tcp_open(const char *name, char *suffix, struct stream **streamp,
+         uint8_t dscp, int bufsize)
 {
     int fd, error;
 
     error = inet_open_active(SOCK_STREAM, suffix, -1, NULL, &fd, dscp);
     if (fd >= 0) {
-        return new_tcp_stream(xstrdup(name), fd, error, streamp);
+        return new_tcp_stream(xstrdup(name), fd, error, streamp, bufsize);
     } else {
         VLOG_ERR("%s: connect: %s", name, ovs_strerror(error));
         return error;
@@ -126,7 +128,7 @@  ptcp_accept(int fd, const struct sockaddr_storage *ss,
     ss_format_address(ss, &name);
     ds_put_format(&name, ":%"PRIu16, ss_get_port(ss));
 
-    return new_tcp_stream(ds_steal_cstr(&name), fd, 0, streamp);
+    return new_tcp_stream(ds_steal_cstr(&name), fd, 0, streamp, 512);
 }
 
 const struct pstream_class ptcp_pstream_class = {
diff --git a/lib/stream-unix.c b/lib/stream-unix.c
index d265efb83..548791a6f 100644
--- a/lib/stream-unix.c
+++ b/lib/stream-unix.c
@@ -42,7 +42,7 @@  VLOG_DEFINE_THIS_MODULE(stream_unix);
 
 static int
 unix_open(const char *name, char *suffix, struct stream **streamp,
-          uint8_t dscp OVS_UNUSED)
+          uint8_t dscp OVS_UNUSED, int bufsize)
 {
     char *connect_path;
     int fd;
@@ -59,7 +59,7 @@  unix_open(const char *name, char *suffix, struct stream **streamp,
 
     free(connect_path);
     return new_fd_stream(xstrdup(name), fd, check_connection_completion(fd),
-                         AF_UNIX, streamp);
+                         AF_UNIX, streamp, bufsize);
 }
 
 const struct stream_class unix_stream_class = {
@@ -124,7 +124,7 @@  punix_accept(int fd, const struct sockaddr_storage *ss, size_t ss_len,
         static atomic_count next_idx = ATOMIC_COUNT_INIT(0);
         bound_name = xasprintf("unix#%u", atomic_count_inc(&next_idx));
     }
-    return new_fd_stream(bound_name, fd, 0, AF_UNIX, streamp);
+    return new_fd_stream(bound_name, fd, 0, AF_UNIX, streamp, 512);
 }
 
 const struct pstream_class punix_pstream_class = {
diff --git a/lib/stream-windows.c b/lib/stream-windows.c
index 34bc610b6..65acbc041 100644
--- a/lib/stream-windows.c
+++ b/lib/stream-windows.c
@@ -161,7 +161,7 @@  windows_open(const char *name, char *suffix, struct stream **streamp,
         return ENOENT;
     }
     struct windows_stream *s = xmalloc(sizeof *s);
-    stream_init(&s->stream, &windows_stream_class, 0, xstrdup(name));
+    stream_init(&s->stream, &windows_stream_class, 0, xstrdup(name), 512);
     s->pipe_path = connect_path;
     s->fd = npipe;
     /* This is an active stream. */
@@ -555,7 +555,8 @@  pwindows_accept(struct pstream *pstream, struct stream **new_streamp)
     /* Give the handle p->fd to the new created active stream and specify it
      * was created by an active stream. */
     struct windows_stream *p_temp = xmalloc(sizeof *p_temp);
-    stream_init(&p_temp->stream, &windows_stream_class, 0, xstrdup("unix"));
+    stream_init(&p_temp->stream, &windows_stream_class, 0, xstrdup("unix"),
+                512);
     p_temp->fd = p->fd;
     /* Specify it was created by a passive stream. */
     p_temp->server = true;
diff --git a/lib/stream.c b/lib/stream.c
index e246b3773..37c0ac430 100644
--- a/lib/stream.c
+++ b/lib/stream.c
@@ -208,7 +208,8 @@  stream_verify_name(const char *name)
  * stores a pointer to the new connection in '*streamp', otherwise a null
  * pointer.  */
 int
-stream_open(const char *name, struct stream **streamp, uint8_t dscp)
+stream_open(const char *name, struct stream **streamp, uint8_t dscp,
+            int bufsize)
 {
     const struct stream_class *class;
     struct stream *stream;
@@ -225,7 +226,7 @@  stream_open(const char *name, struct stream **streamp, uint8_t dscp)
 
     /* Call class's "open" function. */
     suffix_copy = xstrdup(strchr(name, ':') + 1);
-    error = class->open(name, suffix_copy, &stream, dscp);
+    error = class->open(name, suffix_copy, &stream, dscp, bufsize);
     free(suffix_copy);
     if (error) {
         goto error;
@@ -650,7 +651,7 @@  pstream_get_bound_port(const struct pstream *pstream)
  * Takes ownership of 'name'. */
 void
 stream_init(struct stream *stream, const struct stream_class *class,
-            int connect_status, char *name)
+            int connect_status, char *name, int bufsize)
 {
     memset(stream, 0, sizeof *stream);
     stream->class = class;
@@ -659,6 +660,7 @@  stream_init(struct stream *stream, const struct stream_class *class,
                     : SCS_DISCONNECTED);
     stream->error = connect_status;
     stream->name = name;
+    stream->bufsize = bufsize;
     ovs_assert(stream->state != SCS_CONNECTING || class->connect);
 }
 
@@ -701,7 +703,7 @@  int
 stream_open_with_default_port(const char *name_,
                               uint16_t default_port,
                               struct stream **streamp,
-                              uint8_t dscp)
+                              uint8_t dscp, int bufsize)
 {
     char *name;
     int error;
@@ -721,7 +723,7 @@  stream_open_with_default_port(const char *name_,
     } else {
         name = xstrdup(name_);
     }
-    error = stream_open(name, streamp, dscp);
+    error = stream_open(name, streamp, dscp, bufsize);
     free(name);
 
     return error;
diff --git a/lib/stream.h b/lib/stream.h
index 77bffa498..049d187df 100644
--- a/lib/stream.h
+++ b/lib/stream.h
@@ -33,7 +33,7 @@  void stream_usage(const char *name, bool active, bool passive, bool bootstrap);
 
 /* Bidirectional byte streams. */
 int stream_verify_name(const char *name);
-int stream_open(const char *name, struct stream **, uint8_t dscp);
+int stream_open(const char *name, struct stream **, uint8_t dscp, int bufsize);
 int stream_open_block(int error, long long int timeout, struct stream **);
 void stream_close(struct stream *);
 const char *stream_get_name(const struct stream *);
@@ -72,7 +72,7 @@  ovs_be16 pstream_get_bound_port(const struct pstream *);
 int stream_open_with_default_port(const char *name,
                                   uint16_t default_port,
                                   struct stream **,
-                                  uint8_t dscp);
+                                  uint8_t dscp, int bufsize);
 int pstream_open_with_default_port(const char *name,
                                    uint16_t default_port,
                                    struct pstream **,
diff --git a/lib/unixctl.c b/lib/unixctl.c
index c216de3d0..bcd587845 100644
--- a/lib/unixctl.c
+++ b/lib/unixctl.c
@@ -459,7 +459,8 @@  unixctl_client_create(const char *path, struct jsonrpc **client)
 
     *client = NULL;
 
-    error = stream_open_block(stream_open(unix_path, &stream, DSCP_DEFAULT),
+    error = stream_open_block(stream_open(unix_path, &stream,
+                                          DSCP_DEFAULT, 512),
                               -1, &stream);
     free(unix_path);
     free(abs_path);
diff --git a/lib/vconn-stream.c b/lib/vconn-stream.c
index 0794ecc05..ea3e183d6 100644
--- a/lib/vconn-stream.c
+++ b/lib/vconn-stream.c
@@ -78,7 +78,7 @@  vconn_stream_open(const char *name, uint32_t allowed_versions,
     struct stream *stream;
     int error;
 
-    error = stream_open_with_default_port(name, OFP_PORT, &stream, dscp);
+    error = stream_open_with_default_port(name, OFP_PORT, &stream, dscp, 512);
     if (!error) {
         error = stream_connect(stream);
         if (!error || error == EAGAIN) {
diff --git a/ovsdb/ovsdb-client.c b/ovsdb/ovsdb-client.c
index 72756eb1f..344685449 100644
--- a/ovsdb/ovsdb-client.c
+++ b/ovsdb/ovsdb-client.c
@@ -192,7 +192,7 @@  open_rpc(int min_args, enum args_needed need,
      * the first one that is up and hosts the database we want (if any) in an
      * acceptable state. */
     struct jsonrpc_session *js = jsonrpc_session_open_multiple(
-        &remotes, false);
+        &remotes, false, 512);
     svec_destroy(&remotes);
 
     unsigned int seqno = 0;
@@ -505,7 +505,7 @@  open_jsonrpc(const char *server)
     int error;
 
     error = stream_open_block(jsonrpc_stream_open(server, &stream,
-                              DSCP_DEFAULT), -1, &stream);
+                              DSCP_DEFAULT, 512), -1, &stream);
     if (error == EAFNOSUPPORT) {
         struct pstream *pstream;
 
diff --git a/tests/test-jsonrpc.c b/tests/test-jsonrpc.c
index 04e941b14..e05b01139 100644
--- a/tests/test-jsonrpc.c
+++ b/tests/test-jsonrpc.c
@@ -272,7 +272,7 @@  do_request(struct ovs_cmdl_context *ctx)
     }
 
     error = stream_open_block(jsonrpc_stream_open(ctx->argv[1], &stream,
-                              DSCP_DEFAULT), -1, &stream);
+                              DSCP_DEFAULT, 512), -1, &stream);
     if (error) {
         ovs_fatal(error, "could not open \"%s\"", ctx->argv[1]);
     }
@@ -312,7 +312,7 @@  do_notify(struct ovs_cmdl_context *ctx)
     }
 
     error = stream_open_block(jsonrpc_stream_open(ctx->argv[1], &stream,
-                              DSCP_DEFAULT), -1, &stream);
+                              DSCP_DEFAULT, 512), -1, &stream);
     if (error) {
         ovs_fatal(error, "could not open \"%s\"", ctx->argv[1]);
     }
diff --git a/tests/test-ovsdb.c b/tests/test-ovsdb.c
index b1a4be36b..a84a477d4 100644
--- a/tests/test-ovsdb.c
+++ b/tests/test-ovsdb.c
@@ -2417,7 +2417,7 @@  do_idl(struct ovs_cmdl_context *ctx)
         struct stream *stream;
 
         error = stream_open_block(jsonrpc_stream_open(ctx->argv[1], &stream,
-                                  DSCP_DEFAULT), -1, &stream);
+                                  DSCP_DEFAULT, 512), -1, &stream);
         if (error) {
             ovs_fatal(error, "failed to connect to \"%s\"", ctx->argv[1]);
         }
diff --git a/tests/test-stream.c b/tests/test-stream.c
index 4af44200e..a35fd9efb 100644
--- a/tests/test-stream.c
+++ b/tests/test-stream.c
@@ -36,7 +36,7 @@  main(int argc, char *argv[])
         ovs_fatal(0, "usage: %s REMOTE", argv[0]);
     }
 
-    error = stream_open_block(stream_open(argv[1], &stream, DSCP_DEFAULT),
+    error = stream_open_block(stream_open(argv[1], &stream, DSCP_DEFAULT, 512),
                               10000, &stream);
     if (error) {
         VLOG_ERR("stream_open_block(%s) failure: %s",
diff --git a/utilities/ovs-vsctl.c b/utilities/ovs-vsctl.c
index 7232471e6..9e32cad34 100644
--- a/utilities/ovs-vsctl.c
+++ b/utilities/ovs-vsctl.c
@@ -178,7 +178,7 @@  main(int argc, char *argv[])
     /* Initialize IDL. */
     idl = the_idl = ovsdb_idl_create_unconnected(&ovsrec_idl_class, false);
     ovsdb_idl_set_shuffle_remotes(idl, shuffle_remotes);
-    ovsdb_idl_set_remote(idl, db, retry);
+    ovsdb_idl_set_remote(idl, db, retry, 512);
     ovsdb_idl_set_leader_only(idl, leader_only);
     run_prerequisites(commands, n_commands, idl);