Message ID | 1369280289-20928-8-git-send-email-famz@redhat.com |
---|---|
State | New |
Headers | show |
On Thu, May 23, 2013 at 11:38:05AM +0800, Fam Zheng wrote: > @@ -221,31 +215,35 @@ static void curl_complete_io(BDRVCURLState *bs, CURLAIOCB *acb, > > static size_t curl_read_cb(void *ptr, size_t size, size_t nmemb, void *opaque) > { > - CURLState *s = ((CURLState*)opaque); > + CURLState *s = (CURLState *)opaque; You can drop the cast completely because opaque is void*. > + CURLDataCache *c = s->cache; > size_t realsize = size * nmemb; > - int i; > - > - DPRINTF("CURL: Just reading %zd bytes\n", realsize); > + CURLAIOCB *acb; > > - if (!s || !s->orig_buf) > + if (!c || !c->data) { > goto read_end; > + } > + if (c->write_pos >= c->data_len) { > + goto read_end; > + } > + memcpy(c->data + c->write_pos, ptr, > + MIN(realsize, c->data_len - c->write_pos)); > + c->write_pos += realsize; > + if (c->write_pos >= c->data_len) { > + c->write_pos = c->data_len; > + } > > - memcpy(s->orig_buf + s->buf_off, ptr, realsize); > - s->buf_off += realsize; Why did you add MIN(realsize, c->data_len - c->write_pos)? The original code trusts realsize to be within s->orig_buf. > - > - for(i=0; i<CURL_NUM_ACB; i++) { > - CURLAIOCB *acb = s->acb[i]; > - > - if (!acb) > - continue; > - > - if ((s->buf_off >= acb->end)) { > - qemu_iovec_from_buf(acb->qiov, 0, s->orig_buf + acb->start, > - acb->end - acb->start); > - acb->common.cb(acb->common.opaque, 0); > - qemu_aio_release(acb); > - s->acb[i] = NULL; > + acb = QLIST_FIRST(&s->s->acbs); > + while (acb) { > + size_t aio_base = acb->sector_num * SECTOR_SIZE; int64_t > @@ -600,29 +596,41 @@ static void curl_readv_bh_cb(void *p) > // No cache found, so let's start a new request > state = curl_init_state(s); > if (!state) { > - acb->common.cb(acb->common.opaque, -EIO); > - qemu_aio_release(acb); > - return; > + goto err_release; > } > > - acb->start = 0; > - acb->end = (acb->nb_sectors * SECTOR_SIZE); > - > - state->buf_off = 0; > - if (state->orig_buf) > - g_free(state->orig_buf); > - state->buf_start = start; > - state->buf_len = acb->end + s->readahead_size; > - end = MIN(start + state->buf_len, s->len) - 1; > - state->orig_buf = g_malloc(state->buf_len); > - state->acb[0] = acb; > - > - snprintf(state->range, sizeof(state->range) - 1, "%zd-%zd", start, end); > - DPRINTF("CURL (AIO): Reading %d at %zd (%s)\n", > - (acb->nb_sectors * SECTOR_SIZE), start, state->range); > - curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range); > + cache = g_malloc0(sizeof(CURLDataCache)); > + cache->base_pos = acb->sector_num * SECTOR_SIZE; > + cache->data_len = aio_bytes + s->readahead_size; > + cache->write_pos = 0; > + cache->data = g_malloc(cache->data_len); > > + QLIST_INSERT_HEAD(&s->acbs, acb, next); > + snprintf(state->range, sizeof(state->range) - 1, "%zd-%zd", cache->base_pos, > + cache->base_pos + cache->data_len); > + DPRINTF("Reading range: %s\n", state->range); > + curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range); > + QLIST_INSERT_HEAD(&s->cache, cache, next); > + state->cache = cache; > + cache->use_count++; I don't see where you bump the use_count when a cache lookup is successful. Maybe I just missed it in the other patches.
On Thu, 05/23 16:23, Stefan Hajnoczi wrote: > On Thu, May 23, 2013 at 11:38:05AM +0800, Fam Zheng wrote: > > @@ -221,31 +215,35 @@ static void curl_complete_io(BDRVCURLState *bs, CURLAIOCB *acb, > > > > static size_t curl_read_cb(void *ptr, size_t size, size_t nmemb, void *opaque) > > { > > - CURLState *s = ((CURLState*)opaque); > > + CURLState *s = (CURLState *)opaque; > > You can drop the cast completely because opaque is void*. > > > + CURLDataCache *c = s->cache; > > size_t realsize = size * nmemb; > > - int i; > > - > > - DPRINTF("CURL: Just reading %zd bytes\n", realsize); > > + CURLAIOCB *acb; > > > > - if (!s || !s->orig_buf) > > + if (!c || !c->data) { > > goto read_end; > > + } > > + if (c->write_pos >= c->data_len) { > > + goto read_end; > > + } > > + memcpy(c->data + c->write_pos, ptr, > > + MIN(realsize, c->data_len - c->write_pos)); > > + c->write_pos += realsize; > > + if (c->write_pos >= c->data_len) { > > + c->write_pos = c->data_len; > > + } > > > > - memcpy(s->orig_buf + s->buf_off, ptr, realsize); > > - s->buf_off += realsize; > > Why did you add MIN(realsize, c->data_len - c->write_pos)? The original > code trusts realsize to be within s->orig_buf. I don't see an evidence why it's safe here. CURL certainly doesn't know how much buffer do we have. (man 3 curl_easy_setopt, section CURLOPT_WRITEFUNCTION) > > > - > > - for(i=0; i<CURL_NUM_ACB; i++) { > > - CURLAIOCB *acb = s->acb[i]; > > - > > - if (!acb) > > - continue; > > - > > - if ((s->buf_off >= acb->end)) { > > - qemu_iovec_from_buf(acb->qiov, 0, s->orig_buf + acb->start, > > - acb->end - acb->start); > > - acb->common.cb(acb->common.opaque, 0); > > - qemu_aio_release(acb); > > - s->acb[i] = NULL; > > + acb = QLIST_FIRST(&s->s->acbs); > > + while (acb) { > > + size_t aio_base = acb->sector_num * SECTOR_SIZE; > > int64_t > > > @@ -600,29 +596,41 @@ static void curl_readv_bh_cb(void *p) > > // No cache found, so let's start a new request > > state = curl_init_state(s); > > if (!state) { > > - acb->common.cb(acb->common.opaque, -EIO); > > - qemu_aio_release(acb); > > - return; > > + goto err_release; > > } > > > > - acb->start = 0; > > - acb->end = (acb->nb_sectors * SECTOR_SIZE); > > - > > - state->buf_off = 0; > > - if (state->orig_buf) > > - g_free(state->orig_buf); > > - state->buf_start = start; > > - state->buf_len = acb->end + s->readahead_size; > > - end = MIN(start + state->buf_len, s->len) - 1; > > - state->orig_buf = g_malloc(state->buf_len); > > - state->acb[0] = acb; > > - > > - snprintf(state->range, sizeof(state->range) - 1, "%zd-%zd", start, end); > > - DPRINTF("CURL (AIO): Reading %d at %zd (%s)\n", > > - (acb->nb_sectors * SECTOR_SIZE), start, state->range); > > - curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range); > > + cache = g_malloc0(sizeof(CURLDataCache)); > > + cache->base_pos = acb->sector_num * SECTOR_SIZE; > > + cache->data_len = aio_bytes + s->readahead_size; > > + cache->write_pos = 0; > > + cache->data = g_malloc(cache->data_len); > > > > + QLIST_INSERT_HEAD(&s->acbs, acb, next); > > + snprintf(state->range, sizeof(state->range) - 1, "%zd-%zd", cache->base_pos, > > + cache->base_pos + cache->data_len); > > + DPRINTF("Reading range: %s\n", state->range); > > + curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range); > > + QLIST_INSERT_HEAD(&s->cache, cache, next); > > + state->cache = cache; > > + cache->use_count++; > > I don't see where you bump the use_count when a cache lookup is > successful. Maybe I just missed it in the other patches. Use count is for serving as the receiving buffer for submitted CURL requests. It's not necessary to bump use_count when cache lookup is successful, since data is immediately copied to guest, no ref to the cache hold.
On Fri, May 24, 2013 at 11:10:26AM +0800, Fam Zheng wrote: > On Thu, 05/23 16:23, Stefan Hajnoczi wrote: > > On Thu, May 23, 2013 at 11:38:05AM +0800, Fam Zheng wrote: > > > + CURLDataCache *c = s->cache; > > > size_t realsize = size * nmemb; > > > - int i; > > > - > > > - DPRINTF("CURL: Just reading %zd bytes\n", realsize); > > > + CURLAIOCB *acb; > > > > > > - if (!s || !s->orig_buf) > > > + if (!c || !c->data) { > > > goto read_end; > > > + } > > > + if (c->write_pos >= c->data_len) { > > > + goto read_end; > > > + } > > > + memcpy(c->data + c->write_pos, ptr, > > > + MIN(realsize, c->data_len - c->write_pos)); > > > + c->write_pos += realsize; > > > + if (c->write_pos >= c->data_len) { > > > + c->write_pos = c->data_len; > > > + } > > > > > > - memcpy(s->orig_buf + s->buf_off, ptr, realsize); > > > - s->buf_off += realsize; > > > > Why did you add MIN(realsize, c->data_len - c->write_pos)? The original > > code trusts realsize to be within s->orig_buf. > > I don't see an evidence why it's safe here. CURL certainly doesn't know > how much buffer do we have. (man 3 curl_easy_setopt, section > CURLOPT_WRITEFUNCTION) The HTTP request included a Range: header so we should know the total number of bytes we'll receive. That said, libcurl may not check so this is defensive programming. A malicious server shouldn't be able to overflow our buffer. A comment or note in the commit description would be nice to explain semantic changes like this. > > > @@ -600,29 +596,41 @@ static void curl_readv_bh_cb(void *p) > > > // No cache found, so let's start a new request > > > state = curl_init_state(s); > > > if (!state) { > > > - acb->common.cb(acb->common.opaque, -EIO); > > > - qemu_aio_release(acb); > > > - return; > > > + goto err_release; > > > } > > > > > > - acb->start = 0; > > > - acb->end = (acb->nb_sectors * SECTOR_SIZE); > > > - > > > - state->buf_off = 0; > > > - if (state->orig_buf) > > > - g_free(state->orig_buf); > > > - state->buf_start = start; > > > - state->buf_len = acb->end + s->readahead_size; > > > - end = MIN(start + state->buf_len, s->len) - 1; > > > - state->orig_buf = g_malloc(state->buf_len); > > > - state->acb[0] = acb; > > > - > > > - snprintf(state->range, sizeof(state->range) - 1, "%zd-%zd", start, end); > > > - DPRINTF("CURL (AIO): Reading %d at %zd (%s)\n", > > > - (acb->nb_sectors * SECTOR_SIZE), start, state->range); > > > - curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range); > > > + cache = g_malloc0(sizeof(CURLDataCache)); > > > + cache->base_pos = acb->sector_num * SECTOR_SIZE; > > > + cache->data_len = aio_bytes + s->readahead_size; > > > + cache->write_pos = 0; > > > + cache->data = g_malloc(cache->data_len); > > > > > > + QLIST_INSERT_HEAD(&s->acbs, acb, next); > > > + snprintf(state->range, sizeof(state->range) - 1, "%zd-%zd", cache->base_pos, > > > + cache->base_pos + cache->data_len); > > > + DPRINTF("Reading range: %s\n", state->range); > > > + curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range); > > > + QLIST_INSERT_HEAD(&s->cache, cache, next); > > > + state->cache = cache; > > > + cache->use_count++; > > > > I don't see where you bump the use_count when a cache lookup is > > successful. Maybe I just missed it in the other patches. > > Use count is for serving as the receiving buffer for submitted CURL > requests. It's not necessary to bump use_count when cache lookup is > successful, since data is immediately copied to guest, no ref to the > cache hold. You're right. Stefan
diff --git a/block/curl.c b/block/curl.c index e387ae1..4c4752b 100644 --- a/block/curl.c +++ b/block/curl.c @@ -39,7 +39,6 @@ CURLPROTO_TFTP) #define CURL_NUM_STATES 8 -#define CURL_NUM_ACB 8 #define SECTOR_SIZE 512 #define READ_AHEAD_SIZE (256 * 1024) @@ -52,9 +51,7 @@ typedef struct CURLAIOCB { int64_t sector_num; int nb_sectors; - - size_t start; - size_t end; + QLIST_ENTRY(CURLAIOCB) next; } CURLAIOCB; typedef struct CURLDataCache { @@ -70,14 +67,10 @@ typedef struct CURLDataCache { typedef struct CURLState { struct BDRVCURLState *s; - CURLAIOCB *acb[CURL_NUM_ACB]; CURL *curl; - char *orig_buf; - size_t buf_start; - size_t buf_off; - size_t buf_len; char range[128]; char errmsg[CURL_ERROR_SIZE]; + CURLDataCache *cache; char in_use; } CURLState; @@ -92,6 +85,7 @@ typedef struct BDRVCURLState { CURLM *multi; size_t len; CURLState states[CURL_NUM_STATES]; + QLIST_HEAD(, CURLAIOCB) acbs; QLIST_HEAD(, CURLSockInfo) socks; char *url; size_t readahead_size; @@ -221,31 +215,35 @@ static void curl_complete_io(BDRVCURLState *bs, CURLAIOCB *acb, static size_t curl_read_cb(void *ptr, size_t size, size_t nmemb, void *opaque) { - CURLState *s = ((CURLState*)opaque); + CURLState *s = (CURLState *)opaque; + CURLDataCache *c = s->cache; size_t realsize = size * nmemb; - int i; - - DPRINTF("CURL: Just reading %zd bytes\n", realsize); + CURLAIOCB *acb; - if (!s || !s->orig_buf) + if (!c || !c->data) { goto read_end; + } + if (c->write_pos >= c->data_len) { + goto read_end; + } + memcpy(c->data + c->write_pos, ptr, + MIN(realsize, c->data_len - c->write_pos)); + c->write_pos += realsize; + if (c->write_pos >= c->data_len) { + c->write_pos = c->data_len; + } - memcpy(s->orig_buf + s->buf_off, ptr, realsize); - s->buf_off += realsize; - - for(i=0; i<CURL_NUM_ACB; i++) { - CURLAIOCB *acb = s->acb[i]; - - if (!acb) - continue; - - if ((s->buf_off >= acb->end)) { - qemu_iovec_from_buf(acb->qiov, 0, s->orig_buf + acb->start, - acb->end - acb->start); - acb->common.cb(acb->common.opaque, 0); - qemu_aio_release(acb); - s->acb[i] = NULL; + acb = QLIST_FIRST(&s->s->acbs); + while (acb) { + size_t aio_base = acb->sector_num * SECTOR_SIZE; + size_t aio_len = acb->nb_sectors * SECTOR_SIZE; + CURLAIOCB *next = QLIST_NEXT(acb, next); + if (aio_base >= c->base_pos && + aio_base + aio_len <= c->base_pos + c->write_pos) { + QLIST_REMOVE(acb, next); + curl_complete_io(s->s, acb, c); } + acb = next; } read_end: @@ -275,10 +273,12 @@ static void curl_fd_handler(void *arg) CURLMsg *msg; msg = curl_multi_info_read(s->multi, &msgs_in_queue); - if (!msg) + if (!msg) { break; - if (msg->msg == CURLMSG_NONE) + } + if (msg->msg == CURLMSG_NONE) { break; + } switch (msg->msg) { case CURLMSG_DONE: @@ -288,19 +288,17 @@ static void curl_fd_handler(void *arg) CURLINFO_PRIVATE, (char **)&state); - /* ACBs for successful messages get completed in curl_read_cb */ + /* ACBs for successful messages get completed in curl_read_cb, + * fail existing acbs for now */ if (msg->data.result != CURLE_OK) { - int i; - for (i = 0; i < CURL_NUM_ACB; i++) { - CURLAIOCB *acb = state->acb[i]; - - if (acb == NULL) { - continue; - } - + CURLAIOCB *acb = QLIST_FIRST(&s->acbs); + while (acb) { + CURLAIOCB *next = QLIST_NEXT(acb, next); + DPRINTF("EIO, %s\n", state->errmsg); acb->common.cb(acb->common.opaque, -EIO); + QLIST_REMOVE(acb, next); qemu_aio_release(acb); - state->acb[i] = NULL; + acb = next; } } @@ -317,13 +315,10 @@ static void curl_fd_handler(void *arg) static CURLState *curl_init_state(BDRVCURLState *s) { CURLState *state = NULL; - int i, j; + int i; do { for (i=0; i<CURL_NUM_STATES; i++) { - for (j=0; j<CURL_NUM_ACB; j++) - if (s->states[i].acb[j]) - continue; if (s->states[i].in_use) continue; @@ -380,6 +375,10 @@ static void curl_clean_state(CURLState *s) if (s->s->multi) curl_multi_remove_handle(s->s->multi, s->curl); s->in_use = 0; + if (s->cache) { + s->cache->use_count--; + assert(s->cache->use_count >= 0); + } } static void curl_parse_filename(const char *filename, QDict *options, @@ -483,6 +482,7 @@ static int curl_open(BlockDriverState *bs, QDict *options, int flags) QLIST_INIT(&s->socks); QLIST_INIT(&s->cache); + QLIST_INIT(&s->acbs); DPRINTF("CURL: Opening %s\n", file); s->url = g_strdup(file); @@ -551,14 +551,8 @@ out_noclean: static int curl_aio_flush(void *opaque) { BDRVCURLState *s = opaque; - int i, j; - - for (i=0; i < CURL_NUM_STATES; i++) { - for(j=0; j < CURL_NUM_ACB; j++) { - if (s->states[i].acb[j]) { - return 1; - } - } + if (!QLIST_EMPTY(&s->acbs)) { + return 1; } return 0; } @@ -580,6 +574,7 @@ static void curl_readv_bh_cb(void *p) CURLDataCache *cache = NULL; CURLAIOCB *acb = p; BDRVCURLState *s = acb->common.bs->opaque; + int running; size_t aio_base, aio_bytes; qemu_bh_delete(acb->bh); @@ -588,8 +583,9 @@ static void curl_readv_bh_cb(void *p) aio_base = acb->sector_num * SECTOR_SIZE; aio_bytes = acb->nb_sectors * SECTOR_SIZE; - size_t start = acb->sector_num * SECTOR_SIZE; - size_t end; + if (aio_base + aio_bytes > s->len) { + goto err_release; + } cache = curl_find_cache(s, aio_base, aio_bytes); if (cache) { @@ -600,29 +596,41 @@ static void curl_readv_bh_cb(void *p) // No cache found, so let's start a new request state = curl_init_state(s); if (!state) { - acb->common.cb(acb->common.opaque, -EIO); - qemu_aio_release(acb); - return; + goto err_release; } - acb->start = 0; - acb->end = (acb->nb_sectors * SECTOR_SIZE); - - state->buf_off = 0; - if (state->orig_buf) - g_free(state->orig_buf); - state->buf_start = start; - state->buf_len = acb->end + s->readahead_size; - end = MIN(start + state->buf_len, s->len) - 1; - state->orig_buf = g_malloc(state->buf_len); - state->acb[0] = acb; - - snprintf(state->range, sizeof(state->range) - 1, "%zd-%zd", start, end); - DPRINTF("CURL (AIO): Reading %d at %zd (%s)\n", - (acb->nb_sectors * SECTOR_SIZE), start, state->range); - curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range); + cache = g_malloc0(sizeof(CURLDataCache)); + cache->base_pos = acb->sector_num * SECTOR_SIZE; + cache->data_len = aio_bytes + s->readahead_size; + cache->write_pos = 0; + cache->data = g_malloc(cache->data_len); + QLIST_INSERT_HEAD(&s->acbs, acb, next); + snprintf(state->range, sizeof(state->range) - 1, "%zd-%zd", cache->base_pos, + cache->base_pos + cache->data_len); + DPRINTF("Reading range: %s\n", state->range); + curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range); + QLIST_INSERT_HEAD(&s->cache, cache, next); + state->cache = cache; + cache->use_count++; curl_multi_add_handle(s->multi, state->curl); + /* kick off curl to start the action */ + curl_multi_socket_action(s->multi, 0, CURL_SOCKET_TIMEOUT, &running); + return; + +err_release: + if (cache) { + if (cache->data) { + g_free(cache->data); + cache->data = NULL; + } + g_free(cache); + cache = NULL; + } + acb->common.cb(acb->common.opaque, -EIO); + qemu_aio_release(acb); + return; + } @@ -669,14 +677,18 @@ static void curl_close(BlockDriverState *bs) curl_easy_cleanup(s->states[i].curl); s->states[i].curl = NULL; } - if (s->states[i].orig_buf) { - g_free(s->states[i].orig_buf); - s->states[i].orig_buf = NULL; - } } if (s->multi) curl_multi_cleanup(s->multi); + while (!QLIST_EMPTY(&s->acbs)) { + CURLAIOCB *acb = QLIST_FIRST(&s->acbs); + acb->common.cb(acb->common.opaque, -EIO); + QLIST_REMOVE(acb, next); + qemu_aio_release(acb); + acb = NULL; + } + while (!QLIST_EMPTY(&s->cache)) { CURLDataCache *cache = QLIST_FIRST(&s->cache); assert(cache->use_count == 0);
Make subsequecial changes to make use of introduced CURLDataCache. Moved acb struct from CURLState to BDRVCURLState, and changed to list. Signed-off-by: Fam Zheng <famz@redhat.com> --- block/curl.c | 168 ++++++++++++++++++++++++++++++++--------------------------- 1 file changed, 90 insertions(+), 78 deletions(-)