From patchwork Wed May 22 03:16:47 2013 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Fam Zheng X-Patchwork-Id: 245496 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@bilbo.ozlabs.org Received: from lists.gnu.org (lists.gnu.org [208.118.235.17]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (Client did not present a certificate) by ozlabs.org (Postfix) with ESMTPS id 491902C0099 for ; Wed, 22 May 2013 13:21:57 +1000 (EST) Received: from localhost ([::1]:38057 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1Uezch-0006Mt-FR for incoming@patchwork.ozlabs.org; Tue, 21 May 2013 23:21:55 -0400 Received: from eggs.gnu.org ([208.118.235.92]:34346) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1UezYG-0007Bm-BD for qemu-devel@nongnu.org; Tue, 21 May 2013 23:17:31 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1UezY9-0007nx-Ha for qemu-devel@nongnu.org; Tue, 21 May 2013 23:17:20 -0400 Received: from mx1.redhat.com ([209.132.183.28]:62063) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1UezY9-0007nq-7d for qemu-devel@nongnu.org; Tue, 21 May 2013 23:17:13 -0400 Received: from int-mx02.intmail.prod.int.phx2.redhat.com (int-mx02.intmail.prod.int.phx2.redhat.com [10.5.11.12]) by mx1.redhat.com (8.14.4/8.14.4) with ESMTP id r4M3HCCv016419 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES256-SHA bits=256 verify=OK) for ; Tue, 21 May 2013 23:17:12 -0400 Received: from localhost.nay.redhat.com ([10.66.7.14]) by int-mx02.intmail.prod.int.phx2.redhat.com (8.13.8/8.13.8) with ESMTP id r4M3GqNp024814 for ; Tue, 21 May 2013 23:17:10 -0400 From: Fam Zheng To: qemu-devel@nongnu.org Date: Wed, 22 May 2013 11:16:47 +0800 Message-Id: <1369192610-25003-8-git-send-email-famz@redhat.com> In-Reply-To: <1369192610-25003-1-git-send-email-famz@redhat.com> References: <1369192610-25003-1-git-send-email-famz@redhat.com> X-Scanned-By: MIMEDefang 2.67 on 10.5.11.12 X-detected-operating-system: by eggs.gnu.org: GNU/Linux 3.x X-Received-From: 209.132.183.28 Subject: [Qemu-devel] [PATCH v4 07/10] curl: make use of CURLDataCache. X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.14 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Sender: qemu-devel-bounces+incoming=patchwork.ozlabs.org@nongnu.org Make subsequecial changes to make use of introduced CURLDataCache. Moved acb struct from CURLState to BDRVCURLState, and changed to list. Signed-off-by: Fam Zheng --- block/curl.c | 168 ++++++++++++++++++++++++++++++++--------------------------- 1 file changed, 90 insertions(+), 78 deletions(-) diff --git a/block/curl.c b/block/curl.c index e387ae1..4c4752b 100644 --- a/block/curl.c +++ b/block/curl.c @@ -39,7 +39,6 @@ CURLPROTO_TFTP) #define CURL_NUM_STATES 8 -#define CURL_NUM_ACB 8 #define SECTOR_SIZE 512 #define READ_AHEAD_SIZE (256 * 1024) @@ -52,9 +51,7 @@ typedef struct CURLAIOCB { int64_t sector_num; int nb_sectors; - - size_t start; - size_t end; + QLIST_ENTRY(CURLAIOCB) next; } CURLAIOCB; typedef struct CURLDataCache { @@ -70,14 +67,10 @@ typedef struct CURLDataCache { typedef struct CURLState { struct BDRVCURLState *s; - CURLAIOCB *acb[CURL_NUM_ACB]; CURL *curl; - char *orig_buf; - size_t buf_start; - size_t buf_off; - size_t buf_len; char range[128]; char errmsg[CURL_ERROR_SIZE]; + CURLDataCache *cache; char in_use; } CURLState; @@ -92,6 +85,7 @@ typedef struct BDRVCURLState { CURLM *multi; size_t len; CURLState states[CURL_NUM_STATES]; + QLIST_HEAD(, CURLAIOCB) acbs; QLIST_HEAD(, CURLSockInfo) socks; char *url; size_t readahead_size; @@ -221,31 +215,35 @@ static void curl_complete_io(BDRVCURLState *bs, CURLAIOCB *acb, static size_t curl_read_cb(void *ptr, size_t size, size_t nmemb, void *opaque) { - CURLState *s = ((CURLState*)opaque); + CURLState *s = (CURLState *)opaque; + CURLDataCache *c = s->cache; size_t realsize = size * nmemb; - int i; - - DPRINTF("CURL: Just reading %zd bytes\n", realsize); + CURLAIOCB *acb; - if (!s || !s->orig_buf) + if (!c || !c->data) { goto read_end; + } + if (c->write_pos >= c->data_len) { + goto read_end; + } + memcpy(c->data + c->write_pos, ptr, + MIN(realsize, c->data_len - c->write_pos)); + c->write_pos += realsize; + if (c->write_pos >= c->data_len) { + c->write_pos = c->data_len; + } - memcpy(s->orig_buf + s->buf_off, ptr, realsize); - s->buf_off += realsize; - - for(i=0; iacb[i]; - - if (!acb) - continue; - - if ((s->buf_off >= acb->end)) { - qemu_iovec_from_buf(acb->qiov, 0, s->orig_buf + acb->start, - acb->end - acb->start); - acb->common.cb(acb->common.opaque, 0); - qemu_aio_release(acb); - s->acb[i] = NULL; + acb = QLIST_FIRST(&s->s->acbs); + while (acb) { + size_t aio_base = acb->sector_num * SECTOR_SIZE; + size_t aio_len = acb->nb_sectors * SECTOR_SIZE; + CURLAIOCB *next = QLIST_NEXT(acb, next); + if (aio_base >= c->base_pos && + aio_base + aio_len <= c->base_pos + c->write_pos) { + QLIST_REMOVE(acb, next); + curl_complete_io(s->s, acb, c); } + acb = next; } read_end: @@ -275,10 +273,12 @@ static void curl_fd_handler(void *arg) CURLMsg *msg; msg = curl_multi_info_read(s->multi, &msgs_in_queue); - if (!msg) + if (!msg) { break; - if (msg->msg == CURLMSG_NONE) + } + if (msg->msg == CURLMSG_NONE) { break; + } switch (msg->msg) { case CURLMSG_DONE: @@ -288,19 +288,17 @@ static void curl_fd_handler(void *arg) CURLINFO_PRIVATE, (char **)&state); - /* ACBs for successful messages get completed in curl_read_cb */ + /* ACBs for successful messages get completed in curl_read_cb, + * fail existing acbs for now */ if (msg->data.result != CURLE_OK) { - int i; - for (i = 0; i < CURL_NUM_ACB; i++) { - CURLAIOCB *acb = state->acb[i]; - - if (acb == NULL) { - continue; - } - + CURLAIOCB *acb = QLIST_FIRST(&s->acbs); + while (acb) { + CURLAIOCB *next = QLIST_NEXT(acb, next); + DPRINTF("EIO, %s\n", state->errmsg); acb->common.cb(acb->common.opaque, -EIO); + QLIST_REMOVE(acb, next); qemu_aio_release(acb); - state->acb[i] = NULL; + acb = next; } } @@ -317,13 +315,10 @@ static void curl_fd_handler(void *arg) static CURLState *curl_init_state(BDRVCURLState *s) { CURLState *state = NULL; - int i, j; + int i; do { for (i=0; istates[i].acb[j]) - continue; if (s->states[i].in_use) continue; @@ -380,6 +375,10 @@ static void curl_clean_state(CURLState *s) if (s->s->multi) curl_multi_remove_handle(s->s->multi, s->curl); s->in_use = 0; + if (s->cache) { + s->cache->use_count--; + assert(s->cache->use_count >= 0); + } } static void curl_parse_filename(const char *filename, QDict *options, @@ -483,6 +482,7 @@ static int curl_open(BlockDriverState *bs, QDict *options, int flags) QLIST_INIT(&s->socks); QLIST_INIT(&s->cache); + QLIST_INIT(&s->acbs); DPRINTF("CURL: Opening %s\n", file); s->url = g_strdup(file); @@ -551,14 +551,8 @@ out_noclean: static int curl_aio_flush(void *opaque) { BDRVCURLState *s = opaque; - int i, j; - - for (i=0; i < CURL_NUM_STATES; i++) { - for(j=0; j < CURL_NUM_ACB; j++) { - if (s->states[i].acb[j]) { - return 1; - } - } + if (!QLIST_EMPTY(&s->acbs)) { + return 1; } return 0; } @@ -580,6 +574,7 @@ static void curl_readv_bh_cb(void *p) CURLDataCache *cache = NULL; CURLAIOCB *acb = p; BDRVCURLState *s = acb->common.bs->opaque; + int running; size_t aio_base, aio_bytes; qemu_bh_delete(acb->bh); @@ -588,8 +583,9 @@ static void curl_readv_bh_cb(void *p) aio_base = acb->sector_num * SECTOR_SIZE; aio_bytes = acb->nb_sectors * SECTOR_SIZE; - size_t start = acb->sector_num * SECTOR_SIZE; - size_t end; + if (aio_base + aio_bytes > s->len) { + goto err_release; + } cache = curl_find_cache(s, aio_base, aio_bytes); if (cache) { @@ -600,29 +596,41 @@ static void curl_readv_bh_cb(void *p) // No cache found, so let's start a new request state = curl_init_state(s); if (!state) { - acb->common.cb(acb->common.opaque, -EIO); - qemu_aio_release(acb); - return; + goto err_release; } - acb->start = 0; - acb->end = (acb->nb_sectors * SECTOR_SIZE); - - state->buf_off = 0; - if (state->orig_buf) - g_free(state->orig_buf); - state->buf_start = start; - state->buf_len = acb->end + s->readahead_size; - end = MIN(start + state->buf_len, s->len) - 1; - state->orig_buf = g_malloc(state->buf_len); - state->acb[0] = acb; - - snprintf(state->range, sizeof(state->range) - 1, "%zd-%zd", start, end); - DPRINTF("CURL (AIO): Reading %d at %zd (%s)\n", - (acb->nb_sectors * SECTOR_SIZE), start, state->range); - curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range); + cache = g_malloc0(sizeof(CURLDataCache)); + cache->base_pos = acb->sector_num * SECTOR_SIZE; + cache->data_len = aio_bytes + s->readahead_size; + cache->write_pos = 0; + cache->data = g_malloc(cache->data_len); + QLIST_INSERT_HEAD(&s->acbs, acb, next); + snprintf(state->range, sizeof(state->range) - 1, "%zd-%zd", cache->base_pos, + cache->base_pos + cache->data_len); + DPRINTF("Reading range: %s\n", state->range); + curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range); + QLIST_INSERT_HEAD(&s->cache, cache, next); + state->cache = cache; + cache->use_count++; curl_multi_add_handle(s->multi, state->curl); + /* kick off curl to start the action */ + curl_multi_socket_action(s->multi, 0, CURL_SOCKET_TIMEOUT, &running); + return; + +err_release: + if (cache) { + if (cache->data) { + g_free(cache->data); + cache->data = NULL; + } + g_free(cache); + cache = NULL; + } + acb->common.cb(acb->common.opaque, -EIO); + qemu_aio_release(acb); + return; + } @@ -669,14 +677,18 @@ static void curl_close(BlockDriverState *bs) curl_easy_cleanup(s->states[i].curl); s->states[i].curl = NULL; } - if (s->states[i].orig_buf) { - g_free(s->states[i].orig_buf); - s->states[i].orig_buf = NULL; - } } if (s->multi) curl_multi_cleanup(s->multi); + while (!QLIST_EMPTY(&s->acbs)) { + CURLAIOCB *acb = QLIST_FIRST(&s->acbs); + acb->common.cb(acb->common.opaque, -EIO); + QLIST_REMOVE(acb, next); + qemu_aio_release(acb); + acb = NULL; + } + while (!QLIST_EMPTY(&s->cache)) { CURLDataCache *cache = QLIST_FIRST(&s->cache); assert(cache->use_count == 0);