diff mbox series

[ovs-dev,3/5] raft: Set threshold on backlog for raft connections.

Message ID 20201026014257.215501-4-i.maximets@ovn.org
State Superseded
Headers show
Series Mitigate RAFT memory consumption issues. | expand

Commit Message

Ilya Maximets Oct. 26, 2020, 1:42 a.m. UTC
RAFT messages could be fairly big.  If something abnormal happens to
one of the servers in a cluster it may not be able to process all the
incoming messages in a timely manner.  This results in jsonrpc backlog
growth on the sender's side.  For example if follower gets many new
clients at once that it needs to serve, or it decides to take a
snapshot in a period of high number of database changes.
If backlog grows large enough it becomes harder and harder for follower
to process incoming raft messages, it sends outdated replies and
starts receiving snapshots and the whole raft log from the leader.
Sometimes backlog grows too high (60GB in this example):

      jsonrpc|INFO|excessive sending backlog, jsonrpc: ssl:<ip>,
                   num of msgs: 15370, backlog: 61731060773.

In this case OS might actually decide to kill the sender to free some
memory.  Anyway, It could take a lot of time for such a server to catch
up with the rest of the cluster if it has so much data to receive and
process.

Introducing backlog thresholds for jsonrpc connections.
If sending backlog will exceed particular values (500 messages or
4GB in size), connection will be dropped and re-created.  This will
allow to drop all the current backlog and start over increasing
chances of cluster recovery.

Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=1888829
Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
---
 NEWS          |  2 ++
 lib/jsonrpc.c | 57 ++++++++++++++++++++++++++++++++++++++++++++++++++-
 lib/jsonrpc.h |  6 ++++++
 ovsdb/raft.c  |  5 +++++
 4 files changed, 69 insertions(+), 1 deletion(-)

Comments

