This patch avoids retrying send/recv in AIO path when the sheepdog connection is not ready for the operation.
Signed-off-by: MORITA Kazutaka <morita.kazut...@lab.ntt.co.jp> --- block/sheepdog.c | 417 +++++++++++++++++++++++++++++++++++++----------------- 1 files changed, 289 insertions(+), 128 deletions(-) diff --git a/block/sheepdog.c b/block/sheepdog.c index a54e0de..cedf806 100644 --- a/block/sheepdog.c +++ b/block/sheepdog.c @@ -242,6 +242,19 @@ static inline int is_snapshot(struct SheepdogInode *inode) typedef struct SheepdogAIOCB SheepdogAIOCB; +enum ConnectionState { + C_IO_HEADER, + C_IO_DATA, + C_IO_END, + C_IO_CLOSED, +}; + +enum AIOReqState { + AIO_PENDING, /* not ready for sending this request */ + AIO_SEND_OBJREQ, /* send this request */ + AIO_RECV_OBJREQ, /* receive a result of this request */ +}; + typedef struct AIOReq { SheepdogAIOCB *aiocb; unsigned int iov_offset; @@ -253,6 +266,9 @@ typedef struct AIOReq { uint8_t flags; uint32_t id; + enum AIOReqState state; + struct SheepdogObjReq hdr; + QLIST_ENTRY(AIOReq) outstanding_aio_siblings; QLIST_ENTRY(AIOReq) aioreq_siblings; } AIOReq; @@ -348,12 +364,14 @@ static const char * sd_strerror(int err) * 1. In the sd_aio_readv/writev, read/write requests are added to the * QEMU Bottom Halves. * - * 2. In sd_readv_writev_bh_cb, the callbacks of BHs, we send the I/O - * requests to the server and link the requests to the - * outstanding_list in the BDRVSheepdogState. we exits the - * function without waiting for receiving the response. + * 2. In sd_readv_writev_bh_cb, the callbacks of BHs, we set up the + * I/O requests to the server and link the requests to the + * outstanding_list in the BDRVSheepdogState. + * + * 3. We send the request in aio_send_request, the fd handler to the + * sheepdog connection. * - * 3. We receive the response in aio_read_response, the fd handler to + * 4. We receive the response in aio_read_response, the fd handler to * the sheepdog connection. If metadata update is needed, we send * the write request to the vdi object in sd_write_done, the write * completion function. The AIOCB callback is not called until all @@ -377,8 +395,6 @@ static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb, aio_req->flags = flags; aio_req->id = s->aioreq_seq_num++; - QLIST_INSERT_HEAD(&s->outstanding_aio_head, aio_req, - outstanding_aio_siblings); QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings); return aio_req; @@ -640,20 +656,17 @@ static int do_readv_writev(int sockfd, struct iovec *iov, int len, again: ret = do_send_recv(sockfd, iov, len, iov_offset, write); if (ret < 0) { - if (errno == EINTR || errno == EAGAIN) { + if (errno == EINTR) { goto again; } + if (errno == EAGAIN) { + return 0; + } error_report("failed to recv a rsp, %s\n", strerror(errno)); - return 1; - } - - iov_offset += ret; - len -= ret; - if (len) { - goto again; + return -errno; } - return 0; + return ret; } static int do_readv(int sockfd, struct iovec *iov, int len, int iov_offset) @@ -666,30 +679,30 @@ static int do_writev(int sockfd, struct iovec *iov, int len, int iov_offset) return do_readv_writev(sockfd, iov, len, iov_offset, 1); } -static int do_read_write(int sockfd, void *buf, int len, int write) +static int do_read_write(int sockfd, void *buf, int len, int skip, int write) { struct iovec iov; iov.iov_base = buf; - iov.iov_len = len; + iov.iov_len = len + skip; - return do_readv_writev(sockfd, &iov, len, 0, write); + return do_readv_writev(sockfd, &iov, len, skip, write); } -static int do_read(int sockfd, void *buf, int len) +static int do_read(int sockfd, void *buf, int len, int skip) { - return do_read_write(sockfd, buf, len, 0); + return do_read_write(sockfd, buf, len, skip, 0); } -static int do_write(int sockfd, void *buf, int len) +static int do_write(int sockfd, void *buf, int len, int skip) { - return do_read_write(sockfd, buf, len, 1); + return do_read_write(sockfd, buf, len, skip, 1); } static int send_req(int sockfd, SheepdogReq *hdr, void *data, unsigned int *wlen) { - int ret; + int ret, done = 0; struct iovec iov[2]; iov[0].iov_base = hdr; @@ -700,19 +713,23 @@ static int send_req(int sockfd, SheepdogReq *hdr, void *data, iov[1].iov_len = *wlen; } - ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen, 0); - if (ret) { - error_report("failed to send a req, %s\n", strerror(errno)); - ret = -1; + while (done < sizeof(*hdr) + *wlen) { + ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen - done, done); + if (ret < 0) { + error_report("failed to send a req, %s\n", strerror(errno)); + ret = -1; + } + done += ret; } - return ret; + return 0; } +/* This function shouldn't be used for asynchronous I/O */ static int do_req(int sockfd, SheepdogReq *hdr, void *data, unsigned int *wlen, unsigned int *rlen) { - int ret; + int ret, done; ret = send_req(sockfd, hdr, data, wlen); if (ret) { @@ -720,33 +737,39 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data, goto out; } - ret = do_read(sockfd, hdr, sizeof(*hdr)); - if (ret) { - error_report("failed to get a rsp, %s\n", strerror(errno)); - ret = -1; - goto out; + done = 0; + while (done < sizeof(*hdr)) { + ret = do_read(sockfd, hdr, sizeof(*hdr) - done, done); + if (ret < 0) { + error_report("failed to get a rsp, %s\n", strerror(errno)); + ret = -1; + goto out; + } + done += ret; } if (*rlen > hdr->data_length) { *rlen = hdr->data_length; } - if (*rlen) { - ret = do_read(sockfd, data, *rlen); - if (ret) { + done = 0; + while (done < *rlen) { + ret = do_read(sockfd, data, *rlen - done, done); + if (ret < 0) { error_report("failed to get the data, %s\n", strerror(errno)); ret = -1; goto out; } + done += ret; } ret = 0; out: return ret; } -static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, - struct iovec *iov, int niov, int create, - enum AIOCBState aiocb_type); +static void setup_aio_header(BDRVSheepdogState *s, AIOReq *aio_req, int create, + enum AIOCBState aiocb_type); +static int sd_update_fd_handler(BDRVSheepdogState *s); /* * This function searchs pending requests to the object `oid', and @@ -756,10 +779,12 @@ static void send_pending_req(BDRVSheepdogState *s, uint64_t oid, uint32_t id) { AIOReq *aio_req, *next; SheepdogAIOCB *acb; - int ret; QLIST_FOREACH_SAFE(aio_req, &s->outstanding_aio_head, outstanding_aio_siblings, next) { + if (aio_req->state != AIO_PENDING) { + continue; + } if (id == aio_req->id) { continue; } @@ -768,15 +793,9 @@ static void send_pending_req(BDRVSheepdogState *s, uint64_t oid, uint32_t id) } acb = aio_req->aiocb; - ret = add_aio_request(s, aio_req, acb->qiov->iov, - acb->qiov->niov, 0, acb->aiocb_type); - if (ret < 0) { - error_report("add_aio_request is failed\n"); - free_aio_req(s, aio_req); - if (QLIST_EMPTY(&acb->aioreq_head)) { - sd_finish_aiocb(acb); - } - } + aio_req->state = AIO_SEND_OBJREQ; + setup_aio_header(s, aio_req, 0, acb->aiocb_type); + sd_update_fd_handler(s); } } @@ -788,38 +807,92 @@ static void send_pending_req(BDRVSheepdogState *s, uint64_t oid, uint32_t id) */ static void aio_read_response(void *opaque) { - SheepdogObjRsp rsp; + static SheepdogObjRsp rsp; BDRVSheepdogState *s = opaque; int fd = s->fd; int ret; - AIOReq *aio_req = NULL; - SheepdogAIOCB *acb; + static AIOReq *aio_req; + static SheepdogAIOCB *acb; int rest; unsigned long idx; + static int done; + static enum ConnectionState conn_state = C_IO_HEADER; if (QLIST_EMPTY(&s->outstanding_aio_head)) { return; } - /* read a header */ - ret = do_read(fd, &rsp, sizeof(rsp)); - if (ret) { - error_report("failed to get the header, %s\n", strerror(errno)); - return; - } + switch (conn_state) { + case C_IO_HEADER: + /* read a header */ + ret = do_read(fd, &rsp, sizeof(rsp) - done, done); + if (ret < 0) { + error_report("failed to get the header, %s\n", strerror(errno)); + conn_state = C_IO_CLOSED; + break; + } + done += ret; + if (done < sizeof(rsp)) { + break; + } + done = 0; - /* find the right aio_req from the outstanding_aio list */ - QLIST_FOREACH(aio_req, &s->outstanding_aio_head, outstanding_aio_siblings) { - if (aio_req->id == rsp.id) { + /* find the right aio_req from the outstanding_aio list */ + QLIST_FOREACH(aio_req, &s->outstanding_aio_head, + outstanding_aio_siblings) { + if (aio_req->state == AIO_RECV_OBJREQ && aio_req->id == rsp.id) { + break; + } + } + if (!aio_req) { + error_report("bug: cannot find aio_req %x\n", rsp.id); + return; + } + acb = aio_req->aiocb; + + if (rsp.result != SD_RES_SUCCESS) { + acb->ret = -EIO; + error_report("%s\n", sd_strerror(rsp.result)); + conn_state = C_IO_END; break; } + + if (acb->aiocb_type == AIOCB_WRITE_UDATA) { + conn_state = C_IO_END; + break; + } + conn_state = C_IO_DATA; + case C_IO_DATA: + ret = do_readv(fd, acb->qiov->iov, aio_req->data_len - done, + aio_req->iov_offset + done); + if (ret < 0) { + error_report("failed to get the data, %s\n", strerror(errno)); + conn_state = C_IO_CLOSED; + } + + done += ret; + if (done < aio_req->data_len) { + break; + } + done = 0; + conn_state = C_IO_END; + break; + default: + error_report("bug: invalid rx state %d", conn_state); + break; } - if (!aio_req) { - error_report("cannot find aio_req %x\n", rsp.id); - return; + + if (conn_state == C_IO_CLOSED) { + acb->ret = -EIO; + rest = free_aio_req(s, aio_req); + if (!rest) { + acb->aio_done_func(acb); + } } - acb = aio_req->aiocb; + if (conn_state != C_IO_END) { + return; + } switch (acb->aiocb_type) { case AIOCB_WRITE_UDATA: @@ -848,19 +921,10 @@ static void aio_read_response(void *opaque) } break; case AIOCB_READ_UDATA: - ret = do_readv(fd, acb->qiov->iov, rsp.data_length, - aio_req->iov_offset); - if (ret) { - error_report("failed to get the data, %s\n", strerror(errno)); - return; - } break; } - if (rsp.result != SD_RES_SUCCESS) { - acb->ret = -EIO; - error_report("%s\n", sd_strerror(rsp.result)); - } + conn_state = C_IO_HEADER; rest = free_aio_req(s, aio_req); if (!rest) { @@ -870,6 +934,8 @@ static void aio_read_response(void *opaque) */ acb->aio_done_func(acb); } + + sd_update_fd_handler(s); } static int aio_flush_request(void *opaque) @@ -905,6 +971,129 @@ static int set_nodelay(int fd) } /* + * Send I/O requests. + * + * This function is registered as a fd handler, and called from the + * main loop when s->fd is ready for sending requests. + */ +static void aio_send_request(void *opaque) +{ + BDRVSheepdogState *s = opaque; + int ret, rest; + AIOReq *areq; + static AIOReq *aio_req; + static SheepdogAIOCB *acb; + static int done; + static enum ConnectionState conn_state = C_IO_HEADER; + + set_cork(s->fd, 1); + + switch (conn_state) { + case C_IO_HEADER: + if (!aio_req) { + /* find the oldest aio_req from the outstanding_aio list */ + QLIST_FOREACH(areq, &s->outstanding_aio_head, + outstanding_aio_siblings) { + if (areq->state == AIO_SEND_OBJREQ) { + aio_req = areq; + } + } + if (!aio_req) { + error_report("bug: cannot find aio_req to be send"); + return; + } + acb = aio_req->aiocb; + } + + /* send a header */ + ret = do_write(s->fd, &aio_req->hdr, sizeof(aio_req->hdr) - done, done); + if (ret < 0) { + error_report("failed to send a req, %s\n", strerror(errno)); + conn_state = C_IO_CLOSED; + break; + } + done += ret; + if (done < sizeof(aio_req->hdr)) { + break; + } + done = 0; + if (acb->aiocb_type == AIOCB_READ_UDATA) { + conn_state = C_IO_END; + break; + } + conn_state = C_IO_DATA; + case C_IO_DATA: + if (is_data_obj(aio_req->oid)) { + ret = do_writev(s->fd, acb->qiov->iov, aio_req->data_len - done, + aio_req->iov_offset + done); + } else { + ret = do_write(s->fd, (uint8_t *)&s->inode + aio_req->offset, + aio_req->data_len - done, done); + } + if (ret < 0) { + error_report("failed to send a data, %s\n", strerror(errno)); + conn_state = C_IO_CLOSED; + } + done += ret; + if (done < aio_req->data_len) { + break; + } + done = 0; + conn_state = C_IO_END; + break; + default: + error_report("bug: invalid tx state %d", conn_state); + break; + } + + set_cork(s->fd, 0); + + if (conn_state == C_IO_CLOSED) { + acb->ret = -EIO; + rest = free_aio_req(s, aio_req); + if (!rest) { + /* + * We've finished all requests which belong to the AIOCB, so + * we can call the callback now. + */ + acb->aio_done_func(acb); + } + } + + if (conn_state != C_IO_END) { + return; + } + + aio_req->state = AIO_RECV_OBJREQ; + + conn_state = C_IO_HEADER; + aio_req = NULL; + + sd_update_fd_handler(s); +} + +/* + * Check outstanding requests and set proper fd handlers + */ +static int sd_update_fd_handler(BDRVSheepdogState *s) +{ + IOHandler *io_read = NULL, *io_write = NULL; + AIOReq *areq; + + QLIST_FOREACH(areq, &s->outstanding_aio_head, outstanding_aio_siblings) { + if (areq->state == AIO_SEND_OBJREQ) { + io_write = aio_send_request; + } + if (areq->state == AIO_RECV_OBJREQ) { + io_read = aio_read_response; + } + } + + return qemu_aio_set_fd_handler(s->fd, io_read, io_write, + aio_flush_request, NULL, s); +} + +/* * Return a socket discriptor to read/write objects. * * We cannot use this discriptor for other operations because @@ -929,8 +1118,6 @@ static int get_sheep_fd(BDRVSheepdogState *s) return -1; } - qemu_aio_set_fd_handler(fd, aio_read_response, NULL, aio_flush_request, - NULL, s); return fd; } @@ -1053,14 +1240,12 @@ out: return ret; } -static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, - struct iovec *iov, int niov, int create, - enum AIOCBState aiocb_type) +static void setup_aio_header(BDRVSheepdogState *s, AIOReq *aio_req, int create, + enum AIOCBState aiocb_type) { int nr_copies = s->inode.nr_copies; SheepdogObjReq hdr; unsigned int wlen; - int ret; uint64_t oid = aio_req->oid; unsigned int datalen = aio_req->data_len; uint64_t offset = aio_req->offset; @@ -1096,26 +1281,7 @@ static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, hdr.id = aio_req->id; - set_cork(s->fd, 1); - - /* send a header */ - ret = do_write(s->fd, &hdr, sizeof(hdr)); - if (ret) { - error_report("failed to send a req, %s\n", strerror(errno)); - return -EIO; - } - - if (wlen) { - ret = do_writev(s->fd, iov, wlen, aio_req->iov_offset); - if (ret) { - error_report("failed to send a data, %s\n", strerror(errno)); - return -EIO; - } - } - - set_cork(s->fd, 0); - - return 0; + memcpy(&aio_req->hdr, &hdr, sizeof(hdr)); } static int read_write_object(int fd, char *buf, uint64_t oid, int copies, @@ -1440,9 +1606,7 @@ static int sd_truncate(BlockDriverState *bs, int64_t offset) */ static void sd_write_done(SheepdogAIOCB *acb) { - int ret; BDRVSheepdogState *s = acb->common.bs->opaque; - struct iovec iov; AIOReq *aio_req; uint32_t offset, data_len, mn, mx; @@ -1457,22 +1621,20 @@ static void sd_write_done(SheepdogAIOCB *acb) s->min_dirty_data_idx = UINT32_MAX; s->max_dirty_data_idx = 0; - iov.iov_base = &s->inode; - iov.iov_len = sizeof(s->inode); aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id), data_len, offset, 0, 0, offset); - ret = add_aio_request(s, aio_req, &iov, 1, 0, AIOCB_WRITE_UDATA); - if (ret) { - free_aio_req(s, aio_req); - acb->ret = -EIO; - goto out; - } + aio_req->state = AIO_SEND_OBJREQ; + setup_aio_header(s, aio_req, 0, AIOCB_WRITE_UDATA); acb->aio_done_func = sd_finish_aiocb; acb->aiocb_type = AIOCB_WRITE_UDATA; + + QLIST_INSERT_HEAD(&s->outstanding_aio_head, aio_req, + outstanding_aio_siblings); + sd_update_fd_handler(s); return; } -out: + sd_finish_aiocb(acb); } @@ -1525,13 +1687,12 @@ out: } /* - * Send I/O requests to the server. + * Set up I/O requests * - * This function sends requests to the server, links the requests to - * the outstanding_list in BDRVSheepdogState, and exits without - * waiting the response. The responses are received in the - * `aio_read_response' function which is called from the main loop as - * a fd handler. + * This function creates asynchronous I/O requests and links them to + * the outstanding_list in BDRVSheepdogState. The requests are sent + * in the `aio_send_requests' function which is called from the main + * loop as a fd handler. */ static void sd_readv_writev_bh_cb(void *p) { @@ -1553,6 +1714,7 @@ static void sd_readv_writev_bh_cb(void *p) * In the case we open the snapshot VDI, Sheepdog creates the * writable VDI when we do a write operation first. */ + /* FIXME: we shouldn't block here */ ret = sd_create_branch(s); if (ret) { acb->ret = -EIO; @@ -1592,6 +1754,9 @@ static void sd_readv_writev_bh_cb(void *p) } aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, old_oid, done); + aio_req->state = AIO_SEND_OBJREQ; + QLIST_INSERT_HEAD(&s->outstanding_aio_head, aio_req, + outstanding_aio_siblings); if (create) { AIOReq *areq; @@ -1609,19 +1774,13 @@ static void sd_readv_writev_bh_cb(void *p) */ aio_req->flags = 0; aio_req->base_oid = 0; + aio_req->state = AIO_PENDING; goto done; } } } - ret = add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov, - create, acb->aiocb_type); - if (ret < 0) { - error_report("add_aio_request is failed\n"); - free_aio_req(s, aio_req); - acb->ret = -EIO; - goto out; - } + setup_aio_header(s, aio_req, create, acb->aiocb_type); done: offset = 0; idx++; @@ -1631,6 +1790,8 @@ out: if (QLIST_EMPTY(&acb->aioreq_head)) { sd_finish_aiocb(acb); } + + sd_update_fd_handler(s); } static BlockDriverAIOCB *sd_aio_writev(BlockDriverState *bs, int64_t sector_num, -- 1.5.6.5