diff mbox

[RFC,03/22] AIO replay

Message ID 007701cf951e$60b5d300$22217900$@Dovgaluk@ispras.ru
State New
Headers show

Commit Message

Pavel Dovgalyuk July 1, 2014, 11:19 a.m. UTC
These patches introduce aio events recording and replaying. Since they are
running in different threads, we need to record all invocations of BH to
replay them deterministically.

Signed-off-by: Pavel Dovgalyuk <pavel.dovgaluk@gmail.com>
---
diff mbox

Patch

diff --git a/async.c b/async.c
index 5b6fe6b..f86798a
--- a/async.c
+++ b/async.c
@@ -26,6 +26,8 @@ 
 #include "block/aio.h"
 #include "block/thread-pool.h"
 #include "qemu/main-loop.h"
+#include "replay/replay.h"
+#include "qemu/log.h"
 
 /***********************************************************/
 /* bottom halves (can be seen as timers which expire ASAP) */
@@ -38,24 +40,52 @@  struct QEMUBH {
     bool scheduled;
     bool idle;
     bool deleted;
+    bool replay;
+    uint64_t id;
 };
 
 QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
 {
-    QEMUBH *bh;
+    QEMUBH *bh, **last;
     bh = g_malloc0(sizeof(QEMUBH));
     bh->ctx = ctx;
     bh->cb = cb;
     bh->opaque = opaque;
     qemu_mutex_lock(&ctx->bh_lock);
-    bh->next = ctx->first_bh;
-    /* Make sure that the members are ready before putting bh into list */
-    smp_wmb();
-    ctx->first_bh = bh;
+    if (replay_mode != REPLAY_NONE) {
+        // slower way, but this is a queue and not a stack
+        // replay will process the BH in the same order they 
+        // came into the queue
+        last = &ctx->first_bh;
+        while (*last) {
+            last = &(*last)->next;
+        }
+        smp_wmb();
+        *last = bh;
+    } else {
+        bh->next = ctx->first_bh;
+        /* Make sure that the members are ready before putting bh into list */
+        smp_wmb();
+        ctx->first_bh = bh;
+    }
     qemu_mutex_unlock(&ctx->bh_lock);
     return bh;
 }
 
+QEMUBH *aio_bh_new_replay(AioContext *ctx, QEMUBHFunc *cb, void *opaque, uint64_t id)
+{
+    QEMUBH *bh = aio_bh_new(ctx, cb, opaque);
+    bh->replay = true;
+    bh->id = id;
+    return bh;
+}
+
+void aio_bh_call(void *opaque)
+{
+    QEMUBH *bh = (QEMUBH *)opaque;
+    bh->cb(bh->opaque);
+}
+
 /* Multiple occurrences of aio_bh_poll cannot be called concurrently */
 int aio_bh_poll(AioContext *ctx)
 {
@@ -70,7 +100,6 @@  int aio_bh_poll(AioContext *ctx)
         smp_read_barrier_depends();
         next = bh->next;
         if (!bh->deleted && bh->scheduled) {
-            bh->scheduled = 0;
             /* Paired with write barrier in bh schedule to ensure reading for
              * idle & callbacks coming after bh's scheduling.
              */
@@ -78,7 +107,12 @@  int aio_bh_poll(AioContext *ctx)
             if (!bh->idle)
                 ret = 1;
             bh->idle = 0;
-            bh->cb(bh->opaque);
+            bh->scheduled = 0;
+            if (!bh->replay) {
+                aio_bh_call(bh);
+            } else {
+                replay_add_bh_event(bh, bh->id);
+            }
         }
     }
 
@@ -195,7 +229,7 @@  aio_ctx_check(GSource *source)
     for (bh = ctx->first_bh; bh; bh = bh->next) {
         if (!bh->deleted && bh->scheduled) {
             return true;
-	}
+        }
     }
     return aio_pending(ctx) || (timerlistgroup_deadline_ns(&ctx->tlg) == 0);
 }

diff --git a/dma-helpers.c b/dma-helpers.c
index 53cbe92..0331746
--- a/dma-helpers.c
+++ b/dma-helpers.c
@@ -12,6 +12,9 @@ 
 #include "qemu/range.h"
 #include "qemu/thread.h"
 #include "qemu/main-loop.h"