Dumitru Ceara Oct. 28, 2020, 10:49 a.m. UTC | #1
On 10/26/20 2:42 AM, Ilya Maximets wrote:
> RAFT messages could be fairly big.  If something abnormal happens to
> one of the servers in a cluster it may not be able to process all the
> incoming messages in a timely manner.  This results in jsonrpc backlog
> growth on the sender's side.  For example if follower gets many new
> clients at once that it needs to serve, or it decides to take a
> snapshot in a period of high number of database changes.
> If backlog grows large enough it becomes harder and harder for follower
> to process incoming raft messages, it sends outdated replies and
> starts receiving snapshots and the whole raft log from the leader.
> Sometimes backlog grows too high (60GB in this example):
> 
>       jsonrpc|INFO|excessive sending backlog, jsonrpc: ssl:<ip>,
>                    num of msgs: 15370, backlog: 61731060773.
> 
> In this case OS might actually decide to kill the sender to free some
> memory.  Anyway, It could take a lot of time for such a server to catch
> up with the rest of the cluster if it has so much data to receive and
> process.
> 
> Introducing backlog thresholds for jsonrpc connections.
> If sending backlog will exceed particular values (500 messages or
> 4GB in size), connection will be dropped and re-created.  This will
> allow to drop all the current backlog and start over increasing
> chances of cluster recovery.
> 
> Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=1888829
> Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
> ---
>  NEWS          |  2 ++
>  lib/jsonrpc.c | 57 ++++++++++++++++++++++++++++++++++++++++++++++++++-
>  lib/jsonrpc.h |  6 ++++++
>  ovsdb/raft.c  |  5 +++++
>  4 files changed, 69 insertions(+), 1 deletion(-)
> 
> diff --git a/NEWS b/NEWS
> index 2860a8e9c..ebdf8758b 100644
> --- a/NEWS
> +++ b/NEWS
> @@ -6,6 +6,8 @@ Post-v2.14.0
>       * New unixctl command 'ovsdb-server/memory-trim-on-compaction on|off'.
>         If turned on, ovsdb-server will try to reclaim all the unused memory
>         after every DB compaction back to OS.  Disabled by default.
> +     * Maximum backlog on RAFT connections limited to 500 messages or 4GB.
> +       Once threshold reached, connection is dropped (and re-established).
>     - DPDK:
>       * Removed support for vhost-user dequeue zero-copy.
>     - The environment variable OVS_UNBOUND_CONF, if set, is now used
> diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
> index ecbc939fe..435824844 100644
> --- a/lib/jsonrpc.c
> +++ b/lib/jsonrpc.c
> @@ -50,6 +50,10 @@ struct jsonrpc {
>      struct ovs_list output;     /* Contains "struct ofpbuf"s. */
>      size_t output_count;        /* Number of elements in "output". */
>      size_t backlog;
> +
> +    /* Limits. */
> +    size_t max_output;          /* 'output_count' disconnection threshold. */
> +    size_t max_backlog;         /* 'backlog' disconnection threshold. */
>  };
>  
>  /* Rate limit for error messages. */
> @@ -178,6 +182,17 @@ jsonrpc_get_backlog(const struct jsonrpc *rpc)
>      return rpc->status ? 0 : rpc->backlog;
>  }
>  
> +/* Sets thresholds for send backlog.  If send backlog contains more than
> + * 'max_n_msgs' messages or larger than 'max_backlog_bytes' bytes, connection
> + * will be dropped. */
> +void
> +jsonrpc_set_backlog_threshold(struct jsonrpc *rpc,
> +                              size_t max_n_msgs, size_t max_backlog_bytes)
> +{
> +    rpc->max_output = max_n_msgs;
> +    rpc->max_backlog = max_backlog_bytes;
> +}
> +
>  /* Returns the number of bytes that have been received on 'rpc''s underlying
>   * stream.  (The value wraps around if it exceeds UINT_MAX.) */
>  unsigned int
> @@ -261,9 +276,26 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
>      rpc->backlog += length;
>  
>      if (rpc->output_count >= 50) {
> -        VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of"
> +        static struct vlog_rate_limit bl_rl = VLOG_RATE_LIMIT_INIT(5, 5);
> +        bool disconnect = false;
> +
> +        VLOG_INFO_RL(&bl_rl, "excessive sending backlog, jsonrpc: %s, num of"
>                       " msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name,
>                       rpc->output_count, rpc->backlog);
> +        if (rpc->max_output && rpc->output_count > rpc->max_output) {
> +            disconnect = true;
> +            VLOG_WARN("sending backlog exceeded maximum number of messages (%"
> +                      PRIuSIZE" > %"PRIuSIZE"), disconnecting, jsonrpc: %s.",
> +                      rpc->output_count, rpc->max_output, rpc->name);
> +        } else if (rpc->max_backlog && rpc->backlog > rpc->max_backlog) {
> +            disconnect = true;
> +            VLOG_WARN("sending backlog exceeded maximum size (%"PRIuSIZE" > %"
> +                      PRIuSIZE" bytes), disconnecting, jsonrpc: %s.",
> +                      rpc->backlog, rpc->max_backlog, rpc->name);
> +        }
> +        if (disconnect) {
> +            jsonrpc_error(rpc, E2BIG);
> +        }
>      }
>  
>      if (rpc->backlog == length) {
> @@ -787,6 +819,10 @@ struct jsonrpc_session {
>      int last_error;
>      unsigned int seqno;
>      uint8_t dscp;
> +
> +    /* Limits for jsonrpc. */
> +    size_t max_n_msgs;
> +    size_t max_backlog_bytes;

Hi Ilya,

If I'm not wrong, we end up using these uninitialized in non-clustered mode.

>  };
>  
>  static void
> @@ -970,6 +1006,8 @@ jsonrpc_session_run(struct jsonrpc_session *s)
>              }
>              reconnect_connected(s->reconnect, time_msec());
>              s->rpc = jsonrpc_open(stream);
> +            jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs,
> +                                                  s->max_backlog_bytes);

