@@ -168,6 +168,33 @@ of "quiescent states", i.e. points where no RCU read-side critical
section can be active. All threads created with qemu_thread_create
participate in the RCU mechanism and need to annotate such points.
+Luckily, in most cases no manual annotation is needed, because waiting
+on condition variables (qemu_cond_wait), semaphores (qemu_sem_wait,
+qemu_sem_timedwait) or events (qemu_event_wait) implicitly marks the thread
+as quiescent for the whole duration of the wait. (There is an exception
+for semaphore waits with a zero timeout).
+
+Manual annotation is still needed in the following cases:
+
+- threads that spend their sleeping time in the kernel, for example
+ in a call to select(), poll() or WaitForMultipleObjects(). The QEMU
+ I/O thread is an example of this case.
+
+- threads that perform a lot of I/O. In QEMU, the workers used for
+ aio=thread are an example of this case (see aio_worker in block/raw-*).
+
+- threads that run continuously until they exit. The migration thread
+ is an example of this case.
+
+Regarding the second case, note that the workers run in the QEMU thread
+pool. The thread pool uses semaphores for synchronization, hence it does
+report quiescent states periodically. However, in some cases (e.g. NFS
+mounted with the "hard" option) the workers can take an arbitrarily long
+amount of time. When this happens, synchronize_rcu() will not exit and
+call_rcu() callbacks will be delayed arbitrarily. It is therefore a
+good idea to mark I/O system calls as quiescence points in the worker
+functions.
+
Marking quiescent states is done with the following three APIs:
void rcu_quiescent_state(void);
@@ -119,7 +119,9 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex)
{
int err;
+ rcu_thread_offline();
err = pthread_cond_wait(&cond->cond, &mutex->lock);
+ rcu_thread_online();
if (err)
error_exit(err, __func__);
}
@@ -211,6 +213,10 @@ int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
int rc;
struct timespec ts;
+ if (ms) {
+ rcu_thread_offline();
+ }
+
#if defined(__APPLE__) || defined(__NetBSD__)
rc = 0;
compute_abs_deadline(&ts, ms);
@@ -228,7 +234,10 @@ int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
--sem->count;
}
pthread_mutex_unlock(&sem->lock);
- return (rc == ETIMEDOUT ? -1 : 0);
+ if (rc == ETIMEDOUT) {
+ rc == -1;
+ }
+
#else
if (ms <= 0) {
/* This is cheaper than sem_timedwait. */
@@ -236,7 +245,7 @@ int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
rc = sem_trywait(&sem->sem);
} while (rc == -1 && errno == EINTR);
if (rc == -1 && errno == EAGAIN) {
- return -1;
+ goto out;
}
} else {
compute_abs_deadline(&ts, ms);
@@ -244,19 +253,26 @@ int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
rc = sem_timedwait(&sem->sem, &ts);
} while (rc == -1 && errno == EINTR);
if (rc == -1 && errno == ETIMEDOUT) {
- return -1;
+ goto out;
}
}
if (rc < 0) {
error_exit(errno, __func__);
}
- return 0;
+
#endif
+
+out:
+ if (ms) {
+ rcu_thread_online();
+ }
+ return rc;
}
void qemu_sem_wait(QemuSemaphore *sem)
{
int rc;
+ rcu_thread_offline();
#if defined(__APPLE__) || defined(__NetBSD__)
pthread_mutex_lock(&sem->lock);
@@ -276,6 +292,7 @@ void qemu_sem_wait(QemuSemaphore *sem)
error_exit(errno, __func__);
}
#endif
+ rcu_thread_online();
}
#ifdef __linux__
@@ -384,7 +401,11 @@ void qemu_event_wait(QemuEvent *ev)
return;
}
}
+ rcu_thread_offline();
futex_wait(ev, EV_BUSY);
+ rcu_thread_online();
+ } else {
+ rcu_quiescent_state();
}
}
@@ -12,6 +12,7 @@
*/
#include "qemu-common.h"
#include "qemu/thread.h"
+#include "qemu/rcu.h"
#include <process.h>
#include <assert.h>
#include <limits.h>
@@ -170,7 +171,9 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex)
* leaving mutex unlocked before we wait on semaphore.
*/
qemu_mutex_unlock(mutex);
+ rcu_thread_offline();
WaitForSingleObject(cond->sema, INFINITE);
+ rcu_thread_online();
/* Now waiters must rendez-vous with the signaling thread and
* let it continue. For cond_broadcast this has heavy contention
@@ -210,7 +213,16 @@ void qemu_sem_post(QemuSemaphore *sem)
int qemu_sem_timedwait(QemuSemaphore *sem, int ms)
{
- int rc = WaitForSingleObject(sem->sema, ms);
+ int rc;
+
+ if (ms) {
+ rcu_thread_offline();
+ }
+ rc = WaitForSingleObject(sem->sema, ms);
+ if (ms) {
+ rcu_thread_offline();
+ }
+
if (rc == WAIT_OBJECT_0) {
return 0;
}
@@ -250,7 +262,9 @@ void qemu_event_reset(QemuEvent *ev)
void qemu_event_wait(QemuEvent *ev)
{
+ rcu_thread_offline();
WaitForSingleObject(ev->event, INFINITE);
+ rcu_thread_online();
}
struct QemuThreadData {
@@ -240,9 +240,6 @@ static void *call_rcu_thread(void *opaque)
{
struct rcu_head *node;
- /* This thread is just a writer. */
- rcu_thread_offline();
-
for (;;) {
int tries = 0;
int n = atomic_read(&rcu_call_count);