Patchwork [RFC,v4,3/3] block: add sheepdog driver for distributed storage support

login
register
mail settings
Submitter MORITA Kazutaka
Date May 28, 2010, 2:44 a.m.
Message ID <1275014699-9431-4-git-send-email-morita.kazutaka@lab.ntt.co.jp>
Download mbox | patch
Permalink /patch/53855/
State New
Headers show

Comments

MORITA Kazutaka - May 28, 2010, 2:44 a.m.
Sheepdog is a distributed storage system for QEMU. It provides highly
available block level storage volumes to VMs like Amazon EBS.  This
patch adds a qemu block driver for Sheepdog.

Sheepdog features are:
- No node in the cluster is special (no metadata node, no control
  node, etc)
- Linear scalability in performance and capacity
- No single point of failure
- Autonomous management (zero configuration)
- Useful volume management support such as snapshot and cloning
- Thin provisioning
- Autonomous load balancing

The more details are available at the project site:
    http://www.osrg.net/sheepdog/

Signed-off-by: MORITA Kazutaka <morita.kazutaka@lab.ntt.co.jp>
---
 Makefile.objs    |    2 +-
 block/sheepdog.c | 1835 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 1836 insertions(+), 1 deletions(-)
 create mode 100644 block/sheepdog.c
Krumme, Chris - June 1, 2010, 2:58 p.m.
On 05/27/2010 09:44 PM, MORITA Kazutaka wrote:
> Sheepdog is a distributed storage system for QEMU. It provides highly
> available block level storage volumes to VMs like Amazon EBS.  This
> patch adds a qemu block driver for Sheepdog.
>
> Sheepdog features are:
> - No node in the cluster is special (no metadata node, no control
>    node, etc)
> - Linear scalability in performance and capacity
> - No single point of failure
> - Autonomous management (zero configuration)
> - Useful volume management support such as snapshot and cloning
> - Thin provisioning
> - Autonomous load balancing
>
> The more details are available at the project site:
>      http://www.osrg.net/sheepdog/
>
> Signed-off-by: MORITA Kazutaka<morita.kazutaka@lab.ntt.co.jp>
> ---
>   Makefile.objs    |    2 +-
>   block/sheepdog.c | 1835 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
>   2 files changed, 1836 insertions(+), 1 deletions(-)
>   create mode 100644 block/sheepdog.c
>
> diff --git a/Makefile.objs b/Makefile.objs
> index 1a942e5..527a754 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -14,7 +14,7 @@ block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
>
>   block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vvfat.o
>   block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o
> -block-nested-y += parallels.o nbd.o blkdebug.o
> +block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o
>   block-nested-$(CONFIG_WIN32) += raw-win32.o
>   block-nested-$(CONFIG_POSIX) += raw-posix.o
>   block-nested-$(CONFIG_CURL) += curl.o
> diff --git a/block/sheepdog.c b/block/sheepdog.c
> new file mode 100644
> index 0000000..68545e8
> --- /dev/null
> +++ b/block/sheepdog.c
> @@ -0,0 +1,1835 @@
> +/*
> + * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
> + *
> + * This program is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU General Public License version
> + * 2 as published by the Free Software Foundation.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program. If not, see<http://www.gnu.org/licenses/>.
> + */
> +#include<netdb.h>
> +#include<netinet/tcp.h>
> +
> +#include "qemu-common.h"
> +#include "qemu-error.h"
> +#include "block_int.h"
> +
> +#define SD_PROTO_VER 0x01
> +
> +#define SD_DEFAULT_ADDR "localhost:7000"
> +
> +#define SD_OP_CREATE_AND_WRITE_OBJ  0x01
> +#define SD_OP_READ_OBJ       0x02
> +#define SD_OP_WRITE_OBJ      0x03
> +
> +#define SD_OP_NEW_VDI        0x11
> +#define SD_OP_LOCK_VDI       0x12
> +#define SD_OP_RELEASE_VDI    0x13
> +#define SD_OP_GET_VDI_INFO   0x14
> +#define SD_OP_READ_VDIS      0x15
> +
> +#define SD_FLAG_CMD_WRITE    0x01
> +#define SD_FLAG_CMD_COW      0x02
> +
> +#define SD_RES_SUCCESS       0x00 /* Success */
> +#define SD_RES_UNKNOWN       0x01 /* Unknown error */
> +#define SD_RES_NO_OBJ        0x02 /* No object found */
> +#define SD_RES_EIO           0x03 /* I/O error */
> +#define SD_RES_VDI_EXIST     0x04 /* Vdi exists already */
> +#define SD_RES_INVALID_PARMS 0x05 /* Invalid parameters */
> +#define SD_RES_SYSTEM_ERROR  0x06 /* System error */
> +#define SD_RES_VDI_LOCKED    0x07 /* Vdi is locked */
> +#define SD_RES_NO_VDI        0x08 /* No vdi found */
> +#define SD_RES_NO_BASE_VDI   0x09 /* No base vdi found */
> +#define SD_RES_VDI_READ      0x0A /* Cannot read requested vdi */
> +#define SD_RES_VDI_WRITE     0x0B /* Cannot write requested vdi */
> +#define SD_RES_BASE_VDI_READ 0x0C /* Cannot read base vdi */
> +#define SD_RES_BASE_VDI_WRITE   0x0D /* Cannot write base vdi */
> +#define SD_RES_NO_TAG        0x0E /* Requested tag is not found */
> +#define SD_RES_STARTUP       0x0F /* Sheepdog is on starting up */
> +#define SD_RES_VDI_NOT_LOCKED   0x10 /* Vdi is not locked */
> +#define SD_RES_SHUTDOWN      0x11 /* Sheepdog is shutting down */
> +#define SD_RES_NO_MEM        0x12 /* Cannot allocate memory */
> +#define SD_RES_FULL_VDI      0x13 /* we already have the maximum vdis */
> +#define SD_RES_VER_MISMATCH  0x14 /* Protocol version mismatch */
> +#define SD_RES_NO_SPACE      0x15 /* Server has no room for new objects */
> +#define SD_RES_WAIT_FOR_FORMAT  0x16 /* Sheepdog is waiting for a format operation */
> +#define SD_RES_WAIT_FOR_JOIN    0x17 /* Sheepdog is waiting for other nodes joining */
> +#define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
> +
> +/*
> + * Object ID rules
> + *
> + *  0 - 19 (20 bits): data object space
> + * 20 - 31 (12 bits): reserved data object space
> + * 32 - 55 (24 bits): vdi object space
> + * 56 - 59 ( 4 bits): reserved vdi object space
> + * 60 - 63 ( 4 bits): object type indentifier space
> + */
> +
> +#define VDI_SPACE_SHIFT   32
> +#define VDI_BIT (UINT64_C(1)<<  63)
> +#define VMSTATE_BIT (UINT64_C(1)<<  62)
> +#define MAX_DATA_OBJS (1ULL<<  20)
> +#define MAX_CHILDREN 1024
> +#define SD_MAX_VDI_LEN 256
> +#define SD_NR_VDIS   (1U<<  24)
> +#define SD_DATA_OBJ_SIZE (UINT64_C(1)<<  22)
> +
> +#define SD_INODE_SIZE (sizeof(SheepdogInode))
> +#define CURRENT_VDI_ID 0
> +
> +typedef struct SheepdogReq {
> +	uint8_t		proto_ver;
> +	uint8_t		opcode;
> +	uint16_t	flags;
> +	uint32_t	epoch;
> +	uint32_t        id;
> +	uint32_t        data_length;
> +	uint32_t	opcode_specific[8];
> +} SheepdogReq;
> +
> +typedef struct SheepdogRsp {
> +	uint8_t		proto_ver;
> +	uint8_t		opcode;
> +	uint16_t	flags;
> +	uint32_t	epoch;
> +	uint32_t        id;
> +	uint32_t        data_length;
> +	uint32_t        result;
> +	uint32_t	opcode_specific[7];
> +} SheepdogRsp;
> +
> +typedef struct SheepdogObjReq {
> +	uint8_t		proto_ver;
> +	uint8_t		opcode;
> +	uint16_t	flags;
> +	uint32_t	epoch;
> +	uint32_t        id;
> +	uint32_t        data_length;
> +	uint64_t        oid;
> +	uint64_t        cow_oid;
> +	uint32_t        copies;
> +	uint32_t        rsvd;
> +	uint64_t        offset;
> +} SheepdogObjReq;
> +
> +typedef struct SheepdogObjRsp {
> +	uint8_t		proto_ver;
> +	uint8_t		opcode;
> +	uint16_t	flags;
> +	uint32_t	epoch;
> +	uint32_t        id;
> +	uint32_t        data_length;
> +	uint32_t        result;
> +	uint32_t        copies;
> +	uint32_t        pad[6];
> +} SheepdogObjRsp;
> +
> +typedef struct SheepdogVdiReq {
> +	uint8_t		proto_ver;
> +	uint8_t		opcode;
> +	uint16_t	flags;
> +	uint32_t	epoch;
> +	uint32_t        id;
> +	uint32_t        data_length;
> +	uint64_t	vdi_size;
> +	uint32_t        base_vdi_id;
> +	uint32_t        copies;
> +	uint32_t        snapid;
> +	uint32_t        pad[3];
> +} SheepdogVdiReq;
> +
> +typedef struct SheepdogVdiRsp {
> +	uint8_t		proto_ver;
> +	uint8_t		opcode;
> +	uint16_t	flags;
> +	uint32_t	epoch;
> +	uint32_t        id;
> +	uint32_t        data_length;
> +	uint32_t        result;
> +	uint32_t        rsvd;
> +	uint32_t        vdi_id;
> +	uint32_t        pad[5];
> +} SheepdogVdiRsp;
> +
> +typedef struct SheepdogInode {
> +	char name[SD_MAX_VDI_LEN];
> +	uint64_t ctime;
> +	uint64_t snap_ctime;
> +	uint64_t vm_clock_nsec;
> +	uint64_t vdi_size;
> +	uint64_t vm_state_size;
> +	uint16_t copy_policy;
> +	uint8_t  nr_copies;
> +	uint8_t  block_size_shift;
> +	uint32_t snap_id;
> +	uint32_t vdi_id;
> +	uint32_t parent_vdi_id;
> +	uint32_t child_vdi_id[MAX_CHILDREN];
> +	uint32_t data_vdi_id[MAX_DATA_OBJS];
> +} SheepdogInode;
> +
> +/*
> + * 64 bit FNV-1a non-zero initial basis
> + */
> +#define FNV1A_64_INIT ((uint64_t)0xcbf29ce484222325ULL)
> +
> +/*
> + * 64 bit Fowler/Noll/Vo FNV-1a hash code
> + */
> +static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
> +{
> +        unsigned char *bp = (unsigned char *) buf;
> +        unsigned char *be = bp + len;
> +        while (bp<  be) {
> +                hval ^= (uint64_t) *bp++;
> +                hval += (hval<<  1) + (hval<<  4) + (hval<<  5) +
> +                        (hval<<  7) + (hval<<  8) + (hval<<  40);
> +        }
> +        return hval;
> +}
> +
> +static inline int is_data_obj_writeable(SheepdogInode *inode, unsigned int idx)
> +{
> +	return inode->vdi_id == inode->data_vdi_id[idx];
> +}
> +
> +static inline int is_data_obj(uint64_t oid)
> +{
> +	return !(VDI_BIT&  oid);
> +}
> +
> +static inline uint64_t data_oid_to_idx(uint64_t oid)
> +{
> +	return oid&  (MAX_DATA_OBJS - 1);
> +}
> +
> +static inline uint64_t vid_to_vdi_oid(uint32_t vid)
> +{
> +	return VDI_BIT | ((uint64_t)vid<<  VDI_SPACE_SHIFT);
> +}
> +
> +static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx)
> +{
> +	return VMSTATE_BIT | ((uint64_t)vid<<  VDI_SPACE_SHIFT) | idx;
> +}
> +
> +static inline uint64_t vid_to_data_oid(uint32_t vid, uint32_t idx)
> +{
> +	return ((uint64_t)vid<<  VDI_SPACE_SHIFT) | idx;
> +}
> +
> +#undef dprintf
> +#ifdef DEBUG_SDOG
> +#define dprintf(fmt, args...)						\
> +do {									\
> +	fprintf(stdout, "%s %d: " fmt, __func__, __LINE__, ##args);	\
> +} while (0)
> +#else
> +#define dprintf(fmt, args...)
> +#endif
> +
> +#define min_t(type, x, y) ({			\
> +	type __min1 = (x);			\
> +	type __min2 = (y);			\
> +	__min1<  __min2 ? __min1: __min2; })
> +
> +#define max_t(type, x, y) ({			\
> +	type __max1 = (x);			\
> +	type __max2 = (y);			\
> +	__max1>  __max2 ? __max1: __max2; })
> +
> +typedef struct SheepdogAIOCB SheepdogAIOCB;
> +
> +typedef struct AIOReq {
> +	SheepdogAIOCB *aiocb;
> +	unsigned int iov_offset;
> +
> +	uint64_t oid;
> +	uint64_t base_oid;
> +	uint64_t offset;
> +	unsigned int data_len;
> +	uint8_t flags;
> +	uint32_t id;
> +
> +	QLIST_ENTRY(AIOReq) outstanding_aio_siblings;
> +	QLIST_ENTRY(AIOReq) aioreq_siblings;
> +} AIOReq;
> +
> +enum AIOCBState {
> +	AIOCB_WRITE_UDATA,
> +	AIOCB_READ_UDATA,
> +};
> +
> +struct SheepdogAIOCB {
> +	BlockDriverAIOCB common;
> +
> +	QEMUIOVector *qiov;
> +
> +	int64_t sector_num;
> +	int nb_sectors;
> +
> +	int ret;
> +	enum AIOCBState aiocb_type;
> +
> +	QEMUBH *bh;
> +	void (*aio_done_func)(SheepdogAIOCB *);
> +
> +	int canceled;
> +
> +	QLIST_HEAD(aioreq_head, AIOReq) aioreq_head;
> +};
> +
> +typedef struct BDRVSheepdogState {
> +	SheepdogInode inode;
> +
> +	uint32_t min_dirty_data_idx;
> +	uint32_t max_dirty_data_idx;
> +
> +	char name[SD_MAX_VDI_LEN];
> +	int is_current;
> +
> +	char *addr;
> +	int fd;
> +
> +	uint32_t aioreq_seq_num;
> +	QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head;
> +} BDRVSheepdogState;
> +
> +static const char * sd_strerror(int err)
> +{
> +	int i;
> +
> +	static const struct {
> +		int err;
> +		const char *desc;
> +	} errors[] = {
> +		{SD_RES_SUCCESS, "Success"},
> +		{SD_RES_UNKNOWN, "Unknown error"},
> +		{SD_RES_NO_OBJ, "No object found"},
> +		{SD_RES_EIO, "I/O error"},
> +		{SD_RES_VDI_EXIST, "VDI exists already"},
> +		{SD_RES_INVALID_PARMS, "Invalid parameters"},
> +		{SD_RES_SYSTEM_ERROR, "System error"},
> +		{SD_RES_VDI_LOCKED, "VDI is already locked"},
> +		{SD_RES_NO_VDI, "No vdi found"},
> +		{SD_RES_NO_BASE_VDI, "No base VDI found"},
> +		{SD_RES_VDI_READ, "Failed read the requested VDI"},
> +		{SD_RES_VDI_WRITE, "Failed to write the requested VDI"},
> +		{SD_RES_BASE_VDI_READ, "Failed to read the base VDI"},
> +		{SD_RES_BASE_VDI_WRITE, "Failed to write the base VDI"},
> +		{SD_RES_NO_TAG, "Failed to find the requested tag"},
> +		{SD_RES_STARTUP, "The system is still booting"},
> +		{SD_RES_VDI_NOT_LOCKED, "VDI isn't locked"},
> +		{SD_RES_SHUTDOWN, "The system is shutting down"},
> +		{SD_RES_NO_MEM, "Out of memory on the server"},
> +		{SD_RES_FULL_VDI, "We already have the maximum vdis"},
> +		{SD_RES_VER_MISMATCH, "Protocol version mismatch"},
> +		{SD_RES_NO_SPACE, "Server has no space for new objects"},
> +		{SD_RES_WAIT_FOR_FORMAT, "Sheepdog is waiting for a format operation"},
> +		{SD_RES_WAIT_FOR_JOIN, "Sheepdog is waiting for other nodes joining"},
> +		{SD_RES_JOIN_FAILED, "Target node had failed to join sheepdog"},
> +	};
> +
> +	for (i = 0; i<  ARRAY_SIZE(errors); ++i) {
> +		if (errors[i].err == err) {
> +			return errors[i].desc;
> +		}
> +	}
> +
> +	return "Invalid error code";
> +}
> +
> +static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s,
> +				    SheepdogAIOCB *acb,
> +				    uint64_t oid, unsigned int data_len,
> +				    uint64_t offset, uint8_t flags,
> +				    uint64_t base_oid,
> +				    unsigned int iov_offset)
> +{
> +	AIOReq *aio_req;
> +
> +	aio_req = qemu_malloc(sizeof(*aio_req));
> +	aio_req->aiocb = acb;
> +	aio_req->iov_offset = iov_offset;
> +	aio_req->oid = oid;
> +	aio_req->base_oid = base_oid;
> +	aio_req->offset = offset;
> +	aio_req->data_len = data_len;
> +	aio_req->flags = flags;
> +	aio_req->id = s->aioreq_seq_num++;
> +
> +	QLIST_INSERT_HEAD(&s->outstanding_aio_head, aio_req,
> +			  outstanding_aio_siblings);
> +	QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings);
> +
> +	return aio_req;
> +}
> +
> +static inline int free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
> +{
> +	SheepdogAIOCB *acb = aio_req->aiocb;
> +	QLIST_REMOVE(aio_req, outstanding_aio_siblings);
> +	QLIST_REMOVE(aio_req, aioreq_siblings);
> +	qemu_free(aio_req);
> +
> +	return !QLIST_EMPTY(&acb->aioreq_head);
> +}
> +
> +static void sd_finish_aiocb(SheepdogAIOCB *acb)
> +{
> +	if (!acb->canceled) {
> +		acb->common.cb(acb->common.opaque, acb->ret);
> +	}
> +	qemu_aio_release(acb);
> +}
> +
> +static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
> +{
> +	SheepdogAIOCB *acb = (SheepdogAIOCB *)blockacb;
> +
> +	acb->canceled = 1;
> +}
> +
> +static AIOPool sd_aio_pool = {
> +	.aiocb_size = sizeof(SheepdogAIOCB),
> +	.cancel = sd_aio_cancel,
> +};
> +
> +static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
> +				   int64_t sector_num, int nb_sectors,
> +				   BlockDriverCompletionFunc *cb,
> +				   void *opaque)
> +{
> +	SheepdogAIOCB *acb;
> +
> +	acb = qemu_aio_get(&sd_aio_pool, bs, cb, opaque);
> +
> +	acb->qiov = qiov;
> +
> +	acb->sector_num = sector_num;
> +	acb->nb_sectors = nb_sectors;
> +
> +	acb->aio_done_func = NULL;
> +	acb->canceled = 0;
> +	acb->bh = NULL;
> +	acb->ret = 0;
> +	QLIST_INIT(&acb->aioreq_head);
> +	return acb;
> +}
> +
> +static int sd_schedule_bh(QEMUBHFunc *cb, SheepdogAIOCB *acb)
> +{
> +	if (acb->bh) {
> +		error_report("bug: %d %d\n", acb->aiocb_type, acb->aiocb_type);
> +		return -EIO;
> +	}
> +
> +	acb->bh = qemu_bh_new(cb, acb);
> +	if (!acb->bh) {
> +		error_report("oom: %d %d\n", acb->aiocb_type, acb->aiocb_type);
> +		return -EIO;
> +	}
> +
> +	qemu_bh_schedule(acb->bh);
> +
> +	return 0;
> +}
> +
> +static int do_send_recv(int sockfd, struct iovec *iov, int len, int offset,
> +			int write)
> +{
> +	struct msghdr msg;
> +	int ret, diff;
> +
> +	memset(&msg, 0, sizeof(msg));
> +	msg.msg_iov = iov;
> +	msg.msg_iovlen = 1;
> +
> +	len += offset;
> +
> +	while (iov->iov_len<  len) {
> +		len -= iov->iov_len;
> +
> +		iov++;
> +		msg.msg_iovlen++;
> +	}
> +
> +	diff = iov->iov_len - len;
> +	iov->iov_len -= diff;
> +
> +	while (msg.msg_iov->iov_len<= offset) {
> +		offset -= msg.msg_iov->iov_len;
> +
> +		msg.msg_iov++;
> +		msg.msg_iovlen--;
> +	}
> +
> +	msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base + offset;
> +	msg.msg_iov->iov_len -= offset;
> +
> +	if (write) {
> +		ret = sendmsg(sockfd,&msg, 0);
> +	} else {
> +		ret = recvmsg(sockfd,&msg, MSG_WAITALL);
> +	}
> +
> +	msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base - offset;
> +	msg.msg_iov->iov_len += offset;
> +
> +	iov->iov_len += diff;
> +	return ret;
> +}
> +
> +static int connect_to_sdog(const char *addr)
> +{
> +	char buf[64];
> +	char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
> +	char name[256], *p;
> +	int fd, ret;
> +	struct addrinfo hints, *res, *res0;
> +	int port = 0;
> +
> +	if (!addr) {
> +		addr = SD_DEFAULT_ADDR;
> +	}
> +
> +	strcpy(name, addr);
>    

Can strlen(addr) be > sizeof(name)?

> +
> +	p = name;
> +	while (*p) {
> +		if (*p == ':') {
> +			*p++ = '\0';
>    

May also need to check for p > name + sizeof(name).

> +			break;
> +		} else {
> +			p++;
> +		}
> +	}
> +
> +	if (*p == '\0') {
> +		error_report("cannot find a port number, %s\n", name);
> +		return -1;
> +	}
> +	port = strtol(p, NULL, 10);
>    

Are negative numbers valid here?

> +	if (port == 0) {
> +		error_report("invalid port number, %s\n", p);
> +		return -1;
> +	}
> +
> +	memset(&hints, 0, sizeof(hints));
> +	snprintf(buf, sizeof(buf), "%d", port);
> +
> +	hints.ai_socktype = SOCK_STREAM;
> +
> +	ret = getaddrinfo(name, buf,&hints,&res0);
> +	if (ret) {
> +		error_report("unable to get address info %s, %m\n", name);
> +		return -1;
> +	}
> +
> +	for (res = res0; res; res = res->ai_next) {
> +		ret = getnameinfo(res->ai_addr, res->ai_addrlen,
> +				  hbuf, sizeof(hbuf), sbuf, sizeof(sbuf),
> +				  NI_NUMERICHOST | NI_NUMERICSERV);
> +		if (ret) {
> +			continue;
> +		}
> +
> +		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
> +		if (fd<  0) {
> +			continue;
> +		}
> +
> +reconnect:
> +		ret = connect(fd, res->ai_addr, res->ai_addrlen);
> +		if (ret<  0) {
> +			if (errno == EINTR) {
> +				goto reconnect;
> +			}
> +			break;
> +		}
> +
> +		dprintf("connected to %s:%d\n", name, port);
> +		goto success;
> +	}
> +	fd = -1;
> +	error_report("failed connect to %s:%d\n", name, port);
> +success:
> +	freeaddrinfo(res0);
> +	return fd;
> +}
> +
> +static int do_readv_writev(int sockfd, struct iovec *iov, int len,
> +			   int iov_offset, int write)
> +{
> +	int ret;
> +again:
> +	ret = do_send_recv(sockfd, iov, len, iov_offset, write);
> +	if (ret<  0) {
> +		if (errno == EINTR || errno == EAGAIN) {
> +			goto again;
> +		}
> +		error_report("failed to recv a rsp, %m\n");
> +		return 1;
> +	}
> +
> +	iov_offset += ret;
> +	len -= ret;
> +	if (len) {
> +		goto again;
> +	}
> +
> +	return 0;
> +}
> +
> +static int do_readv(int sockfd, struct iovec *iov, int len, int iov_offset)
> +{
> +	return do_readv_writev(sockfd, iov, len, iov_offset, 0);
> +}
> +
> +static int do_writev(int sockfd, struct iovec *iov, int len, int iov_offset)
> +{
> +	return do_readv_writev(sockfd, iov, len, iov_offset, 1);
> +}
> +
> +static int do_read_write(int sockfd, void *buf, int len, int write)
> +{
> +	struct iovec iov;
> +
> +	iov.iov_base = buf;
> +	iov.iov_len = len;
> +
> +	return do_readv_writev(sockfd,&iov, len, 0, write);
> +}
> +
> +static int do_read(int sockfd, void *buf, int len)
> +{
> +	return do_read_write(sockfd, buf, len, 0);
> +}
> +
> +static int do_write(int sockfd, void *buf, int len)
> +{
> +	return do_read_write(sockfd, buf, len, 1);
> +}
> +
> +static int send_req(int sockfd, SheepdogReq *hdr, void *data,
> +		    unsigned int *wlen)
> +{
> +	int ret;
> +	struct iovec iov[2];
> +
> +	iov[0].iov_base = hdr;
> +	iov[0].iov_len = sizeof(*hdr);
> +
> +	if (*wlen) {
> +		iov[1].iov_base = data;
> +		iov[1].iov_len = *wlen;
> +	}
> +
> +	ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen, 0);
> +	if (ret) {
> +		error_report("failed to send a req, %m\n");
> +		ret = -1;
> +	}
> +
> +	return ret;
> +}
> +
> +static int do_req(int sockfd, SheepdogReq *hdr, void *data,
> +		  unsigned int *wlen, unsigned int *rlen)
> +{
> +	int ret;
> +
> +	ret = send_req(sockfd, hdr, data, wlen);
> +	if (ret) {
> +		ret = -1;
> +		goto out;
> +	}
> +
> +	ret = do_read(sockfd, hdr, sizeof(*hdr));
> +	if (ret) {
> +		error_report("failed to get a rsp, %m\n");
> +		ret = -1;
> +		goto out;
> +	}
> +
> +	if (*rlen>  hdr->data_length) {
> +		*rlen = hdr->data_length;
> +	}
> +
> +	if (*rlen) {
> +		ret = do_read(sockfd, data, *rlen);
> +		if (ret) {
> +			error_report("failed to get the data, %m\n");
> +			ret = -1;
> +			goto out;
> +		}
> +	}
> +	ret = 0;
> +out:
> +	return ret;
> +}
> +
> +static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
> +			   struct iovec *iov, int niov, int create,
> +			   enum AIOCBState aiocb_type);
> +
> +static void send_pending_req(BDRVSheepdogState *s, uint64_t oid, uint32_t id)
> +{
> +	AIOReq *aio_req, *next;
> +	SheepdogAIOCB *acb;
> +	int ret;
> +
> +	QLIST_FOREACH_SAFE(aio_req,&s->outstanding_aio_head,
> +			   outstanding_aio_siblings, next) {
> +		if (id == aio_req->id) {
> +			continue;
> +		}
> +		if (aio_req->oid != oid) {
> +			continue;
> +		}
> +
> +		acb = aio_req->aiocb;
> +		ret = add_aio_request(s, aio_req, acb->qiov->iov,
> +				      acb->qiov->niov, 0, acb->aiocb_type);
> +		if (ret<  0) {
> +			error_report("add_aio_request is faled\n");
> +			free_aio_req(s, aio_req);
> +			if (QLIST_EMPTY(&acb->aioreq_head)) {
> +				sd_finish_aiocb(acb);
> +			}
> +		}
> +	}
> +}
> +
> +static void aio_read_response(void *opaque)
> +{
> +	SheepdogObjReq hdr;
> +	SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
> +	BDRVSheepdogState *s = (BDRVSheepdogState *)opaque;
> +	int fd = s->fd;
> +	int ret;
> +	AIOReq *aio_req = NULL;
> +	SheepdogAIOCB *acb;
> +	int rest;
> +	unsigned long idx;
> +
> +	if (QLIST_EMPTY(&s->outstanding_aio_head)) {
> +		return;
> +	}
> +
> +	ret = do_read(fd, (void *)rsp, sizeof(*rsp));
> +	if (ret) {
> +		error_report("failed to get the header, %m\n");
> +		return;
> +	}
> +
> +	QLIST_FOREACH(aio_req,&s->outstanding_aio_head, outstanding_aio_siblings) {
> +		if (aio_req->id == rsp->id) {
> +			break;
> +		}
> +	}
> +	if (!aio_req) {
> +		error_report("cannot find aio_req %x\n", rsp->id);
> +		return;
> +	}
> +
> +	acb = aio_req->aiocb;
> +
> +	switch (acb->aiocb_type) {
> +	case AIOCB_WRITE_UDATA:
> +		if (!is_data_obj(aio_req->oid)) {
> +			break;
> +		}
> +		idx = data_oid_to_idx(aio_req->oid);
> +
> +		if (s->inode.data_vdi_id[idx] != s->inode.vdi_id) {
> +			s->inode.data_vdi_id[idx] = s->inode.vdi_id;
> +			s->max_dirty_data_idx = max_t(uint32_t, idx,
> +						      s->max_dirty_data_idx);
> +			s->min_dirty_data_idx = min_t(uint32_t, idx,
> +						      s->min_dirty_data_idx);
> +
> +			send_pending_req(s, vid_to_data_oid(s->inode.vdi_id, idx),
> +					 rsp->id);
> +		}
> +		break;
> +	case AIOCB_READ_UDATA:
> +		ret = do_readv(fd, acb->qiov->iov, rsp->data_length,
> +			       aio_req->iov_offset);
> +		if (ret) {
> +			error_report("failed to get the data, %m\n");
> +			return;
> +		}
> +		break;
> +	}
> +
> +	if (rsp->result != SD_RES_SUCCESS) {
> +		acb->ret = -EIO;
> +		error_report("%s\n", sd_strerror(rsp->result));
> +	}
> +
> +	rest = free_aio_req(s, aio_req);
> +	if (!rest) {
> +		acb->aio_done_func(acb);
> +	}
> +}
> +
> +static int aio_flush_request(void *opaque)
> +{
> +	BDRVSheepdogState *s = (BDRVSheepdogState *)opaque;
> +
> +	return !QLIST_EMPTY(&s->outstanding_aio_head);
> +}
> +
> +static int set_nonblocking(int fd)
> +{
> +	int ret;
> +
> +	ret = fcntl(fd, F_GETFL);
> +	if (ret<  0) {
> +		error_report("can't fcntl (F_GETFL), %m\n");
> +		close(fd);
> +	} else {
> +		ret = fcntl(fd, F_SETFL, ret | O_NONBLOCK);
> +		if (ret<  0) {
> +			error_report("can't fcntl (O_NONBLOCK), %m\n");
> +		}
> +	}
> +
> +	return ret;
> +}
> +
> +static int set_nodelay(int fd)
> +{
> +	int ret, opt;
> +
> +	opt = 1;
> +	ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,&opt, sizeof(opt));
> +	return ret;
> +}
> +
> +/*
> + * Return a socket discriptor to read/write objects.
> + * We cannot use this discriptor for other operations because
> + * the block driver may be on waiting response from the server.
> + */
> +static int get_sheep_fd(BDRVSheepdogState *s)
> +{
> +	int ret, fd;
> +
> +	fd = connect_to_sdog(s->addr);
> +	if (fd<  0) {
> +		error_report("%m\n");
> +		return -1;
> +	}
> +
> +	ret = set_nonblocking(fd);
> +	if (ret) {
> +		error_report("%m\n");
> +		close(fd);
> +		return -1;
> +	}
> +
> +	ret = set_nodelay(fd);
> +	if (ret) {
> +		error_report("%m\n");
> +		close(fd);
> +		return -1;
> +	}
> +
> +	qemu_aio_set_fd_handler(fd, aio_read_response, NULL, aio_flush_request,
> +				NULL, s);
> +	s->fd = fd;
> +
> +	return fd;
> +}
> +
> +static int parse_vdiname(BDRVSheepdogState *s, const char *filename,
> +			 char *vdi, int vdi_len, uint32_t *snapid)
> +{
> +	char *p, *q;
> +	int nr_sep;
> +
> +	p = q = strdup(filename);
> +
> +	if (!p) {
>    

I think Qemu has a version of strdup that will not return NULL.

> +		return 1;
> +	}
> +
> +	nr_sep = 0;
> +	while (*p) {
> +		if (*p == ':') {
> +			nr_sep++;
> +		}
> +		if (nr_sep == 2) {
> +			break;
> +		}
> +		p++;
> +	}
> +
> +	if (nr_sep == 2) {
> +		*p++ = '\0';
> +	} else {
> +		p = q;
> +	}
> +
> +	strncpy(vdi, p, vdi_len);
> +
> +	p = strchr(vdi, ':');
> +	if (p) {
> +		*p++ = '\0';
> +		*snapid = strtol(p, NULL, 10);
> +	} else {
> +		*snapid = CURRENT_VDI_ID; /* search current vdi */
> +	}
> +
> +	if (nr_sep == 2) {
> +		s->addr = q;
> +	} else {
> +		free(q);
> +		s->addr = NULL;
> +	}
> +
> +	return 0;
> +}
> +
> +static int find_vdi_name(BDRVSheepdogState *s, char *filename, uint32_t snapid,
> +			 uint32_t *vid, int for_snapshot)
> +{
> +	int ret, fd;
> +	SheepdogVdiReq hdr;
> +	SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
> +	unsigned int wlen, rlen = 0;
> +	char buf[SD_MAX_VDI_LEN];
> +
> +	fd = connect_to_sdog(s->addr);
> +	if (fd<  0) {
> +		return -1;
> +	}
> +
> +	memset(&hdr, 0, sizeof(hdr));
> +	snprintf(buf, sizeof(buf), "%s", filename);
> +	if (for_snapshot) {
> +		hdr.opcode = SD_OP_GET_VDI_INFO;
> +	} else {
> +		hdr.opcode = SD_OP_LOCK_VDI;
> +	}
> +	wlen = SD_MAX_VDI_LEN;
> +	hdr.proto_ver = SD_PROTO_VER;
> +	hdr.data_length = SD_MAX_VDI_LEN;
> +	hdr.snapid = snapid;
> +	hdr.flags = SD_FLAG_CMD_WRITE;
> +
> +	ret = do_req(fd, (SheepdogReq *)&hdr, buf,&wlen,&rlen);
> +	if (ret) {
> +		ret = -1;
> +		goto out;
> +	}
> +
> +	if (rsp->result != SD_RES_SUCCESS) {
> +		error_report("%s, %s\n", sd_strerror(rsp->result), filename);
> +		ret = -1;
> +		goto out;
> +	}
> +	*vid = rsp->vdi_id;
> +
> +	ret = 0;
> +out:
> +	close(fd);
> +	return ret;
> +}
> +
> +static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
> +			   struct iovec *iov, int niov, int create,
> +			   enum AIOCBState aiocb_type)
> +{
> +	int nr_copies = s->inode.nr_copies;
> +	SheepdogObjReq hdr;
> +	unsigned int wlen;
> +	int ret, opt;
> +	uint64_t oid = aio_req->oid;
> +	unsigned int datalen = aio_req->data_len;
> +	uint64_t offset = aio_req->offset;
> +	uint8_t flags = aio_req->flags;
> +	uint64_t old_oid = aio_req->base_oid;
> +
> +	if (!nr_copies) {
> +		error_report("bug\n");
> +	}
> +
> +	memset(&hdr, 0, sizeof(hdr));
> +
> +	if (aiocb_type == AIOCB_READ_UDATA) {
> +		wlen = 0;
> +		hdr.opcode = SD_OP_READ_OBJ;
> +		hdr.flags = flags;
> +	} else if (create) {
> +		wlen = datalen;
> +		hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
> +		hdr.flags = SD_FLAG_CMD_WRITE | flags;
> +	} else {
> +		wlen = datalen;
> +		hdr.opcode = SD_OP_WRITE_OBJ;
> +		hdr.flags = SD_FLAG_CMD_WRITE | flags;
> +	}
> +
> +	hdr.oid = oid;
> +	hdr.cow_oid = old_oid;
> +	hdr.copies = s->inode.nr_copies;
> +
> +	hdr.data_length = datalen;
> +	hdr.offset = offset;
> +
> +	hdr.id = aio_req->id;
> +
> +	opt = 1;
> +	setsockopt(s->fd, SOL_TCP, TCP_CORK,&opt, sizeof(opt));
> +
> +	ret = do_write(s->fd,&hdr, sizeof(hdr));
> +	if (ret) {
> +		error_report("failed to send a req, %m\n");
> +		return -EIO;
> +	}
> +
> +	if (wlen) {
> +		ret = do_writev(s->fd, iov, wlen, aio_req->iov_offset);
> +		if (ret) {
> +			error_report("failed to send a data, %m\n");
> +			return -EIO;
> +		}
> +	}
> +        opt = 0;
> +        setsockopt(s->fd, SOL_TCP, TCP_CORK,&opt, sizeof(opt));
> +
> +	return 0;
> +}
> +
> +static int read_write_object(int fd, char *buf, uint64_t oid, int copies,
> +			     unsigned int datalen, uint64_t offset,
> +			     int write, int create)
> +{
> +	SheepdogObjReq hdr;
> +	SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
> +	unsigned int wlen, rlen;
> +	int ret;
> +
> +	memset(&hdr, 0, sizeof(hdr));
> +
> +	if (write) {
> +		wlen = datalen;
> +		rlen = 0;
> +		hdr.flags = SD_FLAG_CMD_WRITE;
> +		if (create) {
> +			hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
> +		} else {
> +			hdr.opcode = SD_OP_WRITE_OBJ;
> +		}
> +	} else {
> +		wlen = 0;
> +		rlen = datalen;
> +		hdr.opcode = SD_OP_READ_OBJ;
> +	}
> +	hdr.oid = oid;
> +	hdr.data_length = datalen;
> +	hdr.offset = offset;
> +	hdr.copies = copies;
> +
> +	ret = do_req(fd, (SheepdogReq *)&hdr, buf,&wlen,&rlen);
> +	if (ret) {
> +		error_report("failed to send a request to the sheep\n");
> +		return -1;
> +	}
> +
> +	switch (rsp->result) {
> +	case SD_RES_SUCCESS:
> +		return 0;
> +	default:
> +		error_report("%s\n", sd_strerror(rsp->result));
> +		return -1;
> +	}
> +}
> +
> +static int read_object(int fd, char *buf, uint64_t oid, int copies,
> +		       unsigned int datalen, uint64_t offset)
> +{
> +	return read_write_object(fd, buf, oid, copies, datalen, offset, 0, 0);
> +}
> +
> +static int write_object(int fd, char *buf, uint64_t oid, int copies,
> +			unsigned int datalen, uint64_t offset, int create)
> +{
> +	return read_write_object(fd, buf, oid, copies, datalen, offset, 1, create);
> +}
> +
> +/* TODO: error cleanups */
> +static int sd_open(BlockDriverState *bs, const char *filename, int flags)
> +{
> +	int ret, fd;
> +	uint32_t vid = 0;
> +	BDRVSheepdogState *s = bs->opaque;
> +	char vdi[256];
> +	uint32_t snapid;
> +	int for_snapshot = 0;
> +	char *buf;
> +
> +	strstart(filename, "sheepdog:", (const char **)&filename);
> +
> +	buf = qemu_malloc(SD_INODE_SIZE);
> +
> +	memset(vdi, 0, sizeof(vdi));
> +	if (parse_vdiname(s, filename, vdi, sizeof(vdi),&snapid)<  0) {
> +		goto out;
> +	}
> +	s->fd = get_sheep_fd(s);
> +	if (s->fd<  0) {
>    

buf is not freed, goto out maybe.

> +		return -1;
> +	}
> +
> +	if (snapid != CURRENT_VDI_ID) {
> +		for_snapshot = 1;
> +	}
> +
> +	ret = find_vdi_name(s, vdi, snapid,&vid, for_snapshot);
> +	if (ret) {
> +		goto out;
> +	}
> +
> +	if (snapid) {
> +		dprintf("%" PRIx32 " non current inode was open.\n", vid);
> +	} else {
> +		s->is_current = 1;
> +	}
> +
> +	fd = connect_to_sdog(s->addr);
> +	if (fd<  0) {
> +		error_report("failed to connect\n");
> +		goto out;
> +	}
> +
> +	ret = read_object(fd, buf, vid_to_vdi_oid(vid), 0, SD_INODE_SIZE, 0);
> +
> +	close(fd);
> +
> +	if (ret) {
> +		goto out;
> +	}
> +
> +	memcpy(&s->inode, buf, sizeof(s->inode));
> +	s->min_dirty_data_idx = UINT32_MAX;
> +	s->max_dirty_data_idx = 0;
> +
> +	bs->total_sectors = s->inode.vdi_size>>  9;
> +	strncpy(s->name, vdi, sizeof(s->name));
> +	qemu_free(buf);
> +
> +	QLIST_INIT(&s->outstanding_aio_head);
> +	return 0;
> +out:
> +	qemu_free(buf);
> +	return -1;
> +}
> +
> +static int do_sd_create(const char *addr, char *filename, char *tag,
> +			int64_t total_sectors, uint32_t base_vid,
> +			uint32_t *vdi_id, int snapshot)
> +{
> +	SheepdogVdiReq hdr;
> +	SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
> +	int fd, ret;
> +	unsigned int wlen, rlen = 0;
> +	char buf[SD_MAX_VDI_LEN];
> +
> +	fd = connect_to_sdog(addr);
> +	if (fd<  0) {
> +		return -1;
> +	}
> +
> +	strncpy(buf, filename, SD_MAX_VDI_LEN);
> +
> +	memset(&hdr, 0, sizeof(hdr));
> +	hdr.opcode = SD_OP_NEW_VDI;
> +	hdr.base_vdi_id = base_vid;
> +
> +	wlen = SD_MAX_VDI_LEN;
> +
> +	hdr.flags = SD_FLAG_CMD_WRITE;
> +	hdr.snapid = snapshot;
> +
> +	hdr.data_length = wlen;
> +	hdr.vdi_size = total_sectors * 512;
>    

There is another patch on the list changing 512 to a define for sector size.

> +
> +	ret = do_req(fd, (SheepdogReq *)&hdr, buf,&wlen,&rlen);
> +
> +	close(fd);
> +
> +	if (ret) {
> +		return -1;
> +	}
> +
> +	if (rsp->result != SD_RES_SUCCESS) {
> +		error_report("%s, %s\n", sd_strerror(rsp->result), filename);
> +		return -1;
> +	}
> +
> +	if (vdi_id) {
> +		*vdi_id = rsp->vdi_id;
> +	}
> +
> +	return 0;
> +}
> +
> +static int sd_create(const char *filename, QEMUOptionParameter *options)
> +{
> +	int ret;
> +	uint32_t vid = 0;
> +	int64_t total_sectors = 0;
> +	char *backing_file = NULL;
> +
> +	strstart(filename, "sheepdog:", (const char **)&filename);
> +
> +	while (options&&  options->name) {
> +		if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
> +			total_sectors = options->value.n / 512;
>    
Use define.
> +		} else if (!strcmp(options->name, BLOCK_OPT_BACKING_FILE)) {
> +			backing_file = options->value.s;
> +		}
> +		options++;
> +	}
> +
> +	if (backing_file) {
> +		BlockDriverState bs;
> +		char vdi[SD_MAX_VDI_LEN];
> +		uint32_t snapid;
> +
> +		strstart(backing_file, "sheepdog:", (const char **)&backing_file);
> +		memset(&bs, 0, sizeof(bs));
> +
> +		bs.opaque = qemu_malloc(sizeof(BDRVSheepdogState));
>    

bs seems to have a short life span, is opaque getting freed?

> +
> +		ret = sd_open(&bs, backing_file, 0);
> +		if (ret<  0) {
> +			return -1;
> +		}
> +
> +		if (parse_vdiname(bs.opaque, backing_file, vdi, sizeof(vdi),&snapid)<  0) {
> +			return -1;
> +		}
> +
> +		/* cannot clone from a current inode */
> +		if (snapid == CURRENT_VDI_ID) {
> +			return -1;
> +		}
> +
> +		ret = find_vdi_name(bs.opaque, vdi, snapid,&vid, 1);
> +		if (ret) {
> +			return -1;
> +		}
> +	}
> +
> +	return do_sd_create(NULL, (char *)filename, NULL, total_sectors, vid,
> +			    NULL, 0);
> +}
> +
> +static void sd_close(BlockDriverState *bs)
> +{
> +	BDRVSheepdogState *s = bs->opaque;
> +	SheepdogVdiReq hdr;
> +	SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
> +	unsigned int wlen, rlen = 0;
> +	int fd, ret;
> +
> +	dprintf("%s\n", s->name);
> +
> +	fd = connect_to_sdog(s->addr);
> +	if (fd<  0) {
> +		return;
> +	}
> +
> +	memset(&hdr, 0, sizeof(hdr));
> +
> +	hdr.opcode = SD_OP_RELEASE_VDI;
> +	wlen = strlen(s->name) + 1;
> +	hdr.data_length = wlen;
> +	hdr.flags = SD_FLAG_CMD_WRITE;
> +
> +	ret = do_req(fd, (SheepdogReq *)&hdr, s->name,&wlen,&rlen);
> +
> +	close(fd);
> +
> +	if (!ret&&  rsp->result != SD_RES_SUCCESS&&
> +	    rsp->result != SD_RES_VDI_NOT_LOCKED) {
> +		error_report("%s, %s\n", sd_strerror(rsp->result), s->name);
> +	}
> +
> +	close(s->fd);
> +	free(s->addr);
> +}
> +
> +static void sd_write_done(SheepdogAIOCB *acb)
> +{
> +	int ret;
> +	BDRVSheepdogState *s = acb->common.bs->opaque;
> +	struct iovec iov;
> +	AIOReq *aio_req;
> +	uint32_t offset, data_len, mn, mx;
> +
> +	mn = s->min_dirty_data_idx;
> +	mx = s->max_dirty_data_idx;
> +	if (mn<= mx) {
> +		offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
> +			mn * sizeof(s->inode.data_vdi_id[0]);
> +		data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
> +
> +		s->min_dirty_data_idx = UINT32_MAX;
> +		s->max_dirty_data_idx = 0;
> +
> +		iov.iov_base =&s->inode;
> +		iov.iov_len = sizeof(s->inode);
> +		aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
> +					data_len, offset, 0, 0, offset);
> +		ret = add_aio_request(s, aio_req,&iov, 1, 0, AIOCB_WRITE_UDATA);
> +		if (ret) {
> +			free_aio_req(s, aio_req);
> +			acb->ret = -EIO;
> +			goto out;
> +		}
> +
> +		acb->aio_done_func = sd_finish_aiocb;
> +		acb->aiocb_type = AIOCB_WRITE_UDATA;
> +		return;
> +	}
> +out:
> +	sd_finish_aiocb(acb);
> +}
> +
> +static int sd_create_branch(BDRVSheepdogState *s)
> +{
> +	int ret, fd;
> +	uint32_t vid;
> +	char *buf;
> +
> +	dprintf("%" PRIx32 " is not current.\n", s->inode.vdi_id);
> +
> +	buf = qemu_malloc(SD_INODE_SIZE);
> +
> +	ret = do_sd_create(s->addr, s->name, NULL, s->inode.vdi_size>>  9,
> +			   s->inode.vdi_id,&vid, 1);
> +	if (ret) {
> +		goto out;
> +	}
> +
> +	dprintf("%" PRIx32 " is created.\n", vid);
> +
> +	fd = connect_to_sdog(s->addr);
> +	if (fd<  0) {
> +		error_report("failed to connect\n");
> +		goto out;
> +	}
> +
> +	ret = read_object(fd, buf, vid_to_vdi_oid(vid), s->inode.nr_copies,
> +			  SD_INODE_SIZE, 0);
> +
> +	close(fd);
> +
> +	if (ret<  0) {
> +		goto out;
> +	}
> +
> +	memcpy(&s->inode, buf, sizeof(s->inode));
> +
> +	s->is_current = 1;
> +	ret = 0;
> +	dprintf("%" PRIx32 " was newly created.\n", s->inode.vdi_id);
> +
> +out:
> +	qemu_free(buf);
> +
> +	return ret;
> +}
> +
> +static void sd_readv_writev_bh_cb(void *p)
> +{
> +	SheepdogAIOCB *acb = p;
> +	int ret = 0;
> +	unsigned long len, done = 0, total = acb->nb_sectors * 512;
> +	unsigned long idx = acb->sector_num * 512 / SD_DATA_OBJ_SIZE;
> +	uint64_t oid;
> +	uint64_t offset = (acb->sector_num * 512) % SD_DATA_OBJ_SIZE;
> +	BDRVSheepdogState *s = acb->common.bs->opaque;
> +	SheepdogInode *inode =&s->inode;
> +	AIOReq *aio_req;
> +
> +	qemu_bh_delete(acb->bh);
> +	acb->bh = NULL;
> +
> +	if (acb->aiocb_type == AIOCB_WRITE_UDATA&&  !s->is_current) {
> +		ret = sd_create_branch(s);
> +		if (ret) {
> +			acb->ret = -EIO;
> +			goto out;
> +		}
> +	}
> +
> +	while (done != total) {
> +		uint8_t flags = 0;
> +		uint64_t old_oid = 0;
> +		int create = 0;
> +
> +		oid = vid_to_data_oid(inode->data_vdi_id[idx], idx);
> +
> +		len = min_t(unsigned long, total - done, SD_DATA_OBJ_SIZE - offset);
> +
> +		if (!inode->data_vdi_id[idx]) {
> +			if (acb->aiocb_type == AIOCB_READ_UDATA) {
> +				goto done;
> +			}
> +
> +			create = 1;
> +		} else if (acb->aiocb_type == AIOCB_WRITE_UDATA
> +			&&  !is_data_obj_writeable(inode, idx)) {
> +			create = 1;
> +			old_oid = oid;
> +			flags = SD_FLAG_CMD_COW;
> +		}
> +
> +		if (create) {
> +			dprintf("update ino (%" PRIu32") %"
> +				PRIu64 " %" PRIu64 " %" PRIu64 "\n",
> +				inode->vdi_id, oid,
> +				vid_to_data_oid(inode->data_vdi_id[idx], idx), idx);
> +			oid = vid_to_data_oid(inode->vdi_id, idx);
> +			dprintf("new oid %lx\n", oid);
> +		}
> +
> +		aio_req = alloc_aio_req(s, acb, oid, len, offset, flags,
> +					old_oid, done);
> +
> +		if (create) {
> +			AIOReq *areq;
> +			QLIST_FOREACH(areq,&s->outstanding_aio_head,
> +				      outstanding_aio_siblings) {
> +				if (areq == aio_req) {
> +					continue;
> +				}
> +				if (areq->oid == oid) {
> +					aio_req->flags = 0;
> +					aio_req->base_oid = 0;
> +					goto done;
> +				}
> +			}
> +		}
> +
> +		ret = add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
> +				      create, acb->aiocb_type);
> +		if (ret<  0) {
> +			error_report("add_aio_request is faled\n");
> +			free_aio_req(s, aio_req);
> +			acb->ret = -EIO;
> +			goto out;
> +		}
> +	done:
> +		offset = 0;
> +		idx++;
> +		done += len;
> +	}
> +out:
> +	if (QLIST_EMPTY(&acb->aioreq_head)) {
> +		sd_finish_aiocb(acb);
> +	}
> +}
> +
> +static BlockDriverAIOCB *sd_aio_writev(BlockDriverState *bs,
> +				       int64_t sector_num,
> +				       QEMUIOVector *qiov,
> +				       int nb_sectors,
> +				       BlockDriverCompletionFunc *cb,
> +				       void *opaque)
> +{
> +	SheepdogAIOCB *acb;
> +
> +	acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
> +	acb->aio_done_func = sd_write_done;
> +	acb->aiocb_type = AIOCB_WRITE_UDATA;
> +
> +	sd_schedule_bh(sd_readv_writev_bh_cb, acb);
> +	return&acb->common;
> +}
> +
> +static BlockDriverAIOCB *sd_aio_readv(BlockDriverState *bs,
> +				      int64_t sector_num,
> +				      QEMUIOVector *qiov,
> +				      int nb_sectors,
> +				      BlockDriverCompletionFunc *cb,
> +				      void *opaque)
> +{
> +	SheepdogAIOCB *acb;
> +	int i;
> +
> +	acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
> +	acb->aiocb_type = AIOCB_READ_UDATA;
> +	acb->aio_done_func = sd_finish_aiocb;
> +
> +	/*
> +	 * TODO: we can do better; we don't need to initialize
> +	 * blindly.
> +	 */
> +	for (i = 0; i<  qiov->niov; i++) {
> +		memset(qiov->iov[i].iov_base, 0, qiov->iov[i].iov_len);
> +	}
> +
> +	sd_schedule_bh(sd_readv_writev_bh_cb, acb);
> +	return&acb->common;
> +}
> +
> +static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
> +{
> +	BDRVSheepdogState *s = bs->opaque;
> +	int ret, fd;
> +	uint32_t new_vid;
> +	SheepdogInode *inode;
> +	unsigned int datalen;
> +	uint64_t offset;
> +
> +	dprintf("sn_info: name %s id_str %s s: name %s vm_state_size %d "
> +		"is_current %d\n", sn_info->name, sn_info->id_str,
> +		s->name, sn_info->vm_state_size, s->is_current);
> +
> +	if (!s->is_current) {
> +		error_report("You can't create a snapshot of "
> +			"a non current VDI, %s (%" PRIu32 ").\n",
> +			s->name, s->inode.vdi_id);
> +
> +		return -1;
> +	}
> +
> +	dprintf("%s %s\n", sn_info->name, sn_info->id_str);
> +
> +	s->inode.vm_state_size = sn_info->vm_state_size;
> +	s->inode.vm_clock_nsec = sn_info->vm_clock_nsec;
> +	offset = 0;
> +	/* we don't need to read entire object */
> +	datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
> +
> +	/* refresh inode. */
> +	fd = connect_to_sdog(s->addr);
> +	if (fd<  0) {
> +		ret = -EIO;
> +		goto cleanup;
> +	}
> +
> +	ret = write_object(fd, (char *)&s->inode, vid_to_vdi_oid(s->inode.vdi_id),
> +			   s->inode.nr_copies, datalen, offset, 0);
> +	if (ret<  0) {
> +		error_report("failed to write snapshot's inode.\n");
> +		ret = -EIO;
> +		goto cleanup;
> +	}
> +
> +	ret = do_sd_create(s->addr, s->name, NULL, s->inode.vdi_size>>  9,
> +			   s->inode.vdi_id,&new_vid, 1);
> +	if (ret<  0) {
> +		error_report("failed to create inode for snapshot. %m\n");
> +		ret = -EIO;
> +		goto cleanup;
> +	}
> +
> +	inode = (SheepdogInode *)qemu_malloc(datalen);
> +
> +	ret = read_object(fd, (char *)inode, vid_to_vdi_oid(new_vid),
> +			  s->inode.nr_copies, datalen, offset);
> +
> +	close(fd);
>    

Should you close fd twice, or let it fall through.


Thanks

Chris


> +
> +	if (ret<  0) {
> +		error_report("failed to read new inode info. %m\n");
> +		ret = -EIO;
> +		goto cleanup;
> +	}
> +
> +	memcpy(&s->inode, inode, datalen);
> +	dprintf("s->inode: name %s snap_id %x oid %x\n",
> +		s->inode.name, s->inode.snap_id, s->inode.vdi_id);
> +
> +cleanup:
> +	close(fd);
> +	return ret;
> +}
> +
> +static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
> +{
> +	BDRVSheepdogState *s = bs->opaque;
> +	BDRVSheepdogState *old_s;
> +	char vdi[SD_MAX_VDI_LEN];
> +	char *buf = NULL;
> +	uint32_t vid;
> +	uint32_t snapid = 0;
> +	int ret = -ENOENT, fd;
> +
> +	old_s = qemu_malloc(sizeof(BDRVSheepdogState));
> +
> +	memcpy(old_s, s, sizeof(BDRVSheepdogState));
> +
> +	snapid = strtol(snapshot_id, NULL, 10);
> +	if (!snapid) {
> +		error_report("Invalid snapshot_id\n");
> +		goto out;
> +	}
> +
> +	buf = qemu_malloc(SD_INODE_SIZE);
> +	strncpy(vdi, s->name, sizeof(vdi));
> +	ret = find_vdi_name(s, vdi, snapid,&vid, 1);
> +	if (ret) {
> +		error_report("Failed to find_vdi_name\n");
> +		ret = -ENOENT;
> +		goto out;
> +	}
> +
> +	fd = connect_to_sdog(s->addr);
> +	if (fd<  0) {
> +		error_report("failed to connect\n");
> +		goto out;
> +	}
> +
> +	ret = read_object(fd, buf, vid_to_vdi_oid(vid), s->inode.nr_copies,
> +			  SD_INODE_SIZE, 0);
> +
> +	close(fd);
> +
> +	if (ret) {
> +		ret = -ENOENT;
> +		goto out;
> +	}
> +
> +	memcpy(&s->inode, buf, sizeof(s->inode));
> +
> +	if (!s->inode.vm_state_size) {
> +		error_report("Invalid snapshot\n");
> +		ret = -ENOENT;
> +		goto out;
> +	}
> +
> +	s->is_current = 0;
> +
> +	qemu_free(buf);
> +	qemu_free(old_s);
> +
> +	return 0;
> +out:
> +	/* recover bdrv_sd_state */
> +	memcpy(s, old_s, sizeof(BDRVSheepdogState));
> +	qemu_free(buf);
> +	qemu_free(old_s);
> +
> +	error_report("failed to open. recover old bdrv_sd_state.\n");
> +
> +	return ret;
> +}
> +
> +static int sd_snapshot_delete(BlockDriverState *bs, const char *snapshot_id)
> +{
> +	/* FIXME: Delete specified snapshot id.  */
> +	return 0;
> +}
> +
> +#define DIV_ROUND_UP(n,d) (((n) + (d) - 1) / (d))
> +#define BITS_PER_BYTE		8
> +#define BITS_TO_LONGS(nr)	DIV_ROUND_UP(nr, BITS_PER_BYTE * sizeof(long))
> +#define DECLARE_BITMAP(name,bits) \
> +	unsigned long name[BITS_TO_LONGS(bits)]
> +
> +#define BITS_PER_LONG (BITS_PER_BYTE * sizeof(long))
> +
> +static inline int test_bit(unsigned int nr, const unsigned long *addr)
> +{
> +	return ((1UL<<  (nr % BITS_PER_LONG))&
> +		(((unsigned long *)addr)[nr / BITS_PER_LONG])) != 0;
> +}
> +
> +static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
> +{
> +	BDRVSheepdogState *s = bs->opaque;
> +	SheepdogReq req;
> +	int i, fd, nr = 1024, ret, max = BITS_TO_LONGS(SD_NR_VDIS) * sizeof(long);
> +	QEMUSnapshotInfo *sn_tab = NULL;
> +	unsigned wlen, rlen;
> +	int found = 0;
> +	static SheepdogInode inode;
> +	unsigned long *vdi_inuse;
> +	unsigned int start_nr;
> +
> +	vdi_inuse = qemu_malloc(max);
> +
> +	fd = connect_to_sdog(s->addr);
> +	if (fd<  0) {
> +		goto out;
> +	}
> +
> +	rlen = max;
> +	wlen = 0;
> +
> +	memset(&req, 0, sizeof(req));
> +
> +	req.opcode = SD_OP_READ_VDIS;
> +	req.data_length = max;
> +
> +	ret = do_req(fd, (SheepdogReq *)&req, vdi_inuse,&wlen,&rlen);
> +
> +	close(fd);
> +	if (ret) {
> +		goto out;
> +	}
> +
> +	sn_tab = qemu_mallocz(nr * sizeof(*sn_tab));
> +
> +	start_nr = fnv_64a_buf(s->name, strlen(s->name), FNV1A_64_INIT)&  (SD_NR_VDIS - 1);
> +
> +	fd = connect_to_sdog(s->addr);
> +	if (fd<  0) {
> +		error_report("failed to connect\n");
> +		goto out;
> +	}
> +
> +	/* TODO: round up */
> +	for (i = start_nr; i<  SD_NR_VDIS&&  found<  nr; i++) {
> +		if (!test_bit(i, vdi_inuse)) {
> +			break;
> +		}
> +
> +		/* we don't need to read entire object */
> +		ret = read_object(fd, (char *)&inode, vid_to_vdi_oid(i),
> +				  0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0);
> +
> +		if (ret) {
> +			continue;
> +		}
> +
> +		if (!strcmp(inode.name, s->name)&&  inode.snap_ctime) {
> +			sn_tab[found].date_sec = inode.snap_ctime>>  32;
> +			sn_tab[found].date_nsec = inode.snap_ctime&  0xffffffff;
> +			sn_tab[found].vm_state_size = inode.vm_state_size;
> +			sn_tab[found].vm_clock_nsec = inode.vm_clock_nsec;
> +
> +			snprintf(sn_tab[found].id_str, sizeof(sn_tab[found].id_str), "%u",
> +				 inode.snap_id);
> +			found++;
> +		}
> +	}
> +
> +	close(fd);
> +out:
> +	*psn_tab = sn_tab;
> +
> +	qemu_free(vdi_inuse);
> +
> +	return found;
> +}
> +
> +static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
> +				int64_t pos, int size, int load)
> +{
> +	int fd, create;
> +	int ret = 0;
> +	unsigned int data_len;
> +	uint64_t vmstate_oid;
> +	uint32_t vdi_index;
> +	uint64_t offset;
> +
> +	fd = connect_to_sdog(s->addr);
> +	if (fd<  0) {
> +		ret = -EIO;
> +		goto cleanup;
> +	}
> +
> +	while (size) {
> +		vdi_index = pos / SD_DATA_OBJ_SIZE;
> +		offset = pos % SD_DATA_OBJ_SIZE;
> +
> +		data_len = min_t(unsigned int, size, SD_DATA_OBJ_SIZE);
> +
> +		vmstate_oid = vid_to_vmstate_oid(s->inode.vdi_id, vdi_index);
> +
> +		create = (offset == 0);
> +		if (load) {
> +			ret = read_object(fd, (char *)data, vmstate_oid,
> +					  s->inode.nr_copies, data_len, offset);
> +		} else {
> +			ret = write_object(fd, (char *)data, vmstate_oid,
> +					   s->inode.nr_copies, data_len, offset, create);
> +		}
> +
> +		if (ret<  0) {
> +			error_report("failed to save vmstate %m\n");
> +			ret = -EIO;
> +			goto cleanup;
> +		}
> +
> +		pos += data_len;
> +		size -= data_len;
> +		ret += data_len;
> +	}
> +cleanup:
> +	close(fd);
> +	return ret;
> +}
> +
> +static int sd_save_vmstate(BlockDriverState *bs, const uint8_t *data,
> +			   int64_t pos, int size)
> +{
> +	BDRVSheepdogState *s = bs->opaque;
> +
> +	return do_load_save_vmstate(s, (uint8_t *)data, pos, size, 0);
> +}
> +
> +static int sd_load_vmstate(BlockDriverState *bs, uint8_t *data,
> +			   int64_t pos, int size)
> +{
> +	BDRVSheepdogState *s = bs->opaque;
> +
> +	return do_load_save_vmstate(s, data, pos, size, 1);
> +}
> +
> +
> +static QEMUOptionParameter sd_create_options[] = {
> +	{
> +		.name = BLOCK_OPT_SIZE,
> +		.type = OPT_SIZE,
> +		.help = "Virtual disk size"
> +	},
> +	{
> +		.name = BLOCK_OPT_BACKING_FILE,
> +		.type = OPT_STRING,
> +		.help = "File name of a base image"
> +	},
> +	{ NULL }
> +};
> +
> +BlockDriver bdrv_sheepdog = {
> +	.format_name    = "sheepdog",
> +	.protocol_name  = "sheepdog",
> +	.instance_size  = sizeof(BDRVSheepdogState),
> +	.bdrv_file_open = sd_open,
> +	.bdrv_close     = sd_close,
> +	.bdrv_create    = sd_create,
> +
> +	.bdrv_aio_readv     = sd_aio_readv,
> +	.bdrv_aio_writev    = sd_aio_writev,
> +
> +	.bdrv_snapshot_create   = sd_snapshot_create,
> +	.bdrv_snapshot_goto     = sd_snapshot_goto,
> +	.bdrv_snapshot_delete   = sd_snapshot_delete,
> +	.bdrv_snapshot_list     = sd_snapshot_list,
> +
> +	.bdrv_save_vmstate  = sd_save_vmstate,
> +	.bdrv_load_vmstate  = sd_load_vmstate,
> +
> +	.create_options = sd_create_options,
> +};
> +
> +static void bdrv_sheepdog_init(void)
> +{
> +	bdrv_register(&bdrv_sheepdog);
> +}
> +block_init(bdrv_sheepdog_init);
>
Kevin Wolf - June 2, 2010, 1:55 p.m.
Am 28.05.2010 04:44, schrieb MORITA Kazutaka:
> Sheepdog is a distributed storage system for QEMU. It provides highly
> available block level storage volumes to VMs like Amazon EBS.  This
> patch adds a qemu block driver for Sheepdog.
> 
> Sheepdog features are:
> - No node in the cluster is special (no metadata node, no control
>   node, etc)
> - Linear scalability in performance and capacity
> - No single point of failure
> - Autonomous management (zero configuration)
> - Useful volume management support such as snapshot and cloning
> - Thin provisioning
> - Autonomous load balancing
> 
> The more details are available at the project site:
>     http://www.osrg.net/sheepdog/
> 
> Signed-off-by: MORITA Kazutaka <morita.kazutaka@lab.ntt.co.jp>
> ---
>  Makefile.objs    |    2 +-
>  block/sheepdog.c | 1835 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 1836 insertions(+), 1 deletions(-)
>  create mode 100644 block/sheepdog.c