Indent is a bit odd here.

However, I don't see anything in the coding guidelines about this and it does
seem more readable to do it your way.  So I don't know :)

Thanks,
Dumitru

>              s->seqno++;
>          } else if (error != EAGAIN) {
>              reconnect_listen_error(s->reconnect, time_msec(), error);
> @@ -1010,6 +1048,8 @@ jsonrpc_session_run(struct jsonrpc_session *s)
>          if (!error) {
>              reconnect_connected(s->reconnect, time_msec());
>              s->rpc = jsonrpc_open(s->stream);
> +            jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs,
> +                                                  s->max_backlog_bytes);
>              s->stream = NULL;
>              s->seqno++;
>          } else if (error != EAGAIN) {
> @@ -1250,3 +1290,18 @@ jsonrpc_session_set_dscp(struct jsonrpc_session *s, uint8_t dscp)
>          jsonrpc_session_force_reconnect(s);
>      }
>  }
> +
> +/* Sets thresholds for send backlog.  If send backlog contains more than
> + * 'max_n_msgs' messages or larger than 'max_backlog_bytes' bytes, connection
> + * will be closed (then reconnected, if that feature is enabled). */
> +void
> +jsonrpc_session_set_backlog_threshold(struct jsonrpc_session *s,
> +                                      size_t max_n_msgs,
> +                                      size_t max_backlog_bytes)
> +{
> +    s->max_n_msgs = max_n_msgs;
> +    s->max_backlog_bytes = max_backlog_bytes;
> +    if (s->rpc) {
> +        jsonrpc_set_backlog_threshold(s->rpc, max_n_msgs, max_backlog_bytes);
> +    }
> +}
> diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h
> index a44114e8d..d75d66b86 100644
> --- a/lib/jsonrpc.h
> +++ b/lib/jsonrpc.h
> @@ -51,6 +51,9 @@ void jsonrpc_wait(struct jsonrpc *);
>  
>  int jsonrpc_get_status(const struct jsonrpc *);
>  size_t jsonrpc_get_backlog(const struct jsonrpc *);
> +void jsonrpc_set_backlog_threshold(struct jsonrpc *, size_t max_n_msgs,
> +                                                     size_t max_backlog_bytes);
> +
>  unsigned int jsonrpc_get_received_bytes(const struct jsonrpc *);
>  const char *jsonrpc_get_name(const struct jsonrpc *);
>  
> @@ -140,6 +143,9 @@ void jsonrpc_session_set_probe_interval(struct jsonrpc_session *,
>                                          int probe_interval);
>  void jsonrpc_session_set_dscp(struct jsonrpc_session *,
>                                uint8_t dscp);
> +void jsonrpc_session_set_backlog_threshold(struct jsonrpc_session *,
> +                                           size_t max_n_msgs,
> +                                           size_t max_backlog_bytes);
>  const char *jsonrpc_session_get_id(const struct jsonrpc_session *);
>  
>  #endif /* jsonrpc.h */
> diff --git a/ovsdb/raft.c b/ovsdb/raft.c
> index ac85c6b67..92a099896 100644
> --- a/ovsdb/raft.c
> +++ b/ovsdb/raft.c
> @@ -925,6 +925,9 @@ raft_reset_ping_timer(struct raft *raft)
>      raft->ping_timeout = time_msec() + raft->election_timer / 3;
>  }
>  
> +#define RAFT_MAX_BACKLOG_N_MSGS    500
> +#define RAFT_MAX_BACKLOG_BYTES     UINT32_MAX
> +
>  static void
>  raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
>                const struct uuid *sid, bool incoming)
> @@ -940,6 +943,8 @@ raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
>      conn->incoming = incoming;
>      conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
>      jsonrpc_session_set_probe_interval(js, 0);
> +    jsonrpc_session_set_backlog_threshold(js, RAFT_MAX_BACKLOG_N_MSGS,
> +                                              RAFT_MAX_BACKLOG_BYTES);
>  }
>  
>  /* Starts the local server in an existing Raft cluster, using the local copy of
>
Ilya Maximets Nov. 3, 2020, 3:51 p.m. UTC | #2
On 10/28/20 11:49 AM, Dumitru Ceara wrote:
> On 10/26/20 2:42 AM, Ilya Maximets wrote:
>> RAFT messages could be fairly big.  If something abnormal happens to
>> one of the servers in a cluster it may not be able to process all the
>> incoming messages in a timely manner.  This results in jsonrpc backlog
>> growth on the sender's side.  For example if follower gets many new
>> clients at once that it needs to serve, or it decides to take a
>> snapshot in a period of high number of database changes.
>> If backlog grows large enough it becomes harder and harder for follower
>> to process incoming raft messages, it sends outdated replies and
>> starts receiving snapshots and the whole raft log from the leader.
>> Sometimes backlog grows too high (60GB in this example):
>>
>>       jsonrpc|INFO|excessive sending backlog, jsonrpc: ssl:<ip>,
>>                    num of msgs: 15370, backlog: 61731060773.
>>
>> In this case OS might actually decide to kill the sender to free some
>> memory.  Anyway, It could take a lot of time for such a server to catch
>> up with the rest of the cluster if it has so much data to receive and
>> process.
>>
>> Introducing backlog thresholds for jsonrpc connections.
>> If sending backlog will exceed particular values (500 messages or
>> 4GB in size), connection will be dropped and re-created.  This will
>> allow to drop all the current backlog and start over increasing
>> chances of cluster recovery.
>>
>> Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=1888829
>> Signed-off-by: Ilya Maximets <i.maximets@ovn.org>
>> ---
>>  NEWS          |  2 ++
>>  lib/jsonrpc.c | 57 ++++++++++++++++++++++++++++++++++++++++++++++++++-
>>  lib/jsonrpc.h |  6 ++++++
>>  ovsdb/raft.c  |  5 +++++
>>  4 files changed, 69 insertions(+), 1 deletion(-)
>>
>> diff --git a/NEWS b/NEWS
>> index 2860a8e9c..ebdf8758b 100644
>> --- a/NEWS
>> +++ b/NEWS
>> @@ -6,6 +6,8 @@ Post-v2.14.0
>>       * New unixctl command 'ovsdb-server/memory-trim-on-compaction on|off'.
>>         If turned on, ovsdb-server will try to reclaim all the unused memory
>>         after every DB compaction back to OS.  Disabled by default.
>> +     * Maximum backlog on RAFT connections limited to 500 messages or 4GB.
>> +       Once threshold reached, connection is dropped (and re-established).
>>     - DPDK:
>>       * Removed support for vhost-user dequeue zero-copy.
>>     - The environment variable OVS_UNBOUND_CONF, if set, is now used
>> diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
>> index ecbc939fe..435824844 100644
>> --- a/lib/jsonrpc.c
>> +++ b/lib/jsonrpc.c
>> @@ -50,6 +50,10 @@ struct jsonrpc {
>>      struct ovs_list output;     /* Contains "struct ofpbuf"s. */
>>      size_t output_count;        /* Number of elements in "output". */
>>      size_t backlog;
>> +
>> +    /* Limits. */
>> +    size_t max_output;          /* 'output_count' disconnection threshold. */
>> +    size_t max_backlog;         /* 'backlog' disconnection threshold. */
>>  };
>>  
>>  /* Rate limit for error messages. */
>> @@ -178,6 +182,17 @@ jsonrpc_get_backlog(const struct jsonrpc *rpc)
>>      return rpc->status ? 0 : rpc->backlog;
>>  }
>>  
>> +/* Sets thresholds for send backlog.  If send backlog contains more than
>> + * 'max_n_msgs' messages or larger than 'max_backlog_bytes' bytes, connection
>> + * will be dropped. */
>> +void
>> +jsonrpc_set_backlog_threshold(struct jsonrpc *rpc,
>> +                              size_t max_n_msgs, size_t max_backlog_bytes)
>> +{
>> +    rpc->max_output = max_n_msgs;
>> +    rpc->max_backlog = max_backlog_bytes;
>> +}
>> +
>>  /* Returns the number of bytes that have been received on 'rpc''s underlying
>>   * stream.  (The value wraps around if it exceeds UINT_MAX.) */
>>  unsigned int
>> @@ -261,9 +276,26 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
>>      rpc->backlog += length;
>>  
>>      if (rpc->output_count >= 50) {
>> -        VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of"
>> +        static struct vlog_rate_limit bl_rl = VLOG_RATE_LIMIT_INIT(5, 5);
>> +        bool disconnect = false;
>> +
>> +        VLOG_INFO_RL(&bl_rl, "excessive sending backlog, jsonrpc: %s, num of"
>>                       " msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name,
>>                       rpc->output_count, rpc->backlog);
>> +        if (rpc->max_output && rpc->output_count > rpc->max_output) {
>> +            disconnect = true;
>> +            VLOG_WARN("sending backlog exceeded maximum number of messages (%"
>> +                      PRIuSIZE" > %"PRIuSIZE"), disconnecting, jsonrpc: %s.",
>> +                      rpc->output_count, rpc->max_output, rpc->name);
>> +        } else if (rpc->max_backlog && rpc->backlog > rpc->max_backlog) {
>> +            disconnect = true;
>> +            VLOG_WARN("sending backlog exceeded maximum size (%"PRIuSIZE" > %"
>> +                      PRIuSIZE" bytes), disconnecting, jsonrpc: %s.",
>> +                      rpc->backlog, rpc->max_backlog, rpc->name);
>> +        }
>> +        if (disconnect) {
>> +            jsonrpc_error(rpc, E2BIG);
>> +        }
>>      }
>>  
>>      if (rpc->backlog == length) {
>> @@ -787,6 +819,10 @@ struct jsonrpc_session {
>>      int last_error;
>>      unsigned int seqno;
>>      uint8_t dscp;
>> +
>> +    /* Limits for jsonrpc. */
>> +    size_t max_n_msgs;
>> +    size_t max_backlog_bytes;
> 
> Hi Ilya,
> 
> If I'm not wrong, we end up using these uninitialized in non-clustered mode.

Good catch!  I missed that we're using xmalloc and not xzalloc for sessions.
Anyway, there is another point that we need to clean up whatever limits
configured on jsonrpc when it becomes subordinate of jsonrpc_session.  So,
I'll add jsonrpc_session_set_backlog_threshold(s, 0, 0) to both variants of
jsornrpc_session_open*().

I'll send v2 shortly.

> 
>>  };
>>  
>>  static void
>> @@ -970,6 +1006,8 @@ jsonrpc_session_run(struct jsonrpc_session *s)
>>              }
>>              reconnect_connected(s->reconnect, time_msec());
>>              s->rpc = jsonrpc_open(stream);
>> +            jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs,
>> +                                                  s->max_backlog_bytes);
> 
> Indent is a bit odd here.
> 
> However, I don't see anything in the coding guidelines about this and it does
> seem more readable to do it your way.  So I don't know :)

