Patchwork [RFC,v2,12/17] guest agent: worker thread class

login
register
mail settings
Submitter Michael Roth
Date April 18, 2011, 3:02 p.m.
Message ID <1303138953-1334-13-git-send-email-mdroth@linux.vnet.ibm.com>
Download mbox | patch
Permalink /patch/91777/
State New
Headers show

Comments

Michael Roth - April 18, 2011, 3:02 p.m.
Signed-off-by: Michael Roth <mdroth@linux.vnet.ibm.com>
---
 qga/guest-agent-worker.c |  173 ++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 173 insertions(+), 0 deletions(-)
 create mode 100644 qga/guest-agent-worker.c
Jes Sorensen - April 21, 2011, 8:44 a.m.
On 04/18/11 17:02, Michael Roth wrote:
> diff --git a/qga/guest-agent-worker.c b/qga/guest-agent-worker.c
> new file mode 100644
> index 0000000..e3295da
> --- /dev/null
> +++ b/qga/guest-agent-worker.c
> @@ -0,0 +1,173 @@
> +/*
> + * QEMU Guest Agent worker thread interfaces
> + *
> + * Copyright IBM Corp. 2011
> + *
> + * Authors:
> + *  Michael Roth      <mdroth@linux.vnet.ibm.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
> + * See the COPYING file in the top-level directory.
> + */
> +#include <glib.h>
> +#include <stdlib.h>
> +#include <stdio.h>
> +#include <stdbool.h>
> +#include <pthread.h>
> +#include <errno.h>
> +#include <string.h>
> +#include "guest-agent.h"
> +#include "../error.h"

Oh dear! do not do that please! Fix the Makefile to include the
appropriate path.

