That means RedClient tracks a ring of channels. Right now there will be only a single client because of the disconnection mechanism - whenever a new client comes we disconnect all existing clients. But this patch adds already a ring of clients to reds.c (stored in RedServer).
There is a known problem handling many connections and disconnections at the same time, trigerrable easily by the following script: export NEW_DISPLAY=:3.0 Xephyr $NEW_DISPLAY -noreset & for ((i = 0 ; i < 5; ++i)); do for ((j = 0 ; j < 10; ++j)); do DISPLAY=$NEW_DISPLAY c_win7x86_qxl_tests & done sleep 2; done I fixed a few of the problems resulting from this in the same patch. This required already introducing a few other changes: * make sure all removal of channels happens in the main thread, for that two additional dispatcher calls are added to remove a specific channel client (RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT and RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT). * change some asserts in input channel. * make main channel disconnect not recursive * introduce disconnect call back to red_channel_create_parser The remaining abort is from a double free in the main channel, still can't find it (doesn't happen when running under valgrind - probably due to the slowness resulting from that), but is easy to see when running under gdb. --- server/inputs_channel.c | 59 ++++++++++-------- server/main_channel.c | 14 +++- server/main_channel.h | 3 +- server/red_channel.c | 95 ++++++++++++++++++++++++++--- server/red_channel.h | 25 +++++++- server/red_dispatcher.c | 39 +++++++++++- server/red_dispatcher.h | 7 ++- server/red_tunnel_worker.c | 14 ++-- server/red_worker.c | 147 +++++++++++++++++++++++++++++++++----------- server/red_worker.h | 2 + server/reds.c | 60 ++++++++++++------ server/reds.h | 6 +- server/smartcard.c | 4 +- server/snd_worker.c | 12 ++-- 14 files changed, 370 insertions(+), 117 deletions(-) diff --git a/server/inputs_channel.c b/server/inputs_channel.c index e350689..0fd4bd6 100644 --- a/server/inputs_channel.c +++ b/server/inputs_channel.c @@ -429,10 +429,16 @@ static void inputs_relase_keys(void) kbd_push_scan(keyboard, 0x38 | 0x80); //LALT } -static void inputs_channel_on_error(RedChannelClient *rcc) +static void inputs_channel_disconnect(RedChannelClient *rcc) { inputs_relase_keys(); - red_channel_client_destroy(rcc); + red_channel_client_disconnect(rcc); +} + +static void inputs_channel_on_error(RedChannelClient *rcc) +{ + red_printf(""); + inputs_channel_disconnect(rcc); } static void inputs_shutdown(Channel *channel) @@ -491,36 +497,39 @@ static void inputs_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *item) { } -static void inputs_link(Channel *channel, RedsStream *stream, int migration, +static void inputs_link(Channel *channel, RedClient *client, RedsStream *stream, int migration, int num_common_caps, uint32_t *common_caps, int num_caps, uint32_t *caps) { - InputsChannel *inputs_channel; RedChannelClient *rcc; - ASSERT(channel->data == NULL); - - red_printf("input channel create"); - g_inputs_channel = inputs_channel = (InputsChannel*)red_channel_create_parser( - sizeof(*inputs_channel), core, migration, FALSE /* handle_acks */ - ,inputs_channel_config_socket - ,spice_get_client_channel_parser(SPICE_CHANNEL_INPUTS, NULL) - ,inputs_channel_handle_parsed - ,inputs_channel_alloc_msg_rcv_buf - ,inputs_channel_release_msg_rcv_buf - ,inputs_channel_hold_pipe_item - ,inputs_channel_send_item - ,inputs_channel_release_pipe_item - ,inputs_channel_on_error - ,inputs_channel_on_error - ,NULL - ,NULL - ,NULL); - ASSERT(inputs_channel); + ASSERT(channel->data == g_inputs_channel); + + if (channel->data == NULL) { + red_printf("input channel create"); + g_inputs_channel = (InputsChannel*)red_channel_create_parser( + sizeof(InputsChannel), core, migration, FALSE /* handle_acks */ + ,inputs_channel_config_socket + ,inputs_channel_disconnect + ,spice_get_client_channel_parser(SPICE_CHANNEL_INPUTS, NULL) + ,inputs_channel_handle_parsed + ,inputs_channel_alloc_msg_rcv_buf + ,inputs_channel_release_msg_rcv_buf + ,inputs_channel_hold_pipe_item + ,inputs_channel_send_item + ,inputs_channel_release_pipe_item + ,inputs_channel_on_error + ,inputs_channel_on_error + ,NULL + ,NULL + ,NULL); + } + channel->data = g_inputs_channel; + ASSERT(g_inputs_channel); red_printf("input channel client create"); - rcc = red_channel_client_create(sizeof(RedChannelClient), &g_inputs_channel->base, stream); + rcc = red_channel_client_create(sizeof(RedChannelClient), &g_inputs_channel->base, + client, stream); ASSERT(rcc); - channel->data = inputs_channel; inputs_pipe_add_init(rcc); } diff --git a/server/main_channel.c b/server/main_channel.c index a7fca14..9c07abd 100644 --- a/server/main_channel.c +++ b/server/main_channel.c @@ -139,6 +139,11 @@ enum NetTestStage { static uint64_t latency = 0; uint64_t bitrate_per_sec = ~0; +static void main_channel_client_disconnect(RedChannelClient *rcc) +{ + red_channel_client_disconnect(rcc); +} + static void main_disconnect(MainChannel *main_chan) { red_channel_destroy(&main_chan->base); @@ -788,7 +793,7 @@ static int main_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint static void main_channel_on_error(RedChannelClient *rcc) { - reds_disconnect(); + reds_client_disconnect(rcc->client); } static uint8_t *main_channel_alloc_msg_rcv_buf(RedChannelClient *rcc, SpiceDataHeader *msg_header) @@ -819,19 +824,20 @@ static int main_channel_handle_migrate_flush_mark(RedChannelClient *rcc) return TRUE; } -MainChannelClient *main_channel_link(Channel *channel, RedsStream *stream, int migration, +MainChannelClient *main_channel_link(Channel *channel, RedClient *client, + RedsStream *stream, int migration, int num_common_caps, uint32_t *common_caps, int num_caps, uint32_t *caps) { MainChannel *main_chan; MainChannelClient *mcc; - ASSERT(channel->data == NULL); if (channel->data == NULL) { red_printf("create main channel"); channel->data = red_channel_create_parser( sizeof(*main_chan), core, migration, FALSE /* handle_acks */ ,main_channel_config_socket + ,main_channel_client_disconnect ,spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL) ,main_channel_handle_parsed ,main_channel_alloc_msg_rcv_buf @@ -849,7 +855,7 @@ MainChannelClient *main_channel_link(Channel *channel, RedsStream *stream, int m main_chan = (MainChannel*)channel->data; red_printf("add main channel client"); mcc = (MainChannelClient*) - red_channel_client_create(sizeof(MainChannelClient), &main_chan->base, stream); + red_channel_client_create(sizeof(MainChannelClient), &main_chan->base, client, stream); return mcc; } diff --git a/server/main_channel.h b/server/main_channel.h index f715b43..ed17d53 100644 --- a/server/main_channel.h +++ b/server/main_channel.h @@ -45,11 +45,10 @@ struct MainMigrateData { }; typedef struct MainChannel MainChannel; -typedef struct MainChannelClient MainChannelClient; Channel *main_channel_init(void); /* This is a 'clone' from the reds.h Channel.link callback */ -MainChannelClient *main_channel_link(struct Channel *, +MainChannelClient *main_channel_link(struct Channel *, RedClient *client, RedsStream *stream, int migration, int num_common_caps, uint32_t *common_caps, int num_caps, uint32_t *caps); void main_channel_close(MainChannel *main_chan); // not destroy, just socket close diff --git a/server/red_channel.c b/server/red_channel.c index 60b6d89..d950e59 100644 --- a/server/red_channel.c +++ b/server/red_channel.c @@ -34,6 +34,7 @@ #include "generated_marshallers.h" static void red_channel_client_event(int fd, int event, void *data); +static void red_client_add_channel(RedClient *client, RedChannelClient *rcc); /* return the number of bytes read. -1 in case of error */ static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size) @@ -356,11 +357,13 @@ static void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc) { ASSERT(rcc && !channel->rcc); channel->rcc = rcc; + channel->clients_num++; } RedChannelClient *red_channel_client_create( int size, RedChannel *channel, + RedClient *client, RedsStream *stream) { RedChannelClient *rcc; @@ -369,6 +372,7 @@ RedChannelClient *red_channel_client_create( rcc = spice_malloc0(size); rcc->stream = stream; rcc->channel = channel; + rcc->client = client; rcc->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked + // block flags) rcc->ack_data.client_generation = ~0; @@ -392,8 +396,9 @@ RedChannelClient *red_channel_client_create( stream->watch = channel->core->watch_add(stream->socket, SPICE_WATCH_EVENT_READ, red_channel_client_event, rcc); - rcc->id = 0; + rcc->id = channel->clients_num; red_channel_add_client(channel, rcc); + red_client_add_channel(client, rcc); return rcc; error: free(rcc); @@ -453,10 +458,6 @@ RedChannel *red_channel_create(int size, return channel; } -static void do_nothing_disconnect(RedChannelClient *rcc) -{ -} - static int do_nothing_handle_message(RedChannelClient *rcc, SpiceDataHeader *header, uint8_t *msg) { return TRUE; @@ -466,6 +467,7 @@ RedChannel *red_channel_create_parser(int size, SpiceCoreInterface *core, int migrate, int handle_acks, channel_configure_socket_proc config_socket, + channel_disconnect_proc disconnect, spice_parse_channel_func_t parser, channel_handle_parsed_proc handle_parsed, channel_alloc_msg_recv_buf_proc alloc_recv_buf, @@ -480,7 +482,7 @@ RedChannel *red_channel_create_parser(int size, channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial) { RedChannel *channel = red_channel_create(size, - core, migrate, handle_acks, config_socket, do_nothing_disconnect, + core, migrate, handle_acks, config_socket, disconnect, do_nothing_handle_message, alloc_recv_buf, release_recv_buf, hold_item, send_item, release_item, handle_migrate_flush_mark, handle_migrate_data, handle_migrate_data_get_serial); @@ -516,7 +518,7 @@ void red_channel_destroy(RedChannel *channel) free(channel); } -static void red_channel_client_shutdown(RedChannelClient *rcc) +void red_channel_client_shutdown(RedChannelClient *rcc) { if (rcc->stream && !rcc->stream->shutdown) { rcc->channel->core->watch_remove(rcc->stream->watch); @@ -862,6 +864,15 @@ void red_channel_ack_set_client_window(RedChannel* channel, int client_window) } } +static void red_channel_client_remove(RedChannelClient *rcc) +{ + ring_remove(&rcc->client_link); + rcc->client->channels_num--; + ASSERT(rcc->channel->rcc == rcc); + rcc->channel->rcc = NULL; + rcc->channel->clients_num--; +} + void red_channel_client_disconnect(RedChannelClient *rcc) { red_printf("%p (channel %p)", rcc, rcc->channel); @@ -874,7 +885,7 @@ void red_channel_client_disconnect(RedChannelClient *rcc) rcc->send_data.item = NULL; rcc->send_data.blocked = FALSE; rcc->send_data.size = 0; - rcc->channel->rcc = NULL; + red_channel_client_remove(rcc); } void red_channel_disconnect(RedChannel *channel) @@ -982,3 +993,71 @@ void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, red_channel_client_pipe_remove(rcc, item); red_channel_client_release_item(rcc, item, FALSE); } + +/* + * RedClient implementation - kept in red_channel.c because they are + * pretty tied together. + */ + +RedClient *red_client_new() +{ + RedClient *client; + + client = spice_malloc0(sizeof(RedClient)); + ring_init(&client->channels); + return client; +} + +void red_client_shutdown(RedClient *client) +{ + RingItem *link, *next; + + red_printf("#channels %d", client->channels_num); + RING_FOREACH_SAFE(link, next, &client->channels) { + red_channel_client_shutdown(SPICE_CONTAINEROF(link, RedChannelClient, client_link)); + } +} + +void red_client_destroy(RedClient *client) +{ + RingItem *link, *next; + RedChannelClient *rcc; + + red_printf("destroy client with #channels %d", client->channels_num); + RING_FOREACH_SAFE(link, next, &client->channels) { + // some channels may be in other threads, so disconnection + // is not synchronous. + rcc = SPICE_CONTAINEROF(link, RedChannelClient, client_link); + rcc->channel->disconnect(rcc); // this may call another thread. it also frees. (eventually - doesn't have to be in sync) + } + free(client); +} + +void red_client_disconnect(RedClient *client) +{ + RingItem *link, *next; + RedChannelClient *rcc; + + red_printf("#channels %d", client->channels_num); + RING_FOREACH_SAFE(link, next, &client->channels) { + // some channels may be in other threads, so disconnection + // is not synchronous. + rcc = SPICE_CONTAINEROF(link, RedChannelClient, client_link); + rcc->channel->disconnect(rcc); + } +} + +static void red_client_add_channel(RedClient *client, RedChannelClient *rcc) +{ + ASSERT(rcc && client); + ring_add(&client->channels, &rcc->client_link); + client->channels_num++; +} + +MainChannelClient *red_client_get_main(RedClient *client) { + return client->mcc; +} + +void red_client_set_main(RedClient *client, MainChannelClient *mcc) { + client->mcc = mcc; +} diff --git a/server/red_channel.h b/server/red_channel.h index 2bd3054..874fdf0 100644 --- a/server/red_channel.h +++ b/server/red_channel.h @@ -140,7 +140,9 @@ typedef uint64_t (*channel_handle_migrate_data_get_serial_proc)(RedChannelClient struct RedChannelClient { RingItem channel_link; + RingItem client_link; RedChannel *channel; + RedClient *client; RedsStream *stream; struct { uint32_t generation; @@ -172,6 +174,7 @@ struct RedChannel { int handle_acks; RedChannelClient *rcc; + uint32_t clients_num; OutgoingHandlerInterface outgoing_cb; IncomingHandlerInterface incoming_cb; @@ -219,6 +222,7 @@ RedChannel *red_channel_create_parser(int size, SpiceCoreInterface *core, int migrate, int handle_acks, channel_configure_socket_proc config_socket, + channel_disconnect_proc disconnect, spice_parse_channel_func_t parser, channel_handle_parsed_proc handle_parsed, channel_alloc_msg_recv_buf_proc alloc_recv_buf, @@ -231,13 +235,19 @@ RedChannel *red_channel_create_parser(int size, channel_handle_migrate_flush_mark_proc handle_migrate_flush_mark, channel_handle_migrate_data_proc handle_migrate_data, channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial); -RedChannelClient *red_channel_client_create(int size, RedChannel *channel, +RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient *client, RedsStream *stream); int red_channel_is_connected(RedChannel *channel); void red_channel_client_destroy(RedChannelClient *rcc); void red_channel_destroy(RedChannel *channel); +/* shutdown is the only safe thing to do out of the client/channel + * thread. It will not touch the rings, just shutdown the socket. + * It should be followed by some way to gurantee a disconnection. */ +void red_channel_client_shutdown(RedChannelClient *rcc); +void red_channel_shutdown(RedChannel *channel); + /* should be called when a new channel is ready to send messages */ void red_channel_init_outgoing_messages_window(RedChannel *channel); @@ -350,4 +360,17 @@ typedef void (*channel_client_visitor_data)(RedChannelClient *rcc, void *data); void red_channel_apply_clients(RedChannel *channel, channel_client_visitor v); void red_channel_apply_clients_data(RedChannel *channel, channel_client_visitor_data v, void *data); +struct RedClient { + RingItem link; + Ring channels; + int channels_num; + int disconnecting; + MainChannelClient *mcc; +}; + +RedClient *red_client_new(); +void red_client_destroy(RedClient *client); +void red_client_set_main(RedClient *client, MainChannelClient *mcc); +MainChannelClient *red_client_get_main(RedClient *client); + #endif diff --git a/server/red_dispatcher.c b/server/red_dispatcher.c index 56446ab..5463053 100644 --- a/server/red_dispatcher.c +++ b/server/red_dispatcher.c @@ -74,7 +74,8 @@ extern spice_wan_compression_t zlib_glz_state; static RedDispatcher *dispatchers = NULL; -static void red_dispatcher_set_peer(Channel *channel, RedsStream *stream, int migration, +static void red_dispatcher_set_peer(Channel *channel, RedClient *client, + RedsStream *stream, int migration, int num_common_caps, uint32_t *common_caps, int num_caps, uint32_t *caps) { @@ -84,6 +85,7 @@ static void red_dispatcher_set_peer(Channel *channel, RedsStream *stream, int mi dispatcher = (RedDispatcher *)channel->data; RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_CONNECT; write_message(dispatcher->channel, &message); + send_data(dispatcher->channel, &client, sizeof(RedClient *)); send_data(dispatcher->channel, &stream, sizeof(RedsStream *)); send_data(dispatcher->channel, &migration, sizeof(int)); } @@ -104,7 +106,7 @@ static void red_dispatcher_migrate(Channel *channel) write_message(dispatcher->channel, &message); } -static void red_dispatcher_set_cursor_peer(Channel *channel, RedsStream *stream, +static void red_dispatcher_set_cursor_peer(Channel *channel, RedClient *client, RedsStream *stream, int migration, int num_common_caps, uint32_t *common_caps, int num_caps, uint32_t *caps) @@ -113,6 +115,7 @@ static void red_dispatcher_set_cursor_peer(Channel *channel, RedsStream *stream, red_printf(""); RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_CONNECT; write_message(dispatcher->channel, &message); + send_data(dispatcher->channel, &client, sizeof(RedClient *)); send_data(dispatcher->channel, &stream, sizeof(RedsStream *)); send_data(dispatcher->channel, &migration, sizeof(int)); } @@ -388,9 +391,37 @@ static void qxl_worker_stop(QXLWorker *qxl_worker) ASSERT(message == RED_WORKER_MESSAGE_READY); } +static void red_dispatcher_send_disconnect(RedDispatcher *dispatcher, + struct RedChannelClient *rcc, RedWorkerMessage message) +{ + write_message(dispatcher->channel, &message); + send_data(dispatcher->channel, &rcc, sizeof(struct RedChannelClient *)); +} + +void red_dispatcher_disconnect_display_client(RedDispatcher *dispatcher, + struct RedChannelClient *rcc) +{ + RedWorkerMessage message = RED_WORKER_MESSAGE_STOP; + + red_dispatcher_send_disconnect(dispatcher, rcc, + RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT); + read_message(dispatcher->channel, &message); + ASSERT(message == RED_WORKER_MESSAGE_READY); +} + +void red_dispatcher_disconnect_cursor_client(RedDispatcher *dispatcher, + struct RedChannelClient *rcc) +{ + RedWorkerMessage message = RED_WORKER_MESSAGE_STOP; + + red_dispatcher_send_disconnect(dispatcher, rcc, + RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT); + read_message(dispatcher->channel, &message); + ASSERT(message == RED_WORKER_MESSAGE_READY); +} + static void qxl_worker_loadvm_commands(QXLWorker *qxl_worker, - struct QXLCommandExt *ext, - uint32_t count) + struct QXLCommandExt *ext, uint32_t count) { RedDispatcher *dispatcher = (RedDispatcher *)qxl_worker; RedWorkerMessage message = RED_WORKER_MESSAGE_LOADVM_COMMANDS; diff --git a/server/red_dispatcher.h b/server/red_dispatcher.h index 3f3c1ae..dc711db 100644 --- a/server/red_dispatcher.h +++ b/server/red_dispatcher.h @@ -18,6 +18,7 @@ #ifndef _H_RED_DISPATCHER #define _H_RED_DISPATCHER +struct RedChannelClient; struct RedDispatcher *red_dispatcher_init(QXLInstance *qxl); @@ -29,5 +30,9 @@ int red_dispatcher_count(void); int red_dispatcher_add_renderer(const char *name); uint32_t red_dispatcher_qxl_ram_size(void); int red_dispatcher_qxl_count(void); -#endif +void red_dispatcher_disconnect_display_client(struct RedDispatcher *dispatcher, + struct RedChannelClient *rcc); +void red_dispatcher_disconnect_cursor_client(struct RedDispatcher *dispatcher, + struct RedChannelClient *rcc); +#endif diff --git a/server/red_tunnel_worker.c b/server/red_tunnel_worker.c index c18c773..a07cf49 100644 --- a/server/red_tunnel_worker.c +++ b/server/red_tunnel_worker.c @@ -602,9 +602,9 @@ static void arm_timer(SlirpUsrNetworkInterface *usr_interface, UserTimer *timer, /* reds interface */ -static void handle_tunnel_channel_link(Channel *channel, RedsStream *stream, int migration, - int num_common_caps, uint32_t *common_caps, int num_caps, - uint32_t *caps); +static void handle_tunnel_channel_link(Channel *channel, RedClient *client, RedsStream *stream, + int migration, int num_common_caps, uint32_t *common_caps, + int num_caps, uint32_t *caps); static void handle_tunnel_channel_shutdown(struct Channel *channel); static void handle_tunnel_channel_migrate(struct Channel *channel); @@ -3434,9 +3434,9 @@ static void tunnel_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *item) { } -static void handle_tunnel_channel_link(Channel *channel, RedsStream *stream, int migration, - int num_common_caps, uint32_t *common_caps, int num_caps, - uint32_t *caps) +static void handle_tunnel_channel_link(Channel *channel, RedClient *client, RedsStream *stream, + int migration, int num_common_caps, uint32_t *common_caps, + int num_caps, uint32_t *caps) { TunnelChannel *tunnel_channel; TunnelWorker *worker = (TunnelWorker *)channel->data; @@ -3462,7 +3462,7 @@ static void handle_tunnel_channel_link(Channel *channel, RedsStream *stream, int if (!tunnel_channel) { return; } - red_channel_client_create(sizeof(RedChannelClient), &tunnel_channel->base, stream); + red_channel_client_create(sizeof(RedChannelClient), &tunnel_channel->base, client, stream); tunnel_channel->worker = worker; tunnel_channel->worker->channel = tunnel_channel; diff --git a/server/red_worker.c b/server/red_worker.c index 1379b51..5a86f0c 100644 --- a/server/red_worker.c +++ b/server/red_worker.c @@ -61,6 +61,7 @@ #include "generated_marshallers.h" #include "zlib_encoder.h" #include "red_channel.h" +#include "red_dispatcher.h" //#define COMPRESS_STAT //#define DUMP_BITMAP @@ -9026,6 +9027,7 @@ static void free_common_channel_from_listener(EventListener *ctx) free(common); } + static void worker_watch_update_mask(SpiceWatch *watch, int event_mask) { } @@ -9046,13 +9048,15 @@ SpiceCoreInterface worker_core = { }; static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_id, - RedsStream *stream, int migrate, + RedClient *client, RedsStream *stream, int migrate, event_listener_action_proc handler, channel_disconnect_proc disconnect, channel_send_pipe_item_proc send_item, channel_hold_pipe_item_proc hold_item, channel_release_pipe_item_proc release_item, channel_handle_parsed_proc handle_parsed, + channel_on_incoming_error_proc on_incoming_error, + channel_on_outgoing_error_proc on_outgoing_error, channel_handle_migrate_flush_mark_proc handle_migrate_flush_mark, channel_handle_migrate_data_proc handle_migrate_data, channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial) @@ -9064,6 +9068,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i channel = red_channel_create_parser(size, &worker_core, migrate, TRUE /* handle_acks */, common_channel_config_socket, + disconnect, spice_get_client_channel_parser(channel_id, NULL), handle_parsed, common_alloc_recv_buf, @@ -9071,8 +9076,8 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i hold_item, send_item, release_item, - red_channel_client_default_peer_on_error, - red_channel_client_default_peer_on_error, + on_incoming_error, + on_outgoing_error, handle_migrate_flush_mark, handle_migrate_data, handle_migrate_data_get_serial); @@ -9080,7 +9085,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i if (!channel) { goto error; } - red_channel_client_create(sizeof(RedChannelClient), channel, stream); + red_channel_client_create(sizeof(RedChannelClient), channel, client, stream); common->id = worker->id; common->listener.refs = 1; common->listener.action = handler; @@ -9171,25 +9176,72 @@ static void display_channel_release_item(RedChannelClient *rcc, PipeItem *item, } } -static void handle_new_display_channel(RedWorker *worker, RedsStream *stream, int migrate) +static void display_channel_on_incoming_error(RedChannelClient *rcc) +{ + red_printf(""); + red_channel_client_shutdown(rcc); +} + +static void display_channel_on_outgoing_error(RedChannelClient *rcc) +{ + red_printf(""); + red_channel_client_shutdown(rcc); +} + +static void cursor_channel_on_incoming_error(RedChannelClient *rcc) +{ + red_printf(""); + red_channel_client_shutdown(rcc); +} + +static void cursor_channel_on_outgoing_error(RedChannelClient *rcc) +{ + red_printf(""); + red_channel_client_shutdown(rcc); +} + +// call this from dispatcher thread context +static void dispatch_display_channel_client_disconnect(RedChannelClient *rcc) +{ + RedWorker *worker = ((DisplayChannel*)rcc->channel)->common.worker; + struct RedDispatcher *dispatcher = worker->qxl->st->dispatcher; + + red_printf(""); + red_dispatcher_disconnect_display_client(dispatcher, rcc); +} + +// call this from dispatcher thread context +static void dispatch_cursor_channel_client_disconnect(RedChannelClient *rcc) +{ + RedWorker *worker = ((CursorChannel*)rcc->channel)->common.worker; + struct RedDispatcher *dispatcher = worker->qxl->st->dispatcher; + + red_printf(""); + red_dispatcher_disconnect_cursor_client(dispatcher, rcc); +} + +static void handle_new_display_channel(RedWorker *worker, RedClient *client, RedsStream *stream, + int migrate) { DisplayChannel *display_channel; size_t stream_buf_size; red_disconnect_all_display_TODO_remove_me((RedChannel *)worker->display_channel); - if (!(display_channel = (DisplayChannel *)__new_channel(worker, sizeof(*display_channel), - SPICE_CHANNEL_DISPLAY, stream, - migrate, handle_channel_events, - red_disconnect_display, - display_channel_send_item, - display_channel_hold_pipe_item, - display_channel_release_item, - display_channel_handle_message, - display_channel_handle_migrate_mark, - display_channel_handle_migrate_data, - display_channel_handle_migrate_data_get_serial - ))) { + if (!(display_channel = (DisplayChannel *)__new_channel( + worker, sizeof(*display_channel), + SPICE_CHANNEL_DISPLAY, client, stream, + migrate, handle_channel_events, + dispatch_display_channel_client_disconnect, + display_channel_send_item, + display_channel_hold_pipe_item, + display_channel_release_item, + display_channel_handle_message, + display_channel_on_incoming_error, + display_channel_on_outgoing_error, + display_channel_handle_migrate_mark, + display_channel_handle_migrate_data, + display_channel_handle_migrate_data_get_serial))) { return; } #ifdef RED_STATISTICS @@ -9254,11 +9306,6 @@ static void handle_new_display_channel(RedWorker *worker, RedsStream *stream, in stat_compress_init(&display_channel->jpeg_alpha_stat, jpeg_alpha_stat_name); } -static void red_disconnect_cursor_client(RedChannelClient *rcc) -{ - red_disconnect_cursor(rcc->channel); -} - static void red_disconnect_cursor(RedChannel *channel) { CommonChannel *common; @@ -9315,23 +9362,27 @@ static void cursor_channel_release_item(RedChannelClient *rcc, PipeItem *item, i } } -static void red_connect_cursor(RedWorker *worker, RedsStream *stream, int migrate) +static void red_connect_cursor(RedWorker *worker, RedClient *client, RedsStream *stream, + int migrate) { CursorChannel *channel; red_disconnect_cursor((RedChannel *)worker->cursor_channel); - if (!(channel = (CursorChannel *)__new_channel(worker, sizeof(*channel), - SPICE_CHANNEL_CURSOR, stream, migrate, - handle_channel_events, - red_disconnect_cursor_client, - cursor_channel_send_item, - cursor_channel_hold_pipe_item, - cursor_channel_release_item, - red_channel_client_handle_message, - NULL, - NULL, - NULL))) { + if (!(channel = (CursorChannel *)__new_channel( + worker, sizeof(*channel), + SPICE_CHANNEL_CURSOR, client, stream, migrate, + handle_channel_events, + dispatch_cursor_channel_client_disconnect, + cursor_channel_send_item, + cursor_channel_hold_pipe_item, + cursor_channel_release_item, + red_channel_client_handle_message, + cursor_channel_on_incoming_error, + cursor_channel_on_outgoing_error, + NULL, + NULL, + NULL))) { return; } #ifdef RED_STATISTICS @@ -9769,12 +9820,24 @@ static void handle_dev_input(EventListener *listener, uint32_t events) break; case RED_WORKER_MESSAGE_DISPLAY_CONNECT: { RedsStream *stream; + RedClient *client; int migrate; red_printf("connect"); + receive_data(worker->channel, &client, sizeof(RedClient *)); receive_data(worker->channel, &stream, sizeof(RedsStream *)); receive_data(worker->channel, &migrate, sizeof(int)); - handle_new_display_channel(worker, stream, migrate); + handle_new_display_channel(worker, client, stream, migrate); + break; + } + case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT: { + RedChannelClient *rcc; + + red_printf("disconnect display client"); + receive_data(worker->channel, &rcc, sizeof(RedChannelClient *)); + red_disconnect_display(rcc); + message = RED_WORKER_MESSAGE_READY; + write_message(worker->channel, &message); break; } case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT: @@ -9817,12 +9880,24 @@ static void handle_dev_input(EventListener *listener, uint32_t events) break; case RED_WORKER_MESSAGE_CURSOR_CONNECT: { RedsStream *stream; + RedClient *client; int migrate; red_printf("cursor connect"); + receive_data(worker->channel, &client, sizeof(RedClient *)); receive_data(worker->channel, &stream, sizeof(RedsStream *)); receive_data(worker->channel, &migrate, sizeof(int)); - red_connect_cursor(worker, stream, migrate); + red_connect_cursor(worker, client, stream, migrate); + break; + } + case RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT: { + RedChannelClient *rcc; + + red_printf("disconnect cursor client"); + receive_data(worker->channel, &rcc, sizeof(RedChannelClient *)); + red_disconnect_cursor(rcc->channel); /* TODO - assumes a single client */ + message = RED_WORKER_MESSAGE_READY; + write_message(worker->channel, &message); break; } case RED_WORKER_MESSAGE_CURSOR_DISCONNECT: diff --git a/server/red_worker.h b/server/red_worker.h index b4e2ed2..f79412f 100644 --- a/server/red_worker.h +++ b/server/red_worker.h @@ -51,11 +51,13 @@ enum { RED_WORKER_MESSAGE_READY, RED_WORKER_MESSAGE_DISPLAY_CONNECT, RED_WORKER_MESSAGE_DISPLAY_DISCONNECT, + RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT, RED_WORKER_MESSAGE_DISPLAY_MIGRATE, RED_WORKER_MESSAGE_START, RED_WORKER_MESSAGE_STOP, RED_WORKER_MESSAGE_CURSOR_CONNECT, RED_WORKER_MESSAGE_CURSOR_DISCONNECT, + RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT, RED_WORKER_MESSAGE_CURSOR_MIGRATE, RED_WORKER_MESSAGE_SET_COMPRESSION, RED_WORKER_MESSAGE_SET_STREAMING_VIDEO, diff --git a/server/reds.c b/server/reds.c index 9e3133e..347fa4b 100644 --- a/server/reds.c +++ b/server/reds.c @@ -199,6 +199,8 @@ typedef struct RedsState { int disconnecting; VDIPortState agent_state; int pending_mouse_event; + Ring clients; + int num_clients; uint32_t link_id; Channel *main_channel_factory; MainChannel *main_channel; @@ -536,15 +538,6 @@ static Channel *reds_find_channel(uint32_t type, uint32_t id) return channel; } -static void reds_shatdown_channels() -{ - Channel *channel = reds->channels; - while (channel) { - channel->shutdown(channel); - channel = channel->next; - } -} - static void reds_mig_cleanup() { if (reds->mig_inprogress) { @@ -589,14 +582,14 @@ static int reds_main_channel_connected(void) return !!reds->main_channel; } -void reds_disconnect() +void reds_client_disconnect(RedClient *client) { - if (!reds_main_channel_connected() || reds->disconnecting) { + if (!reds_main_channel_connected() || client->disconnecting) { return; } red_printf(""); - reds->disconnecting = TRUE; + client->disconnecting = TRUE; reds->link_id = 0; /* Reset write filter to start with clean state on client reconnect */ @@ -616,14 +609,26 @@ void reds_disconnect() } } - reds_shatdown_channels(); - reds->main_channel_factory->shutdown(reds->main_channel_factory); - reds->main_channel_factory->data = NULL; - reds->main_channel = NULL; + ring_remove(&client->link); + reds->num_clients--; + red_client_destroy(client); + reds_mig_cleanup(); reds->disconnecting = FALSE; } +// TODO: go over all usage of reds_disconnect, most/some of it should be converted to +// reds_client_disconnect +static void reds_disconnect(void) +{ + RingItem *link, *next; + + red_printf(""); + RING_FOREACH_SAFE(link, next, &reds->clients) { + reds_client_disconnect(SPICE_CONTAINEROF(link, RedClient, link)); + } +} + static void reds_mig_disconnect() { if (reds_main_channel_connected()) { @@ -1361,6 +1366,7 @@ void reds_on_main_receive_migrate_data(MainMigrateData *data, uint8_t *end) static int sync_write(RedsStream *stream, const void *in_buf, size_t n) { const uint8_t *buf = (uint8_t *)in_buf; + while (n) { int now = reds_stream_write(stream, buf, n); if (now <= 0) { @@ -1460,7 +1466,6 @@ static int reds_send_link_ack(RedLinkInfo *link) BIO_get_mem_ptr(bio, &bmBuf); memcpy(ack.pub_key, bmBuf->data, sizeof(ack.pub_key)); - if (!sync_write(link->stream, &header, sizeof(header))) goto end; if (!sync_write(link->stream, &ack, sizeof(ack))) @@ -1518,6 +1523,7 @@ static void reds_send_link_result(RedLinkInfo *link, uint32_t error) // actually be joined with reds_handle_other_links, become reds_handle_link static void reds_handle_main_link(RedLinkInfo *link) { + RedClient *client; RedsStream *stream; SpiceLinkMess *link_mess; uint32_t *caps; @@ -1560,13 +1566,17 @@ static void reds_handle_main_link(RedLinkInfo *link) if (!reds->main_channel_factory) { reds->main_channel_factory = main_channel_init(); } - mcc = main_channel_link(reds->main_channel_factory, + client = red_client_new(); + ring_add(&reds->clients, &client->link); + reds->num_clients++; + mcc = main_channel_link(reds->main_channel_factory, client, stream, reds->mig_target, link_mess->num_common_caps, link_mess->num_common_caps ? caps : NULL, link_mess->num_channel_caps, link_mess->num_channel_caps ? caps + link_mess->num_common_caps : NULL); reds->main_channel = (MainChannel*)reds->main_channel_factory->data; ASSERT(reds->main_channel); free(link_mess); + red_client_set_main(client, mcc); if (vdagent) { SpiceCharDeviceInterface *sif; @@ -1628,11 +1638,21 @@ static void openssl_init(RedLinkInfo *link) static void reds_handle_other_links(RedLinkInfo *link) { Channel *channel; + RedClient *client = NULL; RedsStream *stream; SpiceLinkMess *link_mess; uint32_t *caps; link_mess = link->link_mess; + if (reds->num_clients == 1) { + client = SPICE_CONTAINEROF(ring_get_head(&reds->clients), RedClient, link); + } + + if (!client) { + reds_send_link_result(link, SPICE_LINK_ERR_BAD_CONNECTION_ID); + reds_link_free(link); + return; + } if (!reds->link_id || reds->link_id != link_mess->connection_id) { reds_send_link_result(link, SPICE_LINK_ERR_BAD_CONNECTION_ID); @@ -1660,7 +1680,7 @@ static void reds_handle_other_links(RedLinkInfo *link) link->link_mess = NULL; reds_link_free(link); caps = (uint32_t *)((uint8_t *)link_mess + link_mess->caps_offset); - channel->link(channel, stream, reds->mig_target, link_mess->num_common_caps, + channel->link(channel, client, stream, reds->mig_target, link_mess->num_common_caps, link_mess->num_common_caps ? caps : NULL, link_mess->num_channel_caps, link_mess->num_channel_caps ? caps + link_mess->num_common_caps : NULL); free(link_mess); @@ -3467,6 +3487,8 @@ static int do_spice_init(SpiceCoreInterface *core_interface) reds->listen_socket = -1; reds->secure_listen_socket = -1; init_vd_agent_resources(); + ring_init(&reds->clients); + reds->num_clients = 0; if (!(reds->mig_timer = core->timer_add(migrate_timout, NULL))) { red_error("migration timer create failed"); diff --git a/server/reds.h b/server/reds.h index 464803d..29ce15f 100644 --- a/server/reds.h +++ b/server/reds.h @@ -33,6 +33,7 @@ #define __visible__ __attribute__ ((visibility ("default"))) typedef struct RedsStream RedsStream; +typedef struct RedClient RedClient; typedef struct MainChannelClient MainChannelClient; #if HAVE_SASL @@ -93,7 +94,8 @@ typedef struct Channel { uint32_t *common_caps; int num_caps; uint32_t *caps; - void (*link)(struct Channel *, RedsStream *stream, int migration, int num_common_caps, + void (*link)(struct Channel *, RedClient *client, RedsStream *stream, + int migration, int num_common_caps, uint32_t *common_caps, int num_caps, uint32_t *caps); void (*shutdown)(struct Channel *); void (*migrate)(struct Channel *); @@ -136,7 +138,7 @@ extern uint64_t bitrate_per_sec; #define IS_LOW_BANDWIDTH() (bitrate_per_sec < 10 * 1024 * 1024) // Temporary measures to make splitting reds.c to inputs_channel.c easier -void reds_disconnect(void); +void reds_client_disconnect(RedClient *client); // Temporary (?) for splitting main channel typedef struct MainMigrateData MainMigrateData; diff --git a/server/smartcard.c b/server/smartcard.c index 36855a1..48072a3 100644 --- a/server/smartcard.c +++ b/server/smartcard.c @@ -487,7 +487,7 @@ static void smartcard_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *it { } -static void smartcard_link(Channel *channel, RedsStream *stream, +static void smartcard_link(Channel *channel, RedClient *client, RedsStream *stream, int migration, int num_common_caps, uint32_t *common_caps, int num_caps, uint32_t *caps) @@ -516,7 +516,7 @@ static void smartcard_link(Channel *channel, RedsStream *stream, red_printf("ERROR: smartcard channel creation failed"); return; } - red_channel_client_create(sizeof(RedChannelClient), channel->data, stream); + red_channel_client_create(sizeof(RedChannelClient), channel->data, client, stream); red_channel_init_outgoing_messages_window((RedChannel*)channel->data); } diff --git a/server/snd_worker.c b/server/snd_worker.c index 8da11e1..71b7f19 100644 --- a/server/snd_worker.c +++ b/server/snd_worker.c @@ -936,9 +936,9 @@ static void snd_playback_cleanup(SndChannel *channel) celt051_mode_destroy(playback_channel->celt_mode); } -static void snd_set_playback_peer(Channel *channel, RedsStream *stream, int migration, - int num_common_caps, uint32_t *common_caps, int num_caps, - uint32_t *caps) +static void snd_set_playback_peer(Channel *channel, RedClient *client, RedsStream *stream, + int migration, int num_common_caps, uint32_t *common_caps, + int num_caps, uint32_t *caps) { SndWorker *worker = (SndWorker *)channel; SpicePlaybackState *st = SPICE_CONTAINEROF(worker, SpicePlaybackState, worker); @@ -1102,9 +1102,9 @@ static void snd_record_cleanup(SndChannel *channel) celt051_mode_destroy(record_channel->celt_mode); } -static void snd_set_record_peer(Channel *channel, RedsStream *stream, int migration, - int num_common_caps, uint32_t *common_caps, int num_caps, - uint32_t *caps) +static void snd_set_record_peer(Channel *channel, RedClient *client, RedsStream *stream, + int migration, int num_common_caps, uint32_t *common_caps, + int num_caps, uint32_t *caps) { SndWorker *worker = (SndWorker *)channel; SpiceRecordState *st = SPICE_CONTAINEROF(worker, SpiceRecordState, worker); -- 1.7.5.1 _______________________________________________ Spice-devel mailing list Spice-devel@lists.freedesktop.org http://lists.freedesktop.org/mailman/listinfo/spice-devel