It just looks better, IMHO. :)

> 
> Thanks,
> Dumitru
> 
>>              s->seqno++;
>>          } else if (error != EAGAIN) {
>>              reconnect_listen_error(s->reconnect, time_msec(), error);
>> @@ -1010,6 +1048,8 @@ jsonrpc_session_run(struct jsonrpc_session *s)
>>          if (!error) {
>>              reconnect_connected(s->reconnect, time_msec());
>>              s->rpc = jsonrpc_open(s->stream);
>> +            jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs,
>> +                                                  s->max_backlog_bytes);
>>              s->stream = NULL;
>>              s->seqno++;
>>          } else if (error != EAGAIN) {
>> @@ -1250,3 +1290,18 @@ jsonrpc_session_set_dscp(struct jsonrpc_session *s, uint8_t dscp)
>>          jsonrpc_session_force_reconnect(s);
>>      }
>>  }
>> +
>> +/* Sets thresholds for send backlog.  If send backlog contains more than
>> + * 'max_n_msgs' messages or larger than 'max_backlog_bytes' bytes, connection
>> + * will be closed (then reconnected, if that feature is enabled). */
>> +void
>> +jsonrpc_session_set_backlog_threshold(struct jsonrpc_session *s,
>> +                                      size_t max_n_msgs,
>> +                                      size_t max_backlog_bytes)
>> +{
>> +    s->max_n_msgs = max_n_msgs;
>> +    s->max_backlog_bytes = max_backlog_bytes;
>> +    if (s->rpc) {
>> +        jsonrpc_set_backlog_threshold(s->rpc, max_n_msgs, max_backlog_bytes);
>> +    }
>> +}
>> diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h
>> index a44114e8d..d75d66b86 100644
>> --- a/lib/jsonrpc.h
>> +++ b/lib/jsonrpc.h
>> @@ -51,6 +51,9 @@ void jsonrpc_wait(struct jsonrpc *);
>>  
>>  int jsonrpc_get_status(const struct jsonrpc *);
>>  size_t jsonrpc_get_backlog(const struct jsonrpc *);
>> +void jsonrpc_set_backlog_threshold(struct jsonrpc *, size_t max_n_msgs,
>> +                                                     size_t max_backlog_bytes);
>> +
>>  unsigned int jsonrpc_get_received_bytes(const struct jsonrpc *);
>>  const char *jsonrpc_get_name(const struct jsonrpc *);
>>  
>> @@ -140,6 +143,9 @@ void jsonrpc_session_set_probe_interval(struct jsonrpc_session *,
>>                                          int probe_interval);
>>  void jsonrpc_session_set_dscp(struct jsonrpc_session *,
>>                                uint8_t dscp);
>> +void jsonrpc_session_set_backlog_threshold(struct jsonrpc_session *,
>> +                                           size_t max_n_msgs,
>> +                                           size_t max_backlog_bytes);
>>  const char *jsonrpc_session_get_id(const struct jsonrpc_session *);
>>  
>>  #endif /* jsonrpc.h */
>> diff --git a/ovsdb/raft.c b/ovsdb/raft.c
>> index ac85c6b67..92a099896 100644
>> --- a/ovsdb/raft.c
>> +++ b/ovsdb/raft.c
>> @@ -925,6 +925,9 @@ raft_reset_ping_timer(struct raft *raft)
>>      raft->ping_timeout = time_msec() + raft->election_timer / 3;
>>  }
>>  
>> +#define RAFT_MAX_BACKLOG_N_MSGS    500
>> +#define RAFT_MAX_BACKLOG_BYTES     UINT32_MAX
>> +
>>  static void
>>  raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
>>                const struct uuid *sid, bool incoming)
>> @@ -940,6 +943,8 @@ raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
>>      conn->incoming = incoming;
>>      conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
>>      jsonrpc_session_set_probe_interval(js, 0);
>> +    jsonrpc_session_set_backlog_threshold(js, RAFT_MAX_BACKLOG_N_MSGS,
>> +                                              RAFT_MAX_BACKLOG_BYTES);
>>  }
>>  
>>  /* Starts the local server in an existing Raft cluster, using the local copy of
>>
>
diff mbox series

Patch

diff --git a/NEWS b/NEWS
index 2860a8e9c..ebdf8758b 100644
--- a/NEWS
+++ b/NEWS
@@ -6,6 +6,8 @@  Post-v2.14.0
      * New unixctl command 'ovsdb-server/memory-trim-on-compaction on|off'.
        If turned on, ovsdb-server will try to reclaim all the unused memory
        after every DB compaction back to OS.  Disabled by default.
+     * Maximum backlog on RAFT connections limited to 500 messages or 4GB.
+       Once threshold reached, connection is dropped (and re-established).
    - DPDK:
      * Removed support for vhost-user dequeue zero-copy.
    - The environment variable OVS_UNBOUND_CONF, if set, is now used
diff --git a/lib/jsonrpc.c b/lib/jsonrpc.c
index ecbc939fe..435824844 100644
--- a/lib/jsonrpc.c
+++ b/lib/jsonrpc.c
@@ -50,6 +50,10 @@  struct jsonrpc {
     struct ovs_list output;     /* Contains "struct ofpbuf"s. */
     size_t output_count;        /* Number of elements in "output". */
     size_t backlog;
