Signed-off-by: Michael Roth <mdr...@linux.vnet.ibm.com> --- qga/guest-agent-worker.c | 179 ++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 179 insertions(+), 0 deletions(-) create mode 100644 qga/guest-agent-worker.c
diff --git a/qga/guest-agent-worker.c b/qga/guest-agent-worker.c new file mode 100644 index 0000000..e5fc845 --- /dev/null +++ b/qga/guest-agent-worker.c @@ -0,0 +1,179 @@ +/* + * QEMU Guest Agent worker thread interfaces + * + * Copyright IBM Corp. 2011 + * + * Authors: + * Michael Roth <mdr...@linux.vnet.ibm.com> + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + */ +#include <glib.h> +#include <stdlib.h> +#include <stdio.h> +#include <stdbool.h> +#include <pthread.h> +#include <errno.h> +#include <string.h> +#include "error.h" +#include "qga/guest-agent-core.h" + +struct GAWorker { + pthread_t thread; + ga_worker_func execute; + pthread_mutex_t input_mutex; + pthread_cond_t input_avail_cond; + void *input; + bool input_avail; + pthread_mutex_t output_mutex; + pthread_cond_t output_avail_cond; + void *output; + Error *output_error; + bool output_avail; +}; + +static void *worker_run(void *worker_p) +{ + GAWorker *worker = worker_p; + Error *err = NULL; + void *input = NULL, *output = NULL; + + while (1) { + /* wait for input */ + pthread_mutex_lock(&worker->input_mutex); + while (!worker->input_avail) { + pthread_cond_wait(&worker->input_avail_cond, &worker->input_mutex); + } + input = worker->input; + worker->input = NULL; + worker->input_avail = false; + pthread_mutex_unlock(&worker->input_mutex); + + /* process input. input points to shared data, so if we ever add + * asynchronous dispatch, we'll need to copy the input instead + */ + worker->execute(input, &output, &err); + + /* signal waiters */ + pthread_mutex_lock(&worker->output_mutex); + worker->output = output; + worker->output_error = err; + worker->output_avail = true; + pthread_cond_signal(&worker->output_avail_cond); + pthread_mutex_unlock(&worker->output_mutex); + } + + return NULL; +} + +static void ga_worker_set_input(GAWorker *worker, void *input) +{ + pthread_mutex_lock(&worker->input_mutex); + + /* provide input for thread, and signal it */ + worker->input = input; + worker->input_avail = true; + pthread_cond_signal(&worker->input_avail_cond); + + pthread_mutex_unlock(&worker->input_mutex); +} + +static bool ga_worker_get_output(GAWorker *worker, void **output, int timeout) +{ + struct timespec ts; + GTimeVal tv; + bool timed_out = false; + int ret; + + pthread_mutex_lock(&worker->output_mutex); + + while (!worker->output_avail) { + if (timeout > 0) { + g_get_current_time(&tv); + g_time_val_add(&tv, timeout * 1000); + ts.tv_sec = tv.tv_sec; + ts.tv_nsec = tv.tv_usec * 1000; + ret = pthread_cond_timedwait(&worker->output_avail_cond, + &worker->output_mutex, &ts); + if (ret == ETIMEDOUT) { + timed_out = true; + goto out; + } + } else { + ret = pthread_cond_wait(&worker->output_avail_cond, + &worker->output_mutex); + } + } + + /* handle output from thread */ + worker->output_avail = false; + *output = worker->output; + worker->output = NULL; + +out: + pthread_mutex_unlock(&worker->output_mutex); + return timed_out; +} + +bool ga_worker_dispatch(GAWorker *worker, void *input, void *output, + int timeout, Error **errp) +{ + ga_worker_set_input(worker, input); + return ga_worker_get_output(worker, output, timeout); +} + +static void ga_worker_start(GAWorker *worker) +{ + int ret; + + pthread_cond_init(&worker->input_avail_cond, NULL); + pthread_cond_init(&worker->output_avail_cond, NULL); + pthread_mutex_init(&worker->input_mutex, NULL); + pthread_mutex_init(&worker->output_mutex, NULL); + worker->output_avail = false; + worker->input_avail = false; + + ret = pthread_create(&worker->thread, NULL, worker_run, worker); + if (ret == -1) { + g_error("error: %s", strerror(errno)); + } +} + +static void ga_worker_stop(GAWorker *worker) +{ + int ret; + void *status; + + ret = pthread_cancel(worker->thread); + if (ret == -1) { + g_error("pthread_cancel() failed: %s", strerror(errno)); + } + + ret = pthread_join(worker->thread, &status); + if (ret) { + g_error("pthread_join() failed: %s", strerror(ret)); + } + + pthread_mutex_destroy(&worker->input_mutex); + pthread_mutex_destroy(&worker->output_mutex); + pthread_cond_destroy(&worker->input_avail_cond); + pthread_cond_destroy(&worker->input_avail_cond); +} + +GAWorker *ga_worker_new(ga_worker_func func) +{ + GAWorker *worker = g_malloc0(sizeof(GAWorker)); + + g_assert(func); + worker->execute = func; + ga_worker_start(worker); + + return worker; +} + +void ga_worker_cleanup(GAWorker *worker) +{ + ga_worker_stop(worker); + g_free(worker); +} -- 1.7.0.4