On 11/9/2010 1:06 AM, Stefan Hajnoczi wrote: > On Mon, Nov 8, 2010 at 10:47 AM, Arun R Bharadwaj > <a...@linux.vnet.ibm.com> wrote: >> diff --git a/hw/virtio-9p.c b/hw/virtio-9p.c >> index 7c59988..61b65fd 100644 >> --- a/hw/virtio-9p.c >> +++ b/hw/virtio-9p.c >> @@ -18,6 +18,7 @@ >> #include "fsdev/qemu-fsdev.h" >> #include "virtio-9p-debug.h" >> #include "virtio-9p-xattr.h" >> +#include "qemu-threadlets.h" >> >> int debug_9p_pdu; >> >> @@ -33,6 +34,146 @@ enum { >> Oappend = 0x80, >> }; >> >> +struct v9fs_post_op { >> + QTAILQ_ENTRY(v9fs_post_op) node; >> + void (*func)(void *arg); >> + void *arg; >> +}; > > Structs are usually typedefed: > typedef struct V9fsPostOp { > ... > } V9fsPostOp; > > QEMU usually doesn't use struct explicitly unless a library interface > requires it. > >> + >> +static struct { >> + int rfd; >> + int wfd; >> + QemuMutex lock; >> + QTAILQ_HEAD(, v9fs_post_op) post_op_list; >> +} v9fs_async_struct; >> + >> +static void die2(int err, const char *what) >> +{ >> + fprintf(stderr, "%s failed: %s\n", what, strerror(err)); >> + abort(); >> +} >> + >> +static void die(const char *what) >> +{ >> + die2(errno, what); >> +} >> + >> +#define ASYNC_MAX_PROCESS 5 >> + >> +/** >> + * v9fs_process_post_ops: Process any pending v9fs_post_posix_operation >> + * @arg: Not used. >> + * >> + * This function serves as a callback to the iothread to be called into >> whenever >> + * the v9fs_async_struct.wfd is written into. This thread goes through the >> list >> + * of v9fs_post_posix_operations() and executes them. In the process, it >> might >> + * queue more job on the asynchronous thread pool. >> + */ >> +static void v9fs_process_post_ops(void *arg) >> +{ >> + int count = 0; >> + struct v9fs_post_op *post_op; >> + int ret; >> + char byte; >> + >> + qemu_mutex_lock(&v9fs_async_struct.lock); >> + do { >> + ret = read(v9fs_async_struct.rfd, &byte, sizeof(byte)); >> + } while (ret >= 0 && errno != EAGAIN); > > } while (ret > 1 || (ret == -1 && errno == EINTR)); ?
Not sure why we need to retry if ret > 1; all we need is } while (ret == -1 && errno == EINTR); > >> + >> + for (count = 0; count < ASYNC_MAX_PROCESS; count++) { >> + if (QTAILQ_EMPTY(&(v9fs_async_struct.post_op_list))) { >> + break; >> + } >> + post_op = QTAILQ_FIRST(&(v9fs_async_struct.post_op_list)); >> + QTAILQ_REMOVE(&(v9fs_async_struct.post_op_list), post_op, node); >> + >> + qemu_mutex_unlock(&v9fs_async_struct.lock); >> + post_op->func(post_op->arg); >> + qemu_free(post_op); >> + qemu_mutex_lock(&v9fs_async_struct.lock); >> + } >> + qemu_mutex_unlock(&v9fs_async_struct.lock); >> +} >> + >> +/** >> + * v9fs_async_signal: Inform the io-thread of completion of async job. >> + * >> + * This function is used to inform the iothread that a particular >> + * async-operation pertaining to v9fs has been completed and that the io >> thread >> + * can handle the v9fs_post_posix_operation. >> + * >> + * This is based on the aio_signal_handler >> + */ >> +static inline void v9fs_async_signal(void) >> +{ >> + char byte = 0; >> + ssize_t ret; >> + int tries = 0; >> + >> + qemu_mutex_lock(&v9fs_async_struct.lock); >> + do { >> + assert(tries != 100); >> + ret = write(v9fs_async_struct.wfd, &byte, sizeof(byte)); >> + tries++; >> + } while (ret < 0 && errno == EAGAIN); >> + qemu_mutex_unlock(&v9fs_async_struct.lock); >> + >> + if (ret < 0 && errno != EAGAIN) >> + die("write() in v9fs"); > > { } coding style > >> + >> + if (kill(getpid(), SIGUSR2)) die("kill failed"); > > { } coding style > >> +} >> + >> +/** >> + * v9fs_async_helper_done: Marks the completion of the v9fs_async job >> + * @func: v9fs_post_posix_func() for post-processing invoked in the context >> of >> + * the io-thread >> + * @arg: Argument to func. >> + * >> + * This function is called from the context of one of the asynchronous >> threads >> + * in the thread pool. This is called when the asynchronous thread has >> finished >> + * executing a v9fs_posix_operation. It's purpose is to initiate the >> process of >> + * informing the io-thread that the v9fs_posix_operation has completed. >> + */ >> +static void v9fs_async_helper_done(void (*func)(void *arg), void *arg) >> +{ >> + struct v9fs_post_op *post_op; >> + >> + post_op = qemu_mallocz(sizeof(*post_op)); >> + post_op->func = func; >> + post_op->arg = arg; >> + >> + qemu_mutex_lock(&v9fs_async_struct.lock); >> + QTAILQ_INSERT_TAIL(&(v9fs_async_struct.post_op_list), post_op, node); >> + qemu_mutex_unlock(&v9fs_async_struct.lock); >> + >> + v9fs_async_signal(); >> +} >> + >> +/** >> + * v9fs_do_async_posix: Offload v9fs_posix_operation onto async thread. >> + * @vs: V9fsOPState variable for the OP operation. >> + * @posix_fn: The posix function which has to be offloaded onto async >> thread. >> + * @post_fn_ptr: Address of the location to hold the post_fn corresponding >> to >> + * the posix_fn >> + * @post_fn: The post processing function corresponding to the posix_fn. >> + * >> + * This function is a helper to offload posix_operation on to the >> asynchronous >> + * thread pool. It sets up the associations with the post_function that >> needs to >> + * be invoked by from the context of the iothread once the posix_fn has been >> + * executed. >> + */ >> +static void v9fs_do_async_posix(ThreadletWork *work , >> + void (*posix_fn)(ThreadletWork *work), >> + void (**post_fn_ptr)(void *arg), >> + void (*post_fn)(void *arg)) >> +{ >> + *post_fn_ptr = post_fn; >> + work->func = posix_fn; >> + submit_work(work); >> +} >> + >> static int omode_to_uflags(int8_t mode) >> { >> int ret = 0; >> @@ -3657,7 +3798,7 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, >> V9fsConf *conf) >> int i, len; >> struct stat stat; >> FsTypeEntry *fse; >> - >> + int fds[2]; >> >> s = (V9fsState *)virtio_common_init("virtio-9p", >> VIRTIO_ID_9P, >> @@ -3740,5 +3881,27 @@ VirtIODevice *virtio_9p_init(DeviceState *dev, >> V9fsConf *conf) >> s->tag_len; >> s->vdev.get_config = virtio_9p_get_config; >> >> + if (qemu_pipe(fds) == -1) { >> + fprintf(stderr, "failed to create fd's for virtio-9p\n"); >> + exit(1); >> + } >> + >> + v9fs_async_struct.rfd = fds[0]; >> + v9fs_async_struct.wfd = fds[1]; >> + >> + printf("v9fs: rfd: %d\n", v9fs_async_struct.rfd); >> + printf("v9fs: wfd: %d\n", v9fs_async_struct.wfd); >> + >> + fcntl(fds[0], F_SETFL, O_NONBLOCK); >> + fcntl(fds[1], F_SETFL, O_NONBLOCK); >> + >> + qemu_set_fd_handler(fds[0], v9fs_process_post_ops, NULL, NULL); >> + QTAILQ_INIT(&v9fs_async_struct.post_op_list); >> + qemu_mutex_init(&(v9fs_async_struct.lock)); >> + /* Create async queue. */ > > Please delete this comment, it seems outdated. > >> + >> + (void)v9fs_do_async_posix; >> + (void)v9fs_async_helper_done; >> + >> return &s->vdev; >> } >> diff --git a/posix-aio-compat.c b/posix-aio-compat.c >> index 9e02d18..8019be7 100644 >> --- a/posix-aio-compat.c >> +++ b/posix-aio-compat.c >> @@ -260,6 +260,8 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb) >> return nbytes; >> } >> >> +static PosixAioState */; >> + >> static void aio_thread(ThreadletWork *work) >> { >> pid_t pid; >> @@ -290,6 +292,15 @@ static void aio_thread(ThreadletWork *work) >> aiocb->ret = ret; >> qemu_mutex_unlock(&aiocb_mutex); >> >> + if (posix_aio_state) { > > This test is not needed because posix_aio_state must be non-NULL here. I see similar check in the old code also.. - JV > >> + char byte = 0; >> + ssize_t ret; >> + >> + ret = write(posix_aio_state->wfd, &byte, sizeof(byte)); >> + if (ret < 0 && errno != EAGAIN) > > Please always use if () { ... } curly braces. QEMU coding style. > > Stefan >