Am 24.02.2011 17:49, schrieb Nick Thomas: > This preserves the previous behaviour where the NBD server is > unavailable or goes away during guest execution, but switches the > NBD backend to present the AIO interface instead of the sync IO > interface. > > We also split read & write requests into 1 MiB blocks (minus header). > This is a hard limit in the NBD servers (including qemu-nbd), but > never seemed to come up with the previous backend code. > > All IO except setup and teardown is asynchronous, and we (in theory) > handle canceled requests properly too. > > Signed-off-by: Nick Thomas <n...@bytemark.co.uk>
Actually, the patch doesn't even apply. So just a quick review... > block/nbd.c | 652 +++++++++++++++++++++++++++++++++++++++++++++++++++++----- > nbd.c | 76 +++++-- > nbd.h | 13 +- > 3 files changed, 662 insertions(+), 79 deletions(-) > > diff --git a/block/nbd.c b/block/nbd.c > index 1d6b225..d6594e4 100644 > --- a/block/nbd.c > +++ b/block/nbd.c > @@ -34,7 +34,12 @@ > #include <sys/types.h> > #include <unistd.h> > > -#define EN_OPTSTR ":exportname=" > +#define EN_OPTSTR ":exportname=" > +#define SECTOR_SIZE 512 > + > +/* 1MiB minus header size */ > +#define NBD_MAX_READ ((1024*1024) - sizeof(struct nbd_reply)) > +#define NBD_MAX_WRITE ((1024*1024) - sizeof(struct nbd_request)) > > /* #define DEBUG_NBD */ > > @@ -45,17 +50,148 @@ > #define logout(fmt, ...) ((void)0) > #endif > > -typedef struct BDRVNBDState { > +/* > + * Here's how the I/O works. > + * qemu creates a BDRVNBDState for us, which is the context for all reads > + * and writes. > + * > + * nbd_open is called to connect to the NBD server and set up on-read and > + * on-write handlers (aio_read_response, aio_write_request). > + * > + * nbd_aio_readv/writev, called by qemu, create an NBDAIOCB (representing > the > + * I/O request to qemu). > + * For read requests, read/writev creates a single AIOReq containing the NBD > + * header. For write requests, 1 or more AIOReqs are created, containing the > + * NBD header and the write data. These are pushed to reqs_to_send_head in > the > + * BDRVNBDState and the list in the NBDAIOCB. > + * > + * Each time aio_write_request is called by qemu, it gets the first AIOReq > + * in the reqs_to_send_head and writes the data to the socket. > + * If this results in the whole AIOReq being written to the socket, it moves > + * the AIOReq to the reqs_for_reply_head in the BDRVNBDState. If the AIOReq > + * isn't finished, then it's left where it is. to have more of it written > + * next callback. > + * > + * If there's an unrecoverable error writing to the socket, we disconnect > and > + * return the entire acb as -EIO > + * > + * Each aio_read_response, we check the BDRVNBDState's current_req attribute > + * to see if we're in the middle of a read. If not, we read a header's > worth of > + * data, then try to find an AIOReq in the reqs_for_reply_head. > + * > + * If we don't find one, then we're in a weird error state. > + * > + * Once we have our AIOReq, we remove it from reqs_for_reply_head and put it > + * in the current_req attribute, then read from the socket to the buffer (if > + * needed). If that completes the AIOReq, we clear the current_req attribute > + * and deallocate the AIOReq. > + * If the AIOReq is complete, and that's the last one for the NBDAIOCB, we > call > + * the 'done' callback' and return. > + * If the AIOReq isn't complete, we just return. It'll be completed in > future > + * callbacks, since it's now the current_req > + * > + * If there'an unrecoverable error reading from the socket, [...] Something's missing here? ;-) > + */ > + > +typedef struct NBDAIOCB NBDAIOCB; > +typedef struct BDRVNBDState BDRVNBDState; > + > +static int nbd_establish_connection(BDRVNBDState *s); > +static void nbd_teardown_connection(BDRVNBDState *s); > + > +typedef struct AIOReq { > + NBDAIOCB *aiocb; /* Which QEMU operation this belongs to */ > + > + /* Where on the NBDAIOCB's iov does this request start? */ > + off_t iov_offset; > + > + /* The NBD request header pertaining to this AIOReq. > + * This specifies the handle of the request, the read offset and length. > + */ > + NBDRequest nbd_req_hdr; > + > + /* How many bytes have been written to the NBD server so far. This will > + * vary between 0 and sizeof(nbd_req_hdr) + nbd_req_hdr.len > + */ > + size_t bytes_sent; > + > + /* How many bytes have been read from the NBD server so far. Varies > between > + * 0 and sizeof(nbd_rsp_hdr) + nbd_req_hdr.len > + */ > + size_t bytes_got; > + > + /* Used to record this in the state object. waiting_sent is used to work > + * out which queue the AIOReq is in. Before it's been sent, it's in the > + * reqs_to_send_head. After being sent, if it's not current_req, it's in > + * reqs_for_reply_head. > + */ > + QTAILQ_ENTRY(AIOReq) socket_siblings; > + bool waiting_sent; > + > + /* Used to enter this into an NBDAIOCB */ > + QLIST_ENTRY(AIOReq) aioreq_siblings; > +} AIOReq; > + > +struct BDRVNBDState { > + /* File descriptor for the socket to the NBD server */ > int sock; > + > + /* Size of the file being served */ > off_t size; > + > + /* block size */ > size_t blocksize; > - char *export_name; /* An NBD server may export several devices */ > > /* If it begins with '/', this is a UNIX domain socket. Otherwise, > * it's a string of the form <hostname|ip4|\[ip6\]>:port > */ > char *host_spec; > -} BDRVNBDState; > + > + /* An NBD server may export several devices - this is the one we want*/ > + char *export_name; > + > + /* Used to generate unique NBD handles */ > + uint64_t aioreq_seq_num; > + > + /* AIOReqs yet to be transmitted */ > + QTAILQ_HEAD(reqs_to_send_head, AIOReq) reqs_to_send_head; > + > + /* AIOReqs that have been transmitted and are awaiting a reply */ > + QTAILQ_HEAD(reqs_for_reply_head, AIOReq) reqs_for_reply_head; > + > + /* AIOReq that is currently being read from the socket */ > + AIOReq *current_req; > + > + /* Used in aio_read_response. We may need to store received header bytes > + * between reads - we don't have an AIOReq at that point. > + */ > + uint8_t nbd_rsp_buf[sizeof(NBDReply)]; > + size_t nbd_rsp_offset; > + > +}; > + > +enum AIOCBState { > + AIOCB_WRITE_UDATA, > + AIOCB_READ_UDATA, > +}; > + > +struct NBDAIOCB { > + BlockDriverAIOCB common; > + QEMUIOVector *qiov; > + QEMUBH *bh; > + > + enum AIOCBState aiocb_type; > + > + int64_t sector_num; > + int nb_sectors; > + int ret; > + > + bool canceled; > + > + void (*aio_done_func)(NBDAIOCB *); > + > + QLIST_HEAD(aioreq_head, AIOReq) aioreq_head; > +}; > > static int nbd_config(BDRVNBDState *s, const char *filename, int flags) > { > @@ -103,9 +239,307 @@ out: > return err; > } > > -static int nbd_establish_connection(BlockDriverState *bs) > +static inline AIOReq *nbd_alloc_aio_req(BDRVNBDState *s, NBDAIOCB *acb, > + size_t data_len, > + off_t offset, > + off_t iov_offset) > +{ > + AIOReq *aio_req; > + > + aio_req = qemu_malloc(sizeof(*aio_req)); > + aio_req->aiocb = acb; > + aio_req->iov_offset = iov_offset; > + aio_req->nbd_req_hdr.from = offset; > + aio_req->nbd_req_hdr.len = data_len; > + aio_req->nbd_req_hdr.handle = s->aioreq_seq_num++; > + > + if(acb->aiocb_type == AIOCB_READ_UDATA) { > + aio_req->nbd_req_hdr.type = NBD_CMD_READ; > + } else { > + aio_req->nbd_req_hdr.type = NBD_CMD_WRITE; > + } > + > + aio_req->bytes_sent = 0; > + aio_req->bytes_got = 0; > + > + QTAILQ_INSERT_TAIL(&s->reqs_to_send_head, aio_req, socket_siblings); > + aio_req->waiting_sent = true; > + QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings); > + return aio_req; > +} > + > +static int nbd_aio_flush_request(void *opaque) > +{ > + BDRVNBDState *s = opaque; > + > + return !(QTAILQ_EMPTY(&s->reqs_to_send_head) && > + QTAILQ_EMPTY(&s->reqs_for_reply_head) && > + (s->current_req == NULL)); > +} > + > +static inline int free_aio_req(BDRVNBDState *s, AIOReq *aio_req) > +{ > + NBDAIOCB *acb = aio_req->aiocb; > + QLIST_REMOVE(aio_req, aioreq_siblings); > + qemu_free(aio_req); > + > + return !QLIST_EMPTY(&acb->aioreq_head); > +} > + > +static void nbd_finish_aiocb(NBDAIOCB *acb) > +{ > + acb->common.cb(acb->common.opaque, acb->ret); > + qemu_aio_release(acb); > +} > + > +static void nbd_handle_io_err(BDRVNBDState *s, AIOReq *aio_req, int err) > +{ > + NBDAIOCB *acb; > + AIOReq *a; > + > + /* These are fine - no need to do anything */ > + if(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { > + return; > + } > + > + /* These errors mean the request failed. So we need to trash the acb > + * (and all associated AIOReqs) and return -EIO. Partial reads are > + * fine. Partial writes aren't great, but no worse than (say) a write > + * to a physical disc that hits a bad sector. > + */ > + if (aio_req == NULL) { > + logout("Error %i (%s) on NBD I/O. Killing NBD\n", err, > strerror(err)); > + } else { > + acb = aio_req->aiocb; > + logout("Error %i (%s) on NBD request (handle %lu). Killing NBD\n", > err, > + sterror(err), aio_req->nbd_req_hdr.handle); > + acb->ret = -EIO; Why don't we use the real error code? > + QLIST_FOREACH(a, &acb->aioreq_head, aioreq_siblings) { > + if (aio_req->waiting_sent) { > + QTAILQ_REMOVE(&s->reqs_to_send_head, aio_req, > socket_siblings); > + } else { > + if (s->current_req == aio_req) { > + s->current_req = NULL; > + } else { > + QTAILQ_REMOVE(&s->reqs_for_reply_head, aio_req, > + socket_siblings); > + } > + } I think this logic should be part of free_aio_req. > + free_aio_req(s, a); > + } > + nbd_finish_aiocb(acb); > + } > + > + nbd_teardown_connection(s); And now there's no way to get the disk back to life without a reboot? Do I understand correctly that now trying to access the disk will always return -EBADF? > + return; Unnecessary return; > +} > + > +static void nbd_aio_write_request(void *opaque) > +{ > + BDRVNBDState *s = opaque; > + AIOReq *aio_req = NULL; > + NBDAIOCB *acb; > + size_t total; > + ssize_t ret; > + > + if (QTAILQ_EMPTY(&s->reqs_to_send_head)) { > + return; > + } > + > + aio_req = QTAILQ_FIRST(&s->reqs_to_send_head); > + acb = aio_req->aiocb; > + > + if(acb->aiocb_type == AIOCB_WRITE_UDATA) { Space after if is missing > + total = sizeof(NBDRequest) + aio_req->nbd_req_hdr.len; > + } else { > + total = sizeof(NBDRequest); > + } > + > + /* Since we've not written (all of) the header yet, get on with it. > + * We always grab the *head* of the queue in this callback, so we > + * won't interleave writes to the socket. > + * > + * Creating the header buffer on the fly isn't ideal in the case of many > + * retries, but almost all the time, this will happen exactly once. > + */ > + if (aio_req->bytes_sent < sizeof(NBDRequest)) { > + logout("Buffer not written in full, doing so\n"); > + uint8_t buf[sizeof(NBDRequest)]; > + QEMUIOVector hdr; > + nbd_request_to_buf(&aio_req->nbd_req_hdr, buf); > + qemu_iovec_init(&hdr, 1); > + qemu_iovec_add(&hdr, &buf, sizeof(NBDRequest)); > + ret = writev(s->sock, hdr.iov, hdr.niov); > + qemu_iovec_destroy(&hdr); > + > + if (ret == -1) { > + nbd_handle_io_err(s, aio_req, socket_error()); > + return; > + } else { > + logout("Written %zu bytes to socket (request is %zu bytes)\n", > ret, > + sizeof(NBDRequest)); > + aio_req->bytes_sent += ret; > + } > + } > + > + /* If the header is sent & we're doing a write request, send data */ > + if (acb->aiocb_type == AIOCB_WRITE_UDATA && > + aio_req->bytes_sent >= sizeof(NBDRequest) && > + aio_req->bytes_sent < total) { > + logout("Write request - putting data in socket\n"); > + off_t offset = (aio_req->bytes_sent - sizeof(NBDRequest)) + > + aio_req->iov_offset; > + > + ret = nbd_wr_aio(s->sock, acb->qiov, total - aio_req->bytes_sent, > + offset, false); > + > + if (ret < 0 ) { I guess this is the space that was missing above. :-) > + nbd_handle_io_err(s, aio_req, -ret); > + return; > + } else { > + logout("Written %zu bytes to socket\n", ret); > + aio_req->bytes_sent += ret; > + } > + } > + > + /* Request written. aio_read_response gets the reply */ > + if (aio_req->bytes_sent == total) { > + logout("aio_req written to socket, moving to reqs_for_reply\n"); > + aio_req->waiting_sent = false; > + QTAILQ_REMOVE(&s->reqs_to_send_head, aio_req, socket_siblings); > + QTAILQ_INSERT_TAIL(&s->reqs_for_reply_head, aio_req, socket_siblings); > + } > + > + return; > +} > + > +static void nbd_aio_read_response(void *opaque) > +{ > + BDRVNBDState *s = opaque; > + AIOReq *aio_req = NULL; > + NBDAIOCB *acb; > + NBDReply rsp; > + > + size_t total; > + ssize_t ret; > + int rest; > + > + if (s->current_req == NULL && QTAILQ_EMPTY(&s->reqs_for_reply_head)) { > + return; > + } > + > + /* Build our nbd_reply object if we've got it */ > + if (s->current_req && (s->nbd_rsp_offset == sizeof(NBDReply))) { > + nbd_buf_to_reply((uint8_t *)&s->nbd_rsp_buf, &rsp); > + } > + > + if (s->current_req == NULL) { > + /* Try to read a header */ Factor this whole block out in its own function? > + QEMUIOVector hdr; > + qemu_iovec_init(&hdr, 1); > + qemu_iovec_add(&hdr, ((&s->nbd_rsp_buf) + s->nbd_rsp_offset), > + (sizeof(NBDReply) - s->nbd_rsp_offset)); > + ret = readv(s->sock, hdr.iov, hdr.niov); > + qemu_iovec_destroy(&hdr); > + > + if (ret == -1) { > + nbd_handle_io_err(s, aio_req, socket_error()); > + return; > + } > + > + s->nbd_rsp_offset += ret; > + > + if (s->nbd_rsp_offset == sizeof(NBDReply)) { > + /* Turn data into NBDReply, find the matching aio_req */ > + nbd_buf_to_reply((uint8_t *)&s->nbd_rsp_buf, &rsp); > + > + /* Check the magic */ > + if (rsp.magic != NBD_REPLY_MAGIC) { > + logout("Received invalid NBD response magic!\n"); > + nbd_handle_io_err(s, NULL, EIO); > + return; > + } > + > + QTAILQ_FOREACH(aio_req, &s->reqs_for_reply_head, > socket_siblings) { > + if (aio_req->nbd_req_hdr.handle == rsp.handle) { > + s->current_req = aio_req; > + s->current_req->bytes_got = sizeof(NBDReply); > + break; > + } > + } > + > + if (!s->current_req) { > + logout("cannot find aio_req for handle %lu\n", rsp.handle); > + nbd_handle_io_err(s, NULL, EIO); > + return; > + } > + QTAILQ_REMOVE(&s->reqs_for_reply_head, aio_req, socket_siblings); > + } else { > + /* We haven't finished reading the entire header yet. */ > + return; > + } > + } > + > + /* s->current_req and rsp are both usable now */ > + aio_req = s->current_req; > + acb = aio_req->aiocb; > + > + total = sizeof(NBDReply); > + if (acb->aiocb_type == AIOCB_READ_UDATA) { > + total += aio_req->nbd_req_hdr.len; > + } > + > + if (acb->aiocb_type == AIOCB_READ_UDATA && aio_req->bytes_got < total) { > + off_t offset = (aio_req->bytes_got - sizeof(NBDReply)) + > + aio_req->iov_offset; > + QEMUIOVector *qiov = acb->qiov; > + uint8_t *buf = NULL; > + > + if (acb->canceled) { > + buf = qemu_malloc(total - offset); > + qemu_iovec_init(qiov, 1); > + qemu_iovec_add(qiov, buf, total - offset); > + } > + > + ret = nbd_wr_aio(s->sock, qiov, total - offset, offset, true); > + > + if (acb->canceled) { > + qemu_iovec_destroy(qiov); > + qemu_free(buf); > + } > + > + if (ret < 0) { > + nbd_handle_io_err(s, aio_req, -ret); > + return; > + } else { > + aio_req->bytes_got += ret; > + } > + } > + > + /* Entire request has been read */ > + if (total == aio_req->bytes_got) { > + logout("Read all bytes of the response; removing response from > s->current_req\n"); > + s->nbd_rsp_offset = 0; > + s->current_req = NULL; > + if (rsp.error != 0) { > + acb->ret = -EIO; > + logout("NBD request resulted in error %i\n", rsp.error); > + } > + > + /* Free the aio_req. If the NBDAIOCB is finished, notify QEMU */ > + rest = free_aio_req(s, aio_req); > + if (!rest) { > + logout("acb complete\n"); > + acb->aio_done_func(acb); > + } > + } > + > + logout("Leaving nbd_aio_read_response\n"); > + return; Another useless return. > +} > + > +static int nbd_establish_connection(BDRVNBDState *s) > { > - BDRVNBDState *s = bs->opaque; > int sock; > int ret; > off_t size; > @@ -139,23 +573,29 @@ static int nbd_establish_connection(BlockDriverState > *bs) > s->sock = sock; > s->size = size; > s->blocksize = blocksize; > + s->nbd_rsp_offset = 0; > + > + qemu_aio_set_fd_handler(sock, nbd_aio_read_response, > nbd_aio_write_request, > + nbd_aio_flush_request, NULL, s); > > logout("Established connection with NBD server\n"); > return 0; > } > > -static void nbd_teardown_connection(BlockDriverState *bs) > +static void nbd_teardown_connection(BDRVNBDState *s) > { > - BDRVNBDState *s = bs->opaque; > struct nbd_request request; > > request.type = NBD_CMD_DISC; > - request.handle = (uint64_t)(intptr_t)bs; > + request.handle = s->aioreq_seq_num++; > request.from = 0; > request.len = 0; > nbd_send_request(s->sock, &request); > > + qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, NULL); > closesocket(s->sock); > + logout("Connection to NBD server closed\n"); > + return; > } > > static int nbd_open(BlockDriverState *bs, const char* filename, int flags) > @@ -169,95 +609,193 @@ static int nbd_open(BlockDriverState *bs, const char* > filename, int flags) > return result; > } > > + QTAILQ_INIT(&s->reqs_to_send_head); > + QTAILQ_INIT(&s->reqs_for_reply_head); > + > + s->current_req = NULL; > + s->aioreq_seq_num = 0; > + s->nbd_rsp_offset = 0; > + > /* establish TCP connection, return error if it fails > * TODO: Configurable retry-until-timeout behaviour. > */ > - result = nbd_establish_connection(bs); > + result = nbd_establish_connection(s); > > return result; > } > > -static int nbd_read(BlockDriverState *bs, int64_t sector_num, > - uint8_t *buf, int nb_sectors) > +static void nbd_close(BlockDriverState *bs) > { > BDRVNBDState *s = bs->opaque; > - struct nbd_request request; > - struct nbd_reply reply; > > - request.type = NBD_CMD_READ; > - request.handle = (uint64_t)(intptr_t)bs; > - request.from = sector_num * 512;; > - request.len = nb_sectors * 512; > + nbd_teardown_connection(s); > + qemu_free(s->export_name); > + qemu_free(s->host_spec); > > - if (nbd_send_request(s->sock, &request) == -1) > - return -errno; > + return; > +} > > - if (nbd_receive_reply(s->sock, &reply) == -1) > - return -errno; > +/* We remove all the aiocbs currently sat in reqs_to_send_head (excepting the > + * first, if any bytes have been transmitted). So we don't need to check for > + * canceled in aio_write_request at all.If that finishes the acb, we call its > + * completion function. Otherwise, we leave it alone. > + * > + * in nbd_aio_read_response, when we're handling a read request for an acb > with > + * canceled = true, we allocate a QEMUIOVector of the appropriate size to do > + * the read, and throw the bytes away. Everything else goes on as normal. > + */ > +static void nbd_aio_cancel(BlockDriverAIOCB *blockacb) { > + NBDAIOCB *acb = (NBDAIOCB *)blockacb; > + BDRVNBDState *s = acb->common.bs->opaque; > + AIOReq *a; > + > + QLIST_FOREACH(a, &acb->aioreq_head, aioreq_siblings) { > + if(a->waiting_sent && a->bytes_sent == 0) { > + QTAILQ_REMOVE(&s->reqs_to_send_head, a, socket_siblings); > + free_aio_req(s, a); > + } > + } > + > + acb->canceled = true; > + acb->ret = -EIO; > + > + if QLIST_EMPTY(&acb->aioreq_head) { > + nbd_finish_aiocb(acb); > + } > +} > + > +static AIOPool nbd_aio_pool = { > + .aiocb_size = sizeof(NBDAIOCB), > + .cancel = nbd_aio_cancel, > +}; > + > +static NBDAIOCB *nbd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov, > + int64_t sector_num, int nb_sectors, > + BlockDriverCompletionFunc *cb, void > *opaque) > +{ > + NBDAIOCB *acb; > > - if (reply.error !=0) > - return -reply.error; > + acb = qemu_aio_get(&nbd_aio_pool, bs, cb, opaque); > > - if (reply.handle != request.handle) > + acb->qiov = qiov; > + acb->sector_num = sector_num; > + acb->nb_sectors = nb_sectors; > + > + acb->canceled = false; > + > + acb->aio_done_func = NULL; > + acb->bh = NULL; > + acb->ret = 0; > + > + QLIST_INIT(&acb->aioreq_head); > + return acb; > +} > + > +static int nbd_schedule_bh(QEMUBHFunc *cb, NBDAIOCB *acb) > +{ > + if (acb->bh) { > + logout("bug: %d %d\n", acb->aiocb_type, acb->aiocb_type); > return -EIO; > + } > > - if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len) > + acb->bh = qemu_bh_new(cb, acb); > + if (!acb->bh) { > + logout("oom: %d %d\n", acb->aiocb_type, acb->aiocb_type); > return -EIO; > + } > + > + qemu_bh_schedule(acb->bh); > > return 0; > } > > -static int nbd_write(BlockDriverState *bs, int64_t sector_num, > - const uint8_t *buf, int nb_sectors) > +static void nbd_readv_writev_bh_cb(void *p) > { > - BDRVNBDState *s = bs->opaque; > - struct nbd_request request; > - struct nbd_reply reply; > + NBDAIOCB *acb = p; > > - request.type = NBD_CMD_WRITE; > - request.handle = (uint64_t)(intptr_t)bs; > - request.from = sector_num * 512;; > - request.len = nb_sectors * 512; > + size_t len, done = 0; > + size_t total = acb->nb_sectors * SECTOR_SIZE; > > - if (nbd_send_request(s->sock, &request) == -1) > - return -errno; > + /* Where the read/write starts from */ > + off_t offset = acb->sector_num * SECTOR_SIZE; > + BDRVNBDState *s = acb->common.bs->opaque; > > - if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len) > - return -EIO; > + AIOReq *aio_req; > + > + logout("Entering nbd_readv_writev_bh_cb\n"); > > - if (nbd_receive_reply(s->sock, &reply) == -1) > - return -errno; > + qemu_bh_delete(acb->bh); > + acb->bh = NULL; > > - if (reply.error !=0) > - return -reply.error; > + while (done < total) { > + len = (total - done); > > - if (reply.handle != request.handle) > - return -EIO; > + /* Split read & write requests into segments if needed */ > + if (acb->aiocb_type == AIOCB_READ_UDATA && len > NBD_MAX_READ) { > + len = NBD_MAX_READ; > + } > > - return 0; > + if (acb->aiocb_type == AIOCB_WRITE_UDATA && len > NBD_MAX_WRITE) { > + len = NBD_MAX_WRITE; > + } > + > + logout("Allocating an aio_req of %zu bytes\n", len); > + aio_req = nbd_alloc_aio_req(s, acb, len, offset + done, done); > + > + done += len; > + } > + > + if (QLIST_EMPTY(&acb->aioreq_head)) { > + logout("acb->ioreq_head empty, so finishing acb now\n"); > + nbd_finish_aiocb(acb); > + } > + logout("Leaving nbd_readv_writev_bh_cb\n"); > + return; > } > > -static void nbd_close(BlockDriverState *bs) > +static BlockDriverAIOCB *nbd_aio_readv(BlockDriverState *bs, > + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, > + BlockDriverCompletionFunc *cb, void *opaque) > +{ > + NBDAIOCB *acb; > + > + acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque); > + acb->aiocb_type = AIOCB_READ_UDATA; > + acb->aio_done_func = nbd_finish_aiocb; > + > + nbd_schedule_bh(nbd_readv_writev_bh_cb, acb); > + return &acb->common; > +} > + > +static BlockDriverAIOCB *nbd_aio_writev(BlockDriverState *bs, > + int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, > + BlockDriverCompletionFunc *cb, void *opaque) > { > - nbd_teardown_connection(bs); > + NBDAIOCB *acb; > + > + acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque); > + acb->aiocb_type = AIOCB_WRITE_UDATA; > + acb->aio_done_func = nbd_finish_aiocb; > + > + nbd_schedule_bh(nbd_readv_writev_bh_cb, acb); > + return &acb->common; > } > > static int64_t nbd_getlength(BlockDriverState *bs) > { > BDRVNBDState *s = bs->opaque; > - > return s->size; > } > > static BlockDriver bdrv_nbd = { > - .format_name = "nbd", > - .instance_size = sizeof(BDRVNBDState), > - .bdrv_file_open = nbd_open, > - .bdrv_read = nbd_read, > - .bdrv_write = nbd_write, > - .bdrv_close = nbd_close, > - .bdrv_getlength = nbd_getlength, > - .protocol_name = "nbd", > + .format_name = "nbd", > + .instance_size = sizeof(BDRVNBDState), > + .bdrv_file_open = nbd_open, > + .bdrv_aio_readv = nbd_aio_readv, > + .bdrv_aio_writev = nbd_aio_writev, > + .bdrv_close = nbd_close, > + .bdrv_getlength = nbd_getlength, > + .protocol_name = "nbd" > }; > > static void bdrv_nbd_init(void) > diff --git a/nbd.c b/nbd.c > index a3e6d52..26d33c5 100644 > --- a/nbd.c > +++ b/nbd.c > @@ -31,7 +31,7 @@ > > #include "qemu_socket.h" > > -//#define DEBUG_NBD > +#define DEBUG_NBD > > #ifdef DEBUG_NBD > #define TRACE(msg, ...) do { \ > @@ -49,10 +49,6 @@ > > /* This is all part of the "official" NBD API */ > > -#define NBD_REPLY_SIZE (4 + 4 + 8) > -#define NBD_REQUEST_MAGIC 0x25609513 > -#define NBD_REPLY_MAGIC 0x67446698 > - > #define NBD_SET_SOCK _IO(0xab, 0) > #define NBD_SET_BLKSIZE _IO(0xab, 1) > #define NBD_SET_SIZE _IO(0xab, 2) > @@ -107,6 +103,30 @@ size_t nbd_wr_sync(int fd, void *buffer, size_t size, > bool do_read) > return offset; > } > > +ssize_t nbd_wr_aio(int fd, QEMUIOVector *qiov, size_t len, off_t offset, > + bool do_read) Isn't this name misleading? The function is completely synchronous. It just happens not to block because it's only called when the socket is ready. Kevin