One general thing: The code uses some mix of spaces and tabs for
indentation, with the greatest part using tabs. According to
CODING_STYLE it should consistently use four spaces instead.

> diff --git a/Makefile.objs b/Makefile.objs
> index 1a942e5..527a754 100644
> --- a/Makefile.objs
> +++ b/Makefile.objs
> @@ -14,7 +14,7 @@ block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
>  
>  block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vvfat.o
>  block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o
> -block-nested-y += parallels.o nbd.o blkdebug.o
> +block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o
>  block-nested-$(CONFIG_WIN32) += raw-win32.o
>  block-nested-$(CONFIG_POSIX) += raw-posix.o
>  block-nested-$(CONFIG_CURL) += curl.o
> diff --git a/block/sheepdog.c b/block/sheepdog.c
> new file mode 100644
> index 0000000..68545e8
> --- /dev/null
> +++ b/block/sheepdog.c
> @@ -0,0 +1,1835 @@
> +/*
> + * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
> + *
> + * This program is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU General Public License version
> + * 2 as published by the Free Software Foundation.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program. If not, see <http://www.gnu.org/licenses/>.
> + */
> +#include <netdb.h>
> +#include <netinet/tcp.h>
> +
> +#include "qemu-common.h"
> +#include "qemu-error.h"
> +#include "block_int.h"
> +
> +#define SD_PROTO_VER 0x01
> +
> +#define SD_DEFAULT_ADDR "localhost:7000"
> +
> +#define SD_OP_CREATE_AND_WRITE_OBJ  0x01
> +#define SD_OP_READ_OBJ       0x02
> +#define SD_OP_WRITE_OBJ      0x03
> +
> +#define SD_OP_NEW_VDI        0x11
> +#define SD_OP_LOCK_VDI       0x12
> +#define SD_OP_RELEASE_VDI    0x13
> +#define SD_OP_GET_VDI_INFO   0x14
> +#define SD_OP_READ_VDIS      0x15
> +
> +#define SD_FLAG_CMD_WRITE    0x01
> +#define SD_FLAG_CMD_COW      0x02
> +
> +#define SD_RES_SUCCESS       0x00 /* Success */
> +#define SD_RES_UNKNOWN       0x01 /* Unknown error */
> +#define SD_RES_NO_OBJ        0x02 /* No object found */
> +#define SD_RES_EIO           0x03 /* I/O error */
> +#define SD_RES_VDI_EXIST     0x04 /* Vdi exists already */
> +#define SD_RES_INVALID_PARMS 0x05 /* Invalid parameters */
> +#define SD_RES_SYSTEM_ERROR  0x06 /* System error */
> +#define SD_RES_VDI_LOCKED    0x07 /* Vdi is locked */
> +#define SD_RES_NO_VDI        0x08 /* No vdi found */
> +#define SD_RES_NO_BASE_VDI   0x09 /* No base vdi found */
> +#define SD_RES_VDI_READ      0x0A /* Cannot read requested vdi */
> +#define SD_RES_VDI_WRITE     0x0B /* Cannot write requested vdi */
> +#define SD_RES_BASE_VDI_READ 0x0C /* Cannot read base vdi */
> +#define SD_RES_BASE_VDI_WRITE   0x0D /* Cannot write base vdi */
> +#define SD_RES_NO_TAG        0x0E /* Requested tag is not found */
> +#define SD_RES_STARTUP       0x0F /* Sheepdog is on starting up */
> +#define SD_RES_VDI_NOT_LOCKED   0x10 /* Vdi is not locked */
> +#define SD_RES_SHUTDOWN      0x11 /* Sheepdog is shutting down */
> +#define SD_RES_NO_MEM        0x12 /* Cannot allocate memory */
> +#define SD_RES_FULL_VDI      0x13 /* we already have the maximum vdis */
> +#define SD_RES_VER_MISMATCH  0x14 /* Protocol version mismatch */
> +#define SD_RES_NO_SPACE      0x15 /* Server has no room for new objects */
> +#define SD_RES_WAIT_FOR_FORMAT  0x16 /* Sheepdog is waiting for a format operation */
> +#define SD_RES_WAIT_FOR_JOIN    0x17 /* Sheepdog is waiting for other nodes joining */
> +#define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
> +
> +/*
> + * Object ID rules
> + *
> + *  0 - 19 (20 bits): data object space
> + * 20 - 31 (12 bits): reserved data object space
> + * 32 - 55 (24 bits): vdi object space
> + * 56 - 59 ( 4 bits): reserved vdi object space
> + * 60 - 63 ( 4 bits): object type indentifier space
> + */
> +
> +#define VDI_SPACE_SHIFT   32
> +#define VDI_BIT (UINT64_C(1) << 63)
> +#define VMSTATE_BIT (UINT64_C(1) << 62)
> +#define MAX_DATA_OBJS (1ULL << 20)
> +#define MAX_CHILDREN 1024
> +#define SD_MAX_VDI_LEN 256
> +#define SD_NR_VDIS   (1U << 24)
> +#define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
> +
> +#define SD_INODE_SIZE (sizeof(SheepdogInode))
> +#define CURRENT_VDI_ID 0
> +
> +typedef struct SheepdogReq {
> +	uint8_t		proto_ver;
> +	uint8_t		opcode;
> +	uint16_t	flags;
> +	uint32_t	epoch;
> +	uint32_t        id;
> +	uint32_t        data_length;
> +	uint32_t	opcode_specific[8];
> +} SheepdogReq;
> +
> +typedef struct SheepdogRsp {
> +	uint8_t		proto_ver;
> +	uint8_t		opcode;
> +	uint16_t	flags;
> +	uint32_t	epoch;
> +	uint32_t        id;
> +	uint32_t        data_length;
> +	uint32_t        result;
> +	uint32_t	opcode_specific[7];
> +} SheepdogRsp;
> +
> +typedef struct SheepdogObjReq {
> +	uint8_t		proto_ver;
> +	uint8_t		opcode;
> +	uint16_t	flags;
> +	uint32_t	epoch;
> +	uint32_t        id;
> +	uint32_t        data_length;
> +	uint64_t        oid;
> +	uint64_t        cow_oid;
> +	uint32_t        copies;
> +	uint32_t        rsvd;
> +	uint64_t        offset;
> +} SheepdogObjReq;
> +
> +typedef struct SheepdogObjRsp {
> +	uint8_t		proto_ver;
> +	uint8_t		opcode;
> +	uint16_t	flags;
> +	uint32_t	epoch;
> +	uint32_t        id;
> +	uint32_t        data_length;
> +	uint32_t        result;
> +	uint32_t        copies;
> +	uint32_t        pad[6];
> +} SheepdogObjRsp;
> +
> +typedef struct SheepdogVdiReq {
> +	uint8_t		proto_ver;
> +	uint8_t		opcode;
> +	uint16_t	flags;
> +	uint32_t	epoch;
> +	uint32_t        id;
> +	uint32_t        data_length;
> +	uint64_t	vdi_size;
> +	uint32_t        base_vdi_id;
> +	uint32_t        copies;
> +	uint32_t        snapid;
> +	uint32_t        pad[3];
> +} SheepdogVdiReq;
> +
> +typedef struct SheepdogVdiRsp {
> +	uint8_t		proto_ver;
> +	uint8_t		opcode;
> +	uint16_t	flags;
> +	uint32_t	epoch;
> +	uint32_t        id;
> +	uint32_t        data_length;
> +	uint32_t        result;
> +	uint32_t        rsvd;
> +	uint32_t        vdi_id;
> +	uint32_t        pad[5];
> +} SheepdogVdiRsp;
> +
> +typedef struct SheepdogInode {
> +	char name[SD_MAX_VDI_LEN];
> +	uint64_t ctime;
> +	uint64_t snap_ctime;
> +	uint64_t vm_clock_nsec;
> +	uint64_t vdi_size;
> +	uint64_t vm_state_size;
> +	uint16_t copy_policy;
> +	uint8_t  nr_copies;
> +	uint8_t  block_size_shift;
> +	uint32_t snap_id;
> +	uint32_t vdi_id;
> +	uint32_t parent_vdi_id;
> +	uint32_t child_vdi_id[MAX_CHILDREN];
> +	uint32_t data_vdi_id[MAX_DATA_OBJS];

Wow, this is a huge array. :-)

