diff mbox

[1/2] rfifolock: add recursive FIFO lock

Message ID 1381312531-28723-2-git-send-email-stefanha@redhat.com
State New
Headers show

Commit Message

Stefan Hajnoczi Oct. 9, 2013, 9:55 a.m. UTC
QemuMutex does not guarantee fairness and cannot be acquired
recursively:

Fairness means each locker gets a turn and the scheduler cannot cause
starvation.

Recursive locking is useful for composition, it allows a sequence of
locking operations to be invoked atomically by acquiring the lock around
them.

This patch adds RFifoLock, a recursive lock that guarantees FIFO order.
Its first user is added in the next patch.

RFifoLock has one additional feature: it can be initialized with an
optional contention callback.  The callback is invoked whenever a thread
must wait for the lock.  For example, it can be used to poke the current
owner so that they release the lock soon.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
 include/qemu/rfifolock.h | 54 +++++++++++++++++++++++++++++
 tests/Makefile           |  2 ++
 tests/test-rfifolock.c   | 90 ++++++++++++++++++++++++++++++++++++++++++++++++
 util/Makefile.objs       |  1 +
 util/rfifolock.c         | 79 ++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 226 insertions(+)
 create mode 100644 include/qemu/rfifolock.h
 create mode 100644 tests/test-rfifolock.c
 create mode 100644 util/rfifolock.c

Comments

