@@ -309,6 +309,96 @@ jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
return rpc->status;
}
+/* Receives a message chunk from 'rpc' and feeds it into the JSON parser.
+ *
+ * If successful, returns 0.
+ *
+ * Otherwise, it returns:
+ *
+ * - EAGAIN: No data has been received.
+ *
+ * - EOF: The remote end closed the connection gracefully.
+ *
+ * - Otherwise an errno value that represents an error fatal to the
+ * connection. 'rpc' will not send or receive any more messages.
+ * */
+static int
+jsonrpc_recv_chunk(struct jsonrpc *rpc)
+{
+ size_t n, used;
+
+ /* Fill our input buffer if it's empty. */
+ if (byteq_is_empty(&rpc->input)) {
+ size_t chunk;
+ int retval;
+
+ byteq_fast_forward(&rpc->input);
+ chunk = byteq_headroom(&rpc->input);
+ retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
+ if (retval < 0) {
+ if (retval == -EAGAIN) {
+ return EAGAIN;
+ } else {
+ VLOG_WARN_RL(&rl, "%s: receive error: %s",
+ rpc->name, ovs_strerror(-retval));
+ jsonrpc_error(rpc, -retval);
+ return rpc->status;
+ }
+ } else if (retval == 0) {
+ jsonrpc_error(rpc, EOF);
+ return EOF;
+ }
+ byteq_advance_head(&rpc->input, retval);
+ }
+
+ /* We have some input. Feed it into the JSON parser. */
+ if (!rpc->parser) {
+ rpc->parser = json_parser_create(0);
+ }
+ n = byteq_tailroom(&rpc->input);
+ used = json_parser_feed(rpc->parser,
+ (char *) byteq_tail(&rpc->input), n);
+ byteq_advance_tail(&rpc->input, used);
+
+ return 0;
+}
+
+/* Get the JSON-RPC message from 'rpc' if JSON parsing is complete.
+ *
+ * On success, stores the message in '*msgp' and returns 0. The caller
+ * takes ownership of '*msgp' and must eventually destroy it with
+ * jsonrpc_msg_destroy().
+ *
+ * Otherwise, it returns:
+ *
+ * - ENODATA: JSON parsing is not yet complete and we need to feed more
+ * data into the parser.
+ *
+ * - EPROTO: JSON-RPC protocol violation.
+ */
+static int
+jsonrpc_get_message(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
+{
+ /* If we have complete JSON, attempt to parse it as JSON-RPC. */
+ if (json_parser_is_done(rpc->parser)) {
+ *msgp = jsonrpc_parse_received_message(rpc);
+ if (*msgp) {
+ return 0;
+ }
+
+ if (rpc->status) {
+ const struct byteq *q = &rpc->input;
+ if (q->head <= q->size) {
+ stream_report_content(q->buffer, q->head, STREAM_JSONRPC,
+ &this_module, rpc->name);
+ }
+ return rpc->status;
+ }
+ }
+
+ return ENODATA;
+}
+
/* Attempts to receive a message from 'rpc'.
*
* If successful, stores the received message in '*msgp' and returns 0. The
@@ -336,56 +426,13 @@ jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
}
for (i = 0; i < 50; i++) {
- size_t n, used;
-
- /* Fill our input buffer if it's empty. */
- if (byteq_is_empty(&rpc->input)) {
- size_t chunk;
- int retval;
-
- byteq_fast_forward(&rpc->input);
- chunk = byteq_headroom(&rpc->input);
- retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
- if (retval < 0) {
- if (retval == -EAGAIN) {
- return EAGAIN;
- } else {
- VLOG_WARN_RL(&rl, "%s: receive error: %s",
- rpc->name, ovs_strerror(-retval));
- jsonrpc_error(rpc, -retval);
- return rpc->status;
- }
- } else if (retval == 0) {
- jsonrpc_error(rpc, EOF);
- return EOF;
- }
- byteq_advance_head(&rpc->input, retval);
+ int retval = jsonrpc_recv_chunk(rpc);
+ if (retval) {
+ return retval;
}
-
- /* We have some input. Feed it into the JSON parser. */
- if (!rpc->parser) {
- rpc->parser = json_parser_create(0);
- }
- n = byteq_tailroom(&rpc->input);
- used = json_parser_feed(rpc->parser,
- (char *) byteq_tail(&rpc->input), n);
- byteq_advance_tail(&rpc->input, used);
-
- /* If we have complete JSON, attempt to parse it as JSON-RPC. */
- if (json_parser_is_done(rpc->parser)) {
- *msgp = jsonrpc_parse_received_message(rpc);
- if (*msgp) {
- return 0;
- }
-
- if (rpc->status) {
- const struct byteq *q = &rpc->input;
- if (q->head <= q->size) {
- stream_report_content(q->buffer, q->head, STREAM_JSONRPC,
- &this_module, rpc->name);
- }
- return rpc->status;
- }
+ retval = jsonrpc_get_message(rpc, msgp);
+ if (retval != ENODATA) {
+ return retval;
}
}
Extract two functions from the inner loop of jsonrpc_recv(): - a function that receives message chunks from the RPC layer, and - a function that assembles a complete message if parsing is complete. This way, we can reuse some of the logic for a new set of functions in a later patch. Furthermore, this streamlines the error handling in jsonrpc_recv(), because we can handle the results of the two functions separately. Signed-off-by: Martin Morgenstern <martin.morgenstern@cloudandheat.com> --- lib/jsonrpc.c | 145 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 96 insertions(+), 49 deletions(-)