From patchwork Wed Nov 3 11:35:30 2010 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Michael Roth X-Patchwork-Id: 69982 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.gnu.org (lists.gnu.org [199.232.76.165]) (using TLSv1 with cipher DHE-RSA-AES256-SHA (256/256 bits)) (Client did not present a certificate) by ozlabs.org (Postfix) with ESMTPS id A2495B70D4 for ; Wed, 3 Nov 2010 22:47:52 +1100 (EST) Received: from localhost ([127.0.0.1]:35030 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.43) id 1PDbog-0005FN-Va for incoming@patchwork.ozlabs.org; Wed, 03 Nov 2010 07:47:47 -0400 Received: from [140.186.70.92] (port=48274 helo=eggs.gnu.org) by lists.gnu.org with esmtp (Exim 4.43) id 1PDbdc-0006if-6g for qemu-devel@nongnu.org; Wed, 03 Nov 2010 07:36:23 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1PDbda-0005QT-4L for qemu-devel@nongnu.org; Wed, 03 Nov 2010 07:36:20 -0400 Received: from e2.ny.us.ibm.com ([32.97.182.142]:59385) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1PDbdZ-0005Q6-VJ for qemu-devel@nongnu.org; Wed, 03 Nov 2010 07:36:18 -0400 Received: from d01relay07.pok.ibm.com (d01relay07.pok.ibm.com [9.56.227.147]) by e2.ny.us.ibm.com (8.14.4/8.13.1) with ESMTP id oA3BKQbK001316 for ; Wed, 3 Nov 2010 07:20:26 -0400 Received: from d01av01.pok.ibm.com (d01av01.pok.ibm.com [9.56.224.215]) by d01relay07.pok.ibm.com (8.13.8/8.13.8/NCO v10.0) with ESMTP id oA3BaFoo1617928 for ; Wed, 3 Nov 2010 07:36:15 -0400 Received: from d01av01.pok.ibm.com (loopback [127.0.0.1]) by d01av01.pok.ibm.com (8.14.4/8.13.1/NCO v10.0 AVout) with ESMTP id oA3BaFPA013686 for ; Wed, 3 Nov 2010 07:36:15 -0400 Received: from localhost.localdomain (sig-9-76-99-21.mts.ibm.com [9.76.99.21]) by d01av01.pok.ibm.com (8.14.4/8.13.1/NCO v10.0 AVin) with ESMTP id oA3Ba4ig013140; Wed, 3 Nov 2010 07:36:14 -0400 From: Michael Roth To: qemu-devel@nongnu.org Date: Wed, 3 Nov 2010 06:35:30 -0500 Message-Id: <1288784139-1110-2-git-send-email-mdroth@linux.vnet.ibm.com> X-Mailer: git-send-email 1.7.0.4 In-Reply-To: <1288784139-1110-1-git-send-email-mdroth@linux.vnet.ibm.com> References: <1288784139-1110-1-git-send-email-mdroth@linux.vnet.ibm.com> X-detected-operating-system: by eggs.gnu.org: GNU/Linux 2.6, seldom 2.4 (older, 4) Cc: aliguori@linux.vnet.ibm.com, agl@linux.vnet.ibm.com, mdroth@linux.vnet.ibm.com Subject: [Qemu-devel] [RFC][PATCH v2 01/10] virtagent: add common rpc transport defs X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.5 Precedence: list List-Id: qemu-devel.nongnu.org List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Sender: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Common code for sending/recieving RPCs via http over virtproxy channel. All communication is done via asynchronous read/write handlers and using non-blocking reads/writes Signed-off-by: Michael Roth --- virtagent-common.c | 431 ++++++++++++++++++++++++++++++++++++++++++++++++++++ virtagent-common.h | 73 +++++++++ 2 files changed, 504 insertions(+), 0 deletions(-) create mode 100644 virtagent-common.c create mode 100644 virtagent-common.h diff --git a/virtagent-common.c b/virtagent-common.c new file mode 100644 index 0000000..cc58938 --- /dev/null +++ b/virtagent-common.c @@ -0,0 +1,431 @@ +/* + * virt-agent - common host/guest RPC functions + * + * Copyright IBM Corp. 2010 + * + * Authors: + * Adam Litke + * Michael Roth + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + * + */ + +#include "virtagent-common.h" + +#define VA_READ true +#define VA_SEND false + +enum va_rpc_type { + VA_RPC_REQUEST, + VA_RPC_RESPONSE, +}; + +typedef struct VARPCState { + char hdr[VA_HDR_LEN_MAX]; + int fd; + size_t hdr_len; + size_t hdr_pos; + enum { + VA_READ_START, + VA_READ_HDR, + VA_READ_BODY, + VA_SEND_START, + VA_SEND_HDR, + VA_SEND_BODY, + } state; + enum va_rpc_type rpc_type; + char *content; + size_t content_len; + size_t content_pos; + VARPCData *data; +} VARPCState; + +static void va_rpc_read_handler(void *opaque); +static void va_rpc_send_handler(void *opaque); + +static int end_of_header(char *buf, int end_pos) +{ + return !strncmp(buf+(end_pos-2), "\n\r\n", 3); +} + +static void va_rpc_hdr_init(VARPCState *s) { + const char *preamble; + + TRACE("called"); + /* essentially ignored in the context of virtagent, but might as well */ + if (s->rpc_type == VA_RPC_REQUEST) { + preamble = "POST /RPC2 HTTP/1.1"; + } else if (s->rpc_type == VA_RPC_RESPONSE) { + preamble = "HTTP/1.1 200 OK"; + } else { + s->hdr_len = 0; + return; + } + + s->hdr_len = sprintf(s->hdr, + "%s" EOL + "Content-Type: text/xml" EOL + "Content-Length: %u" EOL EOL, + preamble, + (uint32_t)s->content_len); +} + +static void va_rpc_parse_hdr(VARPCState *s) +{ + int i, line_pos = 0; + char line_buf[4096]; + + for (i = 0; i < VA_HDR_LEN_MAX; ++i) { + if (s->hdr[i] != '\n') { + /* read line */ + line_buf[line_pos++] = s->hdr[i]; + } else { + /* process line */ + if (strncmp(line_buf, "Content-Length: ", 16) == 0) { + s->content_len = atoi(&line_buf[16]); + return; + } + line_pos = 0; + } + } +} + +static VARPCState *va_rpc_state_new(VARPCData *data, int fd, + enum va_rpc_type rpc_type, bool read) +{ + VARPCState *s = qemu_mallocz(sizeof(VARPCState)); + + s->rpc_type = rpc_type; + s->fd = fd; + s->data = data; + if (s->data == NULL) { + goto EXIT_BAD; + } + + if (read) { + s->state = VA_READ_START; + s->content = NULL; + } else { + s->state = VA_SEND_START; + if (rpc_type == VA_RPC_REQUEST) { + s->content = XMLRPC_MEMBLOCK_CONTENTS(char, s->data->send_req_xml); + s->content_len = XMLRPC_MEMBLOCK_SIZE(char, s->data->send_req_xml); + } else if (rpc_type == VA_RPC_RESPONSE) { + s->content = XMLRPC_MEMBLOCK_CONTENTS(char, s->data->send_resp_xml); + s->content_len = XMLRPC_MEMBLOCK_SIZE(char, s->data->send_resp_xml); + } else { + LOG("unknown rcp type"); + goto EXIT_BAD; + } + va_rpc_hdr_init(s); + if (s->hdr_len == 0) { + LOG("failed to initialize http header"); + goto EXIT_BAD; + } + } + + return s; +EXIT_BAD: + qemu_free(s); + return NULL; +} + +/* called by va_rpc_read_handler after reading requests */ +static int va_rpc_send_response(VARPCData *data, int fd) +{ + VARPCState *s = va_rpc_state_new(data, fd, VA_RPC_RESPONSE, VA_SEND); + + TRACE("called"); + if (s == NULL) { + LOG("failed to set up RPC state"); + return -1; + } + TRACE("setting up send handler for RPC request"); + vp_set_fd_handler(fd, NULL, va_rpc_send_handler, s); + + return 0; +} + +static void va_rpc_read_handler_completion(VARPCState *s) { + int ret; + + if (s->rpc_type == VA_RPC_REQUEST) { + /* server read request, call it's cb function then set up + * a send handler for the rpc response if there weren't any + * communication errors + */ + s->data->cb(s->data); + if (s->data->status == VA_RPC_STATUS_OK) { + ret = va_rpc_send_response(s->data, s->fd); + if (ret != 0) { + LOG("error setting up send handler for rpc response"); + } + } else { + LOG("error reading rpc request, skipping response"); + vp_set_fd_handler(s->fd, NULL, NULL, NULL); + closesocket(s->fd); + qemu_free(s->data); + } + } else if (s->rpc_type == VA_RPC_RESPONSE) { + /* client read response, call it's cb function and complete + * the RPC + */ + s->data->cb(s->data); + vp_set_fd_handler(s->fd, NULL, NULL, NULL); + closesocket(s->fd); + qemu_free(s->data); + } else { + LOG("unknown rpc_type"); + } + if (s->content != NULL) { + qemu_free(s->content); + } + qemu_free(s); +} + +static void va_rpc_read_handler(void *opaque) +{ + VARPCState *s = opaque; + int ret; + + TRACE("called with opaque: %p", opaque); + + switch (s->state) { + case VA_READ_START: + s->state = VA_READ_HDR; + case VA_READ_HDR: + while((ret = read(s->fd, s->hdr + s->hdr_pos, 1)) > 0 + && s->hdr_pos < VA_HDR_LEN_MAX) { + s->hdr_pos += ret; + if (end_of_header(s->hdr, s->hdr_pos - 1)) { + break; + } + } + if (ret == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return; + } else { + LOG("error reading connection: %s", strerror(errno)); + goto out_bad; + } + } else if (ret == 0) { + LOG("connected closed unexpectedly"); + goto out_bad; + } else if (s->hdr_pos >= VA_HDR_LEN_MAX) { + LOG("http header too long"); + goto out_bad; + } else { + s->content_len = -1; + va_rpc_parse_hdr(s); + if (s->content_len == -1) { + LOG("malformed http header"); + goto out_bad; + } else if (s->content_len > VA_CONTENT_LEN_MAX) { + LOG("http content length too long"); + goto out_bad; + } + s->content = qemu_mallocz(s->content_len); + s->state = VA_READ_BODY; + } + case VA_READ_BODY: + while(s->content_pos < s->content_len) { + ret = read(s->fd, s->content + s->content_pos, + s->content_len - s->content_pos); + if (ret == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return; + } else { + LOG("error reading connection: %s", strerror(errno)); + goto out_bad; + } + } else if (ret == 0) { + LOG("connection closed unexpectedly:" + " read %u bytes, expected %u bytes", + (unsigned int)s->content_pos, (unsigned int)s->content_len); + goto out_bad; + } + s->content_pos += ret; + } + + if (s->rpc_type == VA_RPC_REQUEST) { + s->data->req_xml = s->content; + s->data->req_xml_len = s->content_len; + } else if (s->rpc_type == VA_RPC_RESPONSE) { + s->data->resp_xml = s->content; + s->data->resp_xml_len = s->content_len; + } + s->data->status = VA_RPC_STATUS_OK; + goto out; + default: + LOG("unknown state"); + goto out_bad; + } + +out_bad: + s->data->status = VA_RPC_STATUS_ERR; +out: + va_rpc_read_handler_completion(s); +} + +/* called by va_rpc_send_handler after sending requests */ +static int va_rpc_read_response(VARPCData *data, int fd) +{ + VARPCState *s = va_rpc_state_new(data, fd, VA_RPC_RESPONSE, VA_READ); + + TRACE("called"); + if (s == NULL) { + LOG("failed to set up RPC state"); + return -1; + } + TRACE("setting up send handler for RPC request"); + vp_set_fd_handler(fd, NULL, va_rpc_read_handler, s); + + return 0; +} + +static void va_rpc_send_handler_completion(VARPCState *s) { + int ret; + + if (s->rpc_type == VA_RPC_REQUEST) { + /* client sent request. free request's memblock, and set up read + * handler for server response if there weren't any communication + * errors + */ + XMLRPC_MEMBLOCK_FREE(char, s->data->send_req_xml); + if (s->data->status == VA_RPC_STATUS_OK) { + ret = va_rpc_read_response(s->data, s->fd); + if (ret != 0) { + LOG("error setting up read handler for rpc response"); + } + } else { + s->data->cb(s->data); + LOG("error sending rpc request, skipping response"); + vp_set_fd_handler(s->fd, NULL, NULL, NULL); + closesocket(s->fd); + qemu_free(s->data); + } + } else if (s->rpc_type == VA_RPC_RESPONSE) { + /* server sent response. call it's cb once more, then free + * response's memblock and complete the RPC + */ + s->data->cb(s->data); + XMLRPC_MEMBLOCK_FREE(char, s->data->send_resp_xml); + vp_set_fd_handler(s->fd, NULL, NULL, NULL); + closesocket(s->fd); + qemu_free(s->data); + } else { + LOG("unknown rpc_type"); + } + qemu_free(s); +} + +static void va_rpc_send_handler(void *opaque) +{ + VARPCState *s = opaque; + int ret; + + TRACE("called with opaque: %p", opaque); + + switch (s->state) { + case VA_SEND_START: + s->state = VA_SEND_HDR; + case VA_SEND_HDR: + do { + ret = write(s->fd, s->hdr + s->hdr_pos, s->hdr_len - s->hdr_pos); + if (ret <= 0) { + break; + } + s->hdr_pos += ret; + } while (s->hdr_pos < s->hdr_len); + if (ret == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return; + } else { + LOG("error reading connection: %s", strerror(errno)); + goto out_bad; + } + } else if (ret == 0) { + LOG("connected closed unexpectedly"); + goto out_bad; + } else { + s->state = VA_SEND_BODY; + } + case VA_SEND_BODY: + do { + ret = write(s->fd, s->content + s->content_pos, + s->content_len - s->content_pos); + if (ret <= 0) { + break; + } + s->content_pos += ret; + } while (s->content_pos < s->content_len); + if (ret == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return; + } else { + LOG("error reading connection: %s", strerror(errno)); + goto out_bad; + } + } else if (ret == 0) { + LOG("connected closed unexpectedly"); + goto out_bad; + } else { + s->data->status = VA_RPC_STATUS_OK; + goto out; + } + default: + LOG("unknown state"); + goto out_bad; + } + +out_bad: + s->data->status = VA_RPC_STATUS_ERR; +out: + va_rpc_send_handler_completion(s); +} + +/* called by rpc client + * one callback to data->cb after response is read. + * data and data->send_req_xml should be allocated by caller, + * callee will de-allocate these after calling data->cb(data) + * + * if non-zero returned however, caller should free data and hanging refs + */ +int va_rpc_send_request(VARPCData *data, int fd) +{ + VARPCState *s = va_rpc_state_new(data, fd, VA_RPC_REQUEST, VA_SEND); + + TRACE("called"); + if (s == NULL) { + LOG("failed to set up RPC state"); + return -1; + } + TRACE("setting up send handler for RPC request"); + vp_set_fd_handler(fd, NULL, va_rpc_send_handler, s); + + return 0; +} + +/* called by rpc server + * one callback to current data->cb after read, one callback after send. + * data should be allocated by caller, data->send_resp_xml should be + * allocated by first data->cb(data) callback, "callee" will de-allocate + * data and data->send_resp_xml after sending rpc response + * + * if non-zero returned however, caller should free data and hanging refs + */ +int va_rpc_read_request(VARPCData *data, int fd) +{ + VARPCState *s = va_rpc_state_new(data, fd, VA_RPC_REQUEST, VA_READ); + + TRACE("called"); + if (s == NULL) { + LOG("failed to set up RPC state"); + return -1; + } + TRACE("setting up read handler for RPC request"); + vp_set_fd_handler(fd, va_rpc_read_handler, NULL, s); + return 0; +} diff --git a/virtagent-common.h b/virtagent-common.h new file mode 100644 index 0000000..6a58bcd --- /dev/null +++ b/virtagent-common.h @@ -0,0 +1,73 @@ +/* + * virt-agent - host/guest RPC client functions + * + * Copyright IBM Corp. 2010 + * + * Authors: + * Adam Litke + * Michael Roth + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + * + */ +#ifndef VIRTAGENT_COMMON_H +#define VIRTAGENT_COMMON_H + +#include +#include +#include +#include "qemu-common.h" +#include "qemu_socket.h" +#include "monitor.h" +#include "virtproxy.h" + +#define DEBUG_VA + +#ifdef DEBUG_VA +#define TRACE(msg, ...) do { \ + fprintf(stderr, "%s:%s():L%d: " msg "\n", \ + __FILE__, __FUNCTION__, __LINE__, ## __VA_ARGS__); \ +} while(0) +#else +#define TRACE(msg, ...) \ + do { } while (0) +#endif + +#define LOG(msg, ...) do { \ + fprintf(stderr, "%s:%s(): " msg "\n", \ + __FILE__, __FUNCTION__, ## __VA_ARGS__); \ +} while(0) + +#define TADDR "127.0.0.1:8080" +#define URL "http://localhost:8080/RPC2" +#define NAME "QEMU virt-agent host client" +#define VERSION "1.0" +#define EOL "\r\n" + +#define VA_RPC_STATUS_OK 0 +#define VA_RPC_STATUS_ERR 1 +#define VA_HDR_LEN_MAX 4096 /* http header limit */ +#define VA_CONTENT_LEN_MAX 2*1024*1024 /* rpc/http send limit */ + +typedef void (VARPCDataCallback)(void *rpc_data); +typedef struct VARPCData { + VARPCDataCallback *cb; + int status; + void *opaque; + /* provided/allocated by caller for sending as memblocks */ + xmlrpc_mem_block *send_req_xml; + xmlrpc_mem_block *send_resp_xml; + /* recieved, and passed to cb func, as char arrays */ + char *req_xml; + int req_xml_len; + char *resp_xml; + int resp_xml_len; + /* for use by QMP functions */ + MonitorCompletion *mon_cb; + void *mon_data; +} VARPCData; + +int va_rpc_send_request(VARPCData *data, int fd); +int va_rpc_read_request(VARPCData *data, int fd); +#endif /* VIRTAGENT_COMMON_H */