+#include "block/block_int.h"
+#include "replay/replay.h"
+#include "qemu/log.h"
 
 /* #define DEBUG_IOMMU */
 
@@ -96,7 +99,7 @@  static void continue_after_map_failure(void *opaque)
 {
     DMAAIOCB *dbs = (DMAAIOCB *)opaque;
 
-    dbs->bh = qemu_bh_new(reschedule_dma, dbs);
+    dbs->bh = qemu_bh_new_replay(reschedule_dma, dbs, replay_get_current_step());
     qemu_bh_schedule(dbs->bh);
 }
 
@@ -149,7 +152,7 @@  static void dma_bdrv_cb(void *opaque, int ret)
         return;
     }
     dma_bdrv_unmap(dbs);
-
+    
     while (dbs->sg_cur_index < dbs->sg->nsg) {
         cur_addr = dbs->sg->sg[dbs->sg_cur_index].base + dbs->sg_cur_byte;
         cur_len = dbs->sg->sg[dbs->sg_cur_index].len - dbs->sg_cur_byte;
@@ -217,6 +220,8 @@  BlockDriverAIOCB *dma_bdrv_io(
     dbs->io_func = io_func;
     dbs->bh = NULL;
     qemu_iovec_init(&dbs->iov, sg->nsg);
+    dbs->iov.replay = true;
+    dbs->iov.replay_step = replay_get_current_step();
     dma_bdrv_cb(dbs, 0);
     return &dbs->common;
 }
@@ -226,7 +231,7 @@  BlockDriverAIOCB *dma_bdrv_read(BlockDriverState *bs,
                                 QEMUSGList *sg, uint64_t sector,
                                 void (*cb)(void *opaque, int ret), void *opaque)
 {
-    return dma_bdrv_io(bs, sg, sector, bdrv_aio_readv, cb, opaque,
+    return dma_bdrv_io(bs, sg, sector, bdrv_aio_readv_replay, cb, opaque,
                        DMA_DIRECTION_FROM_DEVICE);
 }
 
@@ -234,7 +239,7 @@  BlockDriverAIOCB *dma_bdrv_write(BlockDriverState *bs,
                                  QEMUSGList *sg, uint64_t sector,
                                  void (*cb)(void *opaque, int ret), void *opaque)
 {
-    return dma_bdrv_io(bs, sg, sector, bdrv_aio_writev, cb, opaque,
+    return dma_bdrv_io(bs, sg, sector, bdrv_aio_writev_replay, cb, opaque,
                        DMA_DIRECTION_TO_DEVICE);
 }
 
@@ -244,7 +249,7 @@  static uint64_t dma_buf_rw(uint8_t *ptr, int32_t len, QEMUSGList *sg,
 {
     uint64_t resid;
     int sg_cur_index;
-
+    
     resid = sg->size;
     sg_cur_index = 0;
     len = MIN(len, resid);

diff --git a/include/block/aio.h b/include/block/aio.h
index a92511b..6649628 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -34,6 +34,8 @@  struct BlockDriverAIOCB {
     const AIOCBInfo *aiocb_info;
     BlockDriverState *bs;
     BlockDriverCompletionFunc *cb;
+    bool replay;
+    uint64_t replay_step;
     void *opaque;
 };
 
@@ -130,6 +132,7 @@  void aio_context_release(AioContext *ctx);
  * is opaque and must be allocated prior to its use.
  */
 QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque);
+QEMUBH *aio_bh_new_replay(AioContext *ctx, QEMUBHFunc *cb, void *opaque, uint64_t id);
 
 /**
  * aio_notify: Force processing of pending events.
@@ -147,6 +150,11 @@  QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque);
 void aio_notify(AioContext *ctx);
 
 /**
+ * aio_bh_call: Calls callback function of the specified BH.
+ */
+void aio_bh_call(void *opaque);
+
+/**
  * aio_bh_poll: Poll bottom halves for an AioContext.
  *
  * These are internal functions used by the QEMU main loop.
@@ -254,6 +262,7 @@  struct ThreadPool *aio_get_thread_pool(AioContext *ctx);
 /* Functions to operate on the main QEMU AioContext.  */
 
 bool qemu_aio_wait(void);
+bool qemu_aio_wait_no_blocking(void);
 void qemu_aio_set_event_notifier(EventNotifier *notifier,
                                  EventNotifierHandler *io_read);
 
diff --git a/include/qemu/main-loop.h b/include/qemu/main-loop.h
index 6f0200a..c221ddd 100644
--- a/include/qemu/main-loop.h
+++ b/include/qemu/main-loop.h
@@ -306,6 +306,7 @@  void qemu_iohandler_fill(GArray *pollfds);
 void qemu_iohandler_poll(GArray *pollfds, int rc);
 
 QEMUBH *qemu_bh_new(QEMUBHFunc *cb, void *opaque);
+QEMUBH *qemu_bh_new_replay(QEMUBHFunc *cb, void *opaque, uint64_t id);
 void qemu_bh_schedule_idle(QEMUBH *bh);
 
 #endif

diff --git a/main-loop.c b/main-loop.c
index 8a85493..64dded1
--- a/main-loop.c
+++ b/main-loop.c
@@ -29,6 +29,9 @@ 
 #include "slirp/libslirp.h"
 #include "qemu/main-loop.h"
 #include "block/aio.h"
+#include "replay/replay.h"
+#include "sysemu/sysemu.h"
+#include "qemu/log.h"
 
 #ifndef _WIN32
 
@@ -499,11 +502,21 @@  QEMUBH *qemu_bh_new(QEMUBHFunc *cb, void *opaque)
     return aio_bh_new(qemu_aio_context, cb, opaque);
 }
 
+QEMUBH *qemu_bh_new_replay(QEMUBHFunc *cb, void *opaque, uint64_t id)
+{
+    return aio_bh_new_replay(qemu_aio_context, cb, opaque, id);
+}
+
 bool qemu_aio_wait(void)
 {
     return aio_poll(qemu_aio_context, true);
 }
 
+bool qemu_aio_wait_no_blocking(void)
+{
+    return aio_poll(qemu_aio_context, false);
+}
+
 #ifdef CONFIG_POSIX
 void qemu_aio_set_fd_handler(int fd,
                              IOHandler *io_read,

diff --git a/util/iov.c b/util/iov.c
index 2b4f46d..3bf7092 100644
--- a/util/iov.c
+++ b/util/iov.c
@@ -257,6 +257,8 @@  void qemu_iovec_init(QEMUIOVector *qiov, int alloc_hint)
     qiov->niov = 0;
     qiov->nalloc = alloc_hint;
     qiov->size = 0;
+    qiov->replay = false;
+    qiov->replay_step = 0;
 }
 
 void qemu_iovec_init_external(QEMUIOVector *qiov, struct iovec *iov, int niov)
@@ -267,6 +269,8 @@  void qemu_iovec_init_external(QEMUIOVector *qiov, struct iovec *iov, int niov)
     qiov->niov = niov;
     qiov->nalloc = -1;
     qiov->size = 0;
+    qiov->replay = false;
+    qiov->replay_step = 0;
     for (i = 0; i < niov; i++)
         qiov->size += iov[i].iov_len;
 }

diff --git a/include/qemu-common.h b/include/qemu-common.h
index ae76197..50ab79c 100644
--- a/include/qemu-common.h
+++ b/include/qemu-common.h
@@ -121,6 +121,7 @@  extern int use_icount;
 int qemu_main(int argc, char **argv, char **envp);
 #endif
 
+void qemu_get_timedate_no_warning(struct tm *tm, int offset);
 void qemu_get_timedate(struct tm *tm, int offset);
 int qemu_timedate_diff(struct tm *tm);
 
@@ -308,6 +309,8 @@  typedef struct QEMUIOVector {
     int niov;
     int nalloc;
     size_t size;
+    bool replay;
+    uint64_t replay_step;
 } QEMUIOVector;
 
 void qemu_iovec_init(QEMUIOVector *qiov, int alloc_hint);