Patchwork [v2,07/11] curl: make use of CURLDataCache.

login
register
mail settings
Submitter Fam Zheng
Date May 14, 2013, 2:26 a.m.
Message ID <1368498390-20738-8-git-send-email-famz@redhat.com>
Download mbox | patch
Permalink /patch/243560/
State New
Headers show

Comments

Fam Zheng - May 14, 2013, 2:26 a.m.
Make subsequecial changes to make use of introduced CURLDataCache. Moved
acb struct from CURLState to BDRVCURLState, and changed to list.

Signed-off-by: Fam Zheng <famz@redhat.com>
---
 block/curl.c | 156 +++++++++++++++++++++++++++++------------------------------
 1 file changed, 78 insertions(+), 78 deletions(-)

Patch

diff --git a/block/curl.c b/block/curl.c
index 0c4d865..97642e3 100644
--- a/block/curl.c
+++ b/block/curl.c
@@ -39,7 +39,6 @@ 
                    CURLPROTO_TFTP)
 
 #define CURL_NUM_STATES 8
-#define CURL_NUM_ACB    8
 #define SECTOR_SIZE     512
 #define READ_AHEAD_SIZE (256 * 1024)
 
@@ -52,9 +51,7 @@  typedef struct CURLAIOCB {
 
     int64_t sector_num;
     int nb_sectors;
-
-    size_t start;
-    size_t end;
+    QLIST_ENTRY(CURLAIOCB) next;
 } CURLAIOCB;
 
 typedef struct CURLDataCache {
@@ -70,15 +67,11 @@  typedef struct CURLDataCache {
 typedef struct CURLState
 {
     struct BDRVCURLState *s;
-    CURLAIOCB *acb[CURL_NUM_ACB];
     CURL *curl;
-    char *orig_buf;
-    size_t buf_start;
-    size_t buf_off;
-    size_t buf_len;
 #define CURL_RANGE_SIZE 128
     char range[CURL_RANGE_SIZE];
     char errmsg[CURL_ERROR_SIZE];
+    CURLDataCache *cache;
     char in_use;
 } CURLState;
 
@@ -93,6 +86,7 @@  typedef struct BDRVCURLState {
     CURLM *multi;
     size_t len;
     CURLState states[CURL_NUM_STATES];
+    QLIST_HEAD(, CURLAIOCB) acbs;
     QLIST_HEAD(, CURLSockInfo) socks;
     char *url;
     size_t readahead_size;
@@ -222,30 +216,31 @@  static void curl_complete_io(BDRVCURLState *bs, CURLAIOCB *acb,
 
 static size_t curl_read_cb(void *ptr, size_t size, size_t nmemb, void *opaque)
 {
-    CURLState *s = ((CURLState*)opaque);
+    CURLState *s = (CURLState *)opaque;
+    CURLDataCache *c = s->cache;
     size_t realsize = size * nmemb;
-    int i;
-
-    DPRINTF("CURL: Just reading %zd bytes\n", realsize);
+    CURLAIOCB *acb;
 
-    if (!s || !s->orig_buf)
+    if (!c || !c->data) {
         goto read_end;
+    }
+    if (c->write_pos >= c->data_len) {
+        goto read_end;
+    }
+    memcpy(c->data + c->write_pos, ptr,
+           MIN(realsize, c->data_len - c->write_pos));
+    c->write_pos += realsize;
+    if (c->write_pos >= c->data_len) {
+        c->write_pos = c->data_len;
+    }
 
-    memcpy(s->orig_buf + s->buf_off, ptr, realsize);
-    s->buf_off += realsize;
-
-    for(i=0; i<CURL_NUM_ACB; i++) {
-        CURLAIOCB *acb = s->acb[i];
-
-        if (!acb)
-            continue;
-
-        if ((s->buf_off >= acb->end)) {
-            qemu_iovec_from_buf(acb->qiov, 0, s->orig_buf + acb->start,
-                                acb->end - acb->start);
-            acb->common.cb(acb->common.opaque, 0);
-            qemu_aio_release(acb);
-            s->acb[i] = NULL;
+    QLIST_FOREACH(acb, &s->s->acbs, next) {
+        size_t aio_base = acb->sector_num * SECTOR_SIZE;
+        size_t aio_len = acb->nb_sectors * SECTOR_SIZE;
+        if (aio_base >= c->base_pos &&
+            aio_base + aio_len <= c->base_pos + c->write_pos) {
+            QLIST_REMOVE(acb, next);
+            curl_complete_io(s->s, acb, c);
         }
     }
 
@@ -276,10 +271,12 @@  static void curl_fd_handler(void *arg)
         CURLMsg *msg;
         msg = curl_multi_info_read(s->multi, &msgs_in_queue);
 
-        if (!msg)
+        if (!msg) {
             break;
-        if (msg->msg == CURLMSG_NONE)
+        }
+        if (msg->msg == CURLMSG_NONE) {
             break;
+        }
 
         switch (msg->msg) {
             case CURLMSG_DONE:
@@ -289,19 +286,17 @@  static void curl_fd_handler(void *arg)
                                   CURLINFO_PRIVATE,
                                   (char **)&state);
 
-                /* ACBs for successful messages get completed in curl_read_cb */
+                /* ACBs for successful messages get completed in curl_read_cb,
+                 * fail existing acbs for now */
                 if (msg->data.result != CURLE_OK) {
-                    int i;
-                    for (i = 0; i < CURL_NUM_ACB; i++) {
-                        CURLAIOCB *acb = state->acb[i];
-
-                        if (acb == NULL) {
-                            continue;
-                        }
-
+                    CURLAIOCB *acb = QLIST_FIRST(&s->acbs);
+                    while (acb) {
+                        CURLAIOCB *next = QLIST_NEXT(acb, next);
+                        DPRINTF("EIO, %s\n", state->errmsg);
                         acb->common.cb(acb->common.opaque, -EIO);
+                        QLIST_REMOVE(acb, next);
                         qemu_aio_release(acb);
-                        state->acb[i] = NULL;
+                        acb = next;
                     }
                 }
 
@@ -318,13 +313,10 @@  static void curl_fd_handler(void *arg)
 static CURLState *curl_init_state(BDRVCURLState *s)
 {
     CURLState *state = NULL;
-    int i, j;
+    int i;
 
     do {
         for (i=0; i<CURL_NUM_STATES; i++) {
-            for (j=0; j<CURL_NUM_ACB; j++)
-                if (s->states[i].acb[j])
-                    continue;
             if (s->states[i].in_use)
                 continue;
 
@@ -381,6 +373,10 @@  static void curl_clean_state(CURLState *s)
     if (s->s->multi)
         curl_multi_remove_handle(s->s->multi, s->curl);
     s->in_use = 0;
+    if (s->cache) {
+        s->cache->use_count--;
+        assert(s->cache->use_count >= 0);
+    }
 }
 
 static void curl_parse_filename(const char *filename, QDict *options,
@@ -547,14 +543,8 @@  out_noclean:
 static int curl_aio_flush(void *opaque)
 {
     BDRVCURLState *s = opaque;
-    int i, j;
-
-    for (i=0; i < CURL_NUM_STATES; i++) {
-        for(j=0; j < CURL_NUM_ACB; j++) {
-            if (s->states[i].acb[j]) {
-                return 1;
-            }
-        }
+    if (!QLIST_EMPTY(&s->acbs)) {
+        return 1;
     }
     return 0;
 }
@@ -576,6 +566,7 @@  static void curl_readv_bh_cb(void *p)
     CURLDataCache *cache = NULL;
     CURLAIOCB *acb = p;
     BDRVCURLState *s = acb->common.bs->opaque;
+    int running;
     size_t aio_base, aio_bytes;
 
     qemu_bh_delete(acb->bh);
@@ -584,8 +575,9 @@  static void curl_readv_bh_cb(void *p)
     aio_base = acb->sector_num * SECTOR_SIZE;
     aio_bytes = acb->nb_sectors * SECTOR_SIZE;
 
-    size_t start = acb->sector_num * SECTOR_SIZE;
-    size_t end;
+    if (aio_base + aio_bytes > s->len) {
+        goto err_release;
+    }
 
     cache = curl_find_cache(s, aio_base, aio_bytes);
     if (cache) {
@@ -596,29 +588,41 @@  static void curl_readv_bh_cb(void *p)
     // No cache found, so let's start a new request
     state = curl_init_state(s);
     if (!state) {
-        acb->common.cb(acb->common.opaque, -EIO);
-        qemu_aio_release(acb);
-        return;
+        goto err_release;
     }
 
-    acb->start = 0;
-    acb->end = (acb->nb_sectors * SECTOR_SIZE);
-
-    state->buf_off = 0;
-    if (state->orig_buf)
-        g_free(state->orig_buf);
-    state->buf_start = start;
-    state->buf_len = acb->end + s->readahead_size;
-    end = MIN(start + state->buf_len, s->len) - 1;
-    state->orig_buf = g_malloc(state->buf_len);
-    state->acb[0] = acb;
-
-    snprintf(state->range, CURL_RANGE_SIZE - 1, "%zd-%zd", start, end);
-    DPRINTF("CURL (AIO): Reading %d at %zd (%s)\n",
-            (acb->nb_sectors * SECTOR_SIZE), start, state->range);
-    curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range);
+    cache = g_malloc0(sizeof(CURLDataCache));
+    cache->base_pos = acb->sector_num * SECTOR_SIZE;
+    cache->data_len = aio_bytes + s->readahead_size;
+    cache->write_pos = 0;
+    cache->data = g_malloc(cache->data_len);
 
+    QLIST_INSERT_HEAD(&s->acbs, acb, next);
+    snprintf(state->range, CURL_RANGE_SIZE - 1, "%zd-%zd", cache->base_pos,
+             cache->base_pos + cache->data_len);
+    DPRINTF("Reading range: %s\n", state->range);
+    curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range);
+    QLIST_INSERT_HEAD(&s->cache, cache, next);
+    state->cache = cache;
+    cache->use_count++;
     curl_multi_add_handle(s->multi, state->curl);
+    /* kick off curl to start the action */
+    curl_multi_socket_action(s->multi, 0, CURL_SOCKET_TIMEOUT, &running);
+    return;
+
+err_release:
+    if (cache) {
+        if (cache->data) {
+            g_free(cache->data);
+            cache->data = NULL;
+        }
+        g_free(cache);
+        cache = NULL;
+    }
+    acb->common.cb(acb->common.opaque, -EIO);
+    qemu_aio_release(acb);
+    return;
+
 
 }
 
@@ -658,10 +662,6 @@  static void curl_close(BlockDriverState *bs)
             curl_easy_cleanup(s->states[i].curl);
             s->states[i].curl = NULL;
         }
-        if (s->states[i].orig_buf) {
-            g_free(s->states[i].orig_buf);
-            s->states[i].orig_buf = NULL;
-        }
     }
     if (s->multi)
         curl_multi_cleanup(s->multi);