> +struct GAWorker {
> +    pthread_t thread;
> +    ga_worker_func execute;
> +    pthread_mutex_t input_mutex;
> +    pthread_cond_t input_avail_cond;
> +    void *input;
> +    bool input_avail;
> +    pthread_mutex_t output_mutex;
> +    pthread_cond_t output_avail_cond;

You really should use QemuMutex and friends here.

> +    void *output;
> +    Error *output_error;
> +    bool output_avail;
> +};
> +
> +static void *worker_run(void *worker_p)
> +{
> +    GAWorker *worker = worker_p;
> +    Error *err;
> +    void *input, *output;
> +
> +    while (1) {
> +        /* wait for input */
> +        pthread_mutex_lock(&worker->input_mutex);

qemu_mutex_lock()

> +        while (!worker->input_avail) {
> +            pthread_cond_wait(&worker->input_avail_cond, &worker->input_mutex);
> +        }

again

> +        input = worker->input;
> +        worker->input_avail = false;
> +        pthread_mutex_unlock(&worker->input_mutex);

and again.... I'll stop. Basically there really should be no references
to pthread_*

Jes
Michael Roth - April 21, 2011, 1:15 p.m.
On 04/21/2011 03:44 AM, Jes Sorensen wrote:
> On 04/18/11 17:02, Michael Roth wrote:
>> diff --git a/qga/guest-agent-worker.c b/qga/guest-agent-worker.c
>> new file mode 100644
>> index 0000000..e3295da
>> --- /dev/null
>> +++ b/qga/guest-agent-worker.c
>> @@ -0,0 +1,173 @@
>> +/*
>> + * QEMU Guest Agent worker thread interfaces
>> + *
>> + * Copyright IBM Corp. 2011
>> + *
>> + * Authors:
>> + *  Michael Roth<mdroth@linux.vnet.ibm.com>
>> + *
>> + * This work is licensed under the terms of the GNU GPL, version 2 or later.
>> + * See the COPYING file in the top-level directory.
>> + */
>> +#include<glib.h>
>> +#include<stdlib.h>
>> +#include<stdio.h>
>> +#include<stdbool.h>
>> +#include<pthread.h>
>> +#include<errno.h>
>> +#include<string.h>
>> +#include "guest-agent.h"
>> +#include "../error.h"
>
> Oh dear! do not do that please! Fix the Makefile to include the
> appropriate path.
>
>> +struct GAWorker {
>> +    pthread_t thread;
>> +    ga_worker_func execute;
>> +    pthread_mutex_t input_mutex;
>> +    pthread_cond_t input_avail_cond;
>> +    void *input;
>> +    bool input_avail;
>> +    pthread_mutex_t output_mutex;
>> +    pthread_cond_t output_avail_cond;
>
> You really should use QemuMutex and friends here.
>
>> +    void *output;
>> +    Error *output_error;
>> +    bool output_avail;
>> +};
>> +
>> +static void *worker_run(void *worker_p)
>> +{
>> +    GAWorker *worker = worker_p;
>> +    Error *err;
>> +    void *input, *output;
>> +
>> +    while (1) {
>> +        /* wait for input */
>> +        pthread_mutex_lock(&worker->input_mutex);
>
> qemu_mutex_lock()
>
>> +        while (!worker->input_avail) {
>> +            pthread_cond_wait(&worker->input_avail_cond,&worker->input_mutex);
>> +        }
>
> again
>
>> +        input = worker->input;
>> +        worker->input_avail = false;
>> +        pthread_mutex_unlock(&worker->input_mutex);
>
> and again.... I'll stop. Basically there really should be no references
> to pthread_*

This is on the guest side of things where I'm trying to use GLib 
wherever possible to keep things somewhat portable: logging/list 
utilities/io events/etc. And I *really* wanted to use GThreads here, but 
the problem is that GThread does not have any sane means to kill off a 
thread when a timeout occurs: there's no analogue to pthread_cancel(), 
and to use signals you need to break the abstraction to get the 
underlying pid. The new QemuThread stuff is using GThread underneath the 
covers so same limitation there.

pthreads provides these things and is fairly portable however, so I 
opted to make it an explicit dependency on the guest side. So 
glib+pthreads are the current dependencies.

>
> Jes
Jes Sorensen - April 21, 2011, 1:19 p.m.
On 04/21/11 15:15, Michael Roth wrote:
> On 04/21/2011 03:44 AM, Jes Sorensen wrote:
>> and again.... I'll stop. Basically there really should be no references
>> to pthread_*
> 
> This is on the guest side of things where I'm trying to use GLib
> wherever possible to keep things somewhat portable: logging/list
> utilities/io events/etc. And I *really* wanted to use GThreads here, but
> the problem is that GThread does not have any sane means to kill off a
> thread when a timeout occurs: there's no analogue to pthread_cancel(),
> and to use signals you need to break the abstraction to get the
> underlying pid. The new QemuThread stuff is using GThread underneath the
> covers so same limitation there.
> 
> pthreads provides these things and is fairly portable however, so I
> opted to make it an explicit dependency on the guest side. So
> glib+pthreads are the current dependencies.

That is really unfortunate - is there no way around it? It really would
be ideal if we could build the guest relying on QemuThreads for portability.

Either way, please fix the include issue.

Jes

Patch

diff --git a/qga/guest-agent-worker.c b/qga/guest-agent-worker.c
new file mode 100644
index 0000000..e3295da
--- /dev/null
+++ b/qga/guest-agent-worker.c
@@ -0,0 +1,173 @@ 
+/*
+ * QEMU Guest Agent worker thread interfaces
+ *
+ * Copyright IBM Corp. 2011
+ *
+ * Authors:
+ *  Michael Roth      <mdroth@linux.vnet.ibm.com>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+#include <glib.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <stdbool.h>
+#include <pthread.h>
+#include <errno.h>
+#include <string.h>
+#include "guest-agent.h"
+#include "../error.h"
+
+struct GAWorker {
+    pthread_t thread;
+    ga_worker_func execute;
+    pthread_mutex_t input_mutex;
+    pthread_cond_t input_avail_cond;
+    void *input;
+    bool input_avail;
+    pthread_mutex_t output_mutex;
+    pthread_cond_t output_avail_cond;
+    void *output;
+    Error *output_error;
+    bool output_avail;
+};
+
+static void *worker_run(void *worker_p)
+{
+    GAWorker *worker = worker_p;
+    Error *err;
+    void *input, *output;
+
+    while (1) {
+        /* wait for input */
+        pthread_mutex_lock(&worker->input_mutex);
+        while (!worker->input_avail) {
+            pthread_cond_wait(&worker->input_avail_cond, &worker->input_mutex);
+        }
+        input = worker->input;
+        worker->input_avail = false;
+        pthread_mutex_unlock(&worker->input_mutex);
+
+        /* process input. input points to shared data, so if we ever add
+         * asynchronous dispatch, we'll need to copy the input instead
+         */
+        worker->execute(input, &output, &err);
+
+        /* signal waiters */
+        pthread_mutex_lock(&worker->output_mutex);
+        worker->output = output;
+        worker->output_error = err;
+        worker->output_avail = true;
+        pthread_cond_signal(&worker->output_avail_cond);
+        pthread_mutex_unlock(&worker->output_mutex);
+    }
+
+    return NULL;
+}
+
+static void ga_worker_set_input(GAWorker *worker, void *input)
+{
+    pthread_mutex_lock(&worker->input_mutex);
+
+    /* provide input for thread, and signal it */
+    worker->input = input;
+    worker->input_avail = true;
+    pthread_cond_signal(&worker->input_avail_cond);
+
+    pthread_mutex_unlock(&worker->input_mutex);
+}
+
+static bool ga_worker_get_output(GAWorker *worker, void **output, int timeout)
+{
+    struct timespec ts;
+    GTimeVal tv;
+    bool timed_out = false;
+    int ret;
+
+    pthread_mutex_lock(&worker->output_mutex);
+
+    while (!worker->output_avail) {
+        if (timeout > 0) {
+            g_get_current_time(&tv);
+            g_time_val_add(&tv, timeout * 1000);
+            ts.tv_sec = tv.tv_sec;
+            ts.tv_nsec = tv.tv_usec * 1000;
+            ret = pthread_cond_timedwait(&worker->output_avail_cond,
+                                         &worker->output_mutex, &ts);
+            if (ret == ETIMEDOUT) {
+                timed_out = true;
+                goto out;
+            }
+        } else {
+            ret = pthread_cond_wait(&worker->output_avail_cond,
+                                    &worker->output_mutex);
+        }
+    }
+
+    /* handle output from thread */
+    worker->output_avail = false;
+    *output = worker->output;
+
+out:
+    pthread_mutex_unlock(&worker->output_mutex);
+    return timed_out;
+}
+
+bool ga_worker_dispatch(GAWorker *worker, void *input, void *output,
+                        int timeout, Error **errp)
+{
+    ga_worker_set_input(worker, input);
+    return ga_worker_get_output(worker, output, timeout);
+}
+
+static void ga_worker_start(GAWorker *worker)
+{
+    int ret;
+
+    pthread_cond_init(&worker->input_avail_cond, NULL);
+    pthread_cond_init(&worker->output_avail_cond, NULL);
+    pthread_mutex_init(&worker->input_mutex, NULL);
+    pthread_mutex_init(&worker->output_mutex, NULL);
+    worker->output_avail = false;
+    worker->input_avail = false;
+
+    ret = pthread_create(&worker->thread, NULL, worker_run, worker);
+    if (ret == -1) {
+        g_error("error: %s", strerror(errno));
+    }
+}
+
+static void ga_worker_stop(GAWorker *worker)
+{
+    int ret;
+    void *status;
+
+    ret = pthread_cancel(worker->thread);
+    if (ret == -1) { 
+        g_error("pthread_cancel() failed: %s", strerror(errno));
+    }
+
+    ret = pthread_join(worker->thread, &status);
+    if (ret == -1) { 
+        g_error("pthread_join() failed: %s", strerror(errno));
+    }
+    /* TODO: should *_destroy pthread data structures here */
+}
+
+GAWorker *ga_worker_new(ga_worker_func func)
+{
+    GAWorker *worker = g_malloc0(sizeof(GAWorker));
+
+    g_assert(func);
+    worker->execute = func;
+    ga_worker_start(worker);
+
+    return worker;
+}
+
+void ga_worker_cleanup(GAWorker *worker)
+{
+    ga_worker_stop(worker);
+    g_free(worker);
+}