So Sheepdog has a fixed limit of 16 TB, right?


> +} SheepdogInode;
> +
> +/*
> + * 64 bit FNV-1a non-zero initial basis
> + */
> +#define FNV1A_64_INIT ((uint64_t)0xcbf29ce484222325ULL)
> +
> +/*
> + * 64 bit Fowler/Noll/Vo FNV-1a hash code
> + */
> +static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
> +{
> +        unsigned char *bp = (unsigned char *) buf;
> +        unsigned char *be = bp + len;
> +        while (bp < be) {
> +                hval ^= (uint64_t) *bp++;
> +                hval += (hval << 1) + (hval << 4) + (hval << 5) +
> +                        (hval << 7) + (hval << 8) + (hval << 40);
> +        }
> +        return hval;
> +}
> +
> +static inline int is_data_obj_writeable(SheepdogInode *inode, unsigned int idx)
> +{
> +	return inode->vdi_id == inode->data_vdi_id[idx];
> +}
> +
> +static inline int is_data_obj(uint64_t oid)
> +{
> +	return !(VDI_BIT & oid);
> +}
> +
> +static inline uint64_t data_oid_to_idx(uint64_t oid)
> +{
> +	return oid & (MAX_DATA_OBJS - 1);
> +}
> +
> +static inline uint64_t vid_to_vdi_oid(uint32_t vid)
> +{
> +	return VDI_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT);
> +}
> +
> +static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx)
> +{
> +	return VMSTATE_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
> +}
> +
> +static inline uint64_t vid_to_data_oid(uint32_t vid, uint32_t idx)
> +{
> +	return ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
> +}
> +
> +#undef dprintf
> +#ifdef DEBUG_SDOG
> +#define dprintf(fmt, args...)						\
> +do {									\
> +	fprintf(stdout, "%s %d: " fmt, __func__, __LINE__, ##args);	\
> +} while (0)
> +#else
> +#define dprintf(fmt, args...)
> +#endif
> +
> +#define min_t(type, x, y) ({			\
> +	type __min1 = (x);			\
> +	type __min2 = (y);			\
> +	__min1 < __min2 ? __min1: __min2; })
> +
> +#define max_t(type, x, y) ({			\
> +	type __max1 = (x);			\
> +	type __max2 = (y);			\
> +	__max1 > __max2 ? __max1: __max2; })
> +
> +typedef struct SheepdogAIOCB SheepdogAIOCB;
> +
> +typedef struct AIOReq {
> +	SheepdogAIOCB *aiocb;
> +	unsigned int iov_offset;
> +
> +	uint64_t oid;
> +	uint64_t base_oid;
> +	uint64_t offset;
> +	unsigned int data_len;
> +	uint8_t flags;
> +	uint32_t id;
> +
> +	QLIST_ENTRY(AIOReq) outstanding_aio_siblings;
> +	QLIST_ENTRY(AIOReq) aioreq_siblings;
> +} AIOReq;
> +
> +enum AIOCBState {
> +	AIOCB_WRITE_UDATA,
> +	AIOCB_READ_UDATA,
> +};
> +
> +struct SheepdogAIOCB {
> +	BlockDriverAIOCB common;
> +
> +	QEMUIOVector *qiov;
> +
> +	int64_t sector_num;
> +	int nb_sectors;
> +
> +	int ret;
> +	enum AIOCBState aiocb_type;
> +
> +	QEMUBH *bh;
> +	void (*aio_done_func)(SheepdogAIOCB *);
> +
> +	int canceled;
> +
> +	QLIST_HEAD(aioreq_head, AIOReq) aioreq_head;
> +};
> +
> +typedef struct BDRVSheepdogState {
> +	SheepdogInode inode;
> +
> +	uint32_t min_dirty_data_idx;
> +	uint32_t max_dirty_data_idx;
> +
> +	char name[SD_MAX_VDI_LEN];
> +	int is_current;
> +
> +	char *addr;
> +	int fd;
> +
> +	uint32_t aioreq_seq_num;
> +	QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head;
> +} BDRVSheepdogState;
> +
> +static const char * sd_strerror(int err)
> +{
> +	int i;
> +
> +	static const struct {
> +		int err;
> +		const char *desc;
> +	} errors[] = {
> +		{SD_RES_SUCCESS, "Success"},
> +		{SD_RES_UNKNOWN, "Unknown error"},
> +		{SD_RES_NO_OBJ, "No object found"},
> +		{SD_RES_EIO, "I/O error"},
> +		{SD_RES_VDI_EXIST, "VDI exists already"},
> +		{SD_RES_INVALID_PARMS, "Invalid parameters"},
> +		{SD_RES_SYSTEM_ERROR, "System error"},
> +		{SD_RES_VDI_LOCKED, "VDI is already locked"},
> +		{SD_RES_NO_VDI, "No vdi found"},
> +		{SD_RES_NO_BASE_VDI, "No base VDI found"},
> +		{SD_RES_VDI_READ, "Failed read the requested VDI"},
> +		{SD_RES_VDI_WRITE, "Failed to write the requested VDI"},
> +		{SD_RES_BASE_VDI_READ, "Failed to read the base VDI"},
> +		{SD_RES_BASE_VDI_WRITE, "Failed to write the base VDI"},
> +		{SD_RES_NO_TAG, "Failed to find the requested tag"},
> +		{SD_RES_STARTUP, "The system is still booting"},
> +		{SD_RES_VDI_NOT_LOCKED, "VDI isn't locked"},
> +		{SD_RES_SHUTDOWN, "The system is shutting down"},
> +		{SD_RES_NO_MEM, "Out of memory on the server"},
> +		{SD_RES_FULL_VDI, "We already have the maximum vdis"},
> +		{SD_RES_VER_MISMATCH, "Protocol version mismatch"},
> +		{SD_RES_NO_SPACE, "Server has no space for new objects"},
> +		{SD_RES_WAIT_FOR_FORMAT, "Sheepdog is waiting for a format operation"},
> +		{SD_RES_WAIT_FOR_JOIN, "Sheepdog is waiting for other nodes joining"},
> +		{SD_RES_JOIN_FAILED, "Target node had failed to join sheepdog"},
> +	};
> +
> +	for (i = 0; i < ARRAY_SIZE(errors); ++i) {
> +		if (errors[i].err == err) {
> +			return errors[i].desc;
> +		}
> +	}
> +
> +	return "Invalid error code";
> +}
> +
> +static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s,
> +				    SheepdogAIOCB *acb,
> +				    uint64_t oid, unsigned int data_len,
> +				    uint64_t offset, uint8_t flags,
> +				    uint64_t base_oid,
> +				    unsigned int iov_offset)
> +{
> +	AIOReq *aio_req;
> +
> +	aio_req = qemu_malloc(sizeof(*aio_req));
> +	aio_req->aiocb = acb;
> +	aio_req->iov_offset = iov_offset;
> +	aio_req->oid = oid;
> +	aio_req->base_oid = base_oid;
> +	aio_req->offset = offset;
> +	aio_req->data_len = data_len;
> +	aio_req->flags = flags;
> +	aio_req->id = s->aioreq_seq_num++;
> +
> +	QLIST_INSERT_HEAD(&s->outstanding_aio_head, aio_req,
> +			  outstanding_aio_siblings);
> +	QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings);
> +
> +	return aio_req;
> +}
> +
> +static inline int free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
> +{
> +	SheepdogAIOCB *acb = aio_req->aiocb;
> +	QLIST_REMOVE(aio_req, outstanding_aio_siblings);
> +	QLIST_REMOVE(aio_req, aioreq_siblings);
> +	qemu_free(aio_req);
> +
> +	return !QLIST_EMPTY(&acb->aioreq_head);
> +}
> +
> +static void sd_finish_aiocb(SheepdogAIOCB *acb)
> +{
> +	if (!acb->canceled) {
> +		acb->common.cb(acb->common.opaque, acb->ret);
> +	}
> +	qemu_aio_release(acb);
> +}
> +
> +static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
> +{
> +	SheepdogAIOCB *acb = (SheepdogAIOCB *)blockacb;
> +
> +	acb->canceled = 1;
> +}

