On Tue, Nov 08, 2011 at 09:31:27AM +0200, Yonit Halperin wrote: > Hi, > 2 comments bellow > On 11/07/2011 01:44 PM, Alon Levy wrote: > >used for main_dispatcher only in this patch. > > > >Dispatcher is meant to be used for Main<->any low frequency messages. > > > >It's interface is meant to include the red_dispatcher usage: > > fixed size messages per message type > > some messages require an ack > > > >Some methods are added to be used by RedDispatcher later: > > dispatcher_handle_read - to be called directly by RedDispatcher epoll > > based loop > > dispatcher_set_opaque - to be set from red_worker pthread > > dispatcher_init - allow NULL core as used by red_worker > > > >Read and Write behavior: (changed from RFC) > > Sender: blocking write, blocking read for ack (if any). > > Reader: non blocking check for 4 bytes (message type is uint32_t), > > blocking read for the rest of the message, repeat until fail to > > get message_type. > > > >FDO Bugzilla: 42463 > > > >v1->v2: > > drop O_NONBLOCK, use poll (Paolo Bonzini) > > write_size/written_size (Paolo) > > fold red_dispatcher change into dispatcher introduction > > > >RFC->v1: > > read_safe, write_safe instead of read_with_eintr (Paolo Bonzini) > > read_safe can be blocking or non blocking > > statics where missing, fix read_safe arithmetic (Yonit Halperin) > > dispatcher_handle_read: read until no data available. will block > > if the start of a message is available. (Yonit) > > renamed DispatcherMessage's send_ack to ack (Yonit) > >--- > > server/Makefile.am | 2 + > > server/dispatcher.c | 206 > > ++++++++++++++++++++++++++++++++++++++++++++++ > > server/dispatcher.h | 81 ++++++++++++++++++ > > server/main_dispatcher.c | 102 ++++++----------------- > > 4 files changed, 316 insertions(+), 75 deletions(-) > > create mode 100644 server/dispatcher.c > > create mode 100644 server/dispatcher.h > > > >diff --git a/server/Makefile.am b/server/Makefile.am > >index 418d707..34a6b47 100644 > >--- a/server/Makefile.am > >+++ b/server/Makefile.am > >@@ -78,6 +78,8 @@ libspice_server_la_SOURCES = \ > > red_client_cache.h \ > > red_client_shared_cache.h \ > > red_common.h \ > >+ dispatcher.c \ > >+ dispatcher.h \ > > red_dispatcher.c \ > > red_dispatcher.h \ > > main_dispatcher.c \ > >diff --git a/server/dispatcher.c b/server/dispatcher.c > >new file mode 100644 > >index 0000000..e2973a4 > >--- /dev/null > >+++ b/server/dispatcher.c > >@@ -0,0 +1,206 @@ > >+#include<unistd.h> > >+#include<errno.h> > >+#include<assert.h> > >+#include<string.h> > >+#include<pthread.h> > >+#include<fcntl.h> > >+#include<poll.h> > >+ > >+#include "mem.h" > >+#include "spice_common.h" > >+#include "dispatcher.h" > >+ > >+#define ACK 0xffffffff > >+ > >+/* > >+ * read_safe > >+ * helper. reads until size bytes accumulated in buf, if an error other then > >+ * EINTR is encountered returns -1, otherwise returns 0. > >+ * @block if 1 the read will block (the fd is always blocking). > >+ * if 0 poll first, return immediately if no bytes available, > >otherwise > >+ * read size in blocking mode. > >+ */ > >+static int read_safe(int fd, void *buf, size_t size, int block) > >+{ > >+ int read_size = 0; > >+ int ret; > >+ struct pollfd pollfd = {.fd = fd, .events = POLLIN, .revents = 0}; > >+ > >+ if (!block) { > >+ ret = poll(&pollfd, 1, 0); > >+ if (ret == -1) { > What if (ernno == EINTR)? shouldn't you call poll again? If you > don't read from the fd now, will you receive another read event?
Yes for select, no for epoll (IIUC). So yes, I'll do a loop if errno == EINTR. will fix both your other comments, thanks. > >+ return -1; > >+ } > >+ if (!(pollfd.revents& POLLIN)) { > >+ return 0; > >+ } > >+ } > >+ while (read_size< size) { > >+ ret = read(fd, buf + read_size, size - read_size); > >+ if (ret == -1) { > >+ if (errno == EINTR) { > >+ continue; > >+ } > >+ return -1; > >+ } > >+ if (ret == 0) { > >+ red_error("broken pipe on read"); > >+ errno = ECONNRESET; /* huh? */ > >+ return -1; > >+ } > >+ read_size += ret; > >+ } > >+ return read_size; > >+} > >+ > >+/* > >+ * write_safe > >+ * @return -1 for error, otherwise number of written bytes. may be zero. > >+ */ > >+static int write_safe(int fd, void *buf, size_t size) > >+{ > >+ int written_size = 0; > >+ int ret; > >+ > >+ while (written_size< size) { > >+ ret = write(fd, buf + written_size, size - written_size); > >+ if (ret == -1) { > >+ if (errno != EINTR) { > >+ return -1; > >+ } > >+ continue; > >+ } > >+ written_size += ret; > >+ } > >+ return written_size; > >+} > >+ > >+static int dispatcher_handle_single_read(Dispatcher *dispatcher) > >+{ > >+ int ret; > >+ uint32_t type; > >+ DispatcherMessage *msg = NULL; > >+ uint8_t *payload = dispatcher->payload; > >+ uint32_t ack = ACK; > >+ > >+ if ((ret = read_safe(dispatcher->recv_fd,&type, sizeof(type), 0)) == > >-1) { > >+ red_printf("error reading from dispatcher: %d\n", errno); > >+ return 0; > >+ } > >+ if (ret == 0) { > >+ /* no messsage */ > >+ return 0; > >+ } > >+ msg =&dispatcher->messages[type]; > >+ /* we call read_safe even if msg->size == 0, this has the side > >+ * effect of setting the fd to block, so the remaining send_data > >+ * users will get a blocking fd as expected. */ > obsolete comment (and the poll is not necessary if msg->size == 0) > >+ if (read_safe(dispatcher->recv_fd, payload, msg->size, 1) == -1) { > >+ red_printf("error reading from dispatcher: %d\n", errno); > >+ /* TODO: close socketpair? */ > >+ return 0; > >+ } > >+ if (msg->handler) { > >+ msg->handler(dispatcher->opaque, (void *)payload); > >+ } else { > >+ red_printf("error: no handler for message type %d\n", type); > >+ } > >+ if (msg->ack&& write_safe(dispatcher->recv_fd, > >+&ack, sizeof(ack)) == -1) { > >+ red_printf("error writing ack for message %d\n", type); > >+ /* TODO: close socketpair? */ > >+ } > >+ return 1; > >+} > >+ > >+/* > >+ * dispatcher_handle_recv_read > >+ * doesn't handle being in the middle of a message. all reads are blocking. > >+ */ > >+void dispatcher_handle_recv_read(Dispatcher *dispatcher) > >+{ > >+ while (dispatcher_handle_single_read(dispatcher)) { > >+ } > >+} > >+ > >+void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type, > >+ void *payload) > >+{ > >+ DispatcherMessage *msg; > >+ uint32_t ack; > >+ int send_fd = dispatcher->send_fd; > >+ > >+ assert(dispatcher->max_message_type> message_type); > >+ assert(dispatcher->messages[message_type].handler); > >+ msg =&dispatcher->messages[message_type]; > >+ pthread_mutex_lock(&dispatcher->lock); > >+ if (write_safe(send_fd,&message_type, sizeof(message_type)) == -1) { > >+ red_printf("error: failed to send message type for message %d\n", > >+ message_type); > >+ goto unlock; > >+ } > >+ if (write_safe(send_fd, payload, msg->size) == -1) { > >+ red_printf("error: failed to send message body for message %d\n", > >+ message_type); > >+ goto unlock; > >+ } > >+ if (msg->ack) { > >+ if (read_safe(send_fd,&ack, sizeof(ack), 1) == -1) { > missing error print, goto unlock and return? > >+ } > >+ if (ack != ACK) { > >+ red_printf("error: got wrong ack value in dispatcher " > >+ "for message %d\n", message_type); > >+ /* TODO handling error? */ > >+ } > >+ } > >+unlock: > >+ pthread_mutex_unlock(&dispatcher->lock); > >+} > >+ > >+void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t > >message_type, > >+ dispatcher_handle_message handler, size_t > >size, > >+ int ack) > >+{ > >+ DispatcherMessage *msg; > >+ > >+ assert(message_type< dispatcher->max_message_type); > >+ assert(dispatcher->messages[message_type].handler == 0); > >+ msg =&dispatcher->messages[message_type]; > >+ msg->handler = handler; > >+ msg->size = size; > >+ msg->ack = ack; > >+ if (msg->size> dispatcher->payload_size) { > >+ dispatcher->payload = realloc(dispatcher->payload, msg->size); > >+ dispatcher->payload_size = msg->size; > >+ } > >+} > >+ > >+void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type, > >+ void *opaque) > >+{ > >+ int channels[2]; > >+ > >+ dispatcher->opaque = opaque; > >+ if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) { > >+ red_error("socketpair failed %s", strerror(errno)); > >+ return; > >+ } > >+ pthread_mutex_init(&dispatcher->lock, NULL); > >+ dispatcher->recv_fd = channels[0]; > >+ dispatcher->send_fd = channels[1]; > >+ dispatcher->self = pthread_self(); > >+ > >+ dispatcher->messages = spice_malloc0_n(max_message_type, > >+ sizeof(dispatcher->messages[0])); > >+ dispatcher->max_message_type = max_message_type; > >+} > >+ > >+void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque) > >+{ > >+ dispatcher->opaque = opaque; > >+} > >+ > >+int dispatcher_get_recv_fd(Dispatcher *dispatcher) > >+{ > >+ return dispatcher->recv_fd; > >+} > >diff --git a/server/dispatcher.h b/server/dispatcher.h > >new file mode 100644 > >index 0000000..04e6b46 > >--- /dev/null > >+++ b/server/dispatcher.h > >@@ -0,0 +1,81 @@ > >+#ifndef MAIN_DISPATCHER_H > >+#define MAIN_DISPATCHER_H > >+ > >+#include<spice.h> > >+ > >+typedef struct Dispatcher Dispatcher; > >+ > >+typedef void (*dispatcher_handle_message)(void *opaque, > >+ void *payload); > >+ > >+typedef struct DispatcherMessage { > >+ size_t size; > >+ int ack; > >+ dispatcher_handle_message handler; > >+} DispatcherMessage; > >+ > >+struct Dispatcher { > >+ SpiceCoreInterface *recv_core; > >+ int recv_fd; > >+ int send_fd; > >+ pthread_t self; > >+ pthread_mutex_t lock; > >+ DispatcherMessage *messages; > >+ int stage; /* message parser stage - sender has no stages */ > >+ size_t max_message_type; > >+ void *payload; /* allocated as max of message sizes */ > >+ size_t payload_size; /* used to track realloc calls */ > >+ void *opaque; > >+}; > >+ > >+/* > >+ * dispatcher_send_message > >+ * @message_type: message type > >+ * @payload: payload > >+ */ > >+void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type, > >+ void *payload); > >+ > >+/* > >+ * dispatcher_init > >+ * @max_message_type: number of message types. Allows upfront allocation > >+ * of a DispatcherMessage list. > >+ * up front, and registration in any order wanted. > >+ */ > >+void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type, > >+ void *opaque); > >+ > >+/* > >+ * dispatcher_register_handler > >+ * @dispatcher: dispatcher > >+ * @messsage_type: message type > >+ * @handler: message handler > >+ * @size: message size. Each type has a fixed associated size. > >+ * @ack: send an ack. This is per message type - you can't send > >the > >+ * same message type with and without. Register two > >different > >+ * messages if that is what you want. > >+ */ > >+void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t > >message_type, > >+ dispatcher_handle_message handler, size_t > >size, > >+ int ack); > >+ > >+/* > >+ * dispatcher_handle_recv_read > >+ * @dispatcher: Dispatcher instance > >+ */ > >+void dispatcher_handle_recv_read(Dispatcher *); > >+ > >+/* > >+ * dispatcher_get_recv_fd > >+ * @return: receive file descriptor of the dispatcher > >+ */ > >+int dispatcher_get_recv_fd(Dispatcher *); > >+ > >+/* > >+ * dispatcher_set_opaque > >+ * @dispatcher: Dispatcher instance > >+ * @opaque: opaque to use for callbacks > >+ */ > >+void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque); > >+ > >+#endif //MAIN_DISPATCHER_H > >diff --git a/server/main_dispatcher.c b/server/main_dispatcher.c > >index 73856bf..a5967fa 100644 > >--- a/server/main_dispatcher.c > >+++ b/server/main_dispatcher.c > >@@ -5,6 +5,7 @@ > > #include<assert.h> > > > > #include "red_common.h" > >+#include "dispatcher.h" > > #include "main_dispatcher.h" > > > > /* > >@@ -28,11 +29,8 @@ > > */ > > > > typedef struct { > >+ Dispatcher base; > > SpiceCoreInterface *core; > >- int main_fd; > >- int other_fd; > >- pthread_t self; > >- pthread_mutex_t lock; > > } MainDispatcher; > > > > MainDispatcher main_dispatcher; > >@@ -43,15 +41,10 @@ enum { > > MAIN_DISPATCHER_NUM_MESSAGES > > }; > > > >-typedef struct MainDispatcherMessage { > >- uint32_t type; > >- union { > >- struct { > >- int event; > >- SpiceChannelEventInfo *info; > >- } channel_event; > >- } data; > >-} MainDispatcherMessage; > >+typedef struct MainDispatcherChannelEventMessage { > >+ int event; > >+ SpiceChannelEventInfo *info; > >+} MainDispatcherChannelEventMessage; > > > > /* channel_event - calls core->channel_event, must be done in main thread > > */ > > static void main_dispatcher_self_handle_channel_event( > >@@ -61,85 +54,44 @@ static void main_dispatcher_self_handle_channel_event( > > main_dispatcher.core->channel_event(event, info); > > } > > > >-static void main_dispatcher_handle_channel_event(MainDispatcherMessage *msg) > >+static void main_dispatcher_handle_channel_event(void *opaque, > >+ void *payload) > > { > >- main_dispatcher_self_handle_channel_event(msg->data.channel_event.event, > >- msg->data.channel_event.info); > >+ MainDispatcherChannelEventMessage *channel_event = payload; > >+ > >+ main_dispatcher_self_handle_channel_event(channel_event->event, > >+ channel_event->info); > > } > > > > void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info) > > { > >- MainDispatcherMessage msg; > >- ssize_t written = 0; > >- ssize_t ret; > >+ MainDispatcherChannelEventMessage msg; > > > >- if (pthread_self() == main_dispatcher.self) { > >+ if (pthread_self() == main_dispatcher.base.self) { > > main_dispatcher_self_handle_channel_event(event, info); > > return; > > } > >- msg.type = MAIN_DISPATCHER_CHANNEL_EVENT; > >- msg.data.channel_event.event = event; > >- msg.data.channel_event.info = info; > >- pthread_mutex_lock(&main_dispatcher.lock); > >- while (written< sizeof(msg)) { > >- ret = write(main_dispatcher.other_fd,&msg + written, > >- sizeof(msg) - written); > >- if (ret == -1) { > >- assert(errno == EINTR); > >- continue; > >- } > >- written += ret; > >- } > >- pthread_mutex_unlock(&main_dispatcher.lock); > >+ msg.event = event; > >+ msg.info = info; > >+ dispatcher_send_message(&main_dispatcher.base, > >MAIN_DISPATCHER_CHANNEL_EVENT, > >+&msg); > > } > > > >- > >-static void main_dispatcher_handle_read(int fd, int event, void *opaque) > >+static void dispatcher_handle_read(int fd, int event, void *opaque) > > { > >- int ret; > >- MainDispatcher *md = opaque; > >- MainDispatcherMessage msg; > >- int read_size = 0; > >+ Dispatcher *dispatcher = opaque; > > > >- while (read_size< sizeof(msg)) { > >- /* blocks until sizeof(msg) is read */ > >- ret = read(md->main_fd,&msg + read_size, sizeof(msg) - read_size); > >- if (ret == -1) { > >- if (errno != EINTR) { > >- red_printf("error reading from main dispatcher: %d\n", > >errno); > >- /* TODO: close channel? */ > >- return; > >- } > >- continue; > >- } > >- read_size += ret; > >- } > >- switch (msg.type) { > >- case MAIN_DISPATCHER_CHANNEL_EVENT: > >- main_dispatcher_handle_channel_event(&msg); > >- break; > >- default: > >- red_printf("error: unhandled main dispatcher message type %d\n", > >- msg.type); > >- } > >+ dispatcher_handle_recv_read(dispatcher); > > } > > > > void main_dispatcher_init(SpiceCoreInterface *core) > > { > >- int channels[2]; > >- > > memset(&main_dispatcher, 0, sizeof(main_dispatcher)); > > main_dispatcher.core = core; > >- > >- if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) { > >- red_error("socketpair failed %s", strerror(errno)); > >- return; > >- } > >- pthread_mutex_init(&main_dispatcher.lock, NULL); > >- main_dispatcher.main_fd = channels[0]; > >- main_dispatcher.other_fd = channels[1]; > >- main_dispatcher.self = pthread_self(); > >- > >- core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ, > >- main_dispatcher_handle_read,&main_dispatcher); > >+ dispatcher_init(&main_dispatcher.base, > >MAIN_DISPATCHER_NUM_MESSAGES,&main_dispatcher.base); > >+ core->watch_add(main_dispatcher.base.recv_fd, SPICE_WATCH_EVENT_READ, > >+ dispatcher_handle_read,&main_dispatcher.base); > >+ dispatcher_register_handler(&main_dispatcher.base, > >MAIN_DISPATCHER_CHANNEL_EVENT, > >+ main_dispatcher_handle_channel_event, > >+ sizeof(MainDispatcherChannelEventMessage), > >0 /* no ack */); > > } > _______________________________________________ Spice-devel mailing list Spice-devel@lists.freedesktop.org http://lists.freedesktop.org/mailman/listinfo/spice-devel