Implement reconnect. To achieve this: 1. Move from quit bool variable to state. 4 states are introduced: connecting-wait: means, that reconnecting is in progress, and there were small number of reconnect attempts, so all requests are waiting for the connection. connecting-nowait: reconnecting is in progress, there were a lot of attempts of reconnect, all requests will return errors. connected: normal state quit: exiting after fatal error or on close
Possible transitions are: * -> quit connecting-* -> connected connecting-wait -> connecting-nowait connected -> connecting-wait 2. Implement reconnect in connection_co. So, in connecting-* mode, connection_co, tries to reconnect every NBD_RECONNECT_NS. Configuring of this parameter (as well as NBD_RECONNECT_ATTEMPTS, which specifies bound of transition from connecting-wait to connecting-nowait) may be done as a follow-up patch. 3. Retry nbd queries on channel error, if we are in connecting-wait state. 4. In init, wait until for connection until transition to connecting-nowait. So, NBD_RECONNECT_ATTEMPTS is a bound of fail for initial connection too. Signed-off-by: Vladimir Sementsov-Ogievskiy <vsement...@virtuozzo.com> --- block/nbd-client.h | 2 + block/nbd-client.c | 170 ++++++++++++++++++++++++++++++++++++++--------------- 2 files changed, 123 insertions(+), 49 deletions(-) diff --git a/block/nbd-client.h b/block/nbd-client.h index 2561e1ea42..1249f2eb52 100644 --- a/block/nbd-client.h +++ b/block/nbd-client.h @@ -44,6 +44,8 @@ typedef struct NBDClientSession { bool receiving; int connect_status; Error *connect_err; + int connect_attempts; + bool wait_in_flight; NBDClientRequest requests[MAX_NBD_REQUESTS]; NBDReply reply; diff --git a/block/nbd-client.c b/block/nbd-client.c index f22ed7f404..49b1f67047 100644 --- a/block/nbd-client.c +++ b/block/nbd-client.c @@ -41,10 +41,16 @@ static int nbd_client_connect(BlockDriverState *bs, const char *hostname, Error **errp); -/* @ret would be used for reconnect in future */ static void nbd_channel_error(NBDClientSession *s, int ret) { - s->state = NBD_CLIENT_QUIT; + if (ret == -EIO) { + if (s->state == NBD_CLIENT_CONNECTED) { + s->state = NBD_CLIENT_CONNECTING_WAIT; + s->connect_attempts = 0; + } + } else { + s->state = NBD_CLIENT_QUIT; + } } static void nbd_recv_coroutines_wake_all(NBDClientSession *s) @@ -90,6 +96,19 @@ typedef struct NBDConnection { uint64_t reconnect_timeout; } NBDConnection; +static bool nbd_client_connecting(NBDClientSession *client) +{ + return client->state == NBD_CLIENT_CONNECTING_WAIT || + client->state == NBD_CLIENT_CONNECTING_NOWAIT || + client->state == NBD_CLIENT_CONNECTING_INIT; +} + +static bool nbd_client_connecting_wait(NBDClientSession *client) +{ + return client->state == NBD_CLIENT_CONNECTING_WAIT || + client->state == NBD_CLIENT_CONNECTING_INIT; +} + static coroutine_fn void nbd_connection_entry(void *opaque) { NBDConnection *con = opaque; @@ -98,26 +117,55 @@ static coroutine_fn void nbd_connection_entry(void *opaque) int ret = 0; Error *local_err = NULL; - if (con->reconnect_attempts != 0) { - error_setg(&s->connect_err, "Reconnect is not supported yet"); - s->connect_status = -EINVAL; - nbd_channel_error(s, s->connect_status); - return; - } + while (s->state != NBD_CLIENT_QUIT) { + assert(s->reply.handle == 0); - s->connect_status = nbd_client_connect(con->bs, con->saddr, - con->export, con->tlscreds, - con->hostname, &s->connect_err); - if (s->connect_status < 0) { - nbd_channel_error(s, s->connect_status); - return; - } + if (nbd_client_connecting(s)) { + if (s->connect_attempts == con->reconnect_attempts) { + s->state = NBD_CLIENT_CONNECTING_NOWAIT; + qemu_co_queue_restart_all(&s->free_sema); + } - /* successfully connected */ - s->state = NBD_CLIENT_CONNECTED; + qemu_co_mutex_lock(&s->send_mutex); + + while (s->in_flight > 0) { + qemu_co_mutex_unlock(&s->send_mutex); + nbd_recv_coroutines_wake_all(s); + s->wait_in_flight = true; + qemu_coroutine_yield(); + s->wait_in_flight = false; + qemu_co_mutex_lock(&s->send_mutex); + } + + qemu_co_mutex_unlock(&s->send_mutex); + + /* Now we are sure, that nobody accessing the channel now and nobody + * will try to access the channel, until we set state to CONNECTED + */ + + s->connect_status = nbd_client_connect(con->bs, con->saddr, + con->export, con->tlscreds, + con->hostname, &local_err); + s->connect_attempts++; + error_free(s->connect_err); + s->connect_err = NULL; + error_propagate(&s->connect_err, local_err); + local_err = NULL; + if (s->connect_status == -EINVAL) { + /* Protocol error or something like this */ + nbd_channel_error(s, s->connect_status); + continue; + } + if (s->connect_status < 0) { + qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, con->reconnect_timeout); + continue; + } + + /* successfully connected */ + s->state = NBD_CLIENT_CONNECTED; + qemu_co_queue_restart_all(&s->free_sema); + } - while (s->state != NBD_CLIENT_QUIT) { - assert(s->reply.handle == 0); s->receiving = true; ret = nbd_receive_reply(s->ioc, &s->reply, &local_err); s->receiving = false; @@ -158,6 +206,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque) qemu_coroutine_yield(); } + qemu_co_queue_restart_all(&s->free_sema); nbd_recv_coroutines_wake_all(s); s->connection_co = NULL; } @@ -170,7 +219,7 @@ static int nbd_co_send_request(BlockDriverState *bs, int rc, i = -1; qemu_co_mutex_lock(&s->send_mutex); - while (s->in_flight == MAX_NBD_REQUESTS) { + while (s->in_flight == MAX_NBD_REQUESTS || nbd_client_connecting_wait(s)) { qemu_co_queue_wait(&s->free_sema, &s->send_mutex); } @@ -221,7 +270,11 @@ err: s->requests[i].coroutine = NULL; s->in_flight--; } - qemu_co_queue_next(&s->free_sema); + if (s->in_flight == 0 && s->wait_in_flight) { + aio_co_wake(s->connection_co); + } else { + qemu_co_queue_next(&s->free_sema); + } } qemu_co_mutex_unlock(&s->send_mutex); return rc; @@ -671,7 +724,11 @@ break_loop: qemu_co_mutex_lock(&s->send_mutex); s->in_flight--; - qemu_co_queue_next(&s->free_sema); + if (s->in_flight == 0 && s->wait_in_flight) { + aio_co_wake(s->connection_co); + } else { + qemu_co_queue_next(&s->free_sema); + } qemu_co_mutex_unlock(&s->send_mutex); return false; @@ -820,16 +877,21 @@ static int nbd_co_request(BlockDriverState *bs, NBDRequest *request, } else { assert(request->type != NBD_CMD_WRITE); } - ret = nbd_co_send_request(bs, request, write_qiov); - if (ret < 0) { - return ret; - } - ret = nbd_co_receive_return_code(client, request->handle, - &request_ret, &local_err); - if (local_err) { - error_report_err(local_err); - } + do { + ret = nbd_co_send_request(bs, request, write_qiov); + if (ret < 0) { + continue; + } + + ret = nbd_co_receive_return_code(client, request->handle, + &request_ret, &local_err); + if (local_err) { + error_report_err(local_err); + local_err = NULL; + } + } while (ret < 0 && nbd_client_connecting_wait(client)); + return ret ? ret : request_ret; } @@ -851,16 +913,21 @@ int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset, if (!bytes) { return 0; } - ret = nbd_co_send_request(bs, &request, NULL); - if (ret < 0) { - return ret; - } - ret = nbd_co_receive_cmdread_reply(client, request.handle, offset, qiov, - &request_ret, &local_err); - if (local_err) { - error_report_err(local_err); - } + do { + ret = nbd_co_send_request(bs, &request, NULL); + if (ret < 0) { + continue; + } + + ret = nbd_co_receive_cmdread_reply(client, request.handle, offset, qiov, + &request_ret, &local_err); + if (local_err) { + error_report_err(local_err); + local_err = NULL; + } + } while (ret < 0 && nbd_client_connecting_wait(client)); + return ret ? ret : request_ret; } @@ -974,16 +1041,21 @@ int coroutine_fn nbd_client_co_block_status(BlockDriverState *bs, return BDRV_BLOCK_DATA; } - ret = nbd_co_send_request(bs, &request, NULL); - if (ret < 0) { - return ret; - } + do { + ret = nbd_co_send_request(bs, &request, NULL); + if (ret < 0) { + continue; + } + + ret = nbd_co_receive_blockstatus_reply(client, request.handle, bytes, + &extent, &request_ret, + &local_err); + if (local_err) { + error_report_err(local_err); + local_err = NULL; + } + } while (ret < 0 && nbd_client_connecting_wait(client)); - ret = nbd_co_receive_blockstatus_reply(client, request.handle, bytes, - &extent, &request_ret, &local_err); - if (local_err) { - error_report_err(local_err); - } if (ret < 0 || request_ret < 0) { return ret ? ret : request_ret; } -- 2.11.1