Does this provide the right semantics? You haven't really cancelled the
request, but you pretend to. So you actually complete the request in the
background and then throw the return code away.

I seem to remember that posix-aio-compat.c waits at this point for
completion of the requests, calls the callbacks and only afterwards
returns from aio_cancel when no more requests are in flight.

Or if you can really cancel requests, it would be the best option, of
course.

> +
> +static AIOPool sd_aio_pool = {
> +	.aiocb_size = sizeof(SheepdogAIOCB),
> +	.cancel = sd_aio_cancel,
> +};
> +
> +static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
> +				   int64_t sector_num, int nb_sectors,
> +				   BlockDriverCompletionFunc *cb,
> +				   void *opaque)
> +{
> +	SheepdogAIOCB *acb;
> +
> +	acb = qemu_aio_get(&sd_aio_pool, bs, cb, opaque);
> +
> +	acb->qiov = qiov;
> +
> +	acb->sector_num = sector_num;
> +	acb->nb_sectors = nb_sectors;
> +
> +	acb->aio_done_func = NULL;
> +	acb->canceled = 0;
> +	acb->bh = NULL;
> +	acb->ret = 0;
> +	QLIST_INIT(&acb->aioreq_head);
> +	return acb;
> +}
> +
> +static int sd_schedule_bh(QEMUBHFunc *cb, SheepdogAIOCB *acb)
> +{
> +	if (acb->bh) {
> +		error_report("bug: %d %d\n", acb->aiocb_type, acb->aiocb_type);
> +		return -EIO;
> +	}
> +
> +	acb->bh = qemu_bh_new(cb, acb);
> +	if (!acb->bh) {
> +		error_report("oom: %d %d\n", acb->aiocb_type, acb->aiocb_type);
> +		return -EIO;
> +	}
> +
> +	qemu_bh_schedule(acb->bh);
> +
> +	return 0;
> +}
> +
> +static int do_send_recv(int sockfd, struct iovec *iov, int len, int offset,
> +			int write)

I've spent at least 15 minutes figuring out what this function does. I
think I've got it now more or less, but I've come to the conclusion that
this code needs more comments.

I'd suggest to add a header comment to all non-trivial functions and
maybe somewhere on the top a general description of how things work.

As far as I understood now, there are basically two parts of request
handling:

1. The request is sent to the server. Its AIOCB is saved in a list in
the BDRVSheepdogState. It doesn't pass a callback or anything for the
completion.

2. aio_read_response is registered as a fd handler to the sheepdog
connection. When the server responds, it searches the right AIOCB in the
list and the second part of request handling starts.

do_send_recv is the function that is used to do all communication with
the server. The iov stuff looks like it's only used for some data, but
seems this is not true - it's also used for the metadata of the protocol.

Did I understand it right so far?