+
+    /* Limits. */
+    size_t max_output;          /* 'output_count' disconnection threshold. */
+    size_t max_backlog;         /* 'backlog' disconnection threshold. */
 };
 
 /* Rate limit for error messages. */
@@ -178,6 +182,17 @@  jsonrpc_get_backlog(const struct jsonrpc *rpc)
     return rpc->status ? 0 : rpc->backlog;
 }
 
+/* Sets thresholds for send backlog.  If send backlog contains more than
+ * 'max_n_msgs' messages or larger than 'max_backlog_bytes' bytes, connection
+ * will be dropped. */
+void
+jsonrpc_set_backlog_threshold(struct jsonrpc *rpc,
+                              size_t max_n_msgs, size_t max_backlog_bytes)
+{
+    rpc->max_output = max_n_msgs;
+    rpc->max_backlog = max_backlog_bytes;
+}
+
 /* Returns the number of bytes that have been received on 'rpc''s underlying
  * stream.  (The value wraps around if it exceeds UINT_MAX.) */
 unsigned int
@@ -261,9 +276,26 @@  jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
     rpc->backlog += length;
 
     if (rpc->output_count >= 50) {
-        VLOG_INFO_RL(&rl, "excessive sending backlog, jsonrpc: %s, num of"
+        static struct vlog_rate_limit bl_rl = VLOG_RATE_LIMIT_INIT(5, 5);
+        bool disconnect = false;
+
+        VLOG_INFO_RL(&bl_rl, "excessive sending backlog, jsonrpc: %s, num of"
                      " msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name,
                      rpc->output_count, rpc->backlog);
+        if (rpc->max_output && rpc->output_count > rpc->max_output) {
+            disconnect = true;
+            VLOG_WARN("sending backlog exceeded maximum number of messages (%"
+                      PRIuSIZE" > %"PRIuSIZE"), disconnecting, jsonrpc: %s.",
+                      rpc->output_count, rpc->max_output, rpc->name);
+        } else if (rpc->max_backlog && rpc->backlog > rpc->max_backlog) {
+            disconnect = true;
+            VLOG_WARN("sending backlog exceeded maximum size (%"PRIuSIZE" > %"
+                      PRIuSIZE" bytes), disconnecting, jsonrpc: %s.",
+                      rpc->backlog, rpc->max_backlog, rpc->name);
+        }
+        if (disconnect) {
+            jsonrpc_error(rpc, E2BIG);
+        }
     }
 
     if (rpc->backlog == length) {
@@ -787,6 +819,10 @@  struct jsonrpc_session {
     int last_error;
     unsigned int seqno;
     uint8_t dscp;
+
+    /* Limits for jsonrpc. */
+    size_t max_n_msgs;
+    size_t max_backlog_bytes;
 };
 
 static void
@@ -970,6 +1006,8 @@  jsonrpc_session_run(struct jsonrpc_session *s)
             }
             reconnect_connected(s->reconnect, time_msec());
             s->rpc = jsonrpc_open(stream);
+            jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs,
+                                                  s->max_backlog_bytes);
             s->seqno++;
         } else if (error != EAGAIN) {
             reconnect_listen_error(s->reconnect, time_msec(), error);
@@ -1010,6 +1048,8 @@  jsonrpc_session_run(struct jsonrpc_session *s)
         if (!error) {
             reconnect_connected(s->reconnect, time_msec());
             s->rpc = jsonrpc_open(s->stream);
+            jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs,
+                                                  s->max_backlog_bytes);
             s->stream = NULL;
             s->seqno++;
         } else if (error != EAGAIN) {
@@ -1250,3 +1290,18 @@  jsonrpc_session_set_dscp(struct jsonrpc_session *s, uint8_t dscp)
         jsonrpc_session_force_reconnect(s);
     }
 }
