Use set_fd_handler instead of a listening thread. Change the reading/parsing function to be state-machine based, because they no longer have their own thread.
--- Makefile | 6 +- Makefile.objs | 10 +- configure | 3 +- replication/repagent_client.c | 28 ++++++- replication/repcmd_listener.c | 166 +++++++++++++++++++++++++---------------- replication/repcmd_listener.h | 8 +- 6 files changed, 141 insertions(+), 80 deletions(-) diff --git a/Makefile b/Makefile index fbd77df..b6379fb 100644 --- a/Makefile +++ b/Makefile @@ -156,9 +156,9 @@ tools-obj-y = $(oslib-obj-y) $(trace-obj-y) qemu-tool.o qemu-timer.o \ qemu-timer-common.o main-loop.o notify.o iohandler.o cutils.o async.o tools-obj-$(CONFIG_POSIX) += compatfd.o -qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y) $(replication-obj-y) -qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y) $(replication-obj-y) -qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(block-obj-y) $(replication-obj-y) +qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y) +qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y) +qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(block-obj-y) qemu-bridge-helper$(EXESUF): qemu-bridge-helper.o qemu-bridge-helper.o: $(GENERATED_HEADERS) diff --git a/Makefile.objs b/Makefile.objs index a28eefb..01413a2 100755 --- a/Makefile.objs +++ b/Makefile.objs @@ -30,6 +30,11 @@ block-obj-y += $(coroutine-obj-y) $(qobject-obj-y) $(version-obj-y) block-obj-$(CONFIG_POSIX) += posix-aio-compat.o block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o +# Replication agent +replication-nested-y = repagent_client.o repagent.o repcmd_listener.o +replication-obj-y = $(addprefix replication/, $(replication-nested-y)) +block-obj-y += $(replication-obj-y) + block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vvfat.o block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-cache.o block-nested-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o @@ -423,11 +428,6 @@ common-obj-y += qmp-marshal.o qapi-visit.o qapi-types.o $(qapi-obj-y) common-obj-y += qmp.o hmp.o ###################################################################### -# replication -replication-nested-y = repagent_client.o repagent.o repcmd_listener.o -replication-obj-y = $(addprefix replication/, $(replication-nested-y)) - -###################################################################### # guest agent qga-nested-y = commands.o guest-agent-command-state.o diff --git a/configure b/configure index f97394f..83b74c2 100755 --- a/configure +++ b/configure @@ -2883,7 +2883,6 @@ echo "curses support $curses" echo "curl support $curl" echo "mingw32 support $mingw32" echo "Audio drivers $audio_drv_list" -echo "Replication $replication" echo "Extra audio cards $audio_card_list" echo "Block whitelist $block_drv_whitelist" echo "Mixer emulation $mixemu" @@ -3904,3 +3903,5 @@ symlink $source_path/Makefile.user $d/Makefile if test "$docs" = "yes" ; then mkdir -p QMP fi + +echo "Replication $replication" diff --git a/replication/repagent_client.c b/replication/repagent_client.c index 4dd9ea4..eaa0a28 100644 --- a/replication/repagent_client.c +++ b/replication/repagent_client.c @@ -3,6 +3,7 @@ #include "repcmd_listener.h" #include "repagent_client.h" #include "repagent.h" +#include "main-loop.h" #include <string.h> #include <stdlib.h> @@ -26,6 +27,15 @@ typedef struct repagent_client_state { static repagent_client_state g_client_state = { 0 }; +static void repagent_client_read(void *opaque) +{ + printf("repagent_client_read\n"); + int bytes_read = repcmd_listener_socket_read_next_buf(g_client_state.hsock); + if (bytes_read <= 0) { + g_client_state.is_connected = 0; + } +} + void *repagent_listen(void *pParam) { rephub_params *pServerParams = (rephub_params *) pParam; @@ -80,13 +90,25 @@ void *repagent_listen(void *pParam) } retries = 0; - g_client_state.is_connected = 1; repagent_client_connected(); - repcmd_listener(g_client_state.hsock, repagent_process_cmd, NULL); - close(g_client_state.hsock); + repcmd_listener_init(repagent_process_cmd, NULL); + g_client_state.is_connected = 1; + static int c; + /* repcmd_listener_socket_thread_listener(g_client_state.hsock); */ + qemu_set_fd_handler(g_client_state.hsock, repagent_client_read, NULL, + NULL); + while (g_client_state.is_connected) { + printf("Connected (%d)...\n", c++); + usleep(1 * 1000 * 1000); + } + /* Unregister */ + qemu_set_fd_handler(g_client_state.hsock, NULL, NULL, NULL); + printf("Disconnected\n"); g_client_state.is_connected = 0; + close(g_client_state.hsock); + } return 0; } diff --git a/replication/repcmd_listener.c b/replication/repcmd_listener.c index a211927..c1ce97f 100644 --- a/replication/repcmd_listener.c +++ b/replication/repcmd_listener.c @@ -26,93 +26,129 @@ #define ZERO_MEM_OBJ(pObj) memset((void *)pObj, 0, sizeof(*pObj)) + +typedef struct RepCmdRxCmdState { + RepCmd curCmd; + uint8_t *pReadBuf; + int bytesToGet; + int bytesGotten; + int isGotHeader; + uint8_t *pdata; +} RepCmdRxCmdState; + typedef struct RepCmdListenerState { int is_terminate_receive; + pfn_received_cmd_cb receive_cb; + void *opaque; + int hsock; + RepCmdRxCmdState cur_cmd; } RepCmdListenerState; static RepCmdListenerState g_listenerState = { 0 }; -/* Returns 0 for initiated termination or socket error value on error */ -int repcmd_listener(int hsock, pfn_received_cmd_cb callback, void *clientPtr) +static int repcmd_listener_process_rx(int bytecount); + +void repcmd_listener_init(pfn_received_cmd_cb callback, void *opaque) { - RepCmd curCmd; - uint8_t *pReadBuf = (uint8_t *) &curCmd; - int bytesToGet = sizeof(RepCmd); - int bytesGotten = 0; - int isGotHeader = 0; - uint8_t *pdata = NULL; + ZERO_MEM_OBJ(&g_listenerState); + g_listenerState.receive_cb = callback; + g_listenerState.opaque = opaque; - assert(callback != NULL); + g_listenerState.cur_cmd.bytesToGet = sizeof(RepCmd); + g_listenerState.cur_cmd.pReadBuf = + (uint8_t *) &g_listenerState.cur_cmd.curCmd; +} +int repcmd_listener_socket_read_next_buf(int hsock) +{ + RepCmdRxCmdState *cmd_state = &g_listenerState.cur_cmd; + int bytecount = recv(hsock, cmd_state->pReadBuf + cmd_state->bytesGotten, + cmd_state->bytesToGet - cmd_state->bytesGotten, 0); + return repcmd_listener_process_rx(bytecount); +} + +/* Returns 0 for initiated termination or socket error value on error */ +int repcmd_listener_socket_thread_listener(int hsock) +{ + int ret = 0; /* receive loop */ while (!g_listenerState.is_terminate_receive) { - int bytecount; - - bytecount = recv(hsock, pReadBuf + bytesGotten, - bytesToGet - bytesGotten, 0); - if (bytecount == -1) { - fprintf(stderr, "Error receiving data %d\n", errno); - return errno; + ret = repcmd_listener_socket_read_next_buf(hsock); + if (ret <= 0) { + return ret; } + } + return 0; +} - if (bytecount == 0) { - printf("Disconnected\n"); - return 0; - } - bytesGotten += bytecount; +static int repcmd_listener_process_rx(int bytecount) +{ + RepCmdRxCmdState *cmd_state = &g_listenerState.cur_cmd; + if (bytecount == -1) { + fprintf(stderr, "Error receiving data %d\n", errno); + return errno; + } + + if (bytecount == 0) { + printf("Disconnected\n"); + return 0; + } + cmd_state->bytesGotten += bytecount; /* printf("Recieved bytes %d, got %d/%d\n", - bytecount, bytesGotten, bytesToGet); */ - /* print content */ - if (0) { - int i; - for (i = 0; i < bytecount ; i += 4) { - /*printf("%d/%d", i, bytecount/4); */ - printf("%#x ", - *(int *) (&pReadBuf[bytesGotten - bytecount + i])); + bytecount, cmd_state->bytesGotten, cmd_state->bytesToGet); */ + /* print content */ + if (0) { + int i; + for (i = 0; i < bytecount ; i += 4) { + /*printf("%d/%d", i, bytecount/4); */ + printf( + "%#x ", + *(int *) (&cmd_state->pReadBuf[cmd_state->bytesGotten + - bytecount + i])); - } - printf("\n"); } - assert(bytesGotten <= bytesToGet); - if (bytesGotten == bytesToGet) { - int isGotData = 0; - bytesGotten = 0; - if (!isGotHeader) { - /* We just got the header */ - isGotHeader = 1; - - assert(curCmd.hdr.magic1 == REPCMD_MAGIC1); - assert(curCmd.magic2 == REPCMD_MAGIC2); - if (curCmd.hdr.data_size_bytes > 0) { - pdata = (uint8_t *)REPCMD_MALLOC( - curCmd.hdr.data_size_bytes); -/* printf("malloc %p\n", pdata); */ - pReadBuf = pdata; - } else { - /* no data */ - isGotData = 1; - pdata = NULL; - } - bytesToGet = curCmd.hdr.data_size_bytes; + printf("\n"); + } + assert(cmd_state->bytesGotten <= cmd_state->bytesToGet); + if (cmd_state->bytesGotten == cmd_state->bytesToGet) { + int isGotData = 0; + cmd_state->bytesGotten = 0; + if (!cmd_state->isGotHeader) { + /* We just got the header */ + cmd_state->isGotHeader = 1; + + assert(cmd_state->curCmd.hdr.magic1 == REPCMD_MAGIC1); + assert(cmd_state->curCmd.magic2 == REPCMD_MAGIC2); + if (cmd_state->curCmd.hdr.data_size_bytes > 0) { + cmd_state->pdata = (uint8_t *)REPCMD_MALLOC( + cmd_state->curCmd.hdr.data_size_bytes); +/* printf("malloc %p\n", cmd_state->pdata); */ + cmd_state->pReadBuf = cmd_state->pdata; } else { + /* no data */ isGotData = 1; + cmd_state->pdata = NULL; } + cmd_state->bytesToGet = cmd_state->curCmd.hdr.data_size_bytes; + } else { + isGotData = 1; + } - if (isGotData) { - /* Got command and data */ - (*callback)(&curCmd, pdata, clientPtr); - - /* It's the callee responsibility to free pData */ - pdata = NULL; - ZERO_MEM_OBJ(&curCmd); - pReadBuf = (uint8_t *) &curCmd; - bytesGotten = 0; - bytesToGet = sizeof(RepCmd); - isGotHeader = 0; - } + if (isGotData) { + /* Got command and data */ + (*g_listenerState.receive_cb)(&cmd_state->curCmd, cmd_state->pdata, + g_listenerState.opaque); + + /* It's the callee responsibility to free cmd_state->pdata */ + cmd_state->pdata = NULL; + ZERO_MEM_OBJ(&cmd_state->curCmd); + cmd_state->pReadBuf = (uint8_t *) &cmd_state->curCmd; + cmd_state->bytesGotten = 0; + cmd_state->bytesToGet = sizeof(RepCmd); + cmd_state->isGotHeader = 0; } } - return 0; + return bytecount; } RepCmd *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata) diff --git a/replication/repcmd_listener.h b/replication/repcmd_listener.h index c09a12e..19b9ea9 100644 --- a/replication/repcmd_listener.h +++ b/replication/repcmd_listener.h @@ -24,9 +24,11 @@ #ifndef REPCMD_LISTENER_H #define REPCMD_LISTENER_H #include <stdint.h> -typedef void (*pfn_received_cmd_cb)(RepCmd *pCmd, - uint8_t *pData, void *clientPtr); +typedef void (*pfn_received_cmd_cb)(RepCmd *pcmd, + uint8_t *pdata, void *opaque); -int repcmd_listener(int hsock, pfn_received_cmd_cb callback, void *clientPtr); +void repcmd_listener_init(pfn_received_cmd_cb callback, void *opaque); +int repcmd_listener_socket_read_next_buf(int hsock); +int repcmd_listener_socket_thread_listener(int hsock); #endif /* REPCMD_LISTENER_H */ -- 1.7.6.5