> +{
> +	struct msghdr msg;
> +	int ret, diff;
> +
> +	memset(&msg, 0, sizeof(msg));
> +	msg.msg_iov = iov;
> +	msg.msg_iovlen = 1;
> +
> +	len += offset;
> +
> +	while (iov->iov_len < len) {
> +		len -= iov->iov_len;
> +
> +		iov++;
> +		msg.msg_iovlen++;
> +	}

You're counting the number of elements in the iov here. qemu_iovec would
already have these (and also len), wouldn't it make sense to use it as
the abstraction? Though I'm not sure where these iovecs come from, so
the answer might be no.

> +
> +	diff = iov->iov_len - len;
> +	iov->iov_len -= diff;
> +
> +	while (msg.msg_iov->iov_len <= offset) {
> +		offset -= msg.msg_iov->iov_len;
> +
> +		msg.msg_iov++;
> +		msg.msg_iovlen--;
> +	}
> +
> +	msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base + offset;
> +	msg.msg_iov->iov_len -= offset;
> +
> +	if (write) {
> +		ret = sendmsg(sockfd, &msg, 0);
> +	} else {
> +		ret = recvmsg(sockfd, &msg, MSG_WAITALL);
> +	}
> +
> +	msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base - offset;
> +	msg.msg_iov->iov_len += offset;
> +
> +	iov->iov_len += diff;
> +	return ret;
> +}
> +
> +static int connect_to_sdog(const char *addr)
> +{
> +	char buf[64];
> +	char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
> +	char name[256], *p;
> +	int fd, ret;
> +	struct addrinfo hints, *res, *res0;
> +	int port = 0;
> +
> +	if (!addr) {
> +		addr = SD_DEFAULT_ADDR;
> +	}
> +
> +	strcpy(name, addr);

This smells like buffer overflows. In practice it's s->addr for all
callers and I think this values comes indirectly from filename in
sd_open - for which I didn't find a length check, so it could overflow
indeed.

> +
> +	p = name;
> +	while (*p) {
> +		if (*p == ':') {
> +			*p++ = '\0';
> +			break;
> +		} else {
> +			p++;
> +		}
> +	}
> +
> +	if (*p == '\0') {
> +		error_report("cannot find a port number, %s\n", name);
> +		return -1;
> +	}
> +	port = strtol(p, NULL, 10);
> +	if (port == 0) {
> +		error_report("invalid port number, %s\n", p);
> +		return -1;
> +	}
> +
> +	memset(&hints, 0, sizeof(hints));
> +	snprintf(buf, sizeof(buf), "%d", port);
> +
> +	hints.ai_socktype = SOCK_STREAM;
> +
> +	ret = getaddrinfo(name, buf, &hints, &res0);
> +	if (ret) {
> +		error_report("unable to get address info %s, %m\n", name);
> +		return -1;
> +	}
> +
> +	for (res = res0; res; res = res->ai_next) {
> +		ret = getnameinfo(res->ai_addr, res->ai_addrlen,
> +				  hbuf, sizeof(hbuf), sbuf, sizeof(sbuf),
> +				  NI_NUMERICHOST | NI_NUMERICSERV);
> +		if (ret) {
> +			continue;
> +		}
> +
> +		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
> +		if (fd < 0) {
> +			continue;
> +		}
> +
> +reconnect:
> +		ret = connect(fd, res->ai_addr, res->ai_addrlen);
> +		if (ret < 0) {
> +			if (errno == EINTR) {
> +				goto reconnect;
> +			}
> +			break;
> +		}
> +
> +		dprintf("connected to %s:%d\n", name, port);
> +		goto success;
> +	}
> +	fd = -1;
> +	error_report("failed connect to %s:%d\n", name, port);
> +success:
> +	freeaddrinfo(res0);
> +	return fd;
> +}
> +
> +static int do_readv_writev(int sockfd, struct iovec *iov, int len,
> +			   int iov_offset, int write)
> +{
> +	int ret;
> +again:
> +	ret = do_send_recv(sockfd, iov, len, iov_offset, write);
> +	if (ret < 0) {
> +		if (errno == EINTR || errno == EAGAIN) {
> +			goto again;
> +		}
> +		error_report("failed to recv a rsp, %m\n");
> +		return 1;
> +	}
> +
> +	iov_offset += ret;
> +	len -= ret;
> +	if (len) {
> +		goto again;
> +	}
> +
> +	return 0;
> +}
> +
> +static int do_readv(int sockfd, struct iovec *iov, int len, int iov_offset)
> +{
> +	return do_readv_writev(sockfd, iov, len, iov_offset, 0);
> +}
> +
> +static int do_writev(int sockfd, struct iovec *iov, int len, int iov_offset)
> +{
> +	return do_readv_writev(sockfd, iov, len, iov_offset, 1);
> +}
> +
> +static int do_read_write(int sockfd, void *buf, int len, int write)
> +{
> +	struct iovec iov;
> +
> +	iov.iov_base = buf;
> +	iov.iov_len = len;
> +
> +	return do_readv_writev(sockfd, &iov, len, 0, write);
> +}
> +
> +static int do_read(int sockfd, void *buf, int len)
> +{
> +	return do_read_write(sockfd, buf, len, 0);
> +}
> +
> +static int do_write(int sockfd, void *buf, int len)
> +{
> +	return do_read_write(sockfd, buf, len, 1);
> +}
> +
> +static int send_req(int sockfd, SheepdogReq *hdr, void *data,
> +		    unsigned int *wlen)
> +{
> +	int ret;
> +	struct iovec iov[2];
> +
> +	iov[0].iov_base = hdr;
> +	iov[0].iov_len = sizeof(*hdr);
> +
> +	if (*wlen) {
> +		iov[1].iov_base = data;
> +		iov[1].iov_len = *wlen;
> +	}
> +
> +	ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen, 0);
> +	if (ret) {
> +		error_report("failed to send a req, %m\n");
> +		ret = -1;
> +	}
> +
> +	return ret;
> +}
> +
> +static int do_req(int sockfd, SheepdogReq *hdr, void *data,
> +		  unsigned int *wlen, unsigned int *rlen)
> +{
> +	int ret;
> +
> +	ret = send_req(sockfd, hdr, data, wlen);
> +	if (ret) {
> +		ret = -1;
> +		goto out;
> +	}
> +
> +	ret = do_read(sockfd, hdr, sizeof(*hdr));
> +	if (ret) {
> +		error_report("failed to get a rsp, %m\n");
> +		ret = -1;
> +		goto out;
> +	}
> +
> +	if (*rlen > hdr->data_length) {
> +		*rlen = hdr->data_length;
> +	}
> +
> +	if (*rlen) {
> +		ret = do_read(sockfd, data, *rlen);
> +		if (ret) {
> +			error_report("failed to get the data, %m\n");
> +			ret = -1;
> +			goto out;
> +		}
> +	}
> +	ret = 0;
> +out:
> +	return ret;
> +}
> +
> +static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
> +			   struct iovec *iov, int niov, int create,
> +			   enum AIOCBState aiocb_type);
> +
> +static void send_pending_req(BDRVSheepdogState *s, uint64_t oid, uint32_t id)
> +{
> +	AIOReq *aio_req, *next;
> +	SheepdogAIOCB *acb;
> +	int ret;
> +
> +	QLIST_FOREACH_SAFE(aio_req, &s->outstanding_aio_head,
> +			   outstanding_aio_siblings, next) {
> +		if (id == aio_req->id) {
> +			continue;
> +		}
> +		if (aio_req->oid != oid) {
> +			continue;
> +		}
> +
> +		acb = aio_req->aiocb;
> +		ret = add_aio_request(s, aio_req, acb->qiov->iov,
> +				      acb->qiov->niov, 0, acb->aiocb_type);
> +		if (ret < 0) {
> +			error_report("add_aio_request is faled\n");
> +			free_aio_req(s, aio_req);
> +			if (QLIST_EMPTY(&acb->aioreq_head)) {
> +				sd_finish_aiocb(acb);
> +			}
> +		}
> +	}
> +}
> +
> +static void aio_read_response(void *opaque)
> +{
> +	SheepdogObjReq hdr;
> +	SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;

Why do you declare an otherwise unused variable hdr, take a pointer to
it and cast it to an incompatible type? This is scary.

> +	BDRVSheepdogState *s = (BDRVSheepdogState *)opaque;
> +	int fd = s->fd;
> +	int ret;
> +	AIOReq *aio_req = NULL;
> +	SheepdogAIOCB *acb;
> +	int rest;
> +	unsigned long idx;
> +
> +	if (QLIST_EMPTY(&s->outstanding_aio_head)) {
> +		return;
> +	}
> +
> +	ret = do_read(fd, (void *)rsp, sizeof(*rsp));

This cast looks scary, too. But do_read wants a void* anyway, so it's
not even necessary. Please drop it.

> +	if (ret) {
> +		error_report("failed to get the header, %m\n");
> +		return;
> +	}
> +
> +	QLIST_FOREACH(aio_req, &s->outstanding_aio_head, outstanding_aio_siblings) {
> +		if (aio_req->id == rsp->id) {
> +			break;
> +		}
> +	}
> +	if (!aio_req) {
> +		error_report("cannot find aio_req %x\n", rsp->id);
> +		return;
> +	}
> +
> +	acb = aio_req->aiocb;
> +
> +	switch (acb->aiocb_type) {
> +	case AIOCB_WRITE_UDATA:
> +		if (!is_data_obj(aio_req->oid)) {
> +			break;
> +		}
> +		idx = data_oid_to_idx(aio_req->oid);
> +
> +		if (s->inode.data_vdi_id[idx] != s->inode.vdi_id) {
> +			s->inode.data_vdi_id[idx] = s->inode.vdi_id;
> +			s->max_dirty_data_idx = max_t(uint32_t, idx,
> +						      s->max_dirty_data_idx);
> +			s->min_dirty_data_idx = min_t(uint32_t, idx,
> +						      s->min_dirty_data_idx);
> +
> +			send_pending_req(s, vid_to_data_oid(s->inode.vdi_id, idx),
> +					 rsp->id);
> +		}
> +		break;
> +	case AIOCB_READ_UDATA:
> +		ret = do_readv(fd, acb->qiov->iov, rsp->data_length,
> +			       aio_req->iov_offset);
> +		if (ret) {
> +			error_report("failed to get the data, %m\n");
> +			return;
> +		}
> +		break;
> +	}
> +
> +	if (rsp->result != SD_RES_SUCCESS) {
> +		acb->ret = -EIO;
> +		error_report("%s\n", sd_strerror(rsp->result));
> +	}
> +
> +	rest = free_aio_req(s, aio_req);
> +	if (!rest) {
> +		acb->aio_done_func(acb);
> +	}
> +}
> +
> +static int aio_flush_request(void *opaque)
> +{
> +	BDRVSheepdogState *s = (BDRVSheepdogState *)opaque;

Unnecessary cast.

> +
> +	return !QLIST_EMPTY(&s->outstanding_aio_head);
> +}
> +
> +static int set_nonblocking(int fd)
> +{
> +	int ret;
> +
> +	ret = fcntl(fd, F_GETFL);
> +	if (ret < 0) {
> +		error_report("can't fcntl (F_GETFL), %m\n");
> +		close(fd);
> +	} else {
> +		ret = fcntl(fd, F_SETFL, ret | O_NONBLOCK);
> +		if (ret < 0) {
> +			error_report("can't fcntl (O_NONBLOCK), %m\n");
> +		}
> +	}
> +
> +	return ret;
> +}
> +
> +static int set_nodelay(int fd)
> +{
> +	int ret, opt;
> +
> +	opt = 1;
> +	ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
> +	return ret;
> +}
> +
> +/*
> + * Return a socket discriptor to read/write objects.
> + * We cannot use this discriptor for other operations because
> + * the block driver may be on waiting response from the server.
> + */
> +static int get_sheep_fd(BDRVSheepdogState *s)
> +{
> +	int ret, fd;
> +
> +	fd = connect_to_sdog(s->addr);
> +	if (fd < 0) {
> +		error_report("%m\n");

%m is Linux specific, as far as I know.

> +		return -1;
> +	}
> +
> +	ret = set_nonblocking(fd);
> +	if (ret) {
> +		error_report("%m\n");
> +		close(fd);
> +		return -1;
> +	}
> +
> +	ret = set_nodelay(fd);
> +	if (ret) {
> +		error_report("%m\n");
> +		close(fd);
> +		return -1;
> +	}
> +
> +	qemu_aio_set_fd_handler(fd, aio_read_response, NULL, aio_flush_request,
> +				NULL, s);
> +	s->fd = fd;
> +
> +	return fd;
> +}
> +
> +static int parse_vdiname(BDRVSheepdogState *s, const char *filename,
> +			 char *vdi, int vdi_len, uint32_t *snapid)
> +{
> +	char *p, *q;
> +	int nr_sep;
> +
> +	p = q = strdup(filename);
> +
> +	if (!p) {
> +		return 1;
> +	}
> +
> +	nr_sep = 0;
> +	while (*p) {
> +		if (*p == ':') {
> +			nr_sep++;
> +		}
> +		if (nr_sep == 2) {
> +			break;
> +		}
> +		p++;
> +	}
> +
> +	if (nr_sep == 2) {
> +		*p++ = '\0';
> +	} else {
> +		p = q;
> +	}
> +
> +	strncpy(vdi, p, vdi_len);
> +
> +	p = strchr(vdi, ':');
> +	if (p) {
> +		*p++ = '\0';
> +		*snapid = strtol(p, NULL, 10);
> +	} else {
> +		*snapid = CURRENT_VDI_ID; /* search current vdi */
> +	}
> +
> +	if (nr_sep == 2) {
> +		s->addr = q;
> +	} else {
> +		free(q);
> +		s->addr = NULL;
> +	}
> +
> +	return 0;
> +}
> +
> +static int find_vdi_name(BDRVSheepdogState *s, char *filename, uint32_t snapid,
> +			 uint32_t *vid, int for_snapshot)
> +{
> +	int ret, fd;
> +	SheepdogVdiReq hdr;
> +	SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
> +	unsigned int wlen, rlen = 0;
> +	char buf[SD_MAX_VDI_LEN];
> +
> +	fd = connect_to_sdog(s->addr);
> +	if (fd < 0) {
> +		return -1;
> +	}
> +
> +	memset(&hdr, 0, sizeof(hdr));
> +	snprintf(buf, sizeof(buf), "%s", filename);
> +	if (for_snapshot) {
> +		hdr.opcode = SD_OP_GET_VDI_INFO;
> +	} else {
> +		hdr.opcode = SD_OP_LOCK_VDI;
> +	}
> +	wlen = SD_MAX_VDI_LEN;
> +	hdr.proto_ver = SD_PROTO_VER;
> +	hdr.data_length = SD_MAX_VDI_LEN;
> +	hdr.snapid = snapid;
> +	hdr.flags = SD_FLAG_CMD_WRITE;
> +
> +	ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
> +	if (ret) {
> +		ret = -1;
> +		goto out;
> +	}
> +
> +	if (rsp->result != SD_RES_SUCCESS) {
> +		error_report("%s, %s\n", sd_strerror(rsp->result), filename);
> +		ret = -1;
> +		goto out;
> +	}
> +	*vid = rsp->vdi_id;
> +
> +	ret = 0;
> +out:
> +	close(fd);
> +	return ret;
> +}
> +
> +static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
> +			   struct iovec *iov, int niov, int create,
> +			   enum AIOCBState aiocb_type)
> +{
> +	int nr_copies = s->inode.nr_copies;
> +	SheepdogObjReq hdr;
> +	unsigned int wlen;
> +	int ret, opt;
> +	uint64_t oid = aio_req->oid;
> +	unsigned int datalen = aio_req->data_len;
> +	uint64_t offset = aio_req->offset;
> +	uint8_t flags = aio_req->flags;
> +	uint64_t old_oid = aio_req->base_oid;
> +
> +	if (!nr_copies) {
> +		error_report("bug\n");
> +	}
> +
> +	memset(&hdr, 0, sizeof(hdr));
> +
> +	if (aiocb_type == AIOCB_READ_UDATA) {
> +		wlen = 0;
> +		hdr.opcode = SD_OP_READ_OBJ;
> +		hdr.flags = flags;
> +	} else if (create) {
> +		wlen = datalen;
> +		hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
> +		hdr.flags = SD_FLAG_CMD_WRITE | flags;
> +	} else {
> +		wlen = datalen;
> +		hdr.opcode = SD_OP_WRITE_OBJ;
> +		hdr.flags = SD_FLAG_CMD_WRITE | flags;
> +	}
> +
> +	hdr.oid = oid;
> +	hdr.cow_oid = old_oid;
> +	hdr.copies = s->inode.nr_copies;
> +
> +	hdr.data_length = datalen;
> +	hdr.offset = offset;
> +
> +	hdr.id = aio_req->id;
> +
> +	opt = 1;
> +	setsockopt(s->fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
> +
> +	ret = do_write(s->fd, &hdr, sizeof(hdr));
> +	if (ret) {
> +		error_report("failed to send a req, %m\n");
> +		return -EIO;
> +	}
> +
> +	if (wlen) {
> +		ret = do_writev(s->fd, iov, wlen, aio_req->iov_offset);
> +		if (ret) {
> +			error_report("failed to send a data, %m\n");
> +			return -EIO;
> +		}
> +	}
> +        opt = 0;
> +        setsockopt(s->fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
> +
> +	return 0;
> +}
> +
> +static int read_write_object(int fd, char *buf, uint64_t oid, int copies,
> +			     unsigned int datalen, uint64_t offset,
> +			     int write, int create)
> +{
> +	SheepdogObjReq hdr;
> +	SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
> +	unsigned int wlen, rlen;
> +	int ret;
> +
> +	memset(&hdr, 0, sizeof(hdr));
> +
> +	if (write) {
> +		wlen = datalen;
> +		rlen = 0;
> +		hdr.flags = SD_FLAG_CMD_WRITE;
> +		if (create) {
> +			hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
> +		} else {
> +			hdr.opcode = SD_OP_WRITE_OBJ;
> +		}
> +	} else {
> +		wlen = 0;
> +		rlen = datalen;
> +		hdr.opcode = SD_OP_READ_OBJ;
> +	}
> +	hdr.oid = oid;
> +	hdr.data_length = datalen;
> +	hdr.offset = offset;
> +	hdr.copies = copies;
> +
> +	ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
> +	if (ret) {
> +		error_report("failed to send a request to the sheep\n");
> +		return -1;
> +	}
> +
> +	switch (rsp->result) {
> +	case SD_RES_SUCCESS:
> +		return 0;
> +	default:
> +		error_report("%s\n", sd_strerror(rsp->result));
> +		return -1;
> +	}
> +}
> +
> +static int read_object(int fd, char *buf, uint64_t oid, int copies,
> +		       unsigned int datalen, uint64_t offset)
> +{
> +	return read_write_object(fd, buf, oid, copies, datalen, offset, 0, 0);
> +}
> +
> +static int write_object(int fd, char *buf, uint64_t oid, int copies,
> +			unsigned int datalen, uint64_t offset, int create)
> +{
> +	return read_write_object(fd, buf, oid, copies, datalen, offset, 1, create);
> +}
> +
> +/* TODO: error cleanups */
> +static int sd_open(BlockDriverState *bs, const char *filename, int flags)
> +{
> +	int ret, fd;
> +	uint32_t vid = 0;
> +	BDRVSheepdogState *s = bs->opaque;
> +	char vdi[256];
> +	uint32_t snapid;
> +	int for_snapshot = 0;
> +	char *buf;
> +
> +	strstart(filename, "sheepdog:", (const char **)&filename);
> +
> +	buf = qemu_malloc(SD_INODE_SIZE);
> +
> +	memset(vdi, 0, sizeof(vdi));
> +	if (parse_vdiname(s, filename, vdi, sizeof(vdi), &snapid) < 0) {
> +		goto out;
> +	}
> +	s->fd = get_sheep_fd(s);
> +	if (s->fd < 0) {
> +		return -1;
> +	}
> +
> +	if (snapid != CURRENT_VDI_ID) {
> +		for_snapshot = 1;
> +	}
> +
> +	ret = find_vdi_name(s, vdi, snapid, &vid, for_snapshot);
> +	if (ret) {
> +		goto out;
> +	}
> +
> +	if (snapid) {
> +		dprintf("%" PRIx32 " non current inode was open.\n", vid);
> +	} else {
> +		s->is_current = 1;
> +	}
> +
> +	fd = connect_to_sdog(s->addr);
> +	if (fd < 0) {
> +		error_report("failed to connect\n");
> +		goto out;
> +	}
> +
> +	ret = read_object(fd, buf, vid_to_vdi_oid(vid), 0, SD_INODE_SIZE, 0);
> +
> +	close(fd);
> +
> +	if (ret) {
> +		goto out;
> +	}
> +
> +	memcpy(&s->inode, buf, sizeof(s->inode));
> +	s->min_dirty_data_idx = UINT32_MAX;
> +	s->max_dirty_data_idx = 0;
> +
> +	bs->total_sectors = s->inode.vdi_size >> 9;
> +	strncpy(s->name, vdi, sizeof(s->name));
> +	qemu_free(buf);
> +
> +	QLIST_INIT(&s->outstanding_aio_head);
> +	return 0;
> +out:
> +	qemu_free(buf);
> +	return -1;
> +}
> +
> +static int do_sd_create(const char *addr, char *filename, char *tag,
> +			int64_t total_sectors, uint32_t base_vid,
> +			uint32_t *vdi_id, int snapshot)
> +{
> +	SheepdogVdiReq hdr;
> +	SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
> +	int fd, ret;
> +	unsigned int wlen, rlen = 0;
> +	char buf[SD_MAX_VDI_LEN];
> +
> +	fd = connect_to_sdog(addr);
> +	if (fd < 0) {
> +		return -1;
> +	}
> +
> +	strncpy(buf, filename, SD_MAX_VDI_LEN);
> +
> +	memset(&hdr, 0, sizeof(hdr));
> +	hdr.opcode = SD_OP_NEW_VDI;
> +	hdr.base_vdi_id = base_vid;
> +
> +	wlen = SD_MAX_VDI_LEN;
> +
> +	hdr.flags = SD_FLAG_CMD_WRITE;
> +	hdr.snapid = snapshot;
> +
> +	hdr.data_length = wlen;
> +	hdr.vdi_size = total_sectors * 512;
> +
> +	ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
> +
> +	close(fd);
> +
> +	if (ret) {
> +		return -1;
> +	}
> +
> +	if (rsp->result != SD_RES_SUCCESS) {
> +		error_report("%s, %s\n", sd_strerror(rsp->result), filename);
> +		return -1;
> +	}
> +
> +	if (vdi_id) {
> +		*vdi_id = rsp->vdi_id;
> +	}
> +
> +	return 0;
> +}
> +
> +static int sd_create(const char *filename, QEMUOptionParameter *options)
> +{
> +	int ret;
> +	uint32_t vid = 0;
> +	int64_t total_sectors = 0;
> +	char *backing_file = NULL;
> +
> +	strstart(filename, "sheepdog:", (const char **)&filename);
> +
> +	while (options && options->name) {
> +		if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
> +			total_sectors = options->value.n / 512;
> +		} else if (!strcmp(options->name, BLOCK_OPT_BACKING_FILE)) {
> +			backing_file = options->value.s;
> +		}
> +		options++;
> +	}
> +
> +	if (backing_file) {
> +		BlockDriverState bs;
> +		char vdi[SD_MAX_VDI_LEN];
> +		uint32_t snapid;
> +
> +		strstart(backing_file, "sheepdog:", (const char **)&backing_file);
> +		memset(&bs, 0, sizeof(bs));
> +
> +		bs.opaque = qemu_malloc(sizeof(BDRVSheepdogState));
> +
> +		ret = sd_open(&bs, backing_file, 0);
> +		if (ret < 0) {
> +			return -1;
> +		}
> +
> +		if (parse_vdiname(bs.opaque, backing_file, vdi, sizeof(vdi), &snapid) < 0) {
> +			return -1;
> +		}
> +
> +		/* cannot clone from a current inode */
> +		if (snapid == CURRENT_VDI_ID) {
> +			return -1;
> +		}
> +
> +		ret = find_vdi_name(bs.opaque, vdi, snapid, &vid, 1);
> +		if (ret) {
> +			return -1;
> +		}

Is it really necessary to do Sheepdog specific stuff here? With other
formats I can use any format I like for a backing file. VMDK did assume
that backing files could only be VMDK initially, but it was considered a
bug.

If Sheepdog can't handle other image formats (e.g. because it can't
communicate that a request touches unallocated space), you need to check
this at least an return an error if something else is used.


One more thing, I'll ask the same as for Ceph here: Do you have any
specific tests for Sheepdog that could be used or should we try to use
generic qemu-iotests cases?

Kevin
MORITA Kazutaka - June 3, 2010, 3:31 p.m.
At Tue, 01 Jun 2010 09:58:04 -0500,
Thanks for your comments!

Chris Krumme wrote:
> 
> On 05/27/2010 09:44 PM, MORITA Kazutaka wrote:
> > Sheepdog is a distributed storage system for QEMU. It provides highly

> > +
> > +static int connect_to_sdog(const char *addr)
> > +{
> > +	char buf[64];
> > +	char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
> > +	char name[256], *p;
> > +	int fd, ret;
> > +	struct addrinfo hints, *res, *res0;
> > +	int port = 0;
> > +
> > +	if (!addr) {
> > +		addr = SD_DEFAULT_ADDR;
> > +	}
> > +
> > +	strcpy(name, addr);
> >    
> 
> Can strlen(addr) be > sizeof(name)?
> 

Yes, we should check the length of addr. This would causes overflows.

> > +
> > +	p = name;
> > +	while (*p) {
> > +		if (*p == ':') {
> > +			*p++ = '\0';
> >    
> 
> May also need to check for p > name + sizeof(name).
> 

p should be NULL-terminated, so the check is not required, I think.

> > +			break;
> > +		} else {
> > +			p++;
> > +		}
> > +	}
> > +
> > +	if (*p == '\0') {
> > +		error_report("cannot find a port number, %s\n", name);
> > +		return -1;
> > +	}
> > +	port = strtol(p, NULL, 10);
> >    
> 
> Are negative numbers valid here?
> 

No. It is better to use strtoul.


> > +
> > +static int parse_vdiname(BDRVSheepdogState *s, const char *filename,
> > +			 char *vdi, int vdi_len, uint32_t *snapid)
> > +{
> > +	char *p, *q;
> > +	int nr_sep;
> > +
> > +	p = q = strdup(filename);
> > +
> > +	if (!p) {
> >    
> 
> I think Qemu has a version of strdup that will not return NULL.
> 

Yes. We can use qemu_strdup here.


> > +
> > +/* TODO: error cleanups */
> > +static int sd_open(BlockDriverState *bs, const char *filename, int flags)
> > +{
> > +	int ret, fd;
> > +	uint32_t vid = 0;
> > +	BDRVSheepdogState *s = bs->opaque;
> > +	char vdi[256];
> > +	uint32_t snapid;
> > +	int for_snapshot = 0;
> > +	char *buf;
> > +
> > +	strstart(filename, "sheepdog:", (const char **)&filename);
> > +
> > +	buf = qemu_malloc(SD_INODE_SIZE);
> > +
> > +	memset(vdi, 0, sizeof(vdi));
> > +	if (parse_vdiname(s, filename, vdi, sizeof(vdi),&snapid)<  0) {
> > +		goto out;
> > +	}
> > +	s->fd = get_sheep_fd(s);
> > +	if (s->fd<  0) {
> >    
> 
> buf is not freed, goto out maybe.
> 

Yes, we should goto out here.


> > +
> > +static int do_sd_create(const char *addr, char *filename, char *tag,
> > +			int64_t total_sectors, uint32_t base_vid,
> > +			uint32_t *vdi_id, int snapshot)
> > +{
> > +	SheepdogVdiReq hdr;
> > +	SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
> > +	int fd, ret;
> > +	unsigned int wlen, rlen = 0;
> > +	char buf[SD_MAX_VDI_LEN];
> > +
> > +	fd = connect_to_sdog(addr);
> > +	if (fd<  0) {
> > +		return -1;
> > +	}
> > +
> > +	strncpy(buf, filename, SD_MAX_VDI_LEN);
> > +
> > +	memset(&hdr, 0, sizeof(hdr));
> > +	hdr.opcode = SD_OP_NEW_VDI;
> > +	hdr.base_vdi_id = base_vid;
> > +
> > +	wlen = SD_MAX_VDI_LEN;
> > +
> > +	hdr.flags = SD_FLAG_CMD_WRITE;
> > +	hdr.snapid = snapshot;
> > +
> > +	hdr.data_length = wlen;
> > +	hdr.vdi_size = total_sectors * 512;
> >    
> 
> There is another patch on the list changing 512 to a define for sector size.
> 

OK. We'll define SECTOR_SIZE.


> > +
> > +	ret = do_req(fd, (SheepdogReq *)&hdr, buf,&wlen,&rlen);
> > +
> > +	close(fd);
> > +
> > +	if (ret) {
> > +		return -1;
> > +	}
> > +
> > +	if (rsp->result != SD_RES_SUCCESS) {
> > +		error_report("%s, %s\n", sd_strerror(rsp->result), filename);
> > +		return -1;
> > +	}
> > +
> > +	if (vdi_id) {
> > +		*vdi_id = rsp->vdi_id;
> > +	}
> > +
> > +	return 0;
> > +}
> > +
> > +static int sd_create(const char *filename, QEMUOptionParameter *options)
> > +{
> > +	int ret;
> > +	uint32_t vid = 0;
> > +	int64_t total_sectors = 0;
> > +	char *backing_file = NULL;
> > +
> > +	strstart(filename, "sheepdog:", (const char **)&filename);
> > +
> > +	while (options&&  options->name) {
> > +		if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
> > +			total_sectors = options->value.n / 512;
> >    
> Use define.
> > +		} else if (!strcmp(options->name, BLOCK_OPT_BACKING_FILE)) {
> > +			backing_file = options->value.s;
> > +		}
> > +		options++;
> > +	}
> > +
> > +	if (backing_file) {
> > +		BlockDriverState bs;
> > +		char vdi[SD_MAX_VDI_LEN];
> > +		uint32_t snapid;
> > +
> > +		strstart(backing_file, "sheepdog:", (const char **)&backing_file);
> > +		memset(&bs, 0, sizeof(bs));
> > +
> > +		bs.opaque = qemu_malloc(sizeof(BDRVSheepdogState));
> >    
> 
> bs seems to have a short life span, is opaque getting freed?
> 

No, we should free it.


> > +
> > +static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
> > +{
> > +	BDRVSheepdogState *s = bs->opaque;
> > +	int ret, fd;
> > +	uint32_t new_vid;
> > +	SheepdogInode *inode;
> > +	unsigned int datalen;
> > +	uint64_t offset;
> > +
> > +	dprintf("sn_info: name %s id_str %s s: name %s vm_state_size %d "
> > +		"is_current %d\n", sn_info->name, sn_info->id_str,
> > +		s->name, sn_info->vm_state_size, s->is_current);
> > +
> > +	if (!s->is_current) {
> > +		error_report("You can't create a snapshot of "
> > +			"a non current VDI, %s (%" PRIu32 ").\n",
> > +			s->name, s->inode.vdi_id);
> > +
> > +		return -1;
> > +	}
> > +
> > +	dprintf("%s %s\n", sn_info->name, sn_info->id_str);
> > +
> > +	s->inode.vm_state_size = sn_info->vm_state_size;
> > +	s->inode.vm_clock_nsec = sn_info->vm_clock_nsec;
> > +	offset = 0;
> > +	/* we don't need to read entire object */
> > +	datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
> > +
> > +	/* refresh inode. */
> > +	fd = connect_to_sdog(s->addr);
> > +	if (fd<  0) {
> > +		ret = -EIO;
> > +		goto cleanup;
> > +	}
> > +
> > +	ret = write_object(fd, (char *)&s->inode, vid_to_vdi_oid(s->inode.vdi_id),
> > +			   s->inode.nr_copies, datalen, offset, 0);
> > +	if (ret<  0) {
> > +		error_report("failed to write snapshot's inode.\n");
> > +		ret = -EIO;
> > +		goto cleanup;
> > +	}
> > +
> > +	ret = do_sd_create(s->addr, s->name, NULL, s->inode.vdi_size>>  9,
> > +			   s->inode.vdi_id,&new_vid, 1);
> > +	if (ret<  0) {
> > +		error_report("failed to create inode for snapshot. %m\n");
> > +		ret = -EIO;
> > +		goto cleanup;
> > +	}
> > +
> > +	inode = (SheepdogInode *)qemu_malloc(datalen);
> > +
> > +	ret = read_object(fd, (char *)inode, vid_to_vdi_oid(new_vid),
> > +			  s->inode.nr_copies, datalen, offset);
> > +
> > +	close(fd);
> >    
> 
> Should you close fd twice, or let it fall through.
> 

We should remove close() here.


I'll address the comments in my next post.

Thanks,

Kazutaka
MORITA Kazutaka - June 3, 2010, 4:23 p.m.
At Wed, 02 Jun 2010 15:55:42 +0200,
Kevin Wolf wrote:
> 
> Am 28.05.2010 04:44, schrieb MORITA Kazutaka:
> > Sheepdog is a distributed storage system for QEMU. It provides highly
> > available block level storage volumes to VMs like Amazon EBS.  This
> > patch adds a qemu block driver for Sheepdog.
> > 
> > Sheepdog features are:
> > - No node in the cluster is special (no metadata node, no control
> >   node, etc)
> > - Linear scalability in performance and capacity
> > - No single point of failure
> > - Autonomous management (zero configuration)
> > - Useful volume management support such as snapshot and cloning
> > - Thin provisioning
> > - Autonomous load balancing
> > 
> > The more details are available at the project site:
> >     http://www.osrg.net/sheepdog/
> > 
> > Signed-off-by: MORITA Kazutaka <morita.kazutaka@lab.ntt.co.jp>
> > ---
> >  Makefile.objs    |    2 +-
> >  block/sheepdog.c | 1835 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
> >  2 files changed, 1836 insertions(+), 1 deletions(-)
> >  create mode 100644 block/sheepdog.c
> 
> One general thing: The code uses some mix of spaces and tabs for
> indentation, with the greatest part using tabs. According to
> CODING_STYLE it should consistently use four spaces instead.
> 

OK.  I'll fix the indentation according to CODYING_STYLE.


> > +
> > +typedef struct SheepdogInode {
> > +	char name[SD_MAX_VDI_LEN];
> > +	uint64_t ctime;
> > +	uint64_t snap_ctime;
> > +	uint64_t vm_clock_nsec;
> > +	uint64_t vdi_size;
> > +	uint64_t vm_state_size;
> > +	uint16_t copy_policy;
> > +	uint8_t  nr_copies;
> > +	uint8_t  block_size_shift;
> > +	uint32_t snap_id;
> > +	uint32_t vdi_id;
> > +	uint32_t parent_vdi_id;
> > +	uint32_t child_vdi_id[MAX_CHILDREN];
> > +	uint32_t data_vdi_id[MAX_DATA_OBJS];
> 
> Wow, this is a huge array. :-)
> 
> So Sheepdog has a fixed limit of 16 TB, right?
> 

MAX_DATA_OBJS is (1 << 20), and the size of a object is 4 MB.  So the
limit of the Sheepdog image size is 4 TB.

These values are hard-coded, and I guess they should be configurable.


> 
> > +} SheepdogInode;
> > +

> > +
> > +static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
> > +{
> > +	SheepdogAIOCB *acb = (SheepdogAIOCB *)blockacb;
> > +
> > +	acb->canceled = 1;
> > +}
> 
> Does this provide the right semantics? You haven't really cancelled the
> request, but you pretend to. So you actually complete the request in the
> background and then throw the return code away.
> 
> I seem to remember that posix-aio-compat.c waits at this point for
> completion of the requests, calls the callbacks and only afterwards
> returns from aio_cancel when no more requests are in flight.
> 
> Or if you can really cancel requests, it would be the best option, of
> course.
> 

Sheepdog cannot cancel the requests which are already sent to the
servers.  So, as you say, we pretend to cancel the requests without
waiting for completion of them.  However, are there any situation
where pretending to cancel causes problems in practice?

To wait for completion of the requests here, we may need to create
another thread for processing I/O like posix-aio-compat.c.


> > +
> > +static int do_send_recv(int sockfd, struct iovec *iov, int len, int offset,
> > +			int write)
> 
> I've spent at least 15 minutes figuring out what this function does. I
> think I've got it now more or less, but I've come to the conclusion that
> this code needs more comments.
> 
> I'd suggest to add a header comment to all non-trivial functions and
> maybe somewhere on the top a general description of how things work.
> 
> As far as I understood now, there are basically two parts of request
> handling:
> 
> 1. The request is sent to the server. Its AIOCB is saved in a list in
> the BDRVSheepdogState. It doesn't pass a callback or anything for the
> completion.
> 
> 2. aio_read_response is registered as a fd handler to the sheepdog
> connection. When the server responds, it searches the right AIOCB in the
> list and the second part of request handling starts.
> 
> do_send_recv is the function that is used to do all communication with
> the server. The iov stuff looks like it's only used for some data, but
> seems this is not true - it's also used for the metadata of the protocol.
> 
> Did I understand it right so far?
> 

Yes, exactly.  I'll add comments to make codes more readable.


> > +{
> > +	struct msghdr msg;
> > +	int ret, diff;
> > +
> > +	memset(&msg, 0, sizeof(msg));
> > +	msg.msg_iov = iov;
> > +	msg.msg_iovlen = 1;
> > +
> > +	len += offset;
> > +
> > +	while (iov->iov_len < len) {
> > +		len -= iov->iov_len;
> > +
> > +		iov++;
> > +		msg.msg_iovlen++;
> > +	}
> 
> You're counting the number of elements in the iov here. qemu_iovec would
> already have these (and also len), wouldn't it make sense to use it as
> the abstraction? Though I'm not sure where these iovecs come from, so
> the answer might be no.
> 

We uses struct msghdr for sendmsg/recvmsg here, so using iovec
directly looks simpler.


> > +
> > +	diff = iov->iov_len - len;
> > +	iov->iov_len -= diff;
> > +
> > +	while (msg.msg_iov->iov_len <= offset) {
> > +		offset -= msg.msg_iov->iov_len;
> > +
> > +		msg.msg_iov++;
> > +		msg.msg_iovlen--;
> > +	}
> > +
> > +	msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base + offset;
> > +	msg.msg_iov->iov_len -= offset;
> > +
> > +	if (write) {
> > +		ret = sendmsg(sockfd, &msg, 0);
> > +	} else {
> > +		ret = recvmsg(sockfd, &msg, MSG_WAITALL);
> > +	}
> > +
> > +	msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base - offset;
> > +	msg.msg_iov->iov_len += offset;
> > +
> > +	iov->iov_len += diff;
> > +	return ret;
> > +}
> > +
> > +static int connect_to_sdog(const char *addr)
> > +{
> > +	char buf[64];
> > +	char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
> > +	char name[256], *p;
> > +	int fd, ret;
> > +	struct addrinfo hints, *res, *res0;
> > +	int port = 0;
> > +
> > +	if (!addr) {
> > +		addr = SD_DEFAULT_ADDR;
> > +	}
> > +
> > +	strcpy(name, addr);
> 
> This smells like buffer overflows. In practice it's s->addr for all
> callers and I think this values comes indirectly from filename in
> sd_open - for which I didn't find a length check, so it could overflow
> indeed.
> 

Yes, this would causes overflows.  I'll fix it for the next post.


> > +
> > +static void aio_read_response(void *opaque)
> > +{
> > +	SheepdogObjReq hdr;
> > +	SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
> 
> Why do you declare an otherwise unused variable hdr, take a pointer to
> it and cast it to an incompatible type? This is scary.
> 

We don't need hdr, so we should remove it.


> > +	BDRVSheepdogState *s = (BDRVSheepdogState *)opaque;
> > +	int fd = s->fd;
> > +	int ret;
> > +	AIOReq *aio_req = NULL;
> > +	SheepdogAIOCB *acb;
> > +	int rest;
> > +	unsigned long idx;
> > +
> > +	if (QLIST_EMPTY(&s->outstanding_aio_head)) {
> > +		return;
> > +	}
> > +
> > +	ret = do_read(fd, (void *)rsp, sizeof(*rsp));
> 
> This cast looks scary, too. But do_read wants a void* anyway, so it's
> not even necessary. Please drop it.
> 

OK. I'll do it.


> > +	if (ret) {
> > +		error_report("failed to get the header, %m\n");
> > +		return;
> > +	}
> > +
> > +	QLIST_FOREACH(aio_req, &s->outstanding_aio_head, outstanding_aio_siblings) {
> > +		if (aio_req->id == rsp->id) {
> > +			break;
> > +		}
> > +	}
> > +	if (!aio_req) {
> > +		error_report("cannot find aio_req %x\n", rsp->id);
> > +		return;
> > +	}
> > +
> > +	acb = aio_req->aiocb;
> > +
> > +	switch (acb->aiocb_type) {
> > +	case AIOCB_WRITE_UDATA:
> > +		if (!is_data_obj(aio_req->oid)) {
> > +			break;
> > +		}
> > +		idx = data_oid_to_idx(aio_req->oid);
> > +
> > +		if (s->inode.data_vdi_id[idx] != s->inode.vdi_id) {
> > +			s->inode.data_vdi_id[idx] = s->inode.vdi_id;
> > +			s->max_dirty_data_idx = max_t(uint32_t, idx,
> > +						      s->max_dirty_data_idx);
> > +			s->min_dirty_data_idx = min_t(uint32_t, idx,
> > +						      s->min_dirty_data_idx);
> > +
> > +			send_pending_req(s, vid_to_data_oid(s->inode.vdi_id, idx),
> > +					 rsp->id);
> > +		}
> > +		break;
> > +	case AIOCB_READ_UDATA:
> > +		ret = do_readv(fd, acb->qiov->iov, rsp->data_length,
> > +			       aio_req->iov_offset);
> > +		if (ret) {
> > +			error_report("failed to get the data, %m\n");
> > +			return;
> > +		}
> > +		break;
> > +	}
> > +
> > +	if (rsp->result != SD_RES_SUCCESS) {
> > +		acb->ret = -EIO;
> > +		error_report("%s\n", sd_strerror(rsp->result));
> > +	}
> > +
> > +	rest = free_aio_req(s, aio_req);
> > +	if (!rest) {
> > +		acb->aio_done_func(acb);
> > +	}
> > +}
> > +
> > +static int aio_flush_request(void *opaque)
> > +{
> > +	BDRVSheepdogState *s = (BDRVSheepdogState *)opaque;
> 
> Unnecessary cast.
> 

I'll drop it.

> > +
> > +	return !QLIST_EMPTY(&s->outstanding_aio_head);
> > +}
> > +
> > +static int set_nonblocking(int fd)
> > +{
> > +	int ret;
> > +
> > +	ret = fcntl(fd, F_GETFL);
> > +	if (ret < 0) {
> > +		error_report("can't fcntl (F_GETFL), %m\n");
> > +		close(fd);
> > +	} else {
> > +		ret = fcntl(fd, F_SETFL, ret | O_NONBLOCK);
> > +		if (ret < 0) {
> > +			error_report("can't fcntl (O_NONBLOCK), %m\n");
> > +		}
> > +	}
> > +
> > +	return ret;
> > +}
> > +
> > +static int set_nodelay(int fd)
> > +{
> > +	int ret, opt;
> > +
> > +	opt = 1;
> > +	ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
> > +	return ret;
> > +}
> > +
> > +/*
> > + * Return a socket discriptor to read/write objects.
> > + * We cannot use this discriptor for other operations because
> > + * the block driver may be on waiting response from the server.
> > + */
> > +static int get_sheep_fd(BDRVSheepdogState *s)
> > +{
> > +	int ret, fd;
> > +
> > +	fd = connect_to_sdog(s->addr);
> > +	if (fd < 0) {
> > +		error_report("%m\n");
> 
> %m is Linux specific, as far as I know.
> 

Yes, we'll use strerror(errno) instead of it.


> > +
> > +static int sd_create(const char *filename, QEMUOptionParameter *options)
> > +{
> > +	int ret;
> > +	uint32_t vid = 0;
> > +	int64_t total_sectors = 0;
> > +	char *backing_file = NULL;
> > +
> > +	strstart(filename, "sheepdog:", (const char **)&filename);
> > +
> > +	while (options && options->name) {
> > +		if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
> > +			total_sectors = options->value.n / 512;
> > +		} else if (!strcmp(options->name, BLOCK_OPT_BACKING_FILE)) {
> > +			backing_file = options->value.s;
> > +		}
> > +		options++;
> > +	}
> > +
> > +	if (backing_file) {
> > +		BlockDriverState bs;
> > +		char vdi[SD_MAX_VDI_LEN];
> > +		uint32_t snapid;
> > +
> > +		strstart(backing_file, "sheepdog:", (const char **)&backing_file);
> > +		memset(&bs, 0, sizeof(bs));
> > +
> > +		bs.opaque = qemu_malloc(sizeof(BDRVSheepdogState));
> > +
> > +		ret = sd_open(&bs, backing_file, 0);
> > +		if (ret < 0) {
> > +			return -1;
> > +		}
> > +
> > +		if (parse_vdiname(bs.opaque, backing_file, vdi, sizeof(vdi), &snapid) < 0) {
> > +			return -1;
> > +		}
> > +
> > +		/* cannot clone from a current inode */
> > +		if (snapid == CURRENT_VDI_ID) {
> > +			return -1;
> > +		}
> > +
> > +		ret = find_vdi_name(bs.opaque, vdi, snapid, &vid, 1);
> > +		if (ret) {
> > +			return -1;
> > +		}
> 
> Is it really necessary to do Sheepdog specific stuff here? With other
> formats I can use any format I like for a backing file. VMDK did assume
> that backing files could only be VMDK initially, but it was considered a
> bug.
> 
> If Sheepdog can't handle other image formats (e.g. because it can't
> communicate that a request touches unallocated space), you need to check
> this at least an return an error if something else is used.
> 

Sheepdog cannot handle other formats because a copy-on-write of
Sheepdog assumes that the base image is also Sheepdog one.  So we'll
check the format here to avoid other formats.

> 
> One more thing, I'll ask the same as for Ceph here: Do you have any
> specific tests for Sheepdog that could be used or should we try to use
> generic qemu-iotests cases?
> 

No, we don't have such tests, and it is great if qemu-iotests could
support protocol tests.


Thanks,

Kazutaka
Kevin Wolf - June 4, 2010, 11:04 a.m.
Am 03.06.2010 18:23, schrieb MORITA Kazutaka:
>>> +static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
>>> +{
>>> +	SheepdogAIOCB *acb = (SheepdogAIOCB *)blockacb;
>>> +
>>> +	acb->canceled = 1;
>>> +}
>>
>> Does this provide the right semantics? You haven't really cancelled the
>> request, but you pretend to. So you actually complete the request in the
>> background and then throw the return code away.
>>
>> I seem to remember that posix-aio-compat.c waits at this point for
>> completion of the requests, calls the callbacks and only afterwards
>> returns from aio_cancel when no more requests are in flight.
>>
>> Or if you can really cancel requests, it would be the best option, of
>> course.
>>
> 
> Sheepdog cannot cancel the requests which are already sent to the
> servers.  So, as you say, we pretend to cancel the requests without
> waiting for completion of them.  However, are there any situation
> where pretending to cancel causes problems in practice?

I'm not sure how often it would happen in practice, but if the guest OS
thinks the old value is on disk when in fact the new one is, this could
lead to corruption. I think if it can happen, even without evidence that
it actually does, it's already relevant enough.

> To wait for completion of the requests here, we may need to create
> another thread for processing I/O like posix-aio-compat.c.

I don't think you need a thread to get the same behaviour, you just need
to call the fd handlers like in the main loop. It would probably be the
first driver doing this, though, and it's not an often used code path,
so it might be a bad idea.

Maybe it's reasonable to just complete the request with -EIO? This way
the guest couldn't make any assumption about the data written. On the
other hand, it could be unhappy about failed requests, but that's
probably better than corruption.

Kevin
MORITA Kazutaka - June 6, 2010, 5:05 p.m.
At Fri, 04 Jun 2010 13:04:00 +0200,
Kevin Wolf wrote:
> 
> Am 03.06.2010 18:23, schrieb MORITA Kazutaka:
> >>> +static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
> >>> +{
> >>> +	SheepdogAIOCB *acb = (SheepdogAIOCB *)blockacb;
> >>> +
> >>> +	acb->canceled = 1;
> >>> +}
> >>
> >> Does this provide the right semantics? You haven't really cancelled the
> >> request, but you pretend to. So you actually complete the request in the
> >> background and then throw the return code away.
> >>
> >> I seem to remember that posix-aio-compat.c waits at this point for
> >> completion of the requests, calls the callbacks and only afterwards
> >> returns from aio_cancel when no more requests are in flight.
> >>
> >> Or if you can really cancel requests, it would be the best option, of
> >> course.
> >>
> > 
> > Sheepdog cannot cancel the requests which are already sent to the
> > servers.  So, as you say, we pretend to cancel the requests without
> > waiting for completion of them.  However, are there any situation
> > where pretending to cancel causes problems in practice?
> 
> I'm not sure how often it would happen in practice, but if the guest OS
> thinks the old value is on disk when in fact the new one is, this could
> lead to corruption. I think if it can happen, even without evidence that
> it actually does, it's already relevant enough.
> 

I agree.

> > To wait for completion of the requests here, we may need to create
> > another thread for processing I/O like posix-aio-compat.c.
> 
> I don't think you need a thread to get the same behaviour, you just need
> to call the fd handlers like in the main loop. It would probably be the
> first driver doing this, though, and it's not an often used code path,
> so it might be a bad idea.
> 
> Maybe it's reasonable to just complete the request with -EIO? This way
> the guest couldn't make any assumption about the data written. On the
> other hand, it could be unhappy about failed requests, but that's
> probably better than corruption.
> 

Completing with -EIO looks good to me.  Thanks for the advice.
I'll send an updated patch tomorrow.

Regards,

Kazutaka

Patch

diff --git a/Makefile.objs b/Makefile.objs
index 1a942e5..527a754 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -14,7 +14,7 @@  block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
 
 block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vvfat.o
 block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o
-block-nested-y += parallels.o nbd.o blkdebug.o
+block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o
 block-nested-$(CONFIG_WIN32) += raw-win32.o
 block-nested-$(CONFIG_POSIX) += raw-posix.o
 block-nested-$(CONFIG_CURL) += curl.o
diff --git a/block/sheepdog.c b/block/sheepdog.c
new file mode 100644
index 0000000..68545e8
--- /dev/null
+++ b/block/sheepdog.c
@@ -0,0 +1,1835 @@ 
+/*
+ * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <netdb.h>
+#include <netinet/tcp.h>
+
+#include "qemu-common.h"
+#include "qemu-error.h"
+#include "block_int.h"
+
+#define SD_PROTO_VER 0x01
+
+#define SD_DEFAULT_ADDR "localhost:7000"
+
+#define SD_OP_CREATE_AND_WRITE_OBJ  0x01
+#define SD_OP_READ_OBJ       0x02
+#define SD_OP_WRITE_OBJ      0x03
+
+#define SD_OP_NEW_VDI        0x11
+#define SD_OP_LOCK_VDI       0x12
+#define SD_OP_RELEASE_VDI    0x13
+#define SD_OP_GET_VDI_INFO   0x14
+#define SD_OP_READ_VDIS      0x15
+
+#define SD_FLAG_CMD_WRITE    0x01
+#define SD_FLAG_CMD_COW      0x02
+
+#define SD_RES_SUCCESS       0x00 /* Success */
+#define SD_RES_UNKNOWN       0x01 /* Unknown error */
+#define SD_RES_NO_OBJ        0x02 /* No object found */
+#define SD_RES_EIO           0x03 /* I/O error */
+#define SD_RES_VDI_EXIST     0x04 /* Vdi exists already */
+#define SD_RES_INVALID_PARMS 0x05 /* Invalid parameters */
+#define SD_RES_SYSTEM_ERROR  0x06 /* System error */
+#define SD_RES_VDI_LOCKED    0x07 /* Vdi is locked */
+#define SD_RES_NO_VDI        0x08 /* No vdi found */
+#define SD_RES_NO_BASE_VDI   0x09 /* No base vdi found */
+#define SD_RES_VDI_READ      0x0A /* Cannot read requested vdi */
+#define SD_RES_VDI_WRITE     0x0B /* Cannot write requested vdi */
+#define SD_RES_BASE_VDI_READ 0x0C /* Cannot read base vdi */
+#define SD_RES_BASE_VDI_WRITE   0x0D /* Cannot write base vdi */
+#define SD_RES_NO_TAG        0x0E /* Requested tag is not found */
+#define SD_RES_STARTUP       0x0F /* Sheepdog is on starting up */
+#define SD_RES_VDI_NOT_LOCKED   0x10 /* Vdi is not locked */
+#define SD_RES_SHUTDOWN      0x11 /* Sheepdog is shutting down */
+#define SD_RES_NO_MEM        0x12 /* Cannot allocate memory */
+#define SD_RES_FULL_VDI      0x13 /* we already have the maximum vdis */
+#define SD_RES_VER_MISMATCH  0x14 /* Protocol version mismatch */
+#define SD_RES_NO_SPACE      0x15 /* Server has no room for new objects */
+#define SD_RES_WAIT_FOR_FORMAT  0x16 /* Sheepdog is waiting for a format operation */
+#define SD_RES_WAIT_FOR_JOIN    0x17 /* Sheepdog is waiting for other nodes joining */
+#define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
+
+/*
+ * Object ID rules
+ *
+ *  0 - 19 (20 bits): data object space
+ * 20 - 31 (12 bits): reserved data object space
+ * 32 - 55 (24 bits): vdi object space
+ * 56 - 59 ( 4 bits): reserved vdi object space
+ * 60 - 63 ( 4 bits): object type indentifier space
+ */
+
+#define VDI_SPACE_SHIFT   32
+#define VDI_BIT (UINT64_C(1) << 63)
+#define VMSTATE_BIT (UINT64_C(1) << 62)
+#define MAX_DATA_OBJS (1ULL << 20)
+#define MAX_CHILDREN 1024
+#define SD_MAX_VDI_LEN 256
+#define SD_NR_VDIS   (1U << 24)
+#define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
+
+#define SD_INODE_SIZE (sizeof(SheepdogInode))
+#define CURRENT_VDI_ID 0
+
+typedef struct SheepdogReq {
+	uint8_t		proto_ver;
+	uint8_t		opcode;
+	uint16_t	flags;
+	uint32_t	epoch;
+	uint32_t        id;
+	uint32_t        data_length;
+	uint32_t	opcode_specific[8];
+} SheepdogReq;
+
+typedef struct SheepdogRsp {
+	uint8_t		proto_ver;
+	uint8_t		opcode;
+	uint16_t	flags;
+	uint32_t	epoch;
+	uint32_t        id;
+	uint32_t        data_length;
+	uint32_t        result;
+	uint32_t	opcode_specific[7];
+} SheepdogRsp;
+
+typedef struct SheepdogObjReq {
+	uint8_t		proto_ver;
+	uint8_t		opcode;
+	uint16_t	flags;
+	uint32_t	epoch;
+	uint32_t        id;
+	uint32_t        data_length;
+	uint64_t        oid;
+	uint64_t        cow_oid;
+	uint32_t        copies;
+	uint32_t        rsvd;
+	uint64_t        offset;
+} SheepdogObjReq;
+
+typedef struct SheepdogObjRsp {
+	uint8_t		proto_ver;
+	uint8_t		opcode;
+	uint16_t	flags;
+	uint32_t	epoch;
+	uint32_t        id;
+	uint32_t        data_length;
+	uint32_t        result;
+	uint32_t        copies;
+	uint32_t        pad[6];
+} SheepdogObjRsp;
+
+typedef struct SheepdogVdiReq {
+	uint8_t		proto_ver;
+	uint8_t		opcode;
+	uint16_t	flags;
+	uint32_t	epoch;
+	uint32_t        id;
+	uint32_t        data_length;
+	uint64_t	vdi_size;
+	uint32_t        base_vdi_id;
+	uint32_t        copies;
+	uint32_t        snapid;
+	uint32_t        pad[3];
+} SheepdogVdiReq;
+
+typedef struct SheepdogVdiRsp {
+	uint8_t		proto_ver;
+	uint8_t		opcode;
+	uint16_t	flags;
+	uint32_t	epoch;
+	uint32_t        id;
+	uint32_t        data_length;
+	uint32_t        result;
+	uint32_t        rsvd;
+	uint32_t        vdi_id;
+	uint32_t        pad[5];
+} SheepdogVdiRsp;
+
+typedef struct SheepdogInode {
+	char name[SD_MAX_VDI_LEN];
+	uint64_t ctime;
+	uint64_t snap_ctime;
+	uint64_t vm_clock_nsec;
+	uint64_t vdi_size;
+	uint64_t vm_state_size;
+	uint16_t copy_policy;
+	uint8_t  nr_copies;
+	uint8_t  block_size_shift;
+	uint32_t snap_id;
+	uint32_t vdi_id;
+	uint32_t parent_vdi_id;
+	uint32_t child_vdi_id[MAX_CHILDREN];
+	uint32_t data_vdi_id[MAX_DATA_OBJS];
+} SheepdogInode;
+
+/*
+ * 64 bit FNV-1a non-zero initial basis
+ */
+#define FNV1A_64_INIT ((uint64_t)0xcbf29ce484222325ULL)
+
+/*
+ * 64 bit Fowler/Noll/Vo FNV-1a hash code
+ */
+static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
+{
+        unsigned char *bp = (unsigned char *) buf;
+        unsigned char *be = bp + len;
+        while (bp < be) {
+                hval ^= (uint64_t) *bp++;
+                hval += (hval << 1) + (hval << 4) + (hval << 5) +
+                        (hval << 7) + (hval << 8) + (hval << 40);
+        }
+        return hval;
+}
+
+static inline int is_data_obj_writeable(SheepdogInode *inode, unsigned int idx)
+{
+	return inode->vdi_id == inode->data_vdi_id[idx];
+}
+
+static inline int is_data_obj(uint64_t oid)
+{
+	return !(VDI_BIT & oid);
+}
+
+static inline uint64_t data_oid_to_idx(uint64_t oid)
+{
+	return oid & (MAX_DATA_OBJS - 1);
+}
+
+static inline uint64_t vid_to_vdi_oid(uint32_t vid)
+{
+	return VDI_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT);
+}
+
+static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx)
+{
+	return VMSTATE_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
+}
+
+static inline uint64_t vid_to_data_oid(uint32_t vid, uint32_t idx)
+{
+	return ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
+}
+
+#undef dprintf
+#ifdef DEBUG_SDOG
+#define dprintf(fmt, args...)						\
+do {									\
+	fprintf(stdout, "%s %d: " fmt, __func__, __LINE__, ##args);	\
+} while (0)
+#else
+#define dprintf(fmt, args...)
+#endif
+
+#define min_t(type, x, y) ({			\
+	type __min1 = (x);			\
+	type __min2 = (y);			\
+	__min1 < __min2 ? __min1: __min2; })
+
+#define max_t(type, x, y) ({			\
+	type __max1 = (x);			\
+	type __max2 = (y);			\
+	__max1 > __max2 ? __max1: __max2; })
+
+typedef struct SheepdogAIOCB SheepdogAIOCB;
+
+typedef struct AIOReq {
+	SheepdogAIOCB *aiocb;
+	unsigned int iov_offset;
+
+	uint64_t oid;
+	uint64_t base_oid;
+	uint64_t offset;
+	unsigned int data_len;
+	uint8_t flags;
+	uint32_t id;
+
+	QLIST_ENTRY(AIOReq) outstanding_aio_siblings;
+	QLIST_ENTRY(AIOReq) aioreq_siblings;
+} AIOReq;
+
+enum AIOCBState {
+	AIOCB_WRITE_UDATA,
+	AIOCB_READ_UDATA,
+};
+
+struct SheepdogAIOCB {
+	BlockDriverAIOCB common;
+
+	QEMUIOVector *qiov;
+
+	int64_t sector_num;
+	int nb_sectors;
+
+	int ret;
+	enum AIOCBState aiocb_type;
+
+	QEMUBH *bh;
+	void (*aio_done_func)(SheepdogAIOCB *);
+
+	int canceled;
+
+	QLIST_HEAD(aioreq_head, AIOReq) aioreq_head;
+};
+
+typedef struct BDRVSheepdogState {
+	SheepdogInode inode;
+
+	uint32_t min_dirty_data_idx;
+	uint32_t max_dirty_data_idx;
+
+	char name[SD_MAX_VDI_LEN];
+	int is_current;
+
+	char *addr;
+	int fd;
+
+	uint32_t aioreq_seq_num;
+	QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head;
+} BDRVSheepdogState;
+
+static const char * sd_strerror(int err)
+{
+	int i;
+
+	static const struct {
+		int err;
+		const char *desc;
+	} errors[] = {
+		{SD_RES_SUCCESS, "Success"},
+		{SD_RES_UNKNOWN, "Unknown error"},
+		{SD_RES_NO_OBJ, "No object found"},
+		{SD_RES_EIO, "I/O error"},
+		{SD_RES_VDI_EXIST, "VDI exists already"},
+		{SD_RES_INVALID_PARMS, "Invalid parameters"},
+		{SD_RES_SYSTEM_ERROR, "System error"},
+		{SD_RES_VDI_LOCKED, "VDI is already locked"},
+		{SD_RES_NO_VDI, "No vdi found"},
+		{SD_RES_NO_BASE_VDI, "No base VDI found"},
+		{SD_RES_VDI_READ, "Failed read the requested VDI"},
+		{SD_RES_VDI_WRITE, "Failed to write the requested VDI"},
+		{SD_RES_BASE_VDI_READ, "Failed to read the base VDI"},
+		{SD_RES_BASE_VDI_WRITE, "Failed to write the base VDI"},
+		{SD_RES_NO_TAG, "Failed to find the requested tag"},
+		{SD_RES_STARTUP, "The system is still booting"},
+		{SD_RES_VDI_NOT_LOCKED, "VDI isn't locked"},
+		{SD_RES_SHUTDOWN, "The system is shutting down"},
+		{SD_RES_NO_MEM, "Out of memory on the server"},
+		{SD_RES_FULL_VDI, "We already have the maximum vdis"},
+		{SD_RES_VER_MISMATCH, "Protocol version mismatch"},
+		{SD_RES_NO_SPACE, "Server has no space for new objects"},
+		{SD_RES_WAIT_FOR_FORMAT, "Sheepdog is waiting for a format operation"},
+		{SD_RES_WAIT_FOR_JOIN, "Sheepdog is waiting for other nodes joining"},
+		{SD_RES_JOIN_FAILED, "Target node had failed to join sheepdog"},
+	};
+
+	for (i = 0; i < ARRAY_SIZE(errors); ++i) {
+		if (errors[i].err == err) {
+			return errors[i].desc;
+		}
+	}
+
+	return "Invalid error code";
+}
+
+static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s,
+				    SheepdogAIOCB *acb,
+				    uint64_t oid, unsigned int data_len,
+				    uint64_t offset, uint8_t flags,
+				    uint64_t base_oid,
+				    unsigned int iov_offset)
+{
+	AIOReq *aio_req;
+
+	aio_req = qemu_malloc(sizeof(*aio_req));
+	aio_req->aiocb = acb;
+	aio_req->iov_offset = iov_offset;
+	aio_req->oid = oid;
+	aio_req->base_oid = base_oid;
+	aio_req->offset = offset;
+	aio_req->data_len = data_len;
+	aio_req->flags = flags;
+	aio_req->id = s->aioreq_seq_num++;
+
+	QLIST_INSERT_HEAD(&s->outstanding_aio_head, aio_req,
+			  outstanding_aio_siblings);
+	QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings);
+
+	return aio_req;
+}
+
+static inline int free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
+{
+	SheepdogAIOCB *acb = aio_req->aiocb;
+	QLIST_REMOVE(aio_req, outstanding_aio_siblings);
+	QLIST_REMOVE(aio_req, aioreq_siblings);
+	qemu_free(aio_req);
+
+	return !QLIST_EMPTY(&acb->aioreq_head);
+}
+
+static void sd_finish_aiocb(SheepdogAIOCB *acb)
+{
+	if (!acb->canceled) {
+		acb->common.cb(acb->common.opaque, acb->ret);
+	}
+	qemu_aio_release(acb);
+}
+
+static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+	SheepdogAIOCB *acb = (SheepdogAIOCB *)blockacb;
+
+	acb->canceled = 1;
+}
+
+static AIOPool sd_aio_pool = {
+	.aiocb_size = sizeof(SheepdogAIOCB),
+	.cancel = sd_aio_cancel,
+};
+
+static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
+				   int64_t sector_num, int nb_sectors,
+				   BlockDriverCompletionFunc *cb,
+				   void *opaque)
+{
+	SheepdogAIOCB *acb;
+
+	acb = qemu_aio_get(&sd_aio_pool, bs, cb, opaque);
+
+	acb->qiov = qiov;
+
+	acb->sector_num = sector_num;
+	acb->nb_sectors = nb_sectors;
+
+	acb->aio_done_func = NULL;
+	acb->canceled = 0;
+	acb->bh = NULL;
+	acb->ret = 0;
+	QLIST_INIT(&acb->aioreq_head);
+	return acb;
+}
+
+static int sd_schedule_bh(QEMUBHFunc *cb, SheepdogAIOCB *acb)
+{
+	if (acb->bh) {
+		error_report("bug: %d %d\n", acb->aiocb_type, acb->aiocb_type);
+		return -EIO;
+	}
+
+	acb->bh = qemu_bh_new(cb, acb);
+	if (!acb->bh) {
+		error_report("oom: %d %d\n", acb->aiocb_type, acb->aiocb_type);
+		return -EIO;
+	}
+
+	qemu_bh_schedule(acb->bh);
+
+	return 0;
+}
+
+static int do_send_recv(int sockfd, struct iovec *iov, int len, int offset,
+			int write)
+{
+	struct msghdr msg;
+	int ret, diff;
+
+	memset(&msg, 0, sizeof(msg));
+	msg.msg_iov = iov;
+	msg.msg_iovlen = 1;
+
+	len += offset;
+
+	while (iov->iov_len < len) {
+		len -= iov->iov_len;
+
+		iov++;
+		msg.msg_iovlen++;
+	}
+
+	diff = iov->iov_len - len;
+	iov->iov_len -= diff;
+
+	while (msg.msg_iov->iov_len <= offset) {
+		offset -= msg.msg_iov->iov_len;
+
+		msg.msg_iov++;
+		msg.msg_iovlen--;
+	}
+
+	msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base + offset;
+	msg.msg_iov->iov_len -= offset;
+
+	if (write) {
+		ret = sendmsg(sockfd, &msg, 0);
+	} else {
+		ret = recvmsg(sockfd, &msg, MSG_WAITALL);
+	}
+
+	msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base - offset;
+	msg.msg_iov->iov_len += offset;
+
+	iov->iov_len += diff;
+	return ret;
+}
+
+static int connect_to_sdog(const char *addr)
+{
+	char buf[64];
+	char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
+	char name[256], *p;
+	int fd, ret;
+	struct addrinfo hints, *res, *res0;
+	int port = 0;
+
+	if (!addr) {
+		addr = SD_DEFAULT_ADDR;
+	}
+
+	strcpy(name, addr);
+
+	p = name;
+	while (*p) {
+		if (*p == ':') {
+			*p++ = '\0';
+			break;
+		} else {
+			p++;
+		}
+	}
+
+	if (*p == '\0') {
+		error_report("cannot find a port number, %s\n", name);
+		return -1;
+	}
+	port = strtol(p, NULL, 10);
+	if (port == 0) {
+		error_report("invalid port number, %s\n", p);
+		return -1;
+	}
+
+	memset(&hints, 0, sizeof(hints));
+	snprintf(buf, sizeof(buf), "%d", port);
+
+	hints.ai_socktype = SOCK_STREAM;
+
+	ret = getaddrinfo(name, buf, &hints, &res0);
+	if (ret) {
+		error_report("unable to get address info %s, %m\n", name);
+		return -1;
+	}
+
+	for (res = res0; res; res = res->ai_next) {
+		ret = getnameinfo(res->ai_addr, res->ai_addrlen,
+				  hbuf, sizeof(hbuf), sbuf, sizeof(sbuf),
+				  NI_NUMERICHOST | NI_NUMERICSERV);
+		if (ret) {
+			continue;
+		}
+
+		fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+		if (fd < 0) {
+			continue;
+		}
+
+reconnect:
+		ret = connect(fd, res->ai_addr, res->ai_addrlen);
+		if (ret < 0) {
+			if (errno == EINTR) {
+				goto reconnect;
+			}
+			break;
+		}
+
+		dprintf("connected to %s:%d\n", name, port);
+		goto success;
+	}
+	fd = -1;
+	error_report("failed connect to %s:%d\n", name, port);
+success:
+	freeaddrinfo(res0);
+	return fd;
+}
+
+static int do_readv_writev(int sockfd, struct iovec *iov, int len,
+			   int iov_offset, int write)
+{
+	int ret;
+again:
+	ret = do_send_recv(sockfd, iov, len, iov_offset, write);
+	if (ret < 0) {
+		if (errno == EINTR || errno == EAGAIN) {
+			goto again;
+		}
+		error_report("failed to recv a rsp, %m\n");
+		return 1;
+	}
+
+	iov_offset += ret;
+	len -= ret;
+	if (len) {
+		goto again;
+	}
+
+	return 0;
+}
+
+static int do_readv(int sockfd, struct iovec *iov, int len, int iov_offset)
+{
+	return do_readv_writev(sockfd, iov, len, iov_offset, 0);
+}
+
+static int do_writev(int sockfd, struct iovec *iov, int len, int iov_offset)
+{
+	return do_readv_writev(sockfd, iov, len, iov_offset, 1);
+}
+
+static int do_read_write(int sockfd, void *buf, int len, int write)
+{
+	struct iovec iov;
+
+	iov.iov_base = buf;
+	iov.iov_len = len;
+
+	return do_readv_writev(sockfd, &iov, len, 0, write);
+}
+
+static int do_read(int sockfd, void *buf, int len)
+{
+	return do_read_write(sockfd, buf, len, 0);
+}
+
+static int do_write(int sockfd, void *buf, int len)
+{
+	return do_read_write(sockfd, buf, len, 1);
+}
+
+static int send_req(int sockfd, SheepdogReq *hdr, void *data,
+		    unsigned int *wlen)
+{
+	int ret;
+	struct iovec iov[2];
+
+	iov[0].iov_base = hdr;
+	iov[0].iov_len = sizeof(*hdr);
+
+	if (*wlen) {
+		iov[1].iov_base = data;
+		iov[1].iov_len = *wlen;
+	}
+
+	ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen, 0);
+	if (ret) {
+		error_report("failed to send a req, %m\n");
+		ret = -1;
+	}
+
+	return ret;
+}
+
+static int do_req(int sockfd, SheepdogReq *hdr, void *data,
+		  unsigned int *wlen, unsigned int *rlen)
+{
+	int ret;
+
+	ret = send_req(sockfd, hdr, data, wlen);
+	if (ret) {
+		ret = -1;
+		goto out;
+	}
+
+	ret = do_read(sockfd, hdr, sizeof(*hdr));
+	if (ret) {
+		error_report("failed to get a rsp, %m\n");
+		ret = -1;
+		goto out;
+	}
+
+	if (*rlen > hdr->data_length) {
+		*rlen = hdr->data_length;
+	}
+
+	if (*rlen) {
+		ret = do_read(sockfd, data, *rlen);
+		if (ret) {
+			error_report("failed to get the data, %m\n");
+			ret = -1;
+			goto out;
+		}
+	}
+	ret = 0;
+out:
+	return ret;
+}
+
+static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
+			   struct iovec *iov, int niov, int create,
+			   enum AIOCBState aiocb_type);
+
+static void send_pending_req(BDRVSheepdogState *s, uint64_t oid, uint32_t id)
+{
+	AIOReq *aio_req, *next;
+	SheepdogAIOCB *acb;
+	int ret;
+
+	QLIST_FOREACH_SAFE(aio_req, &s->outstanding_aio_head,
+			   outstanding_aio_siblings, next) {
+		if (id == aio_req->id) {
+			continue;
+		}
+		if (aio_req->oid != oid) {
+			continue;
+		}
+
+		acb = aio_req->aiocb;
+		ret = add_aio_request(s, aio_req, acb->qiov->iov,
+				      acb->qiov->niov, 0, acb->aiocb_type);
+		if (ret < 0) {
+			error_report("add_aio_request is faled\n");
+			free_aio_req(s, aio_req);
+			if (QLIST_EMPTY(&acb->aioreq_head)) {
+				sd_finish_aiocb(acb);
+			}
+		}
+	}
+}
+
+static void aio_read_response(void *opaque)
+{
+	SheepdogObjReq hdr;
+	SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
+	BDRVSheepdogState *s = (BDRVSheepdogState *)opaque;
+	int fd = s->fd;
+	int ret;
+	AIOReq *aio_req = NULL;
+	SheepdogAIOCB *acb;
+	int rest;
+	unsigned long idx;
+
+	if (QLIST_EMPTY(&s->outstanding_aio_head)) {
+		return;
+	}
+
+	ret = do_read(fd, (void *)rsp, sizeof(*rsp));
+	if (ret) {
+		error_report("failed to get the header, %m\n");
+		return;
+	}
+
+	QLIST_FOREACH(aio_req, &s->outstanding_aio_head, outstanding_aio_siblings) {
+		if (aio_req->id == rsp->id) {
+			break;
+		}
+	}
+	if (!aio_req) {
+		error_report("cannot find aio_req %x\n", rsp->id);
+		return;
+	}
+
+	acb = aio_req->aiocb;
+
+	switch (acb->aiocb_type) {
+	case AIOCB_WRITE_UDATA:
+		if (!is_data_obj(aio_req->oid)) {
+			break;
+		}
+		idx = data_oid_to_idx(aio_req->oid);
+
+		if (s->inode.data_vdi_id[idx] != s->inode.vdi_id) {
+			s->inode.data_vdi_id[idx] = s->inode.vdi_id;
+			s->max_dirty_data_idx = max_t(uint32_t, idx,
+						      s->max_dirty_data_idx);
+			s->min_dirty_data_idx = min_t(uint32_t, idx,
+						      s->min_dirty_data_idx);
+
+			send_pending_req(s, vid_to_data_oid(s->inode.vdi_id, idx),
+					 rsp->id);
+		}
+		break;
+	case AIOCB_READ_UDATA:
+		ret = do_readv(fd, acb->qiov->iov, rsp->data_length,
+			       aio_req->iov_offset);
+		if (ret) {
+			error_report("failed to get the data, %m\n");
+			return;
+		}
+		break;
+	}
+
+	if (rsp->result != SD_RES_SUCCESS) {
+		acb->ret = -EIO;
+		error_report("%s\n", sd_strerror(rsp->result));
+	}
+
+	rest = free_aio_req(s, aio_req);
+	if (!rest) {
+		acb->aio_done_func(acb);
+	}
+}
+
+static int aio_flush_request(void *opaque)
+{
+	BDRVSheepdogState *s = (BDRVSheepdogState *)opaque;
+
+	return !QLIST_EMPTY(&s->outstanding_aio_head);
+}
+
+static int set_nonblocking(int fd)
+{
+	int ret;
+
+	ret = fcntl(fd, F_GETFL);
+	if (ret < 0) {
+		error_report("can't fcntl (F_GETFL), %m\n");
+		close(fd);
+	} else {
+		ret = fcntl(fd, F_SETFL, ret | O_NONBLOCK);
+		if (ret < 0) {
+			error_report("can't fcntl (O_NONBLOCK), %m\n");
+		}
+	}
+
+	return ret;
+}
+
+static int set_nodelay(int fd)
+{
+	int ret, opt;
+
+	opt = 1;
+	ret = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
+	return ret;
+}
+
+/*
+ * Return a socket discriptor to read/write objects.
+ * We cannot use this discriptor for other operations because
+ * the block driver may be on waiting response from the server.
+ */
+static int get_sheep_fd(BDRVSheepdogState *s)
+{
+	int ret, fd;
+
+	fd = connect_to_sdog(s->addr);
+	if (fd < 0) {
+		error_report("%m\n");
+		return -1;
+	}
+
+	ret = set_nonblocking(fd);
+	if (ret) {
+		error_report("%m\n");
+		close(fd);
+		return -1;
+	}
+
+	ret = set_nodelay(fd);
+	if (ret) {
+		error_report("%m\n");
+		close(fd);
+		return -1;
+	}
+
+	qemu_aio_set_fd_handler(fd, aio_read_response, NULL, aio_flush_request,
+				NULL, s);
+	s->fd = fd;
+
+	return fd;
+}
+
+static int parse_vdiname(BDRVSheepdogState *s, const char *filename,
+			 char *vdi, int vdi_len, uint32_t *snapid)
+{
+	char *p, *q;
+	int nr_sep;
+
+	p = q = strdup(filename);
+
+	if (!p) {
+		return 1;
+	}
+
+	nr_sep = 0;
+	while (*p) {
+		if (*p == ':') {
+			nr_sep++;
+		}
+		if (nr_sep == 2) {
+			break;
+		}
+		p++;
+	}
+
+	if (nr_sep == 2) {
+		*p++ = '\0';
+	} else {
+		p = q;
+	}
+
+	strncpy(vdi, p, vdi_len);
+
+	p = strchr(vdi, ':');
+	if (p) {
+		*p++ = '\0';
+		*snapid = strtol(p, NULL, 10);
+	} else {
+		*snapid = CURRENT_VDI_ID; /* search current vdi */
+	}
+
+	if (nr_sep == 2) {
+		s->addr = q;
+	} else {
+		free(q);
+		s->addr = NULL;
+	}
+
+	return 0;
+}
+
+static int find_vdi_name(BDRVSheepdogState *s, char *filename, uint32_t snapid,
+			 uint32_t *vid, int for_snapshot)
+{
+	int ret, fd;
+	SheepdogVdiReq hdr;
+	SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
+	unsigned int wlen, rlen = 0;
+	char buf[SD_MAX_VDI_LEN];
+
+	fd = connect_to_sdog(s->addr);
+	if (fd < 0) {
+		return -1;
+	}
+
+	memset(&hdr, 0, sizeof(hdr));
+	snprintf(buf, sizeof(buf), "%s", filename);
+	if (for_snapshot) {
+		hdr.opcode = SD_OP_GET_VDI_INFO;
+	} else {
+		hdr.opcode = SD_OP_LOCK_VDI;
+	}
+	wlen = SD_MAX_VDI_LEN;
+	hdr.proto_ver = SD_PROTO_VER;
+	hdr.data_length = SD_MAX_VDI_LEN;
+	hdr.snapid = snapid;
+	hdr.flags = SD_FLAG_CMD_WRITE;
+
+	ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
+	if (ret) {
+		ret = -1;
+		goto out;
+	}
+
+	if (rsp->result != SD_RES_SUCCESS) {
+		error_report("%s, %s\n", sd_strerror(rsp->result), filename);
+		ret = -1;
+		goto out;
+	}
+	*vid = rsp->vdi_id;
+
+	ret = 0;
+out:
+	close(fd);
+	return ret;
+}
+
+static int add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
+			   struct iovec *iov, int niov, int create,
+			   enum AIOCBState aiocb_type)
+{
+	int nr_copies = s->inode.nr_copies;
+	SheepdogObjReq hdr;
+	unsigned int wlen;
+	int ret, opt;
+	uint64_t oid = aio_req->oid;
+	unsigned int datalen = aio_req->data_len;
+	uint64_t offset = aio_req->offset;
+	uint8_t flags = aio_req->flags;
+	uint64_t old_oid = aio_req->base_oid;
+
+	if (!nr_copies) {
+		error_report("bug\n");
+	}
+
+	memset(&hdr, 0, sizeof(hdr));
+
+	if (aiocb_type == AIOCB_READ_UDATA) {
+		wlen = 0;
+		hdr.opcode = SD_OP_READ_OBJ;
+		hdr.flags = flags;
+	} else if (create) {
+		wlen = datalen;
+		hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
+		hdr.flags = SD_FLAG_CMD_WRITE | flags;
+	} else {
+		wlen = datalen;
+		hdr.opcode = SD_OP_WRITE_OBJ;
+		hdr.flags = SD_FLAG_CMD_WRITE | flags;
+	}
+
+	hdr.oid = oid;
+	hdr.cow_oid = old_oid;
+	hdr.copies = s->inode.nr_copies;
+
+	hdr.data_length = datalen;
+	hdr.offset = offset;
+
+	hdr.id = aio_req->id;
+
+	opt = 1;
+	setsockopt(s->fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
+
+	ret = do_write(s->fd, &hdr, sizeof(hdr));
+	if (ret) {
+		error_report("failed to send a req, %m\n");
+		return -EIO;
+	}
+
+	if (wlen) {
+		ret = do_writev(s->fd, iov, wlen, aio_req->iov_offset);
+		if (ret) {
+			error_report("failed to send a data, %m\n");
+			return -EIO;
+		}
+	}
+        opt = 0;
+        setsockopt(s->fd, SOL_TCP, TCP_CORK, &opt, sizeof(opt));
+
+	return 0;
+}
+
+static int read_write_object(int fd, char *buf, uint64_t oid, int copies,
+			     unsigned int datalen, uint64_t offset,
+			     int write, int create)
+{
+	SheepdogObjReq hdr;
+	SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
+	unsigned int wlen, rlen;
+	int ret;
+
+	memset(&hdr, 0, sizeof(hdr));
+
+	if (write) {
+		wlen = datalen;
+		rlen = 0;
+		hdr.flags = SD_FLAG_CMD_WRITE;
+		if (create) {
+			hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
+		} else {
+			hdr.opcode = SD_OP_WRITE_OBJ;
+		}
+	} else {
+		wlen = 0;
+		rlen = datalen;
+		hdr.opcode = SD_OP_READ_OBJ;
+	}
+	hdr.oid = oid;
+	hdr.data_length = datalen;
+	hdr.offset = offset;
+	hdr.copies = copies;
+
+	ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
+	if (ret) {
+		error_report("failed to send a request to the sheep\n");
+		return -1;
+	}
+
+	switch (rsp->result) {
+	case SD_RES_SUCCESS:
+		return 0;
+	default:
+		error_report("%s\n", sd_strerror(rsp->result));
+		return -1;
+	}
+}
+
+static int read_object(int fd, char *buf, uint64_t oid, int copies,
+		       unsigned int datalen, uint64_t offset)
+{
+	return read_write_object(fd, buf, oid, copies, datalen, offset, 0, 0);
+}
+
+static int write_object(int fd, char *buf, uint64_t oid, int copies,
+			unsigned int datalen, uint64_t offset, int create)
+{
+	return read_write_object(fd, buf, oid, copies, datalen, offset, 1, create);
+}
+
+/* TODO: error cleanups */
+static int sd_open(BlockDriverState *bs, const char *filename, int flags)
+{
+	int ret, fd;
+	uint32_t vid = 0;
+	BDRVSheepdogState *s = bs->opaque;
+	char vdi[256];
+	uint32_t snapid;
+	int for_snapshot = 0;
+	char *buf;
+
+	strstart(filename, "sheepdog:", (const char **)&filename);
+
+	buf = qemu_malloc(SD_INODE_SIZE);
+
+	memset(vdi, 0, sizeof(vdi));
+	if (parse_vdiname(s, filename, vdi, sizeof(vdi), &snapid) < 0) {
+		goto out;
+	}
+	s->fd = get_sheep_fd(s);
+	if (s->fd < 0) {
+		return -1;
+	}
+
+	if (snapid != CURRENT_VDI_ID) {
+		for_snapshot = 1;
+	}
+
+	ret = find_vdi_name(s, vdi, snapid, &vid, for_snapshot);
+	if (ret) {
+		goto out;
+	}
+
+	if (snapid) {
+		dprintf("%" PRIx32 " non current inode was open.\n", vid);
+	} else {
+		s->is_current = 1;
+	}
+
+	fd = connect_to_sdog(s->addr);
+	if (fd < 0) {
+		error_report("failed to connect\n");
+		goto out;
+	}
+
+	ret = read_object(fd, buf, vid_to_vdi_oid(vid), 0, SD_INODE_SIZE, 0);
+
+	close(fd);
+
+	if (ret) {
+		goto out;
+	}
+
+	memcpy(&s->inode, buf, sizeof(s->inode));
+	s->min_dirty_data_idx = UINT32_MAX;
+	s->max_dirty_data_idx = 0;
+
+	bs->total_sectors = s->inode.vdi_size >> 9;
+	strncpy(s->name, vdi, sizeof(s->name));
+	qemu_free(buf);
+
+	QLIST_INIT(&s->outstanding_aio_head);
+	return 0;
+out:
+	qemu_free(buf);
+	return -1;
+}
+
+static int do_sd_create(const char *addr, char *filename, char *tag,
+			int64_t total_sectors, uint32_t base_vid,
+			uint32_t *vdi_id, int snapshot)
+{
+	SheepdogVdiReq hdr;
+	SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
+	int fd, ret;
+	unsigned int wlen, rlen = 0;
+	char buf[SD_MAX_VDI_LEN];
+
+	fd = connect_to_sdog(addr);
+	if (fd < 0) {
+		return -1;
+	}
+
+	strncpy(buf, filename, SD_MAX_VDI_LEN);
+
+	memset(&hdr, 0, sizeof(hdr));
+	hdr.opcode = SD_OP_NEW_VDI;
+	hdr.base_vdi_id = base_vid;
+
+	wlen = SD_MAX_VDI_LEN;
+
+	hdr.flags = SD_FLAG_CMD_WRITE;
+	hdr.snapid = snapshot;
+
+	hdr.data_length = wlen;
+	hdr.vdi_size = total_sectors * 512;
+
+	ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
+
+	close(fd);
+
+	if (ret) {
+		return -1;
+	}
+
+	if (rsp->result != SD_RES_SUCCESS) {
+		error_report("%s, %s\n", sd_strerror(rsp->result), filename);
+		return -1;
+	}
+
+	if (vdi_id) {
+		*vdi_id = rsp->vdi_id;
+	}
+
+	return 0;
+}
+
+static int sd_create(const char *filename, QEMUOptionParameter *options)
+{
+	int ret;
+	uint32_t vid = 0;
+	int64_t total_sectors = 0;
+	char *backing_file = NULL;
+
+	strstart(filename, "sheepdog:", (const char **)&filename);
+
+	while (options && options->name) {
+		if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
+			total_sectors = options->value.n / 512;
+		} else if (!strcmp(options->name, BLOCK_OPT_BACKING_FILE)) {
+			backing_file = options->value.s;
+		}
+		options++;
+	}
+
+	if (backing_file) {
+		BlockDriverState bs;
+		char vdi[SD_MAX_VDI_LEN];
+		uint32_t snapid;
+
+		strstart(backing_file, "sheepdog:", (const char **)&backing_file);
+		memset(&bs, 0, sizeof(bs));
+
+		bs.opaque = qemu_malloc(sizeof(BDRVSheepdogState));
+
+		ret = sd_open(&bs, backing_file, 0);
+		if (ret < 0) {
+			return -1;
+		}
+
+		if (parse_vdiname(bs.opaque, backing_file, vdi, sizeof(vdi), &snapid) < 0) {
+			return -1;
+		}
+
+		/* cannot clone from a current inode */
+		if (snapid == CURRENT_VDI_ID) {
+			return -1;
+		}
+
+		ret = find_vdi_name(bs.opaque, vdi, snapid, &vid, 1);
+		if (ret) {
+			return -1;
+		}
+	}
+
+	return do_sd_create(NULL, (char *)filename, NULL, total_sectors, vid,
+			    NULL, 0);
+}
+
+static void sd_close(BlockDriverState *bs)
+{
+	BDRVSheepdogState *s = bs->opaque;
+	SheepdogVdiReq hdr;
+	SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
+	unsigned int wlen, rlen = 0;
+	int fd, ret;
+
+	dprintf("%s\n", s->name);
+
+	fd = connect_to_sdog(s->addr);
+	if (fd < 0) {
+		return;
+	}
+
+	memset(&hdr, 0, sizeof(hdr));
+
+	hdr.opcode = SD_OP_RELEASE_VDI;
+	wlen = strlen(s->name) + 1;
+	hdr.data_length = wlen;
+	hdr.flags = SD_FLAG_CMD_WRITE;
+
+	ret = do_req(fd, (SheepdogReq *)&hdr, s->name, &wlen, &rlen);
+
+	close(fd);
+
+	if (!ret && rsp->result != SD_RES_SUCCESS &&
+	    rsp->result != SD_RES_VDI_NOT_LOCKED) {
+		error_report("%s, %s\n", sd_strerror(rsp->result), s->name);
+	}
+
+	close(s->fd);
+	free(s->addr);
+}
+
+static void sd_write_done(SheepdogAIOCB *acb)
+{
+	int ret;
+	BDRVSheepdogState *s = acb->common.bs->opaque;
+	struct iovec iov;
+	AIOReq *aio_req;
+	uint32_t offset, data_len, mn, mx;
+
+	mn = s->min_dirty_data_idx;
+	mx = s->max_dirty_data_idx;
+	if (mn <= mx) {
+		offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
+			mn * sizeof(s->inode.data_vdi_id[0]);
+		data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
+
+		s->min_dirty_data_idx = UINT32_MAX;
+		s->max_dirty_data_idx = 0;
+
+		iov.iov_base = &s->inode;
+		iov.iov_len = sizeof(s->inode);
+		aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
+					data_len, offset, 0, 0, offset);
+		ret = add_aio_request(s, aio_req, &iov, 1, 0, AIOCB_WRITE_UDATA);
+		if (ret) {
+			free_aio_req(s, aio_req);
+			acb->ret = -EIO;
+			goto out;
+		}
+
+		acb->aio_done_func = sd_finish_aiocb;
+		acb->aiocb_type = AIOCB_WRITE_UDATA;
+		return;
+	}
+out:
+	sd_finish_aiocb(acb);
+}
+
+static int sd_create_branch(BDRVSheepdogState *s)
+{
+	int ret, fd;
+	uint32_t vid;
+	char *buf;
+
+	dprintf("%" PRIx32 " is not current.\n", s->inode.vdi_id);
+
+	buf = qemu_malloc(SD_INODE_SIZE);
+
+	ret = do_sd_create(s->addr, s->name, NULL, s->inode.vdi_size >> 9,
+			   s->inode.vdi_id, &vid, 1);
+	if (ret) {
+		goto out;
+	}
+
+	dprintf("%" PRIx32 " is created.\n", vid);
+
+	fd = connect_to_sdog(s->addr);
+	if (fd < 0) {
+		error_report("failed to connect\n");
+		goto out;
+	}
+
+	ret = read_object(fd, buf, vid_to_vdi_oid(vid), s->inode.nr_copies,
+			  SD_INODE_SIZE, 0);
+
+	close(fd);
+
+	if (ret < 0) {
+		goto out;
+	}
+
+	memcpy(&s->inode, buf, sizeof(s->inode));
+
+	s->is_current = 1;
+	ret = 0;
+	dprintf("%" PRIx32 " was newly created.\n", s->inode.vdi_id);
+
+out:
+	qemu_free(buf);
+
+	return ret;
+}
+
+static void sd_readv_writev_bh_cb(void *p)
+{
+	SheepdogAIOCB *acb = p;
+	int ret = 0;
+	unsigned long len, done = 0, total = acb->nb_sectors * 512;
+	unsigned long idx = acb->sector_num * 512 / SD_DATA_OBJ_SIZE;
+	uint64_t oid;
+	uint64_t offset = (acb->sector_num * 512) % SD_DATA_OBJ_SIZE;
+	BDRVSheepdogState *s = acb->common.bs->opaque;
+	SheepdogInode *inode = &s->inode;
+	AIOReq *aio_req;
+
+	qemu_bh_delete(acb->bh);
+	acb->bh = NULL;
+
+	if (acb->aiocb_type == AIOCB_WRITE_UDATA && !s->is_current) {
+		ret = sd_create_branch(s);
+		if (ret) {
+			acb->ret = -EIO;
+			goto out;
+		}
+	}
+
+	while (done != total) {
+		uint8_t flags = 0;
+		uint64_t old_oid = 0;
+		int create = 0;
+
+		oid = vid_to_data_oid(inode->data_vdi_id[idx], idx);
+
+		len = min_t(unsigned long, total - done, SD_DATA_OBJ_SIZE - offset);
+
+		if (!inode->data_vdi_id[idx]) {
+			if (acb->aiocb_type == AIOCB_READ_UDATA) {
+				goto done;
+			}
+
+			create = 1;
+		} else if (acb->aiocb_type == AIOCB_WRITE_UDATA
+			   && !is_data_obj_writeable(inode, idx)) {
+			create = 1;
+			old_oid = oid;
+			flags = SD_FLAG_CMD_COW;
+		}
+
+		if (create) {
+			dprintf("update ino (%" PRIu32") %"
+				PRIu64 " %" PRIu64 " %" PRIu64 "\n",
+				inode->vdi_id, oid,
+				vid_to_data_oid(inode->data_vdi_id[idx], idx), idx);
+			oid = vid_to_data_oid(inode->vdi_id, idx);
+			dprintf("new oid %lx\n", oid);
+		}
+
+		aio_req = alloc_aio_req(s, acb, oid, len, offset, flags,
+					old_oid, done);
+
+		if (create) {
+			AIOReq *areq;
+			QLIST_FOREACH(areq, &s->outstanding_aio_head,
+				      outstanding_aio_siblings) {
+				if (areq == aio_req) {
+					continue;
+				}
+				if (areq->oid == oid) {
+					aio_req->flags = 0;
+					aio_req->base_oid = 0;
+					goto done;
+				}
+			}
+		}
+
+		ret = add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
+				      create, acb->aiocb_type);
+		if (ret < 0) {
+			error_report("add_aio_request is faled\n");
+			free_aio_req(s, aio_req);
+			acb->ret = -EIO;
+			goto out;
+		}
+	done:
+		offset = 0;
+		idx++;
+		done += len;
+	}
+out:
+	if (QLIST_EMPTY(&acb->aioreq_head)) {
+		sd_finish_aiocb(acb);
+	}
+}
+
+static BlockDriverAIOCB *sd_aio_writev(BlockDriverState *bs,
+				       int64_t sector_num,
+				       QEMUIOVector *qiov,
+				       int nb_sectors,
+				       BlockDriverCompletionFunc *cb,
+				       void *opaque)
+{
+	SheepdogAIOCB *acb;
+
+	acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
+	acb->aio_done_func = sd_write_done;
+	acb->aiocb_type = AIOCB_WRITE_UDATA;
+
+	sd_schedule_bh(sd_readv_writev_bh_cb, acb);
+	return &acb->common;
+}
+
+static BlockDriverAIOCB *sd_aio_readv(BlockDriverState *bs,
+				      int64_t sector_num,
+				      QEMUIOVector *qiov,
+				      int nb_sectors,
+				      BlockDriverCompletionFunc *cb,
+				      void *opaque)
+{
+	SheepdogAIOCB *acb;
+	int i;
+
+	acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
+	acb->aiocb_type = AIOCB_READ_UDATA;
+	acb->aio_done_func = sd_finish_aiocb;
+
+	/*
+	 * TODO: we can do better; we don't need to initialize
+	 * blindly.
+	 */
+	for (i = 0; i < qiov->niov; i++) {
+		memset(qiov->iov[i].iov_base, 0, qiov->iov[i].iov_len);
+	}
+
+	sd_schedule_bh(sd_readv_writev_bh_cb, acb);
+	return &acb->common;
+}
+
+static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
+{
+	BDRVSheepdogState *s = bs->opaque;
+	int ret, fd;
+	uint32_t new_vid;
+	SheepdogInode *inode;
+	unsigned int datalen;
+	uint64_t offset;
+
+	dprintf("sn_info: name %s id_str %s s: name %s vm_state_size %d "
+		"is_current %d\n", sn_info->name, sn_info->id_str,
+		s->name, sn_info->vm_state_size, s->is_current);
+
+	if (!s->is_current) {
+		error_report("You can't create a snapshot of "
+			"a non current VDI, %s (%" PRIu32 ").\n",
+			s->name, s->inode.vdi_id);
+
+		return -1;
+	}
+
+	dprintf("%s %s\n", sn_info->name, sn_info->id_str);
+
+	s->inode.vm_state_size = sn_info->vm_state_size;
+	s->inode.vm_clock_nsec = sn_info->vm_clock_nsec;
+	offset = 0;
+	/* we don't need to read entire object */
+	datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
+
+	/* refresh inode. */
+	fd = connect_to_sdog(s->addr);
+	if (fd < 0) {
+		ret = -EIO;
+		goto cleanup;
+	}
+
+	ret = write_object(fd, (char *)&s->inode, vid_to_vdi_oid(s->inode.vdi_id),
+			   s->inode.nr_copies, datalen, offset, 0);
+	if (ret < 0) {
+		error_report("failed to write snapshot's inode.\n");
+		ret = -EIO;
+		goto cleanup;
+	}
+
+	ret = do_sd_create(s->addr, s->name, NULL, s->inode.vdi_size >> 9,
+			   s->inode.vdi_id, &new_vid, 1);
+	if (ret < 0) {
+		error_report("failed to create inode for snapshot. %m\n");
+		ret = -EIO;
+		goto cleanup;
+	}
+
+	inode = (SheepdogInode *)qemu_malloc(datalen);
+
+	ret = read_object(fd, (char *)inode, vid_to_vdi_oid(new_vid),
+			  s->inode.nr_copies, datalen, offset);
+
+	close(fd);
+
+	if (ret < 0) {
+		error_report("failed to read new inode info. %m\n");
+		ret = -EIO;
+		goto cleanup;
+	}
+
+	memcpy(&s->inode, inode, datalen);
+	dprintf("s->inode: name %s snap_id %x oid %x\n",
+		s->inode.name, s->inode.snap_id, s->inode.vdi_id);
+
+cleanup:
+	close(fd);
+	return ret;
+}
+
+static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
+{
+	BDRVSheepdogState *s = bs->opaque;
+	BDRVSheepdogState *old_s;
+	char vdi[SD_MAX_VDI_LEN];
+	char *buf = NULL;
+	uint32_t vid;
+	uint32_t snapid = 0;
+	int ret = -ENOENT, fd;
+
+	old_s = qemu_malloc(sizeof(BDRVSheepdogState));
+
+	memcpy(old_s, s, sizeof(BDRVSheepdogState));
+
+	snapid = strtol(snapshot_id, NULL, 10);
+	if (!snapid) {
+		error_report("Invalid snapshot_id\n");
+		goto out;
+	}
+
+	buf = qemu_malloc(SD_INODE_SIZE);
+	strncpy(vdi, s->name, sizeof(vdi));
+	ret = find_vdi_name(s, vdi, snapid, &vid, 1);
+	if (ret) {
+		error_report("Failed to find_vdi_name\n");
+		ret = -ENOENT;
+		goto out;
+	}
+
+	fd = connect_to_sdog(s->addr);
+	if (fd < 0) {
+		error_report("failed to connect\n");
+		goto out;
+	}
+
+	ret = read_object(fd, buf, vid_to_vdi_oid(vid), s->inode.nr_copies,
+			  SD_INODE_SIZE, 0);
+
+	close(fd);
+
+	if (ret) {
+		ret = -ENOENT;
+		goto out;
+	}
+
+	memcpy(&s->inode, buf, sizeof(s->inode));
+
+	if (!s->inode.vm_state_size) {
+		error_report("Invalid snapshot\n");
+		ret = -ENOENT;
+		goto out;
+	}
+
+	s->is_current = 0;
+
+	qemu_free(buf);
+	qemu_free(old_s);
+
+	return 0;
+out:
+	/* recover bdrv_sd_state */
+	memcpy(s, old_s, sizeof(BDRVSheepdogState));
+	qemu_free(buf);
+	qemu_free(old_s);
+
+	error_report("failed to open. recover old bdrv_sd_state.\n");
+
+	return ret;
+}
+
+static int sd_snapshot_delete(BlockDriverState *bs, const char *snapshot_id)
+{
+	/* FIXME: Delete specified snapshot id.  */
+	return 0;
+}
+
+#define DIV_ROUND_UP(n,d) (((n) + (d) - 1) / (d))
+#define BITS_PER_BYTE		8
+#define BITS_TO_LONGS(nr)	DIV_ROUND_UP(nr, BITS_PER_BYTE * sizeof(long))
+#define DECLARE_BITMAP(name,bits) \
+	unsigned long name[BITS_TO_LONGS(bits)]
+
+#define BITS_PER_LONG (BITS_PER_BYTE * sizeof(long))
+
+static inline int test_bit(unsigned int nr, const unsigned long *addr)
+{
+	return ((1UL << (nr % BITS_PER_LONG)) &
+		(((unsigned long *)addr)[nr / BITS_PER_LONG])) != 0;
+}
+
+static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
+{
+	BDRVSheepdogState *s = bs->opaque;
+	SheepdogReq req;
+	int i, fd, nr = 1024, ret, max = BITS_TO_LONGS(SD_NR_VDIS) * sizeof(long);
+	QEMUSnapshotInfo *sn_tab = NULL;
+	unsigned wlen, rlen;
+	int found = 0;
+	static SheepdogInode inode;
+	unsigned long *vdi_inuse;
+	unsigned int start_nr;
+
+	vdi_inuse = qemu_malloc(max);
+
+	fd = connect_to_sdog(s->addr);
+	if (fd < 0) {
+		goto out;
+	}
+
+	rlen = max;
+	wlen = 0;
+
+	memset(&req, 0, sizeof(req));
+
+	req.opcode = SD_OP_READ_VDIS;
+	req.data_length = max;
+
+	ret = do_req(fd, (SheepdogReq *)&req, vdi_inuse, &wlen, &rlen);
+
+	close(fd);
+	if (ret) {
+		goto out;
+	}
+
+	sn_tab = qemu_mallocz(nr * sizeof(*sn_tab));
+
+	start_nr = fnv_64a_buf(s->name, strlen(s->name), FNV1A_64_INIT) & (SD_NR_VDIS - 1);
+
+	fd = connect_to_sdog(s->addr);
+	if (fd < 0) {
+		error_report("failed to connect\n");
+		goto out;
+	}
+
+	/* TODO: round up */
+	for (i = start_nr; i < SD_NR_VDIS && found < nr; i++) {
+		if (!test_bit(i, vdi_inuse)) {
+			break;
+		}
+
+		/* we don't need to read entire object */
+		ret = read_object(fd, (char *)&inode, vid_to_vdi_oid(i),
+				  0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0);
+
+		if (ret) {
+			continue;
+		}
+
+		if (!strcmp(inode.name, s->name) && inode.snap_ctime) {
+			sn_tab[found].date_sec = inode.snap_ctime >> 32;
+			sn_tab[found].date_nsec = inode.snap_ctime & 0xffffffff;
+			sn_tab[found].vm_state_size = inode.vm_state_size;
+			sn_tab[found].vm_clock_nsec = inode.vm_clock_nsec;
+
+			snprintf(sn_tab[found].id_str, sizeof(sn_tab[found].id_str), "%u",
+				 inode.snap_id);
+			found++;
+		}
+	}
+
+	close(fd);
+out:
+	*psn_tab = sn_tab;
+
+	qemu_free(vdi_inuse);
+
+	return found;
+}
+
+static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
+				int64_t pos, int size, int load)
+{
+	int fd, create;
+	int ret = 0;
+	unsigned int data_len;
+	uint64_t vmstate_oid;
+	uint32_t vdi_index;
+	uint64_t offset;
+
+	fd = connect_to_sdog(s->addr);
+	if (fd < 0) {
+		ret = -EIO;
+		goto cleanup;
+	}
+
+	while (size) {
+		vdi_index = pos / SD_DATA_OBJ_SIZE;
+		offset = pos % SD_DATA_OBJ_SIZE;
+
+		data_len = min_t(unsigned int, size, SD_DATA_OBJ_SIZE);
+
+		vmstate_oid = vid_to_vmstate_oid(s->inode.vdi_id, vdi_index);
+
+		create = (offset == 0);
+		if (load) {
+			ret = read_object(fd, (char *)data, vmstate_oid,
+					  s->inode.nr_copies, data_len, offset);
+		} else {
+			ret = write_object(fd, (char *)data, vmstate_oid,
+					   s->inode.nr_copies, data_len, offset, create);
+		}
+
+		if (ret < 0) {
+			error_report("failed to save vmstate %m\n");
+			ret = -EIO;
+			goto cleanup;
+		}
+
+		pos += data_len;
+		size -= data_len;
+		ret += data_len;
+	}
+cleanup:
+	close(fd);
+	return ret;
+}
+
+static int sd_save_vmstate(BlockDriverState *bs, const uint8_t *data,
+			   int64_t pos, int size)
+{
+	BDRVSheepdogState *s = bs->opaque;
+
+	return do_load_save_vmstate(s, (uint8_t *)data, pos, size, 0);
+}
+
+static int sd_load_vmstate(BlockDriverState *bs, uint8_t *data,
+			   int64_t pos, int size)
+{
+	BDRVSheepdogState *s = bs->opaque;
+
+	return do_load_save_vmstate(s, data, pos, size, 1);
+}
+
+
+static QEMUOptionParameter sd_create_options[] = {
+	{
+		.name = BLOCK_OPT_SIZE,
+		.type = OPT_SIZE,
+		.help = "Virtual disk size"
+	},
+	{
+		.name = BLOCK_OPT_BACKING_FILE,
+		.type = OPT_STRING,
+		.help = "File name of a base image"
+	},
+	{ NULL }
+};
+
+BlockDriver bdrv_sheepdog = {
+	.format_name    = "sheepdog",
+	.protocol_name  = "sheepdog",
+	.instance_size  = sizeof(BDRVSheepdogState),
+	.bdrv_file_open = sd_open,
+	.bdrv_close     = sd_close,
+	.bdrv_create    = sd_create,
+
+	.bdrv_aio_readv     = sd_aio_readv,
+	.bdrv_aio_writev    = sd_aio_writev,
+
+	.bdrv_snapshot_create   = sd_snapshot_create,
+	.bdrv_snapshot_goto     = sd_snapshot_goto,
+	.bdrv_snapshot_delete   = sd_snapshot_delete,
+	.bdrv_snapshot_list     = sd_snapshot_list,
+
+	.bdrv_save_vmstate  = sd_save_vmstate,
+	.bdrv_load_vmstate  = sd_load_vmstate,
+
+	.create_options = sd_create_options,
+};
+
+static void bdrv_sheepdog_init(void)
+{
+	bdrv_register(&bdrv_sheepdog);
+}
+block_init(bdrv_sheepdog_init);