+
+/* Sets thresholds for send backlog.  If send backlog contains more than
+ * 'max_n_msgs' messages or larger than 'max_backlog_bytes' bytes, connection
+ * will be closed (then reconnected, if that feature is enabled). */
+void
+jsonrpc_session_set_backlog_threshold(struct jsonrpc_session *s,
+                                      size_t max_n_msgs,
+                                      size_t max_backlog_bytes)
+{
+    s->max_n_msgs = max_n_msgs;
+    s->max_backlog_bytes = max_backlog_bytes;
+    if (s->rpc) {
+        jsonrpc_set_backlog_threshold(s->rpc, max_n_msgs, max_backlog_bytes);
+    }
+}
diff --git a/lib/jsonrpc.h b/lib/jsonrpc.h
index a44114e8d..d75d66b86 100644
--- a/lib/jsonrpc.h
+++ b/lib/jsonrpc.h
@@ -51,6 +51,9 @@  void jsonrpc_wait(struct jsonrpc *);
 
 int jsonrpc_get_status(const struct jsonrpc *);
 size_t jsonrpc_get_backlog(const struct jsonrpc *);
+void jsonrpc_set_backlog_threshold(struct jsonrpc *, size_t max_n_msgs,
+                                                     size_t max_backlog_bytes);
+
 unsigned int jsonrpc_get_received_bytes(const struct jsonrpc *);
 const char *jsonrpc_get_name(const struct jsonrpc *);
 
