@@ -27,9 +27,31 @@
#include "qemu-common.h"
#include "trace.h"
#include "block_int.h"
+#include "qemu-thread.h"
#include "block/raw-posix-aio.h"
+static QemuMutex aiocb_mutex;
+static QemuCond aiocb_completion;
+
+typedef struct ThreadletQueue
+{
+ QemuMutex lock;
+ QemuCond cond;
+ int max_threads;
+ int min_threads;
+ int cur_threads;
+ int idle_threads;
+ QTAILQ_HEAD(, ThreadletWork) request_list;
+} ThreadletQueue;
+
+typedef struct ThreadletWork
+{
+ QTAILQ_ENTRY(ThreadletWork) node;
+ void (*func)(struct ThreadletWork *work);
+} ThreadletWork;
+
+static ThreadletQueue globalqueue;
struct qemu_paiocb {
BlockDriverAIOCB common;
@@ -51,6 +73,7 @@ struct qemu_paiocb {
struct qemu_paiocb *next;
int async_context_id;
+ ThreadletWork work;
};
typedef struct PosixAioState {
@@ -58,6 +81,45 @@ typedef struct PosixAioState {
struct qemu_paiocb *first_aio;
} PosixAioState;
+/**
+ * dequeue_work_on_queue: Cancel a task queued on a Queue.
+ * @queue: The queue containing the task to be cancelled.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ * 1 otherwise.
+ */
+
+static int dequeue_work_on_queue(ThreadletQueue *queue, ThreadletWork *work)
+{
+ ThreadletWork *ret_work;
+ int ret = 1;
+
+ qemu_mutex_lock(&queue->lock);
+ QTAILQ_FOREACH(ret_work, &(queue->request_list), node) {
+ if (ret_work == work) {
+ QTAILQ_REMOVE(&queue->request_list, ret_work, node);
+ ret = 0;
+ break;
+ }
+ }
+ qemu_mutex_unlock(&queue->lock);
+
+ return ret;
+}
+
+/**
+ * dequeue_work: Cancel a task queued on the global queue.
+ * @work: Contains the information of the task that needs to be cancelled.
+ *
+ * Returns: 0 if the task is successfully cancelled.
+ * 1 otherwise.
+ */
+
+static int dequeue_work(ThreadletWork *work)
+{
+ return dequeue_work_on_queue(&globalqueue, work);
+}
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
@@ -397,9 +459,9 @@ static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
{
ssize_t ret;
- mutex_lock(&lock);
+ qemu_mutex_lock(&aiocb_mutex);
ret = aiocb->ret;
- mutex_unlock(&lock);
+ qemu_mutex_unlock(&aiocb_mutex);
return ret;
}
@@ -534,22 +596,13 @@ static void paio_remove(struct qemu_paiocb *acb)
static void paio_cancel(BlockDriverAIOCB *blockacb)
{
struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
- int active = 0;
-
- mutex_lock(&lock);
- if (!acb->active) {
- QTAILQ_REMOVE(&request_list, acb, node);
- acb->ret = -ECANCELED;
- } else if (acb->ret == -EINPROGRESS) {
- active = 1;
- }
- mutex_unlock(&lock);
-
- if (active) {
- /* fail safe: if the aio could not be canceled, we wait for
- it */
- while (qemu_paio_error(acb) == EINPROGRESS)
- ;
+ if (dequeue_work(&acb->work) != 0) {
+ /* Wait for running work item to complete */
+ qemu_mutex_lock(&aiocb_mutex);
+ while (acb->ret == -EINPROGRESS) {
+ qemu_cond_wait(&aiocb_completion, &aiocb_mutex);
+ }
+ qemu_mutex_unlock(&aiocb_mutex);
}
paio_remove(acb);
@@ -623,6 +676,9 @@ int paio_init(void)
if (posix_aio_state)
return 0;
+ qemu_mutex_init(&aiocb_mutex);
+ qemu_cond_init(&aiocb_completion);
+
s = qemu_malloc(sizeof(PosixAioState));
sigfillset(&act.sa_mask);
Move paio_cancel() to new infrastructure and introduce the necessary APIs for this. Signed-off-by: Arun R Bharadwaj <arun@linux.vnet.ibm.com> --- posix-aio-compat.c | 92 ++++++++++++++++++++++++++++++++++++++++++---------- 1 files changed, 74 insertions(+), 18 deletions(-)