@@ -34,7 +34,12 @@
#include <sys/types.h>
#include <unistd.h>
-#define EN_OPTSTR ":exportname="
+#define EN_OPTSTR ":exportname="
+#define SECTOR_SIZE 512
+
+/* 1MiB minus header size */
+#define NBD_MAX_READ ((1024*1024) - sizeof(NBDReply))
+#define NBD_MAX_WRITE ((1024*1024) - sizeof(NBDRequest))
/* #define DEBUG_NBD */
@@ -45,17 +50,161 @@
#define logout(fmt, ...) ((void)0)
#endif
-typedef struct BDRVNBDState {
+/*
+ * Here's how the I/O works.
+ * qemu creates a BDRVNBDState for us, which is the context for all reads
+ * and writes.
+ *
+ * nbd_open is called to connect to the NBD server and set up an on-read
+ * handler (nbd_aio_read_response)
+ *
+ * nbd_aio_readv/writev, called by qemu, create an NBDAIOCB (representing the
+ * I/O request to qemu).
+ * For read requests, read/writev creates a single AIOReq containing the NBD
+ * header. For write requests, 1 or more AIOReqs are created, containing the
+ * NBD header and the write data. These are pushed to reqs_to_send_head in the
+ * BDRVNBDState and the list in the NBDAIOCB. We then register a write request
+ * callback, which results in nbd_aio_write_request being called from the
+ * select() in vlc:main_loop_wait
+ *
+ * Each time nbd_aio_write_request is called, it gets the first AIOReq in the
+ * reqs_to_send_head and writes the data to the socket.
+ * If this results in the whole AIOReq being written to the socket, it moves
+ * the AIOReq to the reqs_for_reply_head in the BDRVNBDState. If the AIOReq
+ * isn't finished, then it's left where it is. to have more of it written
+ * next time. Before exiting, we unregister the write request handler if the
+ * reqs_to_send_head queue is empty. This avoids a tight loop around the
+ * aforementioned select (since the socket is almost always ready for writing).
+ *
+ * If there's an unrecoverable error writing to the socket, we disconnect and
+ * return the entire acb with that error.
+ *
+ * Each nbd_aio_read_response, we check the BDRVNBDState's current_req attribute
+ * to see if we're in the middle of a read. If not, we read a header's worth of
+ * data, then try to find an AIOReq in the reqs_for_reply_head. If we don't
+ * find one, that is very odd, so we teardown the connection and return an
+ * I/O error.
+ *
+ * Once we have our AIOReq, we remove it from reqs_for_reply_head and put it
+ * in the current_req attribute, then read from the socket to the buffer (if
+ * needed). If that completes the AIOReq, we clear the current_req attribute
+ * and deallocate the AIOReq.
+ * - If the AIOReq is complete, and that's the last one for the NBDAIOCB, we
+ * call the 'done' callback' and return.
+ * - If the AIOReq isn't complete, we just return. It'll be completed in
+ * future callbacks, since it's now the current_req
+ * - If there's an unrecoverable error reading from the socket (EBADF, say).
+ * we invalidate the AIOReq and teardown the connection.
+ *
+ * Currently, there is no reconnection logic, meaning that once the connection
+ * has been broken or an error condition has occured, the only way to regain
+ * functionality is to call nbd_close() and nbd_open - disconnect & reconnect
+ * the drive, or restart the whole process. There's plenty of scope to improve
+ * upon that.
+ */
+
+typedef struct NBDAIOCB NBDAIOCB;
+typedef struct BDRVNBDState BDRVNBDState;
+
+static int nbd_establish_connection(BDRVNBDState *s);
+static void nbd_teardown_connection(BDRVNBDState *s);
+static void nbd_register_write_request_handler(BDRVNBDState *s);
+static void nbd_unregister_write_request_handler(BDRVNBDState *s);
+
+typedef struct AIOReq {
+ NBDAIOCB *aiocb; /* Which QEMU operation this belongs to */
+
+ /* Where on the NBDAIOCB's iov does this request start? */
+ off_t iov_offset;
+
+ /* The NBD request header pertaining to this AIOReq.
+ * This specifies the handle of the request, the read offset and length.
+ */
+ NBDRequest nbd_req_hdr;
+
+ /* How many bytes have been written to the NBD server so far. This will
+ * vary between 0 and sizeof(nbd_req_hdr) + nbd_req_hdr.len
+ */
+ size_t bytes_sent;
+
+ /* How many bytes *of the payload* have been read from the NBD server so
+ * far. Varies between 0 and nbd_req_hdr.len - header byte count is kept in
+ * BDRVNBDState->nbd_rsp_offset.
+ */
+ size_t bytes_got;
+
+ /* Used to record this in the state object. waiting_sent is used to work
+ * out which queue the AIOReq is in. Before it's been sent, it's in the
+ * reqs_to_send_head. After being sent, if it's not current_req, it's in
+ * reqs_for_reply_head.
+ */
+ QTAILQ_ENTRY(AIOReq) socket_siblings;
+ bool waiting_sent;
+
+ /* Used to enter this into an NBDAIOCB */
+ QLIST_ENTRY(AIOReq) aioreq_siblings;
+} AIOReq;
+
+struct BDRVNBDState {
+ /* File descriptor for the socket to the NBD server */
int sock;
+
+ /* Size of the file being served */
off_t size;
+
+ /* block size */
size_t blocksize;
- char *export_name; /* An NBD server may export several devices */
/* If it begins with '/', this is a UNIX domain socket. Otherwise,
* it's a string of the form <hostname|ip4|\[ip6\]>:port
*/
char *host_spec;
-} BDRVNBDState;
+
+ /* An NBD server may export several devices - this is the one we want */
+ char *export_name;
+
+ /* Used to generate unique NBD handles */
+ uint64_t aioreq_seq_num;
+
+ /* AIOReqs yet to be transmitted */
+ QTAILQ_HEAD(reqs_to_send_head, AIOReq) reqs_to_send_head;
+
+ /* AIOReqs that have been transmitted and are awaiting a reply */
+ QTAILQ_HEAD(reqs_for_reply_head, AIOReq) reqs_for_reply_head;
+
+ /* AIOReq that is currently being read from the socket */
+ AIOReq *current_req;
+
+ /* Used in nbd_aio_read_response. We may need to store received header bytes
+ * between reads - we don't have an AIOReq at that point.
+ */
+ uint8_t nbd_rsp_buf[sizeof(NBDReply)];
+ size_t nbd_rsp_offset;
+
+};
+
+enum AIOCBState {
+ AIOCB_WRITE_UDATA,
+ AIOCB_READ_UDATA,
+};
+
+struct NBDAIOCB {
+ BlockDriverAIOCB common;
+ QEMUIOVector *qiov;
+ QEMUBH *bh;
+
+ enum AIOCBState aiocb_type;
+
+ int64_t sector_num;
+ int nb_sectors;
+ int ret;
+
+ bool canceled;
+
+ void (*aio_done_func)(NBDAIOCB *);
+
+ QLIST_HEAD(aioreq_head, AIOReq) aioreq_head;
+};
static int nbd_config(BDRVNBDState *s, const char *filename, int flags)
{
@@ -103,9 +252,342 @@ out:
return err;
}
-static int nbd_establish_connection(BlockDriverState *bs)
+static inline AIOReq *nbd_alloc_aio_req(BDRVNBDState *s, NBDAIOCB *acb,
+ size_t data_len,
+ off_t offset,
+ off_t iov_offset)
+{
+ AIOReq *aio_req;
+
+ aio_req = qemu_malloc(sizeof(*aio_req));
+ aio_req->aiocb = acb;
+ aio_req->iov_offset = iov_offset;
+ aio_req->nbd_req_hdr.from = offset;
+ aio_req->nbd_req_hdr.len = data_len;
+ aio_req->nbd_req_hdr.handle = s->aioreq_seq_num++;
+
+ if (acb->aiocb_type == AIOCB_READ_UDATA) {
+ aio_req->nbd_req_hdr.type = NBD_CMD_READ;
+ } else {
+ aio_req->nbd_req_hdr.type = NBD_CMD_WRITE;
+ }
+
+ aio_req->bytes_sent = 0;
+ aio_req->bytes_got = 0;
+
+ QTAILQ_INSERT_TAIL(&s->reqs_to_send_head, aio_req, socket_siblings);
+ aio_req->waiting_sent = true;
+ QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings);
+ return aio_req;
+}
+
+static int nbd_aio_flush_request(void *opaque)
+{
+ BDRVNBDState *s = opaque;
+ int resp;
+
+ resp = !(QTAILQ_EMPTY(&s->reqs_to_send_head) &&
+ QTAILQ_EMPTY(&s->reqs_for_reply_head) &&
+ (s->current_req == NULL));
+ logout("flush_request: %i\n", resp);
+ return resp;
+}
+
+static inline bool free_aio_req(BDRVNBDState *s, AIOReq *aio_req)
+{
+ NBDAIOCB *acb = aio_req->aiocb;
+
+ if (aio_req->waiting_sent) {
+ QTAILQ_REMOVE(&s->reqs_to_send_head, aio_req, socket_siblings);
+ }
+
+ if (s->current_req == aio_req) {
+ s->current_req = NULL;
+ } else {
+ QTAILQ_REMOVE(&s->reqs_for_reply_head, aio_req, socket_siblings);
+ }
+
+ QLIST_REMOVE(aio_req, aioreq_siblings);
+ qemu_free(aio_req);
+
+ return !QLIST_EMPTY(&acb->aioreq_head);
+}
+
+static void nbd_finish_aiocb(NBDAIOCB *acb)
+{
+ acb->common.cb(acb->common.opaque, acb->ret);
+ qemu_aio_release(acb);
+}
+
+static void nbd_handle_io_err(BDRVNBDState *s, AIOReq *aio_req, int err)
+{
+ NBDAIOCB *acb;
+ AIOReq *a;
+
+ /* These are fine - no need to do anything */
+ if (err == EAGAIN || err == EWOULDBLOCK || err == EINTR) {
+ logout("Recoverable error %i (%s) - returning\n", err, strerror(err));
+ return;
+ }
+
+ /* These errors mean the request failed. So we need to trash the acb
+ * (and all associated AIOReqs) and return the error. Partial reads are
+ * fine. Partial writes aren't great, but no worse than (say) a write
+ * to a physical disc that hits a bad sector.
+ */
+ if (aio_req == NULL) {
+ logout("Error %i (%s) on NBD I/O. Killing NBD\n", err, strerror(err));
+ } else {
+ acb = aio_req->aiocb;
+ logout("Error %i (%s) on NBD request (handle %lu). Killing NBD\n", err,
+ strerror(err), aio_req->nbd_req_hdr.handle);
+ acb->ret = -err;
+ QLIST_FOREACH(a, &acb->aioreq_head, aioreq_siblings) {
+ free_aio_req(s, a);
+ }
+ nbd_finish_aiocb(acb);
+ }
+
+ nbd_teardown_connection(s);
+}
+
+static void nbd_aio_write_request(void *opaque)
+{
+ BDRVNBDState *s = opaque;
+ AIOReq *aio_req = NULL;
+ NBDAIOCB *acb;
+ size_t total;
+ ssize_t ret;
+
+ if (QTAILQ_EMPTY(&s->reqs_to_send_head)) {
+ logout("Nothing to do in aio_write_request so unregistering handler\n");
+ nbd_unregister_write_request_handler(s);
+ return;
+ }
+
+ aio_req = QTAILQ_FIRST(&s->reqs_to_send_head);
+ acb = aio_req->aiocb;
+
+ if (acb->aiocb_type == AIOCB_WRITE_UDATA) {
+ total = sizeof(NBDRequest) + aio_req->nbd_req_hdr.len;
+ } else {
+ total = sizeof(NBDRequest);
+ }
+
+ /* Since we've not written (all of) the header yet, get on with it.
+ * We always grab the *head* of the queue in this callback, so we
+ * won't interleave writes to the socket.
+ *
+ * Creating the header buffer on the fly isn't ideal in the case of many
+ * retries, but almost all the time, this will happen exactly once.
+ */
+ if (aio_req->bytes_sent < sizeof(NBDRequest)) {
+ logout("Buffer not written in full, doing so\n");
+ uint8_t buf[sizeof(NBDRequest)];
+ QEMUIOVector hdr;
+ nbd_request_to_buf(&aio_req->nbd_req_hdr, buf);
+ qemu_iovec_init(&hdr, 1);
+ qemu_iovec_add(&hdr, &buf, sizeof(NBDRequest));
+ ret = writev(s->sock, hdr.iov, hdr.niov);
+ qemu_iovec_destroy(&hdr);
+
+ if (ret == -1) {
+ nbd_handle_io_err(s, aio_req, socket_error());
+ return;
+ } else {
+ logout("Written %zu bytes to socket (request is %zu bytes)\n", ret,
+ sizeof(NBDRequest));
+ aio_req->bytes_sent += ret;
+ }
+ }
+
+ /* If the header is sent & we're doing a write request, send data */
+ if (acb->aiocb_type == AIOCB_WRITE_UDATA &&
+ aio_req->bytes_sent >= sizeof(NBDRequest) &&
+ aio_req->bytes_sent < total) {
+ logout("Write request - putting data in socket\n");
+ off_t offset = (aio_req->bytes_sent - sizeof(NBDRequest)) +
+ aio_req->iov_offset;
+
+ ret = nbd_qiov_wr(s->sock, acb->qiov, total - aio_req->bytes_sent,
+ offset, false);
+
+ if (ret < 0) {
+ nbd_handle_io_err(s, aio_req, -ret);
+ return;
+ } else {
+ logout("Written %zu bytes to socket\n", ret);
+ aio_req->bytes_sent += ret;
+ }
+ }
+
+ /* Request written. nbd_aio_read_response gets the reply */
+ if (aio_req->bytes_sent == total) {
+ logout("aio_req written to socket, moving to reqs_for_reply\n");
+ aio_req->waiting_sent = false;
+ QTAILQ_REMOVE(&s->reqs_to_send_head, aio_req, socket_siblings);
+ QTAILQ_INSERT_TAIL(&s->reqs_for_reply_head, aio_req, socket_siblings);
+ }
+
+ if (QTAILQ_EMPTY(&s->reqs_to_send_head)) {
+ logout("Write queue empty, unregistering write request handler\n");
+ nbd_unregister_write_request_handler(s);
+ }
+
+}
+
+static inline bool nbd_find_next_aioreq(BDRVNBDState *s)
+{
+ uint8_t *buf = &s->nbd_rsp_buf[s->nbd_rsp_offset];
+ size_t cnt = sizeof(NBDReply) - s->nbd_rsp_offset;
+ ssize_t ret;
+ NBDReply rsp;
+ AIOReq *aio_req;
+
+ /* Try to get enough bytes so we have a complete NBDReply */
+ ret = read(s->sock, buf, cnt);
+ logout("read %zu bytes\n", ret);
+
+ /* I/O error means we've failed. */
+ if (ret == -1) {
+ nbd_handle_io_err(s, aio_req, socket_error());
+ return false;
+ }
+
+ s->nbd_rsp_offset += ret;
+
+ /* We don't have enough data to make a full header */
+ if (s->nbd_rsp_offset < sizeof(NBDReply)) {
+ return false;
+ }
+
+ /* Turn data into NBDReply, find the matching aio_req */
+ nbd_buf_to_reply(&s->nbd_rsp_buf[0], &rsp);
+
+ /* Check the magic */
+ if (rsp.magic != NBD_REPLY_MAGIC) {
+ logout("Received invalid NBD response magic!\n");
+ nbd_handle_io_err(s, NULL, EIO);
+ return false;
+ }
+
+ QTAILQ_FOREACH(aio_req, &s->reqs_for_reply_head, socket_siblings) {
+ if (aio_req->nbd_req_hdr.handle == rsp.handle) {
+ s->current_req = aio_req;
+ break;
+ }
+ }
+
+ if (s->current_req) {
+ /* Mission accomplished! */
+ logout("Found next aio_req\n");
+ QTAILQ_REMOVE(&s->reqs_for_reply_head, aio_req, socket_siblings);
+ return true;
+ }
+
+ /* The handle in the reply head doesn't match any AIOReq. Fail. */
+ logout("cannot find aio_req for handle %lu\n", rsp.handle);
+ nbd_handle_io_err(s, NULL, EIO);
+ return false;
+}
+
+static void nbd_aio_read_response(void *opaque)
+{
+ BDRVNBDState *s = opaque;
+ uint8_t *buf = NULL; /* Used if the aiocb has been canceled */
+ AIOReq *aio_req = NULL;
+ NBDAIOCB *acb;
+ NBDReply rsp;
+
+ size_t total = 0; /* number of payload bytes read */
+ ssize_t ret;
+ int rest;
+
+ /* We're not in the middle of a request */
+ if (s->current_req == NULL) {
+ /* No outstanding requests */
+ if (QTAILQ_EMPTY(&s->reqs_for_reply_head)) {
+ logout("No request outstanding, exiting\n");
+ return;
+ }
+
+ /* Couldn't grab the next aioreq */
+ if (!nbd_find_next_aioreq(s)) {
+ logout("Failed to find a new aio_req to work on, exiting\n");
+ return;
+ }
+ }
+
+ /* From here on, s->current_req and s->nbd_rsp_buf are known to be good */
+ nbd_buf_to_reply(&s->nbd_rsp_buf[0], &rsp);
+ aio_req = s->current_req;
+ acb = aio_req->aiocb;
+
+ /* NBD server returned an error for this operation */
+ if (rsp.error != 0) {
+ logout("NBD request resulted in error: %i\n", rsp.error);
+ acb->ret = -EIO;
+
+ rest = free_aio_req(s, aio_req);
+ if (!rest) {
+ logout("Signalling completion for this ACB\n");
+ acb->aio_done_func(acb);
+ }
+
+ return;
+ }
+
+ if (acb->aiocb_type == AIOCB_READ_UDATA) {
+ total = aio_req->nbd_req_hdr.len;
+ }
+
+ if (acb->aiocb_type == AIOCB_READ_UDATA && aio_req->bytes_got < total) {
+
+ size_t remaining = total - aio_req->bytes_got;
+ QEMUIOVector *qiov = acb->qiov;
+ off_t qiov_offset = aio_req->bytes_got + aio_req->iov_offset;
+
+ if (acb->canceled) {
+ buf = qemu_malloc(remaining);
+ qemu_iovec_init(qiov, 1);
+ qemu_iovec_add(qiov, buf, remaining);
+ qiov_offset = 0;
+ }
+
+ ret = nbd_qiov_wr(s->sock, qiov, remaining, qiov_offset, true);
+ logout("Read %zu of %zu bytes remaining\n", ret, remaining);
+
+ if (acb->canceled) {
+ qemu_iovec_destroy(qiov);
+ qemu_free(buf);
+ }
+
+ if (ret < 0) {
+ nbd_handle_io_err(s, aio_req, -ret);
+ return;
+ }
+
+ aio_req->bytes_got += ret;
+ }
+
+ /* Entire request has been read */
+ if (total == aio_req->bytes_got) {
+ logout("Read all bytes of the response; clearing s->current_req\n");
+ s->nbd_rsp_offset = 0;
+
+ /* Free the aio_req. If the NBDAIOCB is finished, notify QEMU */
+ rest = free_aio_req(s, aio_req);
+ if (!rest) {
+ logout("acb complete\n");
+ acb->aio_done_func(acb);
+ }
+ }
+
+ logout("Leaving function\n");
+}
+
+static int nbd_establish_connection(BDRVNBDState *s)
{
- BDRVNBDState *s = bs->opaque;
int sock;
int ret;
off_t size;
@@ -139,23 +621,30 @@ static int nbd_establish_connection(BlockDriverState *bs)
s->sock = sock;
s->size = size;
s->blocksize = blocksize;
+ s->nbd_rsp_offset = 0;
+
+ qemu_aio_set_fd_handler(sock, nbd_aio_read_response, NULL,
+ nbd_aio_flush_request, NULL, s);
logout("Established connection with NBD server\n");
return 0;
}
-static void nbd_teardown_connection(BlockDriverState *bs)
+static void nbd_teardown_connection(BDRVNBDState *s)
{
- BDRVNBDState *s = bs->opaque;
struct nbd_request request;
request.type = NBD_CMD_DISC;
- request.handle = (uint64_t)(intptr_t)bs;
+ request.handle = s->aioreq_seq_num++;
request.from = 0;
request.len = 0;
nbd_send_request(s->sock, &request);
+ qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, NULL);
closesocket(s->sock);
+ s->sock = -1;
+
+ logout("Connection to NBD server closed\n");
}
static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
@@ -169,99 +658,217 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
return result;
}
+ QTAILQ_INIT(&s->reqs_to_send_head);
+ QTAILQ_INIT(&s->reqs_for_reply_head);
+
+ s->current_req = NULL;
+ s->aioreq_seq_num = 0;
+ s->nbd_rsp_offset = 0;
+ s->sock = -1;
+
/* establish TCP connection, return error if it fails
* TODO: Configurable retry-until-timeout behaviour.
*/
- result = nbd_establish_connection(bs);
+ result = nbd_establish_connection(s);
return result;
}
-static int nbd_read(BlockDriverState *bs, int64_t sector_num,
- uint8_t *buf, int nb_sectors)
+static void nbd_close(BlockDriverState *bs)
{
BDRVNBDState *s = bs->opaque;
- struct nbd_request request;
- struct nbd_reply reply;
+ qemu_free(s->export_name);
+ qemu_free(s->host_spec);
- request.type = NBD_CMD_READ;
- request.handle = (uint64_t)(intptr_t)bs;
- request.from = sector_num * 512;;
- request.len = nb_sectors * 512;
+ nbd_teardown_connection(s);
+}
- if (nbd_send_request(s->sock, &request) == -1)
- return -errno;
+static void nbd_register_write_request_handler(BDRVNBDState *s)
+{
+ int sock = s->sock;
+ if (sock == -1) {
+ logout("Register write request handler tried when socket closed\n");
+ return;
+ }
- if (nbd_receive_reply(s->sock, &reply) == -1)
- return -errno;
+ qemu_aio_set_fd_handler(sock, nbd_aio_read_response, nbd_aio_write_request,
+ nbd_aio_flush_request, NULL, s);
+}
- if (reply.error !=0)
- return -reply.error;
+static void nbd_unregister_write_request_handler(BDRVNBDState *s)
+{
+ int sock = s->sock;
+ if (s->sock == -1) {
+ logout("Unregister write request handler tried when socket closed\n");
+ return;
+ }
+
+ qemu_aio_set_fd_handler(s->sock, nbd_aio_read_response, NULL,
+ nbd_aio_flush_request, NULL, s);
+}
+
+/* We remove all the aiocbs currently sat in reqs_to_send_head (excepting the
+ * first, if any bytes have been transmitted). So we don't need to check for
+ * canceled in aio_write_request at all.If that finishes the acb, we call its
+ * completion function. Otherwise, we leave it alone.
+ *
+ * in nbd_aio_read_response, when we're handling a read request for an acb with
+ * canceled = true, we allocate a QEMUIOVector of the appropriate size to do
+ * the read, and throw the bytes away. Everything else goes on as normal.
+ */
+static void nbd_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+ NBDAIOCB *acb = (NBDAIOCB *)blockacb;
+ BDRVNBDState *s = acb->common.bs->opaque;
+ AIOReq *a;
+
+ QLIST_FOREACH(a, &acb->aioreq_head, aioreq_siblings) {
+ free_aio_req(s, a);
+ }
+
+ acb->canceled = true;
+ acb->ret = -EIO;
+
+ if QLIST_EMPTY(&acb->aioreq_head) {
+ nbd_finish_aiocb(acb);
+ }
+}
- if (reply.handle != request.handle)
+static AIOPool nbd_aio_pool = {
+ .aiocb_size = sizeof(NBDAIOCB),
+ .cancel = nbd_aio_cancel,
+};
+
+static NBDAIOCB *nbd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
+ int64_t sector_num, int nb_sectors,
+ BlockDriverCompletionFunc *cb, void *opaque)
+{
+ NBDAIOCB *acb;
+
+ acb = qemu_aio_get(&nbd_aio_pool, bs, cb, opaque);
+
+ acb->qiov = qiov;
+ acb->sector_num = sector_num;
+ acb->nb_sectors = nb_sectors;
+
+ acb->canceled = false;
+
+ acb->aio_done_func = NULL;
+ acb->bh = NULL;
+ acb->ret = 0;
+
+ QLIST_INIT(&acb->aioreq_head);
+ return acb;
+}
+
+static int nbd_schedule_bh(QEMUBHFunc *cb, NBDAIOCB *acb)
+{
+ if (acb->bh) {
+ logout("bug: %d %d\n", acb->aiocb_type, acb->aiocb_type);
return -EIO;
+ }
- if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len)
+ acb->bh = qemu_bh_new(cb, acb);
+ if (!acb->bh) {
+ logout("oom: %d %d\n", acb->aiocb_type, acb->aiocb_type);
return -EIO;
+ }
+
+ qemu_bh_schedule(acb->bh);
return 0;
}
-static int nbd_write(BlockDriverState *bs, int64_t sector_num,
- const uint8_t *buf, int nb_sectors)
+static void nbd_readv_writev_bh_cb(void *p)
{
- BDRVNBDState *s = bs->opaque;
- struct nbd_request request;
- struct nbd_reply reply;
+ NBDAIOCB *acb = p;
- request.type = NBD_CMD_WRITE;
- request.handle = (uint64_t)(intptr_t)bs;
- request.from = sector_num * 512;;
- request.len = nb_sectors * 512;
+ size_t len, done = 0;
+ size_t total = acb->nb_sectors * SECTOR_SIZE;
- if (nbd_send_request(s->sock, &request) == -1)
- return -errno;
+ /* Where the read/write starts from */
+ off_t offset = acb->sector_num * SECTOR_SIZE;
+ BDRVNBDState *s = acb->common.bs->opaque;
- if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len)
- return -EIO;
+ AIOReq *aio_req;
- if (nbd_receive_reply(s->sock, &reply) == -1)
- return -errno;
+ logout("Entering nbd_readv_writev_bh_cb\n");
- if (reply.error !=0)
- return -reply.error;
+ qemu_bh_delete(acb->bh);
+ acb->bh = NULL;
- if (reply.handle != request.handle)
- return -EIO;
+ while (done < total) {
+ len = (total - done);
- return 0;
+ /* Split read & write requests into segments if needed */
+ if (acb->aiocb_type == AIOCB_READ_UDATA && len > NBD_MAX_READ) {
+ len = NBD_MAX_READ;
+ }
+
+ if (acb->aiocb_type == AIOCB_WRITE_UDATA && len > NBD_MAX_WRITE) {
+ len = NBD_MAX_WRITE;
+ }
+
+ logout("Allocating an aio_req of %zu bytes\n", len);
+ aio_req = nbd_alloc_aio_req(s, acb, len, offset + done, done);
+
+ done += len;
+ }
+
+ if (QLIST_EMPTY(&acb->aioreq_head)) {
+ logout("acb->ioreq_head empty, so finishing acb now\n");
+ nbd_finish_aiocb(acb);
+ } else {
+ logout("Requests to make - registering write request callback\n");
+ nbd_register_write_request_handler(s);
+ }
+
+ logout("Leaving nbd_readv_writev_bh_cb\n");
}
-static void nbd_close(BlockDriverState *bs)
+static BlockDriverAIOCB *nbd_aio_readv(BlockDriverState *bs,
+ int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+ BlockDriverCompletionFunc *cb, void *opaque)
{
- BDRVNBDState *s = bs->opaque;
- qemu_free(s->export_name);
- qemu_free(s->host_spec);
+ NBDAIOCB *acb;
- nbd_teardown_connection(bs);
+ acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
+ acb->aiocb_type = AIOCB_READ_UDATA;
+ acb->aio_done_func = nbd_finish_aiocb;
+
+ nbd_schedule_bh(nbd_readv_writev_bh_cb, acb);
+ return &acb->common;
+}
+
+static BlockDriverAIOCB *nbd_aio_writev(BlockDriverState *bs,
+ int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
+ BlockDriverCompletionFunc *cb, void *opaque)
+{
+ NBDAIOCB *acb;
+
+ acb = nbd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
+ acb->aiocb_type = AIOCB_WRITE_UDATA;
+ acb->aio_done_func = nbd_finish_aiocb;
+
+ nbd_schedule_bh(nbd_readv_writev_bh_cb, acb);
+ return &acb->common;
}
static int64_t nbd_getlength(BlockDriverState *bs)
{
BDRVNBDState *s = bs->opaque;
-
return s->size;
}
static BlockDriver bdrv_nbd = {
- .format_name = "nbd",
- .instance_size = sizeof(BDRVNBDState),
- .bdrv_file_open = nbd_open,
- .bdrv_read = nbd_read,
- .bdrv_write = nbd_write,
- .bdrv_close = nbd_close,
- .bdrv_getlength = nbd_getlength,
- .protocol_name = "nbd",
+ .format_name = "nbd",
+ .instance_size = sizeof(BDRVNBDState),
+ .bdrv_file_open = nbd_open,
+ .bdrv_aio_readv = nbd_aio_readv,
+ .bdrv_aio_writev = nbd_aio_writev,
+ .bdrv_close = nbd_close,
+ .bdrv_getlength = nbd_getlength,
+ .protocol_name = "nbd"
};
static void bdrv_nbd_init(void)
@@ -49,10 +49,6 @@
/* This is all part of the "official" NBD API */
-#define NBD_REPLY_SIZE (4 + 4 + 8)
-#define NBD_REQUEST_MAGIC 0x25609513
-#define NBD_REPLY_MAGIC 0x67446698
-
#define NBD_SET_SOCK _IO(0xab, 0)
#define NBD_SET_BLKSIZE _IO(0xab, 1)
#define NBD_SET_SIZE _IO(0xab, 2)
@@ -107,6 +103,30 @@ size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
return offset;
}
+ssize_t nbd_qiov_wr(int fd, QEMUIOVector *qiov, size_t len, off_t offset,
+ bool do_read)
+{
+ ssize_t ret;
+ QEMUIOVector spec;
+
+ qemu_iovec_init(&spec, qiov->niov);
+ qemu_iovec_copy(&spec, qiov, offset, len);
+
+ if (do_read) {
+ ret = readv(fd, spec.iov, spec.niov);
+ } else {
+ ret = writev(fd, spec.iov, spec.niov);
+ }
+
+ qemu_iovec_destroy(&spec);
+
+ if (ret == -1) {
+ return -socket_error();
+ }
+
+ return ret;
+}
+
static void combine_addr(char *buf, size_t len, const char* address,
uint16_t port)
{
@@ -429,15 +449,31 @@ int nbd_client(int fd)
}
#endif
-int nbd_send_request(int csock, struct nbd_request *request)
+/* Put the NBD header into a buffer, ready for wire transmission.
+ * Endianness is dealt with here. The caller needs to allocate a
+ * buffer of sizeof(NBDRequest) bytes.
+ */
+void nbd_request_to_buf(NBDRequest *request, uint8_t *buf)
{
- uint8_t buf[4 + 4 + 8 + 8 + 4];
+ request->magic = NBD_REQUEST_MAGIC;
+ cpu_to_be32w((uint32_t *)(buf + 0), request->magic);
+ cpu_to_be32w((uint32_t *)(buf + 4), request->type);
+ cpu_to_be64w((uint64_t *)(buf + 8), request->handle);
+ cpu_to_be64w((uint64_t *)(buf + 16), request->from);
+ cpu_to_be32w((uint32_t *)(buf + 24), request->len);
+}
+
+void nbd_buf_to_reply(const uint8_t *buf, NBDReply *reply)
+{
+ reply->magic = be32_to_cpup((uint32_t *)buf);
+ reply->error = be32_to_cpup((uint32_t *)(buf + 4));
+ reply->handle = be64_to_cpup((uint64_t *)(buf + 8));
+}
- cpu_to_be32w((uint32_t*)buf, NBD_REQUEST_MAGIC);
- cpu_to_be32w((uint32_t*)(buf + 4), request->type);
- cpu_to_be64w((uint64_t*)(buf + 8), request->handle);
- cpu_to_be64w((uint64_t*)(buf + 16), request->from);
- cpu_to_be32w((uint32_t*)(buf + 24), request->len);
+int nbd_send_request(int csock, NBDRequest *request)
+{
+ uint8_t buf[sizeof(NBDRequest)];
+ nbd_request_to_buf(request, buf);
TRACE("Sending request to client: "
"{ .from = %" PRIu64", .len = %u, .handle = %" PRIu64", .type=%i}",
@@ -490,8 +526,7 @@ static int nbd_receive_request(int csock, struct nbd_request *request)
int nbd_receive_reply(int csock, struct nbd_reply *reply)
{
- uint8_t buf[NBD_REPLY_SIZE];
- uint32_t magic;
+ uint8_t buf[sizeof(NBDReply)];
memset(buf, 0xAA, sizeof(buf));
@@ -506,17 +541,14 @@ int nbd_receive_reply(int csock, struct nbd_reply *reply)
[ 4 .. 7] error (0 == no error)
[ 7 .. 15] handle
*/
-
- magic = be32_to_cpup((uint32_t*)buf);
- reply->error = be32_to_cpup((uint32_t*)(buf + 4));
- reply->handle = be64_to_cpup((uint64_t*)(buf + 8));
+ nbd_buf_to_reply((uint8_t *)&buf, reply);
TRACE("Got reply: "
"{ magic = 0x%x, .error = %d, handle = %" PRIu64" }",
- magic, reply->error, reply->handle);
+ reply->magic, reply->error, reply->handle);
- if (magic != NBD_REPLY_MAGIC) {
- LOG("invalid magic (got 0x%x)", magic);
+ if (reply->magic != NBD_REPLY_MAGIC) {
+ LOG("invalid magic (got 0x%x)", reply->magic);
errno = EINVAL;
return -1;
}
@@ -558,7 +590,7 @@ int nbd_trip(BlockDriverState *bs, int csock, off_t size, uint64_t dev_offset,
return -1;
if (request.len + NBD_REPLY_SIZE > data_size) {
- LOG("len (%u) is larger than max len (%u)",
+ LOG("len (%lu) is larger than max len (%u)",
request.len + NBD_REPLY_SIZE, data_size);
errno = EINVAL;
return -1;
@@ -39,15 +39,23 @@ struct nbd_reply {
uint64_t handle;
} __attribute__ ((__packed__));
+typedef struct nbd_request NBDRequest;
+typedef struct nbd_reply NBDReply;
+
enum {
NBD_CMD_READ = 0,
NBD_CMD_WRITE = 1,
NBD_CMD_DISC = 2
};
-#define NBD_DEFAULT_PORT 10809
+#define NBD_DEFAULT_PORT 10809
+#define NBD_REPLY_SIZE sizeof(NBDReply)
+#define NBD_REQUEST_MAGIC 0x25609513
+#define NBD_REPLY_MAGIC 0x67446698
size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read);
+ssize_t nbd_qiov_wr(int fd, QEMUIOVector *qiov, size_t len, off_t offset,
+ bool do_read);
int tcp_socket_outgoing(const char *address, uint16_t port);
int tcp_socket_incoming(const char *address, uint16_t port);
int tcp_socket_outgoing_spec(const char *address_and_port);
@@ -55,6 +63,9 @@ int tcp_socket_incoming_spec(const char *address_and_port);
int unix_socket_outgoing(const char *path);
int unix_socket_incoming(const char *path);
+void nbd_request_to_buf(NBDRequest *request, uint8_t *buf);
+void nbd_buf_to_reply(const uint8_t *buf, NBDReply *reply);
+
int nbd_negotiate(int csock, off_t size);
int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
off_t *size, size_t *blocksize);