used for main_dispatcher only in this patch. Dispatcher is meant to be used for Main<->any low frequency messages.
It's interface is meant to include the red_dispatcher usage: fixed size messages per message type some messages require an ack Some methods are added to be used by RedDispatcher later: dispatcher_handle_read - to be called directly by RedDispatcher epoll based loop dispatcher_set_opaque - to be set from red_worker pthread dispatcher_init - allow NULL core as used by red_worker Read and Write behavior: (changed from RFC) Sender: blocking write, blocking read for ack (if any). Reader: non blocking check for 4 bytes (message type is uint32_t), blocking read for the rest of the message, repeat until fail to get message_type. FDO Bugzilla: 42463 v1->v2: drop O_NONBLOCK, use poll (Paolo Bonzini) write_size/written_size (Paolo) fold red_dispatcher change into dispatcher introduction RFC->v1: read_safe, write_safe instead of read_with_eintr (Paolo Bonzini) read_safe can be blocking or non blocking statics where missing, fix read_safe arithmetic (Yonit Halperin) dispatcher_handle_read: read until no data available. will block if the start of a message is available. (Yonit) renamed DispatcherMessage's send_ack to ack (Yonit) --- server/Makefile.am | 2 + server/dispatcher.c | 206 ++++++++++++++++++++++++++++++++++++++++++++++ server/dispatcher.h | 81 ++++++++++++++++++ server/main_dispatcher.c | 102 ++++++----------------- 4 files changed, 316 insertions(+), 75 deletions(-) create mode 100644 server/dispatcher.c create mode 100644 server/dispatcher.h diff --git a/server/Makefile.am b/server/Makefile.am index 418d707..34a6b47 100644 --- a/server/Makefile.am +++ b/server/Makefile.am @@ -78,6 +78,8 @@ libspice_server_la_SOURCES = \ red_client_cache.h \ red_client_shared_cache.h \ red_common.h \ + dispatcher.c \ + dispatcher.h \ red_dispatcher.c \ red_dispatcher.h \ main_dispatcher.c \ diff --git a/server/dispatcher.c b/server/dispatcher.c new file mode 100644 index 0000000..e2973a4 --- /dev/null +++ b/server/dispatcher.c @@ -0,0 +1,206 @@ +#include <unistd.h> +#include <errno.h> +#include <assert.h> +#include <string.h> +#include <pthread.h> +#include <fcntl.h> +#include <poll.h> + +#include "mem.h" +#include "spice_common.h" +#include "dispatcher.h" + +#define ACK 0xffffffff + +/* + * read_safe + * helper. reads until size bytes accumulated in buf, if an error other then + * EINTR is encountered returns -1, otherwise returns 0. + * @block if 1 the read will block (the fd is always blocking). + * if 0 poll first, return immediately if no bytes available, otherwise + * read size in blocking mode. + */ +static int read_safe(int fd, void *buf, size_t size, int block) +{ + int read_size = 0; + int ret; + struct pollfd pollfd = {.fd = fd, .events = POLLIN, .revents = 0}; + + if (!block) { + ret = poll(&pollfd, 1, 0); + if (ret == -1) { + return -1; + } + if (!(pollfd.revents & POLLIN)) { + return 0; + } + } + while (read_size < size) { + ret = read(fd, buf + read_size, size - read_size); + if (ret == -1) { + if (errno == EINTR) { + continue; + } + return -1; + } + if (ret == 0) { + red_error("broken pipe on read"); + errno = ECONNRESET; /* huh? */ + return -1; + } + read_size += ret; + } + return read_size; +} + +/* + * write_safe + * @return -1 for error, otherwise number of written bytes. may be zero. + */ +static int write_safe(int fd, void *buf, size_t size) +{ + int written_size = 0; + int ret; + + while (written_size < size) { + ret = write(fd, buf + written_size, size - written_size); + if (ret == -1) { + if (errno != EINTR) { + return -1; + } + continue; + } + written_size += ret; + } + return written_size; +} + +static int dispatcher_handle_single_read(Dispatcher *dispatcher) +{ + int ret; + uint32_t type; + DispatcherMessage *msg = NULL; + uint8_t *payload = dispatcher->payload; + uint32_t ack = ACK; + + if ((ret = read_safe(dispatcher->recv_fd, &type, sizeof(type), 0)) == -1) { + red_printf("error reading from dispatcher: %d\n", errno); + return 0; + } + if (ret == 0) { + /* no messsage */ + return 0; + } + msg = &dispatcher->messages[type]; + /* we call read_safe even if msg->size == 0, this has the side + * effect of setting the fd to block, so the remaining send_data + * users will get a blocking fd as expected. */ + if (read_safe(dispatcher->recv_fd, payload, msg->size, 1) == -1) { + red_printf("error reading from dispatcher: %d\n", errno); + /* TODO: close socketpair? */ + return 0; + } + if (msg->handler) { + msg->handler(dispatcher->opaque, (void *)payload); + } else { + red_printf("error: no handler for message type %d\n", type); + } + if (msg->ack && write_safe(dispatcher->recv_fd, + &ack, sizeof(ack)) == -1) { + red_printf("error writing ack for message %d\n", type); + /* TODO: close socketpair? */ + } + return 1; +} + +/* + * dispatcher_handle_recv_read + * doesn't handle being in the middle of a message. all reads are blocking. + */ +void dispatcher_handle_recv_read(Dispatcher *dispatcher) +{ + while (dispatcher_handle_single_read(dispatcher)) { + } +} + +void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type, + void *payload) +{ + DispatcherMessage *msg; + uint32_t ack; + int send_fd = dispatcher->send_fd; + + assert(dispatcher->max_message_type > message_type); + assert(dispatcher->messages[message_type].handler); + msg = &dispatcher->messages[message_type]; + pthread_mutex_lock(&dispatcher->lock); + if (write_safe(send_fd, &message_type, sizeof(message_type)) == -1) { + red_printf("error: failed to send message type for message %d\n", + message_type); + goto unlock; + } + if (write_safe(send_fd, payload, msg->size) == -1) { + red_printf("error: failed to send message body for message %d\n", + message_type); + goto unlock; + } + if (msg->ack) { + if (read_safe(send_fd, &ack, sizeof(ack), 1) == -1) { + } + if (ack != ACK) { + red_printf("error: got wrong ack value in dispatcher " + "for message %d\n", message_type); + /* TODO handling error? */ + } + } +unlock: + pthread_mutex_unlock(&dispatcher->lock); +} + +void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type, + dispatcher_handle_message handler, size_t size, + int ack) +{ + DispatcherMessage *msg; + + assert(message_type < dispatcher->max_message_type); + assert(dispatcher->messages[message_type].handler == 0); + msg = &dispatcher->messages[message_type]; + msg->handler = handler; + msg->size = size; + msg->ack = ack; + if (msg->size > dispatcher->payload_size) { + dispatcher->payload = realloc(dispatcher->payload, msg->size); + dispatcher->payload_size = msg->size; + } +} + +void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type, + void *opaque) +{ + int channels[2]; + + dispatcher->opaque = opaque; + if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) { + red_error("socketpair failed %s", strerror(errno)); + return; + } + pthread_mutex_init(&dispatcher->lock, NULL); + dispatcher->recv_fd = channels[0]; + dispatcher->send_fd = channels[1]; + dispatcher->self = pthread_self(); + + dispatcher->messages = spice_malloc0_n(max_message_type, + sizeof(dispatcher->messages[0])); + dispatcher->max_message_type = max_message_type; +} + +void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque) +{ + dispatcher->opaque = opaque; +} + +int dispatcher_get_recv_fd(Dispatcher *dispatcher) +{ + return dispatcher->recv_fd; +} diff --git a/server/dispatcher.h b/server/dispatcher.h new file mode 100644 index 0000000..04e6b46 --- /dev/null +++ b/server/dispatcher.h @@ -0,0 +1,81 @@ +#ifndef MAIN_DISPATCHER_H +#define MAIN_DISPATCHER_H + +#include <spice.h> + +typedef struct Dispatcher Dispatcher; + +typedef void (*dispatcher_handle_message)(void *opaque, + void *payload); + +typedef struct DispatcherMessage { + size_t size; + int ack; + dispatcher_handle_message handler; +} DispatcherMessage; + +struct Dispatcher { + SpiceCoreInterface *recv_core; + int recv_fd; + int send_fd; + pthread_t self; + pthread_mutex_t lock; + DispatcherMessage *messages; + int stage; /* message parser stage - sender has no stages */ + size_t max_message_type; + void *payload; /* allocated as max of message sizes */ + size_t payload_size; /* used to track realloc calls */ + void *opaque; +}; + +/* + * dispatcher_send_message + * @message_type: message type + * @payload: payload + */ +void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type, + void *payload); + +/* + * dispatcher_init + * @max_message_type: number of message types. Allows upfront allocation + * of a DispatcherMessage list. + * up front, and registration in any order wanted. + */ +void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type, + void *opaque); + +/* + * dispatcher_register_handler + * @dispatcher: dispatcher + * @messsage_type: message type + * @handler: message handler + * @size: message size. Each type has a fixed associated size. + * @ack: send an ack. This is per message type - you can't send the + * same message type with and without. Register two different + * messages if that is what you want. + */ +void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type, + dispatcher_handle_message handler, size_t size, + int ack); + +/* + * dispatcher_handle_recv_read + * @dispatcher: Dispatcher instance + */ +void dispatcher_handle_recv_read(Dispatcher *); + +/* + * dispatcher_get_recv_fd + * @return: receive file descriptor of the dispatcher + */ +int dispatcher_get_recv_fd(Dispatcher *); + +/* + * dispatcher_set_opaque + * @dispatcher: Dispatcher instance + * @opaque: opaque to use for callbacks + */ +void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque); + +#endif //MAIN_DISPATCHER_H diff --git a/server/main_dispatcher.c b/server/main_dispatcher.c index 73856bf..a5967fa 100644 --- a/server/main_dispatcher.c +++ b/server/main_dispatcher.c @@ -5,6 +5,7 @@ #include <assert.h> #include "red_common.h" +#include "dispatcher.h" #include "main_dispatcher.h" /* @@ -28,11 +29,8 @@ */ typedef struct { + Dispatcher base; SpiceCoreInterface *core; - int main_fd; - int other_fd; - pthread_t self; - pthread_mutex_t lock; } MainDispatcher; MainDispatcher main_dispatcher; @@ -43,15 +41,10 @@ enum { MAIN_DISPATCHER_NUM_MESSAGES }; -typedef struct MainDispatcherMessage { - uint32_t type; - union { - struct { - int event; - SpiceChannelEventInfo *info; - } channel_event; - } data; -} MainDispatcherMessage; +typedef struct MainDispatcherChannelEventMessage { + int event; + SpiceChannelEventInfo *info; +} MainDispatcherChannelEventMessage; /* channel_event - calls core->channel_event, must be done in main thread */ static void main_dispatcher_self_handle_channel_event( @@ -61,85 +54,44 @@ static void main_dispatcher_self_handle_channel_event( main_dispatcher.core->channel_event(event, info); } -static void main_dispatcher_handle_channel_event(MainDispatcherMessage *msg) +static void main_dispatcher_handle_channel_event(void *opaque, + void *payload) { - main_dispatcher_self_handle_channel_event(msg->data.channel_event.event, - msg->data.channel_event.info); + MainDispatcherChannelEventMessage *channel_event = payload; + + main_dispatcher_self_handle_channel_event(channel_event->event, + channel_event->info); } void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info) { - MainDispatcherMessage msg; - ssize_t written = 0; - ssize_t ret; + MainDispatcherChannelEventMessage msg; - if (pthread_self() == main_dispatcher.self) { + if (pthread_self() == main_dispatcher.base.self) { main_dispatcher_self_handle_channel_event(event, info); return; } - msg.type = MAIN_DISPATCHER_CHANNEL_EVENT; - msg.data.channel_event.event = event; - msg.data.channel_event.info = info; - pthread_mutex_lock(&main_dispatcher.lock); - while (written < sizeof(msg)) { - ret = write(main_dispatcher.other_fd, &msg + written, - sizeof(msg) - written); - if (ret == -1) { - assert(errno == EINTR); - continue; - } - written += ret; - } - pthread_mutex_unlock(&main_dispatcher.lock); + msg.event = event; + msg.info = info; + dispatcher_send_message(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT, + &msg); } - -static void main_dispatcher_handle_read(int fd, int event, void *opaque) +static void dispatcher_handle_read(int fd, int event, void *opaque) { - int ret; - MainDispatcher *md = opaque; - MainDispatcherMessage msg; - int read_size = 0; + Dispatcher *dispatcher = opaque; - while (read_size < sizeof(msg)) { - /* blocks until sizeof(msg) is read */ - ret = read(md->main_fd, &msg + read_size, sizeof(msg) - read_size); - if (ret == -1) { - if (errno != EINTR) { - red_printf("error reading from main dispatcher: %d\n", errno); - /* TODO: close channel? */ - return; - } - continue; - } - read_size += ret; - } - switch (msg.type) { - case MAIN_DISPATCHER_CHANNEL_EVENT: - main_dispatcher_handle_channel_event(&msg); - break; - default: - red_printf("error: unhandled main dispatcher message type %d\n", - msg.type); - } + dispatcher_handle_recv_read(dispatcher); } void main_dispatcher_init(SpiceCoreInterface *core) { - int channels[2]; - memset(&main_dispatcher, 0, sizeof(main_dispatcher)); main_dispatcher.core = core; - - if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) { - red_error("socketpair failed %s", strerror(errno)); - return; - } - pthread_mutex_init(&main_dispatcher.lock, NULL); - main_dispatcher.main_fd = channels[0]; - main_dispatcher.other_fd = channels[1]; - main_dispatcher.self = pthread_self(); - - core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ, - main_dispatcher_handle_read, &main_dispatcher); + dispatcher_init(&main_dispatcher.base, MAIN_DISPATCHER_NUM_MESSAGES, &main_dispatcher.base); + core->watch_add(main_dispatcher.base.recv_fd, SPICE_WATCH_EVENT_READ, + dispatcher_handle_read, &main_dispatcher.base); + dispatcher_register_handler(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT, + main_dispatcher_handle_channel_event, + sizeof(MainDispatcherChannelEventMessage), 0 /* no ack */); } -- 1.7.7.1 _______________________________________________ Spice-devel mailing list Spice-devel@lists.freedesktop.org http://lists.freedesktop.org/mailman/listinfo/spice-devel