Paolo Bonzini Oct. 9, 2013, 10:05 a.m. UTC | #1
Il 09/10/2013 11:55, Stefan Hajnoczi ha scritto:
> +    /* Take a ticket */
> +    unsigned int ticket = r->tail++;
> +
> +    if (r->nesting > 0) {
> +        if (qemu_thread_is_self(&r->owner_thread)) {
> +            r->tail--; /* put ticket back, we're nesting */
> +        } else {

ticket is dead in the "nested" path, why not move it (and the increment)
directly after the else?

Otherwise, the code is very nice.  It's very interesting that no pthread
mutex is held in the rfifolock critical section, so that the thread that
holds the next ticket has a chance to be woken up.

Paolo
Stefan Hajnoczi Oct. 9, 2013, 11:26 a.m. UTC | #2
On Wed, Oct 09, 2013 at 12:05:43PM +0200, Paolo Bonzini wrote:
> Il 09/10/2013 11:55, Stefan Hajnoczi ha scritto:
> > +    /* Take a ticket */
> > +    unsigned int ticket = r->tail++;
> > +
> > +    if (r->nesting > 0) {
> > +        if (qemu_thread_is_self(&r->owner_thread)) {
> > +            r->tail--; /* put ticket back, we're nesting */
> > +        } else {
> 
> ticket is dead in the "nested" path, why not move it (and the increment)
> directly after the else?

The increment cannot be moved because there are 3 cases:

1. Uncontended lock: r->tail++
2. Recursive lock: do not change tail
3. Wait for contended lock: r->tail++

Case #1 needs r->tail++ too.

I find this approach clearest:

if (r->nesting == 0) {
    /* Uncontended, just take a ticket */
    r->tail++;
} else if (qemu_thread_is_self(&r->owner_thread)) {
    /* Recursive lock, no need to take a ticket */
} else {
    /* Contended case, take a ticket and wait for it */
    unsigned int ticket = r->tail++;

    while (ticket != r->head) {
        ...
    }
}

There's also a shorter approach which collapses everything into a single
if statement, but I find the if expression a harder to understand:

if (r->nesting == 0 || !qemu_thread_is_self(&r->owner_thread)) {
    unsigned int ticket = r->tail++;
    while (ticket != r->head) {
        ...
    }
}

Any preferences?

Stefan
Paolo Bonzini Oct. 9, 2013, 11:36 a.m. UTC | #3
Il 09/10/2013 13:26, Stefan Hajnoczi ha scritto:
> On Wed, Oct 09, 2013 at 12:05:43PM +0200, Paolo Bonzini wrote:
>> Il 09/10/2013 11:55, Stefan Hajnoczi ha scritto:
>>> +    /* Take a ticket */
>>> +    unsigned int ticket = r->tail++;
>>> +
>>> +    if (r->nesting > 0) {
>>> +        if (qemu_thread_is_self(&r->owner_thread)) {
>>> +            r->tail--; /* put ticket back, we're nesting */
>>> +        } else {
>>
>> ticket is dead in the "nested" path, why not move it (and the increment)
>> directly after the else?
> 
> The increment cannot be moved because there are 3 cases:
> 
> 1. Uncontended lock: r->tail++
> 2. Recursive lock: do not change tail
> 3. Wait for contended lock: r->tail++
> 
> Case #1 needs r->tail++ too.
> 
> I find this approach clearest:
> 
> if (r->nesting == 0) {
>     /* Uncontended, just take a ticket */
>     r->tail++;
> } else if (qemu_thread_is_self(&r->owner_thread)) {
>     /* Recursive lock, no need to take a ticket */
> } else {
>     /* Contended case, take a ticket and wait for it */
>     unsigned int ticket = r->tail++;
> 
>     while (ticket != r->head) {
>         ...
>     }
> }

I like this one too.

Paolo
Wayne Xia Oct. 11, 2013, 8:55 a.m. UTC | #4
Logic looks fine to me, only a minor comments.

> QemuMutex does not guarantee fairness and cannot be acquired
> recursively:
>
> Fairness means each locker gets a turn and the scheduler cannot cause
> starvation.
>
> Recursive locking is useful for composition, it allows a sequence of
> locking operations to be invoked atomically by acquiring the lock around
> them.
>
> This patch adds RFifoLock, a recursive lock that guarantees FIFO order.
> Its first user is added in the next patch.
>
> RFifoLock has one additional feature: it can be initialized with an
> optional contention callback.  The callback is invoked whenever a thread
> must wait for the lock.  For example, it can be used to poke the current
> owner so that they release the lock soon.
>
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
>  include/qemu/rfifolock.h | 54 +++++++++++++++++++++++++++++
>  tests/Makefile           |  2 ++
>  tests/test-rfifolock.c   | 90 ++++++++++++++++++++++++++++++++++++++++++++++++
>  util/Makefile.objs       |  1 +
>  util/rfifolock.c         | 79 ++++++++++++++++++++++++++++++++++++++++++
>  5 files changed, 226 insertions(+)
>  create mode 100644 include/qemu/rfifolock.h
>  create mode 100644 tests/test-rfifolock.c
>  create mode 100644 util/rfifolock.c
>
> diff --git a/include/qemu/rfifolock.h b/include/qemu/rfifolock.h
> new file mode 100644
> index 0000000..b23ab53
> --- /dev/null
> +++ b/include/qemu/rfifolock.h
> @@ -0,0 +1,54 @@
> +/*
> + * Recursive FIFO lock
> + *
> + * Copyright Red Hat, Inc. 2013
> + *
> + * Authors:
> + *  Stefan Hajnoczi   <stefanha@redhat.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + *
> + */
> +
> +#ifndef QEMU_RFIFOLOCK_H
> +#define QEMU_RFIFOLOCK_H
> +
> +#include "qemu/thread.h"
> +
> +/* Recursive FIFO lock
> + *
> + * This lock provides more features than a plain mutex:
> + *
> + * 1. Fairness - enforces FIFO order.
> + * 2. Nesting - can be taken recursively.
> + * 3. Contention callback - optional, called when thread must wait.
> + *
> + * The recursive FIFO lock is heavyweight so prefer other synchronization
> + * primitives if you do not need its features.
> + */
> +typedef struct {
> +    QemuMutex lock;             /* protects all fields */
> +
> +    /* FIFO order */
> +    unsigned int head;          /* active ticket number */
> +    unsigned int tail;          /* waiting ticket number */
> +    QemuCond cond;              /* used to wait for our ticket number */
> +
> +    /* Nesting */
> +    QemuThread owner_thread;    /* thread that currently has ownership */
> +    unsigned int nesting;       /* amount of nesting levels */
> +
> +    /* Contention callback */
> +    void (*cb)(void *);         /* called when thread must wait, with ->lock
> +                                 * held so it may not recursively lock/unlock
> +                                 */
> +    void *cb_opaque;
> +} RFifoLock;
> +
If you respin, the define can be moved to util/rfifolock.c, leave
typedef struct RFifoLock RFifoLock;
in header.

> +void rfifolock_init(RFifoLock *r, void (*cb)(void *), void *opaque);
> +void rfifolock_destroy(RFifoLock *r);
> +void rfifolock_lock(RFifoLock *r);
> +void rfifolock_unlock(RFifoLock *r);
> +
> +#endif /* QEMU_RFIFOLOCK_H */
> diff --git a/tests/Makefile b/tests/Makefile
> index 994fef1..2959ed1 100644
> --- a/tests/Makefile
> +++ b/tests/Makefile
> @@ -31,6 +31,7 @@ check-unit-y += tests/test-visitor-serialization$(EXESUF)
>  check-unit-y += tests/test-iov$(EXESUF)
>  gcov-files-test-iov-y = util/iov.c
>  check-unit-y += tests/test-aio$(EXESUF)
> +check-unit-y += tests/test-rfifolock$(EXESUF)
>  check-unit-y += tests/test-throttle$(EXESUF)
>  gcov-files-test-aio-$(CONFIG_WIN32) = aio-win32.c
>  gcov-files-test-aio-$(CONFIG_POSIX) = aio-posix.c
> @@ -121,6 +122,7 @@ tests/check-qfloat$(EXESUF): tests/check-qfloat.o libqemuutil.a
>  tests/check-qjson$(EXESUF): tests/check-qjson.o libqemuutil.a libqemustub.a
>  tests/test-coroutine$(EXESUF): tests/test-coroutine.o $(block-obj-y) libqemuutil.a libqemustub.a
>  tests/test-aio$(EXESUF): tests/test-aio.o $(block-obj-y) libqemuutil.a libqemustub.a
> +tests/test-rfifolock$(EXESUF): tests/test-rfifolock.o libqemuutil.a libqemustub.a
>  tests/test-throttle$(EXESUF): tests/test-throttle.o $(block-obj-y) libqemuutil.a libqemustub.a
>  tests/test-thread-pool$(EXESUF): tests/test-thread-pool.o $(block-obj-y) libqemuutil.a libqemustub.a
>  tests/test-iov$(EXESUF): tests/test-iov.o libqemuutil.a
> diff --git a/tests/test-rfifolock.c b/tests/test-rfifolock.c
> new file mode 100644
> index 0000000..440dbcb
> --- /dev/null
> +++ b/tests/test-rfifolock.c
> @@ -0,0 +1,90 @@
> +/*
> + * RFifoLock tests
> + *
> + * Copyright Red Hat, Inc. 2013
> + *
> + * Authors:
> + *  Stefan Hajnoczi    <stefanha@redhat.com>
> + *
> + * This work is licensed under the terms of the GNU LGPL, version 2 or later.
> + * See the COPYING.LIB file in the top-level directory.
> + */
> +
> +#include <glib.h>
> +#include "qemu-common.h"
> +#include "qemu/rfifolock.h"
> +
> +static void test_nesting(void)
> +{
> +    RFifoLock lock;
> +
> +    /* Trivial test, ensure the lock is recursive */
> +    rfifolock_init(&lock, NULL, NULL);
> +    rfifolock_lock(&lock);
> +    rfifolock_lock(&lock);
> +    rfifolock_lock(&lock);
> +    rfifolock_unlock(&lock);
> +    rfifolock_unlock(&lock);
> +    rfifolock_unlock(&lock);
> +    rfifolock_destroy(&lock);
> +}
> +
> +typedef struct {
> +    RFifoLock lock;
> +    int fd[2];
> +} CallbackTestData;
> +
> +static void rfifolock_cb(void *opaque)
> +{
> +    CallbackTestData *data = opaque;
> +    int ret;
> +    char c = 0;
> +
> +    ret = write(data->fd[1], &c, sizeof(c));
> +    g_assert(ret == 1);
> +}
> +
> +static void *callback_thread(void *opaque)
> +{
> +    CallbackTestData *data = opaque;
> +
> +    /* The other thread holds the lock so the contention callback will be
> +     * invoked...
> +     */
> +    rfifolock_lock(&data->lock);
> +    rfifolock_unlock(&data->lock);
> +    return NULL;
> +}
> +
> +static void test_callback(void)
> +{
> +    CallbackTestData data;
> +    QemuThread thread;
> +    int ret;
> +    char c;
> +
> +    rfifolock_init(&data.lock, rfifolock_cb, &data);
> +    ret = qemu_pipe(data.fd);
> +    g_assert(ret == 0);
> +
> +    /* Hold lock but allow the callback to kick us by writing to the pipe */
> +    rfifolock_lock(&data.lock);
> +    qemu_thread_create(&thread, callback_thread, &data, QEMU_THREAD_JOINABLE);
> +    ret = read(data.fd[0], &c, sizeof(c));
> +    g_assert(ret == 1);
> +    rfifolock_unlock(&data.lock);
> +    /* If we got here then the callback was invoked, as expected */
> +
> +    qemu_thread_join(&thread);
> +    close(data.fd[0]);
> +    close(data.fd[1]);
> +    rfifolock_destroy(&data.lock);
> +}
> +
> +int main(int argc, char **argv)
> +{
> +    g_test_init(&argc, &argv, NULL);
> +    g_test_add_func("/nesting", test_nesting);
> +    g_test_add_func("/callback", test_callback);
> +    return g_test_run();
> +}
> diff --git a/util/Makefile.objs b/util/Makefile.objs
> index 2bb13a2..9bc3515 100644
> --- a/util/Makefile.objs
> +++ b/util/Makefile.objs
> @@ -12,3 +12,4 @@ util-obj-y += qemu-option.o qemu-progress.o
>  util-obj-y += hexdump.o
>  util-obj-y += crc32c.o
>  util-obj-y += throttle.o
> +util-obj-y += rfifolock.o
> diff --git a/util/rfifolock.c b/util/rfifolock.c
> new file mode 100644
> index 0000000..13b08de
> --- /dev/null
> +++ b/util/rfifolock.c
> @@ -0,0 +1,79 @@
> +/*
> + * Recursive FIFO lock
> + *
> + * Copyright Red Hat, Inc. 2013
> + *
> + * Authors:
> + *  Stefan Hajnoczi   <stefanha@redhat.com>
> + *
> + * This work is licensed under the terms of the GNU LGPL, version 2 or later.
> + * See the COPYING.LIB file in the top-level directory.
> + *
> + */
> +
> +#include <assert.h>
> +#include "qemu/rfifolock.h"
> +
> +void rfifolock_init(RFifoLock *r, void (*cb)(void *), void *opaque)
> +{
> +    qemu_mutex_init(&r->lock);
> +    r->head = 0;
> +    r->tail = 0;
> +    qemu_cond_init(&r->cond);
> +    r->nesting = 0;
> +    r->cb = cb;
> +    r->cb_opaque = opaque;
> +}
> +
> +void rfifolock_destroy(RFifoLock *r)
> +{
> +    qemu_cond_destroy(&r->cond);
> +    qemu_mutex_destroy(&r->lock);
> +}
> +
> +/*
> + * Theory of operation:
> + *
> + * In order to ensure FIFO ordering, implement a ticketlock.  Threads acquiring
> + * the lock enqueue themselves by incrementing the tail index.  When the lock
> + * is unlocked, the head is incremented and waiting threads are notified.
> + *
> + * Recursive locking does not take a ticket since the head is only incremented
> + * when the outermost recursive caller unlocks.
> + */
> +void rfifolock_lock(RFifoLock *r)
> +{
> +    qemu_mutex_lock(&r->lock);
> +
> +    /* Take a ticket */
> +    unsigned int ticket = r->tail++;
> +
> +    if (r->nesting > 0) {
> +        if (qemu_thread_is_self(&r->owner_thread)) {
> +            r->tail--; /* put ticket back, we're nesting */
> +        } else {
> +            while (ticket != r->head) {
> +                /* Invoke optional contention callback */
> +                if (r->cb) {
> +                    r->cb(r->cb_opaque);
> +                }
> +                qemu_cond_wait(&r->cond, &r->lock);
> +            }
> +        }
> +    }
> +    qemu_thread_get_self(&r->owner_thread);
> +    r->nesting++;
> +    qemu_mutex_unlock(&r->lock);
> +}
> +
> +void rfifolock_unlock(RFifoLock *r)
> +{
> +    qemu_mutex_lock(&r->lock);
> +    assert(r->nesting > 0);
> +    assert(qemu_thread_is_self(&r->owner_thread));
> +    if (--r->nesting == 0) {
> +        r->head++;
> +        qemu_cond_broadcast(&r->cond);
> +    }
> +    qemu_mutex_unlock(&r->lock);
> +}
Stefan Hajnoczi Oct. 11, 2013, 1:35 p.m. UTC | #5
On Fri, Oct 11, 2013 at 04:55:31PM +0800, Wenchao Xia wrote:
> > +/* Recursive FIFO lock
> > + *
> > + * This lock provides more features than a plain mutex:
> > + *
> > + * 1. Fairness - enforces FIFO order.
> > + * 2. Nesting - can be taken recursively.
> > + * 3. Contention callback - optional, called when thread must wait.
> > + *
> > + * The recursive FIFO lock is heavyweight so prefer other synchronization
> > + * primitives if you do not need its features.
> > + */
> > +typedef struct {
> > +    QemuMutex lock;             /* protects all fields */
> > +
> > +    /* FIFO order */
> > +    unsigned int head;          /* active ticket number */
> > +    unsigned int tail;          /* waiting ticket number */
> > +    QemuCond cond;              /* used to wait for our ticket number */
> > +
> > +    /* Nesting */
> > +    QemuThread owner_thread;    /* thread that currently has ownership */
> > +    unsigned int nesting;       /* amount of nesting levels */
> > +
> > +    /* Contention callback */
> > +    void (*cb)(void *);         /* called when thread must wait, with ->lock
> > +                                 * held so it may not recursively lock/unlock
> > +                                 */
> > +    void *cb_opaque;
> > +} RFifoLock;
> > +
> If you respin, the define can be moved to util/rfifolock.c, leave
> typedef struct RFifoLock RFifoLock;
> in header.

Then the struct cannot be embedded as a field, it would require heap
allocation of all RFifoLocks.

This is why I chose to include the definition in the header file.

Stefan
Wayne Xia Oct. 12, 2013, 2:48 a.m. UTC | #6
于 2013/10/11 21:35, Stefan Hajnoczi 写道:
> On Fri, Oct 11, 2013 at 04:55:31PM +0800, Wenchao Xia wrote:
>>> +/* Recursive FIFO lock
>>> + *
>>> + * This lock provides more features than a plain mutex:
>>> + *
>>> + * 1. Fairness - enforces FIFO order.
>>> + * 2. Nesting - can be taken recursively.
>>> + * 3. Contention callback - optional, called when thread must wait.
>>> + *
>>> + * The recursive FIFO lock is heavyweight so prefer other synchronization
>>> + * primitives if you do not need its features.
>>> + */
>>> +typedef struct {
>>> +    QemuMutex lock;             /* protects all fields */
>>> +
>>> +    /* FIFO order */
>>> +    unsigned int head;          /* active ticket number */
>>> +    unsigned int tail;          /* waiting ticket number */
>>> +    QemuCond cond;              /* used to wait for our ticket number */
>>> +
>>> +    /* Nesting */
>>> +    QemuThread owner_thread;    /* thread that currently has ownership */
>>> +    unsigned int nesting;       /* amount of nesting levels */
>>> +
>>> +    /* Contention callback */
>>> +    void (*cb)(void *);         /* called when thread must wait, with ->lock
>>> +                                 * held so it may not recursively lock/unlock
>>> +                                 */
>>> +    void *cb_opaque;
>>> +} RFifoLock;
>>> +
>> If you respin, the define can be moved to util/rfifolock.c, leave
>> typedef struct RFifoLock RFifoLock;
>> in header.
> Then the struct cannot be embedded as a field, it would require heap
> allocation of all RFifoLocks.
>
> This is why I chose to include the definition in the header file.
>
> Stefan
>
OK, it is not serious problem, the patch looks good to me.
diff mbox

Patch

diff --git a/include/qemu/rfifolock.h b/include/qemu/rfifolock.h
new file mode 100644
index 0000000..b23ab53
--- /dev/null
+++ b/include/qemu/rfifolock.h
@@ -0,0 +1,54 @@ 
+/*
+ * Recursive FIFO lock
+ *
+ * Copyright Red Hat, Inc. 2013
+ *
+ * Authors:
+ *  Stefan Hajnoczi   <stefanha@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#ifndef QEMU_RFIFOLOCK_H
+#define QEMU_RFIFOLOCK_H
+
+#include "qemu/thread.h"
+
+/* Recursive FIFO lock
+ *
+ * This lock provides more features than a plain mutex:
+ *
+ * 1. Fairness - enforces FIFO order.
+ * 2. Nesting - can be taken recursively.
+ * 3. Contention callback - optional, called when thread must wait.
+ *
+ * The recursive FIFO lock is heavyweight so prefer other synchronization
+ * primitives if you do not need its features.
+ */
+typedef struct {
+    QemuMutex lock;             /* protects all fields */
+
+    /* FIFO order */
+    unsigned int head;          /* active ticket number */
+    unsigned int tail;          /* waiting ticket number */
+    QemuCond cond;              /* used to wait for our ticket number */
+
+    /* Nesting */
+    QemuThread owner_thread;    /* thread that currently has ownership */
+    unsigned int nesting;       /* amount of nesting levels */
+
+    /* Contention callback */
+    void (*cb)(void *);         /* called when thread must wait, with ->lock
+                                 * held so it may not recursively lock/unlock
+                                 */
+    void *cb_opaque;
+} RFifoLock;
+
+void rfifolock_init(RFifoLock *r, void (*cb)(void *), void *opaque);
+void rfifolock_destroy(RFifoLock *r);
+void rfifolock_lock(RFifoLock *r);
+void rfifolock_unlock(RFifoLock *r);
+
+#endif /* QEMU_RFIFOLOCK_H */
diff --git a/tests/Makefile b/tests/Makefile
index 994fef1..2959ed1 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -31,6 +31,7 @@  check-unit-y += tests/test-visitor-serialization$(EXESUF)
 check-unit-y += tests/test-iov$(EXESUF)
 gcov-files-test-iov-y = util/iov.c
 check-unit-y += tests/test-aio$(EXESUF)
+check-unit-y += tests/test-rfifolock$(EXESUF)
 check-unit-y += tests/test-throttle$(EXESUF)
 gcov-files-test-aio-$(CONFIG_WIN32) = aio-win32.c
 gcov-files-test-aio-$(CONFIG_POSIX) = aio-posix.c
@@ -121,6 +122,7 @@  tests/check-qfloat$(EXESUF): tests/check-qfloat.o libqemuutil.a
 tests/check-qjson$(EXESUF): tests/check-qjson.o libqemuutil.a libqemustub.a
 tests/test-coroutine$(EXESUF): tests/test-coroutine.o $(block-obj-y) libqemuutil.a libqemustub.a
 tests/test-aio$(EXESUF): tests/test-aio.o $(block-obj-y) libqemuutil.a libqemustub.a
+tests/test-rfifolock$(EXESUF): tests/test-rfifolock.o libqemuutil.a libqemustub.a
 tests/test-throttle$(EXESUF): tests/test-throttle.o $(block-obj-y) libqemuutil.a libqemustub.a
 tests/test-thread-pool$(EXESUF): tests/test-thread-pool.o $(block-obj-y) libqemuutil.a libqemustub.a
 tests/test-iov$(EXESUF): tests/test-iov.o libqemuutil.a
diff --git a/tests/test-rfifolock.c b/tests/test-rfifolock.c
new file mode 100644
index 0000000..440dbcb
--- /dev/null
+++ b/tests/test-rfifolock.c
@@ -0,0 +1,90 @@ 
+/*
+ * RFifoLock tests
+ *
+ * Copyright Red Hat, Inc. 2013
+ *
+ * Authors:
+ *  Stefan Hajnoczi    <stefanha@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#include <glib.h>
+#include "qemu-common.h"
+#include "qemu/rfifolock.h"
+
+static void test_nesting(void)
+{
+    RFifoLock lock;
+
+    /* Trivial test, ensure the lock is recursive */
+    rfifolock_init(&lock, NULL, NULL);
+    rfifolock_lock(&lock);
+    rfifolock_lock(&lock);
+    rfifolock_lock(&lock);
+    rfifolock_unlock(&lock);
+    rfifolock_unlock(&lock);
+    rfifolock_unlock(&lock);
+    rfifolock_destroy(&lock);
+}
+
+typedef struct {
+    RFifoLock lock;
+    int fd[2];
+} CallbackTestData;
+
+static void rfifolock_cb(void *opaque)
+{
+    CallbackTestData *data = opaque;
+    int ret;
+    char c = 0;
+
+    ret = write(data->fd[1], &c, sizeof(c));
+    g_assert(ret == 1);
+}
+
+static void *callback_thread(void *opaque)
+{
+    CallbackTestData *data = opaque;
+
+    /* The other thread holds the lock so the contention callback will be
+     * invoked...
+     */
+    rfifolock_lock(&data->lock);
+    rfifolock_unlock(&data->lock);
+    return NULL;
+}
+
+static void test_callback(void)
+{
+    CallbackTestData data;
+    QemuThread thread;
+    int ret;
+    char c;
+
+    rfifolock_init(&data.lock, rfifolock_cb, &data);
+    ret = qemu_pipe(data.fd);
+    g_assert(ret == 0);
+
+    /* Hold lock but allow the callback to kick us by writing to the pipe */
+    rfifolock_lock(&data.lock);
+    qemu_thread_create(&thread, callback_thread, &data, QEMU_THREAD_JOINABLE);
+    ret = read(data.fd[0], &c, sizeof(c));
+    g_assert(ret == 1);
+    rfifolock_unlock(&data.lock);
+    /* If we got here then the callback was invoked, as expected */
+
+    qemu_thread_join(&thread);
+    close(data.fd[0]);
+    close(data.fd[1]);
+    rfifolock_destroy(&data.lock);
+}
+
+int main(int argc, char **argv)
+{
+    g_test_init(&argc, &argv, NULL);
+    g_test_add_func("/nesting", test_nesting);
+    g_test_add_func("/callback", test_callback);
+    return g_test_run();
+}
diff --git a/util/Makefile.objs b/util/Makefile.objs
index 2bb13a2..9bc3515 100644
--- a/util/Makefile.objs
+++ b/util/Makefile.objs
@@ -12,3 +12,4 @@  util-obj-y += qemu-option.o qemu-progress.o
 util-obj-y += hexdump.o
 util-obj-y += crc32c.o
 util-obj-y += throttle.o
+util-obj-y += rfifolock.o
diff --git a/util/rfifolock.c b/util/rfifolock.c
new file mode 100644
index 0000000..13b08de
--- /dev/null
+++ b/util/rfifolock.c
@@ -0,0 +1,79 @@ 
+/*
+ * Recursive FIFO lock
+ *
+ * Copyright Red Hat, Inc. 2013
+ *
+ * Authors:
+ *  Stefan Hajnoczi   <stefanha@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ *
+ */
+
+#include <assert.h>
+#include "qemu/rfifolock.h"
+
+void rfifolock_init(RFifoLock *r, void (*cb)(void *), void *opaque)
+{
+    qemu_mutex_init(&r->lock);
+    r->head = 0;
+    r->tail = 0;
+    qemu_cond_init(&r->cond);
+    r->nesting = 0;
+    r->cb = cb;
+    r->cb_opaque = opaque;
+}
+
+void rfifolock_destroy(RFifoLock *r)
+{
+    qemu_cond_destroy(&r->cond);
+    qemu_mutex_destroy(&r->lock);
+}
+
+/*
+ * Theory of operation:
+ *
+ * In order to ensure FIFO ordering, implement a ticketlock.  Threads acquiring
+ * the lock enqueue themselves by incrementing the tail index.  When the lock
+ * is unlocked, the head is incremented and waiting threads are notified.
+ *
+ * Recursive locking does not take a ticket since the head is only incremented
+ * when the outermost recursive caller unlocks.
+ */
+void rfifolock_lock(RFifoLock *r)
+{
+    qemu_mutex_lock(&r->lock);
+
+    /* Take a ticket */
+    unsigned int ticket = r->tail++;
+
+    if (r->nesting > 0) {
+        if (qemu_thread_is_self(&r->owner_thread)) {
+            r->tail--; /* put ticket back, we're nesting */
+        } else {
+            while (ticket != r->head) {
+                /* Invoke optional contention callback */
+                if (r->cb) {
+                    r->cb(r->cb_opaque);
+                }
+                qemu_cond_wait(&r->cond, &r->lock);
+            }
+        }
+    }
+    qemu_thread_get_self(&r->owner_thread);
+    r->nesting++;
+    qemu_mutex_unlock(&r->lock);
+}
+
+void rfifolock_unlock(RFifoLock *r)
+{
+    qemu_mutex_lock(&r->lock);
+    assert(r->nesting > 0);
+    assert(qemu_thread_is_self(&r->owner_thread));
+    if (--r->nesting == 0) {
+        r->head++;
+        qemu_cond_broadcast(&r->cond);
+    }
+    qemu_mutex_unlock(&r->lock);
+}