@@ -140,6 +143,9 @@  void jsonrpc_session_set_probe_interval(struct jsonrpc_session *,
                                         int probe_interval);
 void jsonrpc_session_set_dscp(struct jsonrpc_session *,
                               uint8_t dscp);
+void jsonrpc_session_set_backlog_threshold(struct jsonrpc_session *,
+                                           size_t max_n_msgs,
+                                           size_t max_backlog_bytes);
 const char *jsonrpc_session_get_id(const struct jsonrpc_session *);
 
 #endif /* jsonrpc.h */
diff --git a/ovsdb/raft.c b/ovsdb/raft.c
index ac85c6b67..92a099896 100644
--- a/ovsdb/raft.c
+++ b/ovsdb/raft.c
@@ -925,6 +925,9 @@  raft_reset_ping_timer(struct raft *raft)
     raft->ping_timeout = time_msec() + raft->election_timer / 3;
 }
 
+#define RAFT_MAX_BACKLOG_N_MSGS    500
+#define RAFT_MAX_BACKLOG_BYTES     UINT32_MAX
+
 static void
 raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
               const struct uuid *sid, bool incoming)
@@ -940,6 +943,8 @@  raft_add_conn(struct raft *raft, struct jsonrpc_session *js,
     conn->incoming = incoming;
     conn->js_seqno = jsonrpc_session_get_seqno(conn->js);
     jsonrpc_session_set_probe_interval(js, 0);
+    jsonrpc_session_set_backlog_threshold(js, RAFT_MAX_BACKLOG_N_MSGS,
+                                              RAFT_MAX_BACKLOG_BYTES);
 }
 
 /* Starts the local server in an existing Raft cluster, using the local copy of