Hello,

Issue in tapdisk-vbd.c

Why td_queue_write(parent,treq); is called in  static void
__tapdisk_vbd_reissue_td_request(td_vbd_t *vbd,td_image_t *image,
td_request_t treq) function as we can't write in parent vhd because it is
read only.

I have attached the code of tapdisk-vbd.c and block-vhd.c.

And please tell me about how to write into child vhd after reading from
parent vhd in block-vhd.c in case of VHD_BM_BIT_CLEAR in function
vhd_queue_read().

Thanks.
/*
 * Copyright (C) Citrix Systems Inc.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; version 2.1 only
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include <stdio.h>
#include <errno.h>
#include <fcntl.h>
#include <regex.h>
#include <unistd.h>
#include <stdlib.h>
#include <libgen.h>
#include <sys/mman.h>
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <sys/types.h>

#include "debug.h"
#include "libvhd.h"
#include "tapdisk-blktap.h"
#include "tapdisk-image.h"
#include "tapdisk-driver.h"
#include "tapdisk-server.h"
#include "tapdisk-vbd.h"
#include "tapdisk-disktype.h"
#include "tapdisk-interface.h"
#include "tapdisk-stats.h"
#include "tapdisk-storage.h"
#include "tapdisk-nbdserver.h"
#include "td-stats.h"
#include "tapdisk-utils.h"
#include "md5.h"

#define DBG(_level, _f, _a...) tlog_write(_level, _f, ##_a)
#define ERR(_err, _f, _a...) tlog_error(_err, _f, ##_a)

#define INFO(_f, _a...)            tlog_syslog(TLOG_INFO, "vbd: " _f, ##_a)
#define ERROR(_f, _a...)           tlog_syslog(TLOG_WARN, "vbd: " _f, ##_a)

#define TD_VBD_EIO_RETRIES          10
#define TD_VBD_EIO_SLEEP            1
#define TD_VBD_WATCHDOG_TIMEOUT     10

static void tapdisk_vbd_complete_vbd_request(td_vbd_t *, td_vbd_request_t *);
static int  tapdisk_vbd_queue_ready(td_vbd_t *);
static void tapdisk_vbd_check_queue_state(td_vbd_t *);

/*
 * initialization
 */

static void
tapdisk_vbd_mark_progress(td_vbd_t *vbd)
{
	gettimeofday(&vbd->ts, NULL);
}

td_vbd_t*
tapdisk_vbd_create(uint16_t uuid)
{
	td_vbd_t *vbd;

	vbd = calloc(1, sizeof(td_vbd_t));
	if (!vbd) {
		EPRINTF("failed to allocate tapdisk state\n");
		return NULL;
	}

    shm_init(&vbd->rrd.shm);

	vbd->uuid        = uuid;
	vbd->req_timeout = TD_VBD_REQUEST_TIMEOUT;

	INIT_LIST_HEAD(&vbd->images);
	INIT_LIST_HEAD(&vbd->new_requests);
	INIT_LIST_HEAD(&vbd->pending_requests);
	INIT_LIST_HEAD(&vbd->failed_requests);
	INIT_LIST_HEAD(&vbd->completed_requests);
	INIT_LIST_HEAD(&vbd->next);
    INIT_LIST_HEAD(&vbd->rings);
    INIT_LIST_HEAD(&vbd->dead_rings);
	tapdisk_vbd_mark_progress(vbd);

	return vbd;
}

int
tapdisk_vbd_initialize(int rfd, int wfd, uint16_t uuid)
{
	td_vbd_t *vbd;

	vbd = tapdisk_server_get_vbd(uuid);
	if (vbd) {
		EPRINTF("duplicate vbds! %u\n", uuid);
		return -EEXIST;
	}

	vbd = tapdisk_vbd_create(uuid);

	tapdisk_server_add_vbd(vbd);

	return 0;
}

static inline void
tapdisk_vbd_add_image(td_vbd_t *vbd, td_image_t *image)
{
	list_add_tail(&image->next, &vbd->images);
}

static inline int
tapdisk_vbd_is_last_image(td_vbd_t *vbd, td_image_t *image)
{
	return list_is_last(&image->next, &vbd->images);
}

static inline td_image_t *
tapdisk_vbd_first_image(td_vbd_t *vbd)
{
	td_image_t *image = NULL;
	if (!list_empty(&vbd->images))
		image = list_entry(vbd->images.next, td_image_t, next);
	return image;
}

static inline td_image_t *
tapdisk_vbd_last_image(td_vbd_t *vbd)
{
	td_image_t *image = NULL;
	if (!list_empty(&vbd->images))
		image = list_entry(vbd->images.prev, td_image_t, next);
	return image;
}

static inline td_image_t *
tapdisk_vbd_next_image(td_image_t *image)
{
	return list_entry(image->next.next, td_image_t, next);
}

static int
tapdisk_vbd_validate_chain(td_vbd_t *vbd)
{
	return tapdisk_image_validate_chain(&vbd->images);
}

static int
vbd_stats_destroy(td_vbd_t *vbd) {

    int err = 0;

    ASSERT(vbd);

    err = shm_destroy(&vbd->rrd.shm);
    if (unlikely(err)) {
        EPRINTF("failed to destroy RRD file: %s\n", strerror(err));
        goto out;
    }

    free(vbd->rrd.shm.path);
    vbd->rrd.shm.path = NULL;

out:
    return -err;
}

static int
vbd_stats_create(td_vbd_t *vbd) {

    int err;

    ASSERT(vbd);

	err = mkdir("/dev/shm/metrics", S_IRUSR | S_IWUSR);
	if (likely(err)) {
        err = errno;
        if (unlikely(err != EEXIST))
    		goto out;
        else
            err = 0;
    }

    /*
     * FIXME Rename this to something like "vbd3-domid-devid". Consider
     * consolidating this with the io_ring shared memory file. Check if blkback
     * exports the same information in some sysfs file and if so move this to
     * the ring location.
     */
    err = asprintf(&vbd->rrd.shm.path, "/dev/shm/metrics/tap-%d-%d", getpid(),
            vbd->uuid);
    if (err == -1) {
        err = errno;
        vbd->rrd.shm.path = NULL;
        EPRINTF("failed to create metric file: %s\n", strerror(err));
        goto out;
    }
    err = 0;

    vbd->rrd.shm.size = PAGE_SIZE;
    err = shm_create(&vbd->rrd.shm);
    if (err)
        EPRINTF("failed to create RRD: %s\n", strerror(err));

out:
    if (err) {
        int err2 = vbd_stats_destroy(vbd);
        if (err2)
            EPRINTF("failed to clean up failed RRD shared memory creation: "
                    "%s (error ignored)\n", strerror(-err2));
    }
    return -err;
}

void
tapdisk_vbd_close_vdi(td_vbd_t *vbd)
{
    int err;

    err = vbd_stats_destroy(vbd);
    if (err) {
        EPRINTF("failed to destroy RRD stats file: %s (error ignored)\n",
                strerror(-err));
    }

	tapdisk_image_close_chain(&vbd->images);

	if (vbd->secondary &&
	    vbd->secondary_mode != TD_VBD_SECONDARY_MIRROR) {
		tapdisk_image_close(vbd->secondary);
		vbd->secondary = NULL;
	}

	if (vbd->retired) {
		tapdisk_image_close(vbd->retired);
		vbd->retired = NULL;
	}

	td_flag_set(vbd->state, TD_VBD_CLOSED);
}

static int
tapdisk_vbd_add_block_cache(td_vbd_t *vbd)
{
	td_image_t *cache, *image, *target, *tmp;
	int err;

	target = NULL;

	tapdisk_vbd_for_each_image(vbd, image, tmp)
		if (td_flag_test(image->flags, TD_OPEN_RDONLY) &&
		    td_flag_test(image->flags, TD_OPEN_SHAREABLE)) {
			target = image;
			break;
		}

	if (!target)
		return 0;

	cache = tapdisk_image_allocate(target->name,
				       DISK_TYPE_BLOCK_CACHE,
				       target->flags);
	if (!cache)
		return -ENOMEM;

	/* try to load existing cache */
	err = td_load(cache);
	if (!err)
		goto done;

	/* hack driver to send open() correct image size */
	if (!target->driver) {
		err = -ENODEV;
		goto fail;
	}

	cache->driver = tapdisk_driver_allocate(cache->type,
						cache->name,
						cache->flags);
	if (!cache->driver) {
		err = -ENOMEM;
		goto fail;
	}

	cache->driver->info = target->driver->info;

	/* try to open new cache */
	err = td_open(cache);
	if (!err)
		goto done;

fail:
	/* give up */
	tapdisk_image_free(target);
	return err;

done:
	/* insert cache before image */
	list_add(&cache->next, target->next.prev);
	return 0;
}

static int
tapdisk_vbd_add_local_cache(td_vbd_t *vbd)
{
	td_image_t *cache, *parent;
	int err;

	parent = tapdisk_vbd_first_image(vbd);
	if (tapdisk_vbd_is_last_image(vbd, parent)) {
		DPRINTF("Single-image chain, nothing to cache");
		return 0;
	}

	cache = tapdisk_image_allocate(parent->name,
				       DISK_TYPE_LCACHE,
				       parent->flags);

	if (!cache)
		return -ENOMEM;

	/* try to load existing cache */
	err = td_load(cache);
	if (!err)
		goto done;

	cache->driver = tapdisk_driver_allocate(cache->type,
						cache->name,
						cache->flags);
	if (!cache->driver) {
		err = -ENOMEM;
		goto fail;
	}

	cache->driver->info = parent->driver->info;

	/* try to open new cache */
	err = td_open(cache);
	if (!err)
		goto done;

fail:
	tapdisk_image_free(cache);
	return err;

done:
	/* insert cache right above leaf image */
	list_add(&cache->next, &parent->next);

	DPRINTF("Added local_cache driver\n");
	return 0;
}

int
tapdisk_vbd_add_secondary(td_vbd_t *vbd)
{
	td_image_t *leaf, *second = NULL;
	const char *path;
	int type, err;

	if (strcmp(vbd->secondary_name, "null") == 0) {
		DPRINTF("Removing secondary image\n");
		vbd->secondary_mode = TD_VBD_SECONDARY_DISABLED;
		vbd->secondary = NULL;
		vbd->nbd_mirror_failed = 0;
		return 0;
	}

	DPRINTF("Adding secondary image: %s\n", vbd->secondary_name);

	type = tapdisk_disktype_parse_params(vbd->secondary_name, &path);
	if (type < 0)
		return type;

	leaf = tapdisk_vbd_first_image(vbd);
	if (!leaf) {
		err = -EINVAL;
		goto fail;
	}

	err = tapdisk_image_open(type, path, leaf->flags, &second);
	if (err) {
		if (type == DISK_TYPE_NBD)
			vbd->nbd_mirror_failed = 1;

		vbd->secondary=NULL;
		vbd->secondary_mode=TD_VBD_SECONDARY_DISABLED;
		
		goto fail;
	}

	if (second->info.size != leaf->info.size) {
		EPRINTF("Secondary image size %"PRIu64" != image size %"PRIu64"\n",
			second->info.size, leaf->info.size);
		err = -EINVAL;
		goto fail;
	}

	vbd->secondary = second;
	leaf->flags |= TD_IGNORE_ENOSPC;
	if (td_flag_test(vbd->flags, TD_OPEN_STANDBY)) {
		DPRINTF("In standby mode\n");
		vbd->secondary_mode = TD_VBD_SECONDARY_STANDBY;
	} else {
		DPRINTF("In mirror mode\n");
		vbd->secondary_mode = TD_VBD_SECONDARY_MIRROR;
		/*
		 * we actually need this image to also be part of the chain, 
		 * since it may already contain data
		 */
		list_add(&second->next, &leaf->next);
	}

	DPRINTF("Added secondary image\n");
	return 0;

fail:
	if (second)
		tapdisk_image_close(second);
	return err;
}

static void signal_enospc(td_vbd_t *vbd)
{
	int fd, err;
	char *fn;

	err = asprintf(&fn, BLKTAP2_ENOSPC_SIGNAL_FILE"%d", vbd->tap->minor);
	if (err == -1) {
		EPRINTF("Failed to signal ENOSPC condition\n");
		return;
	}

	fd = open(fn, O_WRONLY | O_CREAT | O_NONBLOCK, 0666);
	if (fd == -1)
		EPRINTF("Failed to open file to signal ENOSPC condition\n");
	else
		close(fd);

	free(fn);
}

#if 0
static int
tapdisk_vbd_open_index(td_vbd_t *vbd)
{
	int err;
	char *path;
	td_flag_t flags;
	td_image_t *last, *image;

	last = tapdisk_vbd_last_image(vbd);
	err  = asprintf(&path, "%s.bat", last->name);
	if (err == -1)
		return -errno;

	err = access(path, R_OK);
	if (err == -1) {
		free(path);
		return -errno;
	}

	flags = vbd->flags | TD_OPEN_RDONLY | TD_OPEN_SHAREABLE;
	image = tapdisk_image_allocate(path, DISK_TYPE_VINDEX, flags);
	if (!image) {
		err = -ENOMEM;
		goto fail;
	}

	err = td_open(image);
	if (err)
		goto fail;

	tapdisk_vbd_add_image(vbd, image);
	return 0;

fail:
	if (image)
		tapdisk_image_free(image);
	free(path);
	return err;
}
#endif

static int
tapdisk_vbd_add_dirty_log(td_vbd_t *vbd)
{
	int err;
	td_driver_t *driver;
	td_image_t *log, *parent;

	driver = NULL;
	log    = NULL;

	parent = tapdisk_vbd_first_image(vbd);

	log    = tapdisk_image_allocate(parent->name,
					DISK_TYPE_LOG,
					parent->flags);
	if (!log)
		return -ENOMEM;

	driver = tapdisk_driver_allocate(log->type,
					 log->name,
					 log->flags);
	if (!driver) {
		err = -ENOMEM;
		goto fail;
	}

	driver->info = parent->driver->info;
	log->driver  = driver;

	err = td_open(log);
	if (err)
		goto fail;

	tapdisk_vbd_add_image(vbd, log);
	return 0;

fail:
	tapdisk_image_free(log);
	return err;
}

int
tapdisk_vbd_open_vdi(td_vbd_t *vbd, const char *name, td_flag_t flags, int prt_devnum)
{
	char *tmp = vbd->name;
	int err;

	if (!list_empty(&vbd->images)) {
		err = -EBUSY;
		goto fail;
	}

	if (!name && !vbd->name) {
		err = -EINVAL;
		goto fail;
	}

	if (name) {
		vbd->name = strdup(name);
		if (!vbd->name) {
			err = -errno;
			goto fail;
		}
	}

	err = tapdisk_image_open_chain(vbd->name, flags, prt_devnum, &vbd->images);
	if (err)
		goto fail;

	td_flag_clear(vbd->state, TD_VBD_CLOSED);
	vbd->flags = flags;

	if (td_flag_test(vbd->flags, TD_OPEN_LOG_DIRTY)) {
		err = tapdisk_vbd_add_dirty_log(vbd);
		if (err)
			goto fail;
	}

	if (td_flag_test(vbd->flags, TD_OPEN_ADD_CACHE)) {
		err = tapdisk_vbd_add_block_cache(vbd);
		if (err)
			goto fail;
	}

	if (td_flag_test(vbd->flags, TD_OPEN_LOCAL_CACHE)) {
		err = tapdisk_vbd_add_local_cache(vbd);
		if (err)
			goto fail;
	}

	err = tapdisk_vbd_validate_chain(vbd);
	if (err)
		goto fail;

	if (td_flag_test(vbd->flags, TD_OPEN_SECONDARY)) {
		err = tapdisk_vbd_add_secondary(vbd);
		if (err) {
			if (vbd->nbd_mirror_failed != 1)
				goto fail;
			INFO("Ignoring failed NBD secondary attach\n");
			err = 0;
		}
	}

    err = vbd_stats_create(vbd);
    if (err)
        goto fail;

	if (tmp != vbd->name)
		free(tmp);

	return err;

fail:
	if (vbd->name != tmp) {
		free(vbd->name);
		vbd->name = tmp;
	}

	if (!list_empty(&vbd->images))
		tapdisk_image_close_chain(&vbd->images);

	vbd->flags = 0;

	return err;
}

void
tapdisk_vbd_detach(td_vbd_t *vbd)
{
	td_blktap_t *tap = vbd->tap;

	if (tap) {
		tapdisk_blktap_close(tap);
		vbd->tap = NULL;
	}
}

int
tapdisk_vbd_attach(td_vbd_t *vbd, const char *devname, int minor)
{

	if (vbd->tap)
		return -EALREADY;

	return tapdisk_blktap_open(devname, vbd, &vbd->tap);
}

/*
int
tapdisk_vbd_open(td_vbd_t *vbd, const char *name,
		 int minor, const char *ring, td_flag_t flags)
{
	int err;

	err = tapdisk_vbd_open_vdi(vbd, name, flags, -1);
	if (err)
		goto out;

	err = tapdisk_vbd_attach(vbd, ring, minor);
	if (err)
		goto out;

	return 0;

out:
	tapdisk_vbd_detach(vbd);
	tapdisk_vbd_close_vdi(vbd);
	free(vbd->name);
	vbd->name = NULL;
	return err;
}
*/

static void
tapdisk_vbd_queue_count(td_vbd_t *vbd, int *new,
			int *pending, int *failed, int *completed)
{
	int n, p, f, c;
	td_vbd_request_t *vreq, *tvreq;

	n = 0;
	p = 0;
	f = 0;
	c = 0;

	tapdisk_vbd_for_each_request(vreq, tvreq, &vbd->new_requests)
		n++;

	tapdisk_vbd_for_each_request(vreq, tvreq, &vbd->pending_requests)
		p++;

	tapdisk_vbd_for_each_request(vreq, tvreq, &vbd->failed_requests)
		f++;

	tapdisk_vbd_for_each_request(vreq, tvreq, &vbd->completed_requests)
		c++;

	*new       = n;
	*pending   = p;
	*failed    = f;
	*completed = c;
}

static int
tapdisk_vbd_shutdown(td_vbd_t *vbd)
{
	int new, pending, failed, completed;

	if (!list_empty(&vbd->pending_requests))
		return -EAGAIN;

	tapdisk_vbd_queue_count(vbd, &new, &pending, &failed, &completed);

	DPRINTF("%s: state: 0x%08x, new: 0x%02x, pending: 0x%02x, "
		"failed: 0x%02x, completed: 0x%02x\n", 
		vbd->name, vbd->state, new, pending, failed, completed);
	DPRINTF("last activity: %010ld.%06ld, errors: 0x%04"PRIx64", "
		"retries: 0x%04"PRIx64", received: 0x%08"PRIx64", "
		"returned: 0x%08"PRIx64", kicked: 0x%08"PRIx64"\n",
		vbd->ts.tv_sec, vbd->ts.tv_usec,
		vbd->errors, vbd->retries, vbd->received, vbd->returned,
		vbd->kicked);

	tapdisk_vbd_close_vdi(vbd);
	tapdisk_vbd_detach(vbd);
	tapdisk_server_remove_vbd(vbd);
	free(vbd->name);
	free(vbd);

	return 0;
}

int
tapdisk_vbd_close(td_vbd_t *vbd)
{
	/*
	 * don't close if any requests are pending in the aio layer
	 */
	if (!list_empty(&vbd->pending_requests))
		goto fail;

	/* 
	 * if the queue is still active and we have more
	 * requests, try to complete them before closing.
	 */
	if (tapdisk_vbd_queue_ready(vbd) &&
	    (!list_empty(&vbd->new_requests) ||
	     !list_empty(&vbd->failed_requests) ||
	     !list_empty(&vbd->completed_requests)))
		goto fail;

	return tapdisk_vbd_shutdown(vbd);

fail:
	td_flag_set(vbd->state, TD_VBD_SHUTDOWN_REQUESTED);
	DBG(TLOG_WARN, "%s: requests pending\n", vbd->name);
	return -EAGAIN;
}

/*
 * control operations
 */

void
tapdisk_vbd_debug(td_vbd_t *vbd)
{
	td_image_t *image, *tmp;
	int new, pending, failed, completed;

	tapdisk_vbd_queue_count(vbd, &new, &pending, &failed, &completed);

	DBG(TLOG_WARN, "%s: state: 0x%08x, new: 0x%02x, pending: 0x%02x, "
	    "failed: 0x%02x, completed: 0x%02x, last activity: %010ld.%06ld, "
	    "errors: 0x%04"PRIx64", retries: 0x%04"PRIx64", "
	    "received: 0x%08"PRIx64", returned: 0x%08"PRIx64", "
	    "kicked: 0x%08"PRIx64"\n",
	    vbd->name, vbd->state, new, pending, failed, completed,
	    vbd->ts.tv_sec, vbd->ts.tv_usec, vbd->errors, vbd->retries,
	    vbd->received, vbd->returned, vbd->kicked);

	tapdisk_vbd_for_each_image(vbd, image, tmp)
		td_debug(image);
}

static void
tapdisk_vbd_drop_log(td_vbd_t *vbd)
{
	if (td_flag_test(vbd->state, TD_VBD_LOG_DROPPED))
		return;

	tapdisk_vbd_debug(vbd);
	tlog_precious(0);
	td_flag_set(vbd->state, TD_VBD_LOG_DROPPED);
}

int
tapdisk_vbd_get_disk_info(td_vbd_t *vbd, td_disk_info_t *info)
{
	if (list_empty(&vbd->images))
		return -EINVAL;

	*info = tapdisk_vbd_first_image(vbd)->info;
	return 0;
}

static int
tapdisk_vbd_queue_ready(td_vbd_t *vbd)
{
	return (!td_flag_test(vbd->state, TD_VBD_DEAD) &&
		!td_flag_test(vbd->state, TD_VBD_CLOSED) &&
		!td_flag_test(vbd->state, TD_VBD_QUIESCED) &&
		!td_flag_test(vbd->state, TD_VBD_QUIESCE_REQUESTED));
}

int
tapdisk_vbd_retry_needed(td_vbd_t *vbd)
{
	return !(list_empty(&vbd->failed_requests) &&
		 list_empty(&vbd->new_requests));
}

int
tapdisk_vbd_lock(td_vbd_t *vbd)
{
	return 0;
}

int
tapdisk_vbd_quiesce_queue(td_vbd_t *vbd)
{
	if (!list_empty(&vbd->pending_requests)) {
		td_flag_set(vbd->state, TD_VBD_QUIESCE_REQUESTED);
		return -EAGAIN;
	}

	td_flag_clear(vbd->state, TD_VBD_QUIESCE_REQUESTED);
	td_flag_set(vbd->state, TD_VBD_QUIESCED);
	return 0;
}

int
tapdisk_vbd_start_queue(td_vbd_t *vbd)
{
	td_flag_clear(vbd->state, TD_VBD_QUIESCED);
	td_flag_clear(vbd->state, TD_VBD_QUIESCE_REQUESTED);
	tapdisk_vbd_mark_progress(vbd);
	return 0;
}

int
tapdisk_vbd_kill_queue(td_vbd_t *vbd)
{
	tapdisk_vbd_quiesce_queue(vbd);
	td_flag_set(vbd->state, TD_VBD_DEAD);
	return 0;
}

#if 0
static int
tapdisk_vbd_open_image(td_vbd_t *vbd, td_image_t *image)
{
	int err;
	td_image_t *parent;

	err = td_open(image);
	if (err)
		return err;

	if (!tapdisk_vbd_is_last_image(vbd, image)) {
		parent = tapdisk_vbd_next_image(image);
		err    = td_validate_parent(image, parent);
		if (err) {
			td_close(image);
			return err;
		}
	}

	return 0;
}
#endif

int
tapdisk_vbd_pause(td_vbd_t *vbd)
{
	int err;
    struct td_xenblkif *blkif;

	INFO("pause requested\n");

	td_flag_set(vbd->state, TD_VBD_PAUSE_REQUESTED);

	if (vbd->nbdserver)
		tapdisk_nbdserver_pause(vbd->nbdserver);

	err = tapdisk_vbd_quiesce_queue(vbd);
	if (err)
		return err;

    list_for_each_entry(blkif, &vbd->rings, entry)
		tapdisk_xenblkif_suspend(blkif);

	tapdisk_vbd_close_vdi(vbd);

	INFO("pause completed\n");

	if (!list_empty(&vbd->failed_requests))
		INFO("warning: failed requests pending\n");

	td_flag_clear(vbd->state, TD_VBD_PAUSE_REQUESTED);
	td_flag_set(vbd->state, TD_VBD_PAUSED);

	return 0;
}

int
tapdisk_vbd_resume(td_vbd_t *vbd, const char *name)
{
	int i, err;
    struct td_xenblkif *blkif;

	DBG(TLOG_DBG, "resume requested\n");

	if (!td_flag_test(vbd->state, TD_VBD_PAUSED)) {
		EPRINTF("resume request for unpaused vbd %s\n", vbd->name);
		return -EINVAL;
	}

	for (i = 0; i < TD_VBD_EIO_RETRIES; i++) {
		err = tapdisk_vbd_open_vdi(vbd, name, vbd->flags | TD_OPEN_STRICT, -1);
		if (!err)
			break;

		sleep(TD_VBD_EIO_SLEEP);
	}

	if (!err) {
		td_disk_info_t disk_info;
		err = tapdisk_vbd_get_disk_info(vbd, &disk_info);
		if (err) {
			EPRINTF("VBD %d failed to get disk info: %s\n", vbd->uuid,
					strerror(-err));
			goto resume_failed;
		}
		if (vbd->disk_info.size != disk_info.size
				|| vbd->disk_info.sector_size != disk_info.sector_size
				|| vbd->disk_info.info != disk_info.info) {
			EPRINTF("VBD %d cannot change disk info\n", vbd->uuid);
			err = -EMEDIUMTYPE;
			goto resume_failed;
		}
	}
resume_failed:
	if (err) {
		td_flag_set(vbd->state, TD_VBD_RESUME_FAILED);
		tapdisk_vbd_close_vdi(vbd);
		return err;
	}
	td_flag_clear(vbd->state, TD_VBD_RESUME_FAILED);

	DBG(TLOG_DBG, "resume completed\n");

	tapdisk_vbd_start_queue(vbd);
	td_flag_clear(vbd->state, TD_VBD_PAUSED);
	td_flag_clear(vbd->state, TD_VBD_PAUSE_REQUESTED);
	tapdisk_vbd_check_state(vbd);

	if (vbd->nbdserver)
		tapdisk_nbdserver_unpause(vbd->nbdserver);

    list_for_each_entry(blkif, &vbd->rings, entry)
		tapdisk_xenblkif_resume(blkif);


	DBG(TLOG_DBG, "state checked\n");

	return 0;
}

static int
tapdisk_vbd_request_ttl(td_vbd_request_t *vreq,
			const struct timeval *now)
{
	struct timeval delta;
	timersub(now, &vreq->ts, &delta);
	return vreq->vbd->req_timeout - delta.tv_sec;
}

static int
__tapdisk_vbd_request_timeout(td_vbd_request_t *vreq,
			      const struct timeval *now)
{
	int timeout;

	timeout = tapdisk_vbd_request_ttl(vreq, now) < 0;
	if (timeout)
		ERR(vreq->error,
		    "req %s timed out, retried %d times\n",
		    vreq->name, vreq->num_retries);

	return timeout;
}

static int
tapdisk_vbd_request_timeout(td_vbd_request_t *vreq)
{
	struct timeval now;
	gettimeofday(&now, NULL);
	return __tapdisk_vbd_request_timeout(vreq, &now);
}

static void
tapdisk_vbd_check_queue_state(td_vbd_t *vbd)
{
	td_vbd_request_t *vreq, *tmp;
	struct timeval now;

	gettimeofday(&now, NULL);
	tapdisk_vbd_for_each_request(vreq, tmp, &vbd->failed_requests)
		if (__tapdisk_vbd_request_timeout(vreq, &now))
			tapdisk_vbd_complete_vbd_request(vbd, vreq);

	if (!list_empty(&vbd->new_requests) ||
	    !list_empty(&vbd->failed_requests))
		tapdisk_vbd_issue_requests(vbd);

}

static inline int
tapdisk_vbd_produce_rrds(td_vbd_t *vbd) {

	td_image_t *leaf;
	int off = 0, size = 0;
	int err;
	int i, j;
	char *buf;
	int json_str_len_off, md5sum_str_len_off, json_data_off, json_data_len;
	const int json_str_len = 8 + 1, md5sum_str_len = 32 + 1;
	char tmp[md5sum_str_len + 1];
	time_t t;
	MD5_CTX md5_ctx;
	unsigned char md5_out[MD5_DIGEST_LENGTH];

	ASSERT(vbd);

	buf = vbd->rrd.shm.mem;

	/*
	 * If no VDI has been opened yet there's nothing to report.
	 */
	if (!buf)
		return 0;

	/*
	 * Produce RRDs every five seconds.
	 */
	t = time(NULL);
	if (t - vbd->rrd.last < 5)
		return 0;
	vbd->rrd.last = t;

	size = vbd->rrd.shm.size - off;
	err = tapdisk_snprintf(buf, &off, &size, 0, "DATASOURCES\n");
	if (err)
		return err;

	/*
	 * reserve space for JSON string length
	 */
	json_str_len_off = off;
	off += json_str_len, size -= json_str_len;

	/*
	 * reserve space for MD5 sum of JSON string
	 */
	md5sum_str_len_off = off;
	off += md5sum_str_len, size -= md5sum_str_len;

	json_data_off = off;
	err = tapdisk_snprintf(buf, &off, &size, 0,	"{\n");
	err += tapdisk_snprintf(buf, &off, &size, 1, "\"timestamp\": %lu,\n",
			time(NULL));
	err += tapdisk_snprintf(buf, &off, &size, 1, "\"datasources\": {\n");
	if (err)
		return err;

	leaf = tapdisk_vbd_first_image(vbd);

	/*
	 * XXX We're only reporting RRDs for leaves. We could traverse the list
	 * of parent and report RRDs for each one of them, if there is something
	 * to report. However, for internal VHD files there's nothing to report
	 * so that would end up in a useless traverse of the list. We could address
	 * this issue by keeping a list of images that do have an RRD callback.
	 */
	if (leaf && leaf->driver->ops->td_rrd) {
		err = leaf->driver->ops->td_rrd(leaf->driver, buf, &off, &size);
		if (err)
			return err;
		err = tapdisk_snprintf(buf, &off, &size, 0, ",\n");
		if (err)
			return err;
	}

	err += tapdisk_snprintf(buf, &off, &size, 2, "\"io_errors\": {\n");
	err += tapdisk_snprintf(buf, &off, &size, 3,
			"\"description\": \"Number of I/O errors\",\n");
	err += tapdisk_snprintf(buf, &off, &size, 3, "\"owner\": \"host\",\n");
	err += tapdisk_snprintf(buf, &off, &size, 3,  "\"type\": "
			"\"absolute\",\n");
	err += tapdisk_snprintf(buf, &off, &size, 3, "\"units\": \"units\",\n");
	err += tapdisk_snprintf(buf, &off, &size, 3, "\"min\": \"0.00\",\n");
	err += tapdisk_snprintf(buf, &off, &size, 3, "\"max\": \"inf\",\n");
	err += tapdisk_snprintf(buf, &off, &size, 3, "\"value\": \"%llu\",\n",
			vbd->errors);
	err += tapdisk_snprintf(buf, &off, &size, 3, "\"value_type\": \"float\"\n");
	err += tapdisk_snprintf(buf, &off, &size, 2, "}\n");
	err += tapdisk_snprintf(buf, &off, &size, 1, "}\n");
	err += tapdisk_snprintf(buf, &off, &size, 0, "}\n");
	if (err)
		return err;

	json_data_len = off - json_str_len;
	sprintf(tmp, "%08x\n", json_data_len);
	strncpy(buf + json_str_len_off, tmp, json_str_len);

	MD5_Init(&md5_ctx);
	MD5_Update(&md5_ctx, buf + json_data_off, json_data_len);
	MD5_Final(md5_out, &md5_ctx);
	for (i = 0, j = 0; i < MD5_DIGEST_LENGTH; i++)
		j += sprintf(buf + md5sum_str_len_off + j, "%02x", md5_out[i]);
	buf[(md5sum_str_len_off + j)] = '\n';

	memset(buf + off, '\0', size - off);
	return msync(buf, vbd->rrd.shm.size, MS_ASYNC);
}

void
tapdisk_vbd_check_state(td_vbd_t *vbd)
{
    struct td_xenblkif *blkif;

	tapdisk_vbd_produce_rrds(vbd);

    /*
     * TODO don't ignore return value
     */
    list_for_each_entry(blkif, &vbd->rings, entry)
		tapdisk_xenblkif_ring_stats_update(blkif);

	tapdisk_vbd_check_queue_state(vbd);

	if (td_flag_test(vbd->state, TD_VBD_QUIESCE_REQUESTED))
		tapdisk_vbd_quiesce_queue(vbd);

	if (td_flag_test(vbd->state, TD_VBD_PAUSE_REQUESTED))
		tapdisk_vbd_pause(vbd);

	if (td_flag_test(vbd->state, TD_VBD_SHUTDOWN_REQUESTED))
		tapdisk_vbd_close(vbd);
}

void
tapdisk_vbd_check_progress(td_vbd_t *vbd)
{
	time_t diff;
	struct timeval now, delta;

	if (list_empty(&vbd->pending_requests))
		return;

	gettimeofday(&now, NULL);
	timersub(&now, &vbd->ts, &delta);
	diff = delta.tv_sec;

	if (diff >= TD_VBD_WATCHDOG_TIMEOUT && tapdisk_vbd_queue_ready(vbd)) {
		DBG(TLOG_WARN, "%s: watchdog timeout: pending requests "
		    "idle for %ld seconds\n", vbd->name, diff);
		tapdisk_vbd_drop_log(vbd);
		return;
	}

	tapdisk_server_set_max_timeout(TD_VBD_WATCHDOG_TIMEOUT - diff);
}

/*
 * request submission 
 */

static int
tapdisk_vbd_check_queue(td_vbd_t *vbd)
{
	if (list_empty(&vbd->images))
		return -ENOSYS;

	if (!tapdisk_vbd_queue_ready(vbd))
		return -EAGAIN;

	return 0;
}

static int
tapdisk_vbd_request_should_retry(td_vbd_t *vbd, td_vbd_request_t *vreq)
{
	if (td_flag_test(vbd->state, TD_VBD_DEAD) ||
	    td_flag_test(vbd->state, TD_VBD_SHUTDOWN_REQUESTED))
		return 0;

	switch (abs(vreq->error)) {
	case EPERM:
	case ENOSYS:
	case ESTALE:
	case ENOSPC:
	case EFAULT:
		return 0;
	}

	if (tapdisk_vbd_request_timeout(vreq))
		return 0;

	return 1;
}

static void
tapdisk_vbd_complete_vbd_request(td_vbd_t *vbd, td_vbd_request_t *vreq)
{
	if (!vreq->submitting && !vreq->secs_pending) {
		if (vreq->error &&
		    tapdisk_vbd_request_should_retry(vbd, vreq))
			tapdisk_vbd_move_request(vreq, &vbd->failed_requests);
		else
			tapdisk_vbd_move_request(vreq, &vbd->completed_requests);
	}
}

static void
FIXME_maybe_count_enospc_redirect(td_vbd_t *vbd, td_request_t treq)
{
	int write = treq.op == TD_OP_WRITE;
	if (write &&
	    treq.image == tapdisk_vbd_first_image(vbd) &&
	    vbd->FIXME_enospc_redirect_count_enabled)
		vbd->FIXME_enospc_redirect_count += treq.secs;
}

static void
__tapdisk_vbd_complete_td_request(td_vbd_t *vbd, td_vbd_request_t *vreq,
				  td_request_t treq, int res)
{
	td_image_t *image = treq.image;
	int err;

	err = (res <= 0 ? res : -res);
	vbd->secs_pending  -= treq.secs;
	vreq->secs_pending -= treq.secs;

	if (err != -EBUSY) {
		int write = treq.op == TD_OP_WRITE;
		td_sector_count_add(&image->stats.hits, treq.secs, write);
		if (err)
			td_sector_count_add(&image->stats.fail,
					    treq.secs, write);

		FIXME_maybe_count_enospc_redirect(vbd, treq);
	}

	if (err) {
		if (err != -EBUSY) {
			if (!vreq->error &&
			    err != vreq->prev_error)
				tlog_drv_error(image->driver, err,
					       "req %s: %s 0x%04x secs @ 0x%08"PRIx64" - %s",
					       vreq->name,
					       (treq.op == TD_OP_WRITE ? "write" : "read"),
					       treq.secs, treq.sec, strerror(abs(err)));
			vbd->errors++;
		}
		vreq->error = (vreq->error ? : err);
	}

	tapdisk_vbd_complete_vbd_request(vbd, vreq);
}

static void
__tapdisk_vbd_reissue_td_request(td_vbd_t *vbd,
				 td_image_t *image, td_request_t treq)
{
	td_image_t *parent;
	td_vbd_request_t *vreq;

	vreq = treq.vreq;
	gettimeofday(&vreq->last_try, NULL);

	vreq->submitting++;

	if (tapdisk_vbd_is_last_image(vbd, image)) {
		memset(treq.buf, 0, treq.secs << SECTOR_SHIFT);
		td_complete_request(treq, 0);
		goto done;
	}

	parent     = tapdisk_vbd_next_image(image);
	treq.image = parent;

	/* return zeros for requests that extend beyond end of parent image */
	if (treq.sec + treq.secs > parent->info.size) {
		td_request_t clone  = treq;

		if (parent->info.size > treq.sec) {
			int secs    = parent->info.size - treq.sec;
			clone.sec  += secs;
			clone.secs -= secs;
			clone.buf  += (secs << SECTOR_SHIFT);
			treq.secs   = secs;
		} else
			treq.secs   = 0;

		memset(clone.buf, 0, clone.secs << SECTOR_SHIFT);
		td_complete_request(clone, 0);

		if (!treq.secs)
			goto done;
	}

	switch (treq.op) {
	case TD_OP_WRITE:
		td_queue_write(parent, treq);
		break;

	case TD_OP_READ:
		td_queue_read(parent, treq);
		break;
	}

done:
	vreq->submitting--;
	if (!vreq->secs_pending)
		tapdisk_vbd_complete_vbd_request(vbd, vreq);
}

void
tapdisk_vbd_forward_request(td_request_t treq)
{
	td_vbd_t *vbd;
	td_image_t *image;
	td_vbd_request_t *vreq;

	image = treq.image;
	vreq  = treq.vreq;
	vbd   = vreq->vbd;

	tapdisk_vbd_mark_progress(vbd);

	if (tapdisk_vbd_queue_ready(vbd))
		__tapdisk_vbd_reissue_td_request(vbd, image, treq);
	else
		__tapdisk_vbd_complete_td_request(vbd, vreq, treq, -EBUSY);
}

void
tapdisk_vbd_complete_td_request(td_request_t treq, int res)
{
	td_vbd_t *vbd;
	td_image_t *image, *leaf;
	td_vbd_request_t *vreq;

	image = treq.image;
	vreq  = treq.vreq;
	vbd   = vreq->vbd;

	tapdisk_vbd_mark_progress(vbd);

	if (abs(res) == ENOSPC && td_flag_test(image->flags,
				TD_IGNORE_ENOSPC)) {
		res = 0;
		leaf = tapdisk_vbd_first_image(vbd);
		if (vbd->secondary_mode == TD_VBD_SECONDARY_MIRROR) {
			DPRINTF("ENOSPC: disabling mirroring\n");
			list_del_init(&leaf->next);
			vbd->retired = leaf;
		} else if (vbd->secondary_mode == TD_VBD_SECONDARY_STANDBY) {
			DPRINTF("ENOSPC: failing over to secondary image\n");
			list_add(&vbd->secondary->next, leaf->next.prev);
			vbd->FIXME_enospc_redirect_count_enabled = 1;
		}
		if (vbd->secondary_mode != TD_VBD_SECONDARY_DISABLED) {
			vbd->secondary = NULL;
			vbd->secondary_mode = TD_VBD_SECONDARY_DISABLED;
			signal_enospc(vbd);
		}
	}

	if (res != 0 && image->type == DISK_TYPE_NBD && 
			((image == vbd->secondary) || 
			 (image == vbd->retired))) {
		ERROR("Got non-zero res for NBD secondary - disabling "
				"mirroring: %s",vreq->name);
		vbd->nbd_mirror_failed = 1;
		res = 0; /* Pretend the writes have completed successfully */

		/* It was the secondary that timed out - disable secondary */
		list_del_init(&image->next);
		vbd->retired = image;
		if (vbd->secondary_mode != TD_VBD_SECONDARY_DISABLED) {
			vbd->secondary = NULL;
			vbd->secondary_mode = TD_VBD_SECONDARY_DISABLED;
		}
	}

	DBG(TLOG_DBG, "%s: req %s seg %d sec 0x%08"PRIx64
	    " secs 0x%04x buf %p op %d res %d\n", image->name,
	    vreq->name, treq.sidx, treq.sec, treq.secs,
	    treq.buf, vreq->op, res);

	__tapdisk_vbd_complete_td_request(vbd, vreq, treq, res);
}

static inline void
queue_mirror_req(td_vbd_t *vbd, td_request_t clone)
{
	clone.image = vbd->secondary;
	td_queue_write(vbd->secondary, clone);
}

static int
tapdisk_vbd_issue_request(td_vbd_t *vbd, td_vbd_request_t *vreq)
{
	td_image_t *image;
	td_request_t treq;
	td_sector_t sec;
	int i, err;

	sec    = vreq->sec;
	image  = tapdisk_vbd_first_image(vbd);

	vreq->submitting = 1;

	tapdisk_vbd_mark_progress(vbd);
	vreq->last_try = vbd->ts;

	tapdisk_vbd_move_request(vreq, &vbd->pending_requests);

	err = tapdisk_vbd_check_queue(vbd);
	if (err) {
		vreq->error = err;
		goto fail;
	}

	err = tapdisk_image_check_request(image, vreq);
	if (err) {
		vreq->error = err;
		goto fail;
	}

	for (i = 0; i < vreq->iovcnt; i++) {
		struct td_iovec *iov = &vreq->iov[i];

		treq.sidx           = i;
		treq.buf            = iov->base;
		treq.sec            = sec;
		treq.secs           = iov->secs;
		treq.image          = image;
		treq.cb             = tapdisk_vbd_complete_td_request;
		treq.cb_data        = NULL;
		treq.vreq           = vreq;


		vreq->secs_pending += iov->secs;
		vbd->secs_pending  += iov->secs;
		if (vbd->secondary_mode == TD_VBD_SECONDARY_MIRROR &&
		    vreq->op == TD_OP_WRITE) {
			vreq->secs_pending += iov->secs;
			vbd->secs_pending  += iov->secs;
		}

		switch (vreq->op) {
		case TD_OP_WRITE:
			treq.op = TD_OP_WRITE;
			/*
			 * it's important to queue the mirror request before 
			 * queuing the main one. If the main image runs into 
			 * ENOSPC, the mirroring could be disabled before 
			 * td_queue_write returns, so if the mirror request was 
			 * queued after (which would then not happen), we'd 
			 * lose that write and cause the process to hang with 
			 * unacknowledged writes
			 */
			if (vbd->secondary_mode == TD_VBD_SECONDARY_MIRROR)
				queue_mirror_req(vbd, treq);
			td_queue_write(treq.image, treq);
			break;

		case TD_OP_READ:
			treq.op = TD_OP_READ;
			td_queue_read(treq.image, treq);
			break;
		}

		DBG(TLOG_DBG, "%s: req %s seg %d sec 0x%08"PRIx64" secs 0x%04x "
		    "buf %p op %d\n", image->name, vreq->name, i, treq.sec, treq.secs,
		    treq.buf, vreq->op);
		sec += iov->secs;
	}

	err = 0;

out:
	vreq->submitting--;
	if (!vreq->secs_pending) {
		err = (err ? : vreq->error);
		tapdisk_vbd_complete_vbd_request(vbd, vreq);
	}

	return err;

fail:
	vreq->error = err;
	goto out;
}

static int
tapdisk_vbd_request_completed(td_vbd_t *vbd, td_vbd_request_t *vreq)
{
	return vreq->list_head == &vbd->completed_requests;
}

static int
tapdisk_vbd_reissue_failed_requests(td_vbd_t *vbd)
{
	int err;
	struct timeval now;
	td_vbd_request_t *vreq, *tmp;

	err = 0;
	gettimeofday(&now, NULL);

	tapdisk_vbd_for_each_request(vreq, tmp, &vbd->failed_requests) {
		if (vreq->secs_pending)
			continue;

		if (td_flag_test(vbd->state, TD_VBD_SHUTDOWN_REQUESTED)) {
			tapdisk_vbd_complete_vbd_request(vbd, vreq);
			continue;
		}

		if (vreq->error != -EBUSY &&
		    now.tv_sec - vreq->last_try.tv_sec < TD_VBD_RETRY_INTERVAL)
			continue;

		vbd->retries++;
		vreq->num_retries++;

		vreq->prev_error = vreq->error;
		vreq->error      = 0;

		DBG(TLOG_DBG, "retry #%d of req %s, "
		    "sec 0x%08"PRIx64", iovcnt: %d\n", vreq->num_retries,
		    vreq->name, vreq->sec, vreq->iovcnt);

		err = tapdisk_vbd_issue_request(vbd, vreq);
		/*
		 * if this request failed, but was not completed,
		 * we'll back off for a while.
		 */
		if (err && !tapdisk_vbd_request_completed(vbd, vreq))
			break;
	}

	return 0;
}

static void
tapdisk_vbd_count_new_request(td_vbd_t *vbd, td_vbd_request_t *vreq)
{
	struct td_iovec *iov;
	int write;

	write = vreq->op == TD_OP_WRITE;

	for (iov = &vreq->iov[0]; iov < &vreq->iov[vreq->iovcnt]; iov++)
		td_sector_count_add(&vbd->secs, iov->secs, write);
}

static int
tapdisk_vbd_issue_new_requests(td_vbd_t *vbd)
{
	int err;
	td_vbd_request_t *vreq, *tmp;

	tapdisk_vbd_for_each_request(vreq, tmp, &vbd->new_requests) {
		err = tapdisk_vbd_issue_request(vbd, vreq);
		/*
		 * if this request failed, but was not completed,
		 * we'll back off for a while.
		 */
		if (err && !tapdisk_vbd_request_completed(vbd, vreq))
			return err;

		tapdisk_vbd_count_new_request(vbd, vreq);
	}

	return 0;
}

int
tapdisk_vbd_recheck_state(td_vbd_t *vbd)
{
	if (list_empty(&vbd->new_requests))
		return 0;

	if (td_flag_test(vbd->state, TD_VBD_QUIESCED) ||
	    td_flag_test(vbd->state, TD_VBD_QUIESCE_REQUESTED))
		return 0;

	tapdisk_vbd_issue_new_requests(vbd);

	return 1;
}

static int
tapdisk_vbd_kill_requests(td_vbd_t *vbd)
{
	td_vbd_request_t *vreq, *tmp;

	tapdisk_vbd_for_each_request(vreq, tmp, &vbd->new_requests) {
		vreq->error = -ESHUTDOWN;
		tapdisk_vbd_move_request(vreq, &vbd->completed_requests);
	}

	tapdisk_vbd_for_each_request(vreq, tmp, &vbd->failed_requests) {
		vreq->error = -ESHUTDOWN;
		tapdisk_vbd_move_request(vreq, &vbd->completed_requests);
	}

	return 0;
}

int
tapdisk_vbd_issue_requests(td_vbd_t *vbd)
{
	int err;

	if (td_flag_test(vbd->state, TD_VBD_DEAD))
		return tapdisk_vbd_kill_requests(vbd);

	if (td_flag_test(vbd->state, TD_VBD_QUIESCED) ||
	    td_flag_test(vbd->state, TD_VBD_QUIESCE_REQUESTED)) {

		if (td_flag_test(vbd->state, TD_VBD_RESUME_FAILED))
			return tapdisk_vbd_kill_requests(vbd);
		else
			return -EAGAIN;
	}

	err = tapdisk_vbd_reissue_failed_requests(vbd);
	if (err)
		return err;

	return tapdisk_vbd_issue_new_requests(vbd);
}

int
tapdisk_vbd_queue_request(td_vbd_t *vbd, td_vbd_request_t *vreq)
{
	gettimeofday(&vreq->ts, NULL);
	vreq->vbd = vbd;

	list_add_tail(&vreq->next, &vbd->new_requests);
	vbd->received++;

	return 0;
}

void
tapdisk_vbd_kick(td_vbd_t *vbd)
{
	const struct list_head *list = &vbd->completed_requests;
	td_vbd_request_t *vreq, *prev, *next;

	vbd->kicked++;

	while (!list_empty(list)) {

		/*
		 * Take one request off the completed requests list, and then look for
		 * other requests in the same list that have the same token and
		 * complete them. This way we complete requests against the same token
		 * in one go before we proceed to completing requests with other
		 * tokens. The token is usually used to point back to some other
		 * structure, e.g. a blktap or a tapdisk3 connexion. Once all requests
		 * with a specific token have been completed, proceed to the next one
		 * until the list is empty.
		 */
		prev = list_entry(list->next, td_vbd_request_t, next);
		list_del(&prev->next);

		tapdisk_vbd_for_each_request(vreq, next, list) {
			if (vreq->token == prev->token) {

				prev->cb(prev, prev->error, prev->token, 0);
				vbd->returned++;

				list_del(&vreq->next);
				prev = vreq;
			}
		}

		prev->cb(prev, prev->error, prev->token, 1);
		vbd->returned++;
	}
}

int
tapdisk_vbd_start_nbdserver(td_vbd_t *vbd)
{
	td_disk_info_t info;
	int err;

	err = tapdisk_vbd_get_disk_info(vbd, &info);

	if (err)
		return err;

	vbd->nbdserver = tapdisk_nbdserver_alloc(vbd, info);

	if (!vbd->nbdserver) {
		EPRINTF("Error starting nbd server");
		return -1;
	}

	err = tapdisk_nbdserver_listen_unix(vbd->nbdserver);
	if (err) {
		tapdisk_nbdserver_free(vbd->nbdserver);
		EPRINTF("failed to listen on the UNIX domain socket: %s\n",
				strerror(-err));
		return err;
	}

	return 0;
}


static int
tapdisk_vbd_reqs_outstanding(td_vbd_t *vbd)
{
	int new, pending, failed, completed;

	ASSERT(vbd);

	tapdisk_vbd_queue_count(vbd, &new, &pending, &failed, &completed);

	return new + pending + failed + completed;
}


void
tapdisk_vbd_stats(td_vbd_t *vbd, td_stats_t *st)
{
	td_image_t *image, *next;
    struct td_xenblkif *blkif;
	const bool read_caching =
		TD_OPEN_NO_O_DIRECT == (vbd->flags & TD_OPEN_NO_O_DIRECT);

	tapdisk_stats_enter(st, '{');
	tapdisk_stats_field(st, "name", "s", vbd->name);

	tapdisk_stats_field(st, "secs", "[");
	tapdisk_stats_val(st, "llu", vbd->secs.rd);
	tapdisk_stats_val(st, "llu", vbd->secs.wr);
	tapdisk_stats_leave(st, ']');

	tapdisk_stats_field(st, "images", "[");
	tapdisk_vbd_for_each_image(vbd, image, next)
		tapdisk_image_stats(image, st);
	tapdisk_stats_leave(st, ']');

	if (vbd->tap) {
		tapdisk_stats_field(st, "tap", "{");
		tapdisk_blktap_stats(vbd->tap, st);
		tapdisk_stats_leave(st, '}');
	}

    /*
     * TODO Is this used by any one?
     */
    if (!list_empty(&vbd->rings)) {
	    tapdisk_stats_field(st, "xenbus", "{");
        list_for_each_entry(blkif, &vbd->rings, entry)
		    tapdisk_xenblkif_stats(blkif, st);
    	tapdisk_stats_leave(st, '}');
    }

	tapdisk_stats_field(st,
			"FIXME_enospc_redirect_count",
			"llu", vbd->FIXME_enospc_redirect_count);

	tapdisk_stats_field(st,
			"nbd_mirror_failed",
			"d", vbd->nbd_mirror_failed);

	tapdisk_stats_field(st,
			"reqs_outstanding",
			"d", tapdisk_vbd_reqs_outstanding(vbd));

	tapdisk_stats_field(st,
			"read_caching",
			"s",  read_caching ? "true": "false");

	tapdisk_stats_leave(st, '}');
}


bool inline
tapdisk_vbd_contains_dead_rings(td_vbd_t * vbd)
{
    return !list_empty(&vbd->dead_rings);
}
//* 
 * Copyright (C) Citrix Systems Inc.
 *
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU General Public License
 * as published by the Free Software Foundation; version 2.1 only
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
 */

/*
 * block-vhd.c: asynchronous vhd implementation.
 *
 * A note on write transactions:
 * Writes that require updating the BAT or bitmaps cannot be signaled
 * as complete until all updates have reached disk.  Transactions are
 * used to ensure proper ordering in these cases.  The two types of
 * transactions are as follows:
 *   - Bitmap updates only: data writes that require updates to the same
 *     bitmap are grouped in a transaction.  Only after all data writes
 *     in a transaction complete does the bitmap write commence.  Only
 *     after the bitmap write finishes are the data writes signalled as
 *     complete.
 *   - BAT and bitmap updates: data writes are grouped in transactions
 *     as above, but a special extra write is included in the transaction,
 *     which zeros out the newly allocated bitmap on disk.  When the data
 *     writes and the zero-bitmap write complete, the BAT and bitmap writes
 *     are started in parallel.  The transaction is completed only after both
 *     the BAT and bitmap writes successfully return.
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
#include <uuid/uuid.h> /* For whatever reason, Linux packages this in */
                       /* e2fsprogs-devel.                            */
#include <string.h>    /* for memset.                                 */
#include <libaio.h>
#include <sys/mman.h>
#include <limits.h>

#include "debug.h"
#include "libvhd.h"
#include "tapdisk.h"
#include "tapdisk-driver.h"
#include "tapdisk-interface.h"
#include "tapdisk-disktype.h"
#include "tapdisk-storage.h"

unsigned int SPB;

#define DEBUGGING   2
#define MICROSOFT_COMPAT

#define VHD_BATMAP_MAX_RETRIES 10

#define __TRACE(s)							\
	do {								\
		DBG(TLOG_DBG, "%s: QUEUED: %" PRIu64 ", COMPLETED: %"	\
		    PRIu64", RETURNED: %" PRIu64 ", DATA_ALLOCATED: "	\
		    "%u, BBLK: 0x%04x\n",				\
		    s->vhd.file, s->queued, s->completed, s->returned,	\
		    VHD_REQS_DATA - s->vreq_free_count,			\
		    s->bat.pbw_blk);					\
	} while(0)

#if (DEBUGGING == 1)
  #define DBG(level, _f, _a...)      DPRINTF(_f, ##_a)
  #define ERR(_s, err, _f, _a...)    DPRINTF("ERROR: %d: " _f, err, ##_a)
  #define TRACE(s)                   ((void)0)
#elif (DEBUGGING == 2)
  #define DBG(level, _f, _a...)      tlog_write(level, _f, ##_a)
  #define ERR(_s, _err, _f, _a...)   tlog_drv_error((_s)->driver, _err, _f, ##_a)
  #define TRACE(s)                   __TRACE(s)
#else
  #define DBG(level, _f, _a...)      ((void)0)
  #define ERR(_s, err, _f, _a...)    ((void)0)
  #define TRACE(s)                   ((void)0)
#endif

/******VHD DEFINES******/
#define VHD_CACHE_SIZE               32

#define VHD_REQS_DATA                TAPDISK_DATA_REQUESTS
#define VHD_REQS_META                (VHD_CACHE_SIZE + 2)
#define VHD_REQS_TOTAL               (VHD_REQS_DATA + VHD_REQS_META)

#define VHD_OP_BAT_WRITE             0
#define VHD_OP_DATA_READ             1
#define VHD_OP_DATA_WRITE            2
#define VHD_OP_BITMAP_READ           3
#define VHD_OP_BITMAP_WRITE          4
#define VHD_OP_ZERO_BM_WRITE         5
#define VHD_OP_REDUNDANT_BM_WRITE    6

#define VHD_BM_BAT_LOCKED            0
#define VHD_BM_BAT_CLEAR             1
#define VHD_BM_BIT_CLEAR             2
#define VHD_BM_BIT_SET               3
#define VHD_BM_NOT_CACHED            4
#define VHD_BM_READ_PENDING          5

#define VHD_FLAG_OPEN_RDONLY         1
#define VHD_FLAG_OPEN_NO_CACHE       2
#define VHD_FLAG_OPEN_QUIET          4
#define VHD_FLAG_OPEN_STRICT         8
#define VHD_FLAG_OPEN_QUERY          16
#define VHD_FLAG_OPEN_PREALLOCATE    32
#define VHD_FLAG_OPEN_NO_O_DIRECT    64
#define VHD_FLAG_OPEN_LOCAL_CACHE    128

#define VHD_FLAG_BAT_LOCKED          1
#define VHD_FLAG_BAT_WRITE_STARTED   2

#define VHD_FLAG_BM_UPDATE_BAT       1
#define VHD_FLAG_BM_WRITE_PENDING    2
#define VHD_FLAG_BM_READ_PENDING     4
#define VHD_FLAG_BM_LOCKED           8

#define VHD_FLAG_REQ_UPDATE_BAT      1
#define VHD_FLAG_REQ_UPDATE_BITMAP   2
#define VHD_FLAG_REQ_QUEUED          4
#define VHD_FLAG_REQ_FINISHED        8

#define VHD_FLAG_TX_LIVE             1
#define VHD_FLAG_TX_UPDATE_BAT       2

typedef uint8_t vhd_flag_t;

struct vhd_state;
struct vhd_request;

struct vhd_req_list {
	struct vhd_request       *head;
	struct vhd_request       *tail;
};

struct vhd_transaction {
	int                       error;
	int                       closed;
	int                       started;
	int                       finished;
	vhd_flag_t                status;
	struct vhd_req_list       requests;
};

struct vhd_request {
	int                       error;
	uint8_t                   op;
	vhd_flag_t                flags;
	td_request_t              treq;
	struct tiocb              tiocb;
	struct vhd_state         *state;
	struct vhd_request       *next;
	struct vhd_transaction   *tx;
};

struct vhd_bat_state {
	vhd_bat_t                 bat;
	vhd_batmap_t              batmap;
	vhd_flag_t                status;
	uint32_t                  pbw_blk;     /* blk num of pending write */
	uint64_t                  pbw_offset;  /* file offset of same */
	struct vhd_request        req;         /* for writing bat table */
	struct vhd_request        zero_req;    /* for initializing bitmaps */
	char                     *bat_buf;
};

struct vhd_bitmap {
	uint32_t                  blk;
	uint64_t                  seqno;       /* lru sequence number */
	vhd_flag_t                status;

	char                     *map;         /* map should only be modified
					        * in finish_bitmap_write */
	char                     *shadow;      /* in-memory bitmap changes are 
					        * made to shadow and copied to
					        * map only after having been
					        * flushed to disk */
	struct vhd_transaction    tx;          /* transaction data structure
						* encapsulating data, bitmap, 
						* and bat writes */
	struct vhd_req_list       queue;       /* data writes waiting for next
						* transaction */
	struct vhd_req_list       waiting;     /* pending requests that cannot
					        * be serviced until this bitmap
					        * is read from disk */
	struct vhd_request        req;
};

struct vhd_state {
	vhd_flag_t                flags;

        /* VHD stuff */
	vhd_context_t             vhd;
	uint32_t                  spp;         /* sectors per page */
	uint32_t                  spb;         /* sectors per block */
	uint64_t                  first_db;    /* pointer to datablock 0 */

	/**
	 * Pointer to the next (unallocated) datablock. If greater than UINT_MAX,
	 * there are no more blocks available.
	 */
	uint64_t                  next_db;

	struct vhd_bat_state      bat;

	uint64_t                  bm_lru;      /* lru sequence number */
	uint32_t                  bm_secs;     /* size of bitmap, in sectors */
	struct vhd_bitmap        *bitmap[VHD_CACHE_SIZE];

	int                       bm_free_count;
	struct vhd_bitmap        *bitmap_free[VHD_CACHE_SIZE];
	struct vhd_bitmap         bitmap_list[VHD_CACHE_SIZE];

	int                       vreq_free_count;
	struct vhd_request       *vreq_free[VHD_REQS_DATA];
	struct vhd_request        vreq_list[VHD_REQS_DATA];

	/* for redundant bitmap writes */
	int                       padbm_size;
	char                     *padbm_buf;
	long int                  debug_skipped_redundant_writes;
	long int                  debug_done_redundant_writes;

	td_driver_t              *driver;

	uint64_t                  queued;
	uint64_t                  completed;
	uint64_t                  returned;
	uint64_t                  reads;
	uint64_t                  read_size;
	uint64_t                  writes;
	uint64_t                  write_size;
};

#define test_vhd_flag(word, flag)  ((word) & (flag))
#define set_vhd_flag(word, flag)   ((word) |= (flag))
#define clear_vhd_flag(word, flag) ((word) &= ~(flag))

#define bat_entry(s, blk)          ((s)->bat.bat.bat[(blk)])

static void vhd_complete(void *, struct tiocb *, int);
static void finish_data_transaction(struct vhd_state *, struct vhd_bitmap *);

static struct vhd_state  *_vhd_master;
static unsigned long      _vhd_zsize;
static char              *_vhd_zeros = NULL;
int                       _dev_zero = -1;

static int
vhd_initialize(struct vhd_state *s)
{
	int err;

	if (_vhd_zeros)
		return 0;

	_vhd_zsize = 2 * getpagesize();
	if (test_vhd_flag(s->flags, VHD_FLAG_OPEN_PREALLOCATE))
		_vhd_zsize += VHD_BLOCK_SIZE;

	_dev_zero = open("/dev/zero", O_RDONLY);
	if (unlikely(_dev_zero == -1)) {
		err = errno;
		EPRINTF("failed to open /dev/zero: %s\n", strerror(err));
		return -err;
	}

	_vhd_zeros = mmap(NULL, _vhd_zsize, PROT_READ,
			  MAP_SHARED, _dev_zero, 0);
	if (_vhd_zeros == MAP_FAILED) {
		int _err;
		err = errno;
		EPRINTF("vhd_initialize failed: %s\n", strerror(err));
		_vhd_zeros = NULL;
		_vhd_zsize = 0;
		_err = close(_dev_zero);
		if (unlikely(_err == -1))
			EPRINTF("failed to close /dev/zero: %s (error ignored)\n",
					strerror(errno));
		else
			_dev_zero = -1;

		return -err;
	}

	_vhd_master = s;
	return 0;
}

static void
vhd_free(struct vhd_state *s)
{
	if (_vhd_master != s || !_vhd_zeros)
		return;

	free(s->padbm_buf);
	munmap(_vhd_zeros, _vhd_zsize);
	_vhd_zsize  = 0;
	_vhd_zeros  = NULL;
	_vhd_master = NULL;
	if (_dev_zero != -1) {
		int _err = close(_dev_zero);
		if (unlikely(_err == -1))
			EPRINTF("failed to close /dev/zero: %s (error ignored)\n",
					strerror(errno));
		else
			_dev_zero = -1;
	}
}

static char *
_get_vhd_zeros(const char *func, unsigned long size)
{
	if (!_vhd_zeros || _vhd_zsize < size) {
		EPRINTF("invalid zero request from %s: %lu, %lu, %p\n",
			func, size, _vhd_zsize, _vhd_zeros);
		ASSERT(0);
	}

	return _vhd_zeros;
}

#define vhd_zeros(size)	_get_vhd_zeros(__func__, size)

static inline void
set_batmap(struct vhd_state *s, uint32_t blk)
{
	if (s->bat.batmap.map) {
		vhd_batmap_set(&s->vhd, &s->bat.batmap, blk);
		DBG(TLOG_DBG, "block 0x%x completely full\n", blk);
	}
}

static inline int
test_batmap(struct vhd_state *s, uint32_t blk)
{
	if (!s->bat.batmap.map)
		return 0;
	return vhd_batmap_test(&s->vhd, &s->bat.batmap, blk);
}

static int
vhd_kill_footer(struct vhd_state *s)
{
	int err;
	off64_t end;
	void *zeros;

	if (s->vhd.footer.type == HD_TYPE_FIXED)
		return 0;

	err = posix_memalign(&zeros, 512, 512);
	if (err)
		return -err;

	err = 1;
	memset(zeros, 0xc7c7c7c7, 512);

	if ((end = lseek64(s->vhd.fd, 0, SEEK_END)) == -1)
		goto fail;

	if (lseek64(s->vhd.fd, (end - 512), SEEK_SET) == -1)
		goto fail;

	if (write(s->vhd.fd, zeros, 512) != 512)
		goto fail;

	err = 0;

 fail:
	free(zeros);
	if (err)
		return (errno ? -errno : -EIO);
	return 0;
}

static inline int
find_next_free_block(struct vhd_state *s)
{
	int err;
	off64_t eom;
	uint32_t i, entry;

	err = vhd_end_of_headers(&s->vhd, &eom);
	if (err)
		return err;

	s->next_db = secs_round_up(eom);
	s->first_db = s->next_db;
	if ((s->first_db + s->bm_secs) % s->spp)
		s->first_db += (s->spp - ((s->first_db + s->bm_secs) % s->spp));

	for (i = 0; i < s->bat.bat.entries; i++) {
		entry = bat_entry(s, i);
		if (entry != DD_BLK_UNUSED && entry >= s->next_db)
			s->next_db = (uint64_t)entry + (uint64_t)s->spb
				+ (uint64_t)s->bm_secs;
			if (s->next_db > UINT_MAX)
				break;
	}

	return 0;
}

static void
vhd_free_bat(struct vhd_state *s)
{
	free(s->bat.bat.bat);
	free(s->bat.batmap.map);
	free(s->bat.bat_buf);
	memset(&s->bat, 0, sizeof(struct vhd_bat));
}

static int
vhd_initialize_bat(struct vhd_state *s)
{
	int err, batmap_required, i;
	void *buf;

	memset(&s->bat, 0, sizeof(struct vhd_bat));

	err = vhd_read_bat(&s->vhd, &s->bat.bat);
	if (err) {
		EPRINTF("%s: reading bat: %d\n", s->vhd.file, err);
		return err;
	}

	batmap_required = 1;
	if (test_vhd_flag(s->flags, VHD_FLAG_OPEN_RDONLY)) {
		batmap_required = 0;
	} else {
		err = find_next_free_block(s);
		if (err)
			goto fail;
	}

	if (vhd_has_batmap(&s->vhd)) {
		for (i = 0; i < VHD_BATMAP_MAX_RETRIES; i++) {
			err = vhd_read_batmap(&s->vhd, &s->bat.batmap);
			if (err) {
				EPRINTF("%s: reading batmap: %d\n",
						s->vhd.file, err);
				if (batmap_required)
					goto fail;
			} else {
				break;
			}
		}
		if (err)
			EPRINTF("%s: ignoring non-critical batmap error\n",
					s->vhd.file);
	}

	err = posix_memalign(&buf, VHD_SECTOR_SIZE, VHD_SECTOR_SIZE);
	if (err)
		goto fail;

	s->bat.bat_buf = buf;

	return 0;

fail:
	vhd_free_bat(s);
	return err;
}

static void
vhd_free_bitmap_cache(struct vhd_state *s)
{
	int i;
	struct vhd_bitmap *bm;

	for (i = 0; i < VHD_CACHE_SIZE; i++) {
		bm = s->bitmap_list + i;
		free(bm->map);
		free(bm->shadow);
		s->bitmap_free[i] = NULL;
	}

	memset(s->bitmap_list, 0, sizeof(struct vhd_bitmap) * VHD_CACHE_SIZE);
}

static int
vhd_initialize_bitmap_cache(struct vhd_state *s)
{
	int i, err, map_size;
	struct vhd_bitmap *bm;
	void *map, *shadow;

	memset(s->bitmap_list, 0, sizeof(struct vhd_bitmap) * VHD_CACHE_SIZE);

	s->bm_lru        = 0;
	map_size         = vhd_sectors_to_bytes(s->bm_secs);
	s->bm_free_count = VHD_CACHE_SIZE;

	for (i = 0; i < VHD_CACHE_SIZE; i++) {
		bm = s->bitmap_list + i;

		err = posix_memalign(&map, 512, map_size);
		if (err)
			goto fail;

		bm->map = map;

		err = posix_memalign(&shadow, 512, map_size);
		if (err)
			goto fail;

		bm->shadow = shadow;

		memset(bm->map, 0, map_size);
		memset(bm->shadow, 0, map_size);
		s->bitmap_free[i] = bm;
	}

	return 0;

fail:
	vhd_free_bitmap_cache(s);
	return err;
}

static int
vhd_initialize_dynamic_disk(struct vhd_state *s)
{
	uint32_t bm_size;
	void *buf;
	int err;

	err = vhd_get_header(&s->vhd);
	if (err) {
		if (!test_vhd_flag(s->flags, VHD_FLAG_OPEN_QUIET))
			EPRINTF("Error reading VHD DD header.\n");
		return err;
	}

	if (s->vhd.header.hdr_ver != 0x00010000) {
		EPRINTF("unsupported header version! (0x%x)\n",
			s->vhd.header.hdr_ver);
		return -EINVAL;
	}

	s->spp     = getpagesize() >> VHD_SECTOR_SHIFT;
	s->spb     = s->vhd.header.block_size >> VHD_SECTOR_SHIFT;
	s->bm_secs = secs_round_up_no_zero(s->spb >> 3);

	s->padbm_size = (s->bm_secs / getpagesize()) * getpagesize();
	if (s->bm_secs % getpagesize())
		s->padbm_size += getpagesize();

	err = posix_memalign(&buf, 512, s->padbm_size);
	if (err)
		return -err;

	s->padbm_buf = buf;
	bm_size = s->bm_secs << VHD_SECTOR_SHIFT;
	memset(s->padbm_buf, 0, s->padbm_size - bm_size);
	memset(s->padbm_buf + (s->padbm_size - bm_size), ~0, bm_size);
	s->debug_skipped_redundant_writes = 0;
	s->debug_done_redundant_writes = 0;

	if (test_vhd_flag(s->flags, VHD_FLAG_OPEN_NO_CACHE))
		return 0;

	err = vhd_initialize_bat(s);
	if (err)
		return err;

	err = vhd_initialize_bitmap_cache(s);
	if (err) {
		vhd_free_bat(s);
		return err;
	}

	return 0;
}

static int
vhd_check_version(struct vhd_state *s)
{
	if (strncmp(s->vhd.footer.crtr_app, "tap", 3))
		return 0;

	if (s->vhd.footer.crtr_ver > VHD_CURRENT_VERSION) {
		if (!test_vhd_flag(s->flags, VHD_FLAG_OPEN_QUIET))
			EPRINTF("WARNING: %s vhd creator version 0x%08x, "
				"but only versions up to 0x%08x are "
				"supported for IO\n", s->vhd.file,
				s->vhd.footer.crtr_ver, VHD_CURRENT_VERSION);

		return -EINVAL;
	}

	return 0;
}

static void
vhd_log_open(struct vhd_state *s)
{
	char buf[5];
	uint32_t i, allocated, full;

	if (test_vhd_flag(s->flags, VHD_FLAG_OPEN_QUIET))
		return;

	snprintf(buf, sizeof(buf), "%s", s->vhd.footer.crtr_app);
	if (!vhd_type_dynamic(&s->vhd)) {
		DPRINTF("%s version: %s 0x%08x\n",
			s->vhd.file, buf, s->vhd.footer.crtr_ver);
		return;
	}

	allocated = 0;
	full      = 0;

	for (i = 0; i < s->bat.bat.entries; i++) {
		if (bat_entry(s, i) != DD_BLK_UNUSED)
			allocated++;
		if (test_batmap(s, i))
			full++;
	}

	DPRINTF("%s version: %s 0x%08x, b: %u, a: %u, f: %u, n: %"PRIu64"\n",
		s->vhd.file, buf, s->vhd.footer.crtr_ver, s->bat.bat.entries,
		allocated, full, s->next_db);
}

static int
__vhd_open(td_driver_t *driver, const char *name, vhd_flag_t flags)
{
        int i, o_flags, err;
	struct vhd_state *s;

        DBG(TLOG_INFO, "vhd_open: %s\n", name);
	if (test_vhd_flag(flags, VHD_FLAG_OPEN_STRICT))
		libvhd_set_log_level(1);

	s = (struct vhd_state *)driver->data;
	memset(s, 0, sizeof(struct vhd_state));

	s->flags  = flags;
	s->driver = driver;

	err = vhd_initialize(s);
	if (err)
		return err;

	o_flags = ((test_vhd_flag(flags, VHD_FLAG_OPEN_RDONLY)) ? 
		   VHD_OPEN_RDONLY : VHD_OPEN_RDWR);
	if ((test_vhd_flag(flags, VHD_FLAG_OPEN_RDONLY) ||
                test_vhd_flag(flags, VHD_FLAG_OPEN_LOCAL_CACHE)) &&
	    test_vhd_flag(flags, VHD_FLAG_OPEN_NO_O_DIRECT))
		set_vhd_flag(o_flags, VHD_OPEN_CACHED);

	if (test_vhd_flag(flags, VHD_FLAG_OPEN_STRICT))
		set_vhd_flag(o_flags, VHD_OPEN_STRICT);

	err = vhd_open(&s->vhd, name, o_flags);
	if (err) {
		libvhd_set_log_level(1);
		err = vhd_open(&s->vhd, name, o_flags);
		if (err) {
			EPRINTF("Unable to open [%s] (%d)!\n", name, err);
			return err;
		}
	}

	err = vhd_check_version(s);
	if (err)
		goto fail;

	s->spb = s->spp = 1;

	if (vhd_type_dynamic(&s->vhd)) {
		err = vhd_initialize_dynamic_disk(s);
		if (err)
			goto fail;
	}

	vhd_log_open(s);

	SPB = s->spb;

	s->vreq_free_count = VHD_REQS_DATA;
	for (i = 0; i < VHD_REQS_DATA; i++)
		s->vreq_free[i] = s->vreq_list + i;

	driver->info.size        = s->vhd.footer.curr_size >> VHD_SECTOR_SHIFT;
	driver->info.sector_size = VHD_SECTOR_SIZE;
	driver->info.info        = 0;

        DBG(TLOG_INFO, "vhd_open: done (sz:%"PRIu64", sct:%lu, inf:%u)\n",
	    driver->info.size, driver->info.sector_size, driver->info.info);

	if (test_vhd_flag(flags, VHD_FLAG_OPEN_STRICT) && 
	    !test_vhd_flag(flags, VHD_FLAG_OPEN_RDONLY)) {
		err = vhd_kill_footer(s);
		if (err) {
			DPRINTF("ERROR killing footer: %d\n", err);
			goto fail;
		}
		s->writes++;
	}

        return 0;

 fail:
	vhd_free_bat(s);
	vhd_free_bitmap_cache(s);
	vhd_close(&s->vhd);
	vhd_free(s);
	return err;
}

static int
_vhd_open(td_driver_t *driver, const char *name, td_flag_t flags)
{
	vhd_flag_t vhd_flags = 0;

	if (flags & TD_OPEN_RDONLY)
		vhd_flags |= VHD_FLAG_OPEN_RDONLY;
	if (flags & TD_OPEN_NO_O_DIRECT)
		vhd_flags |= VHD_FLAG_OPEN_NO_O_DIRECT;
	if (flags & TD_OPEN_QUIET)
		vhd_flags |= VHD_FLAG_OPEN_QUIET;
	if (flags & TD_OPEN_STRICT)
		vhd_flags |= VHD_FLAG_OPEN_STRICT;
	if (flags & TD_OPEN_QUERY)
		vhd_flags |= (VHD_FLAG_OPEN_QUERY  |
			      VHD_FLAG_OPEN_QUIET  |
			      VHD_FLAG_OPEN_RDONLY |
			      VHD_FLAG_OPEN_NO_CACHE);
    if (flags & TD_OPEN_LOCAL_CACHE)
        vhd_flags |= VHD_FLAG_OPEN_LOCAL_CACHE;

	/* pre-allocate for all but NFS and LVM storage */
	driver->storage = tapdisk_storage_type(name);

	if (driver->storage != TAPDISK_STORAGE_TYPE_NFS &&
	    driver->storage != TAPDISK_STORAGE_TYPE_LVM)
		vhd_flags |= VHD_FLAG_OPEN_PREALLOCATE;

	return __vhd_open(driver, name, vhd_flags);
}

static void
vhd_log_close(struct vhd_state *s)
{
	uint32_t i, allocated, full;

	if (test_vhd_flag(s->flags, VHD_FLAG_OPEN_QUIET))
		return;

	allocated = 0;
	full      = 0;

	for (i = 0; i < s->bat.bat.entries; i++) {
		if (bat_entry(s, i) != DD_BLK_UNUSED)
			allocated++;
		if (test_batmap(s, i))
			full++;
	}

	DPRINTF("%s: b: %u, a: %u, f: %u, n: %"PRIu64"\n",
		s->vhd.file, s->bat.bat.entries, allocated, full, s->next_db);
}

static int
_vhd_close(td_driver_t *driver)
{
	int err;
	struct vhd_state *s;
	
	DBG(TLOG_WARN, "vhd_close\n");
	s = (struct vhd_state *)driver->data;

	DPRINTF("gaps written/skipped: %ld/%ld\n", 
			s->debug_done_redundant_writes,
			s->debug_skipped_redundant_writes);

	/* don't write footer if tapdisk is read-only */
	if (test_vhd_flag(s->flags, VHD_FLAG_OPEN_RDONLY))
		goto free;
	
	/* 
	 * write footer if:
	 *   - we killed it on open (opened with strict) 
	 *   - we've written data since opening
	 */
	if (test_vhd_flag(s->flags, VHD_FLAG_OPEN_STRICT) || s->writes) {
		memcpy(&s->vhd.bat, &s->bat.bat, sizeof(vhd_bat_t));
		err = vhd_write_footer(&s->vhd, &s->vhd.footer);
		memset(&s->vhd.bat, 0, sizeof(vhd_bat_t));

		if (err)
			EPRINTF("writing %s footer: %d\n", s->vhd.file, err);

		if (!vhd_has_batmap(&s->vhd))
			goto free;

		err = vhd_write_batmap(&s->vhd, &s->bat.batmap);
		if (err)
			EPRINTF("writing %s batmap: %d\n", s->vhd.file, err);
	}

 free:
	vhd_log_close(s);
	vhd_free_bat(s);
	vhd_free_bitmap_cache(s);
	vhd_close(&s->vhd);
	vhd_free(s);

	memset(s, 0, sizeof(struct vhd_state));

	return 0;
}

int
vhd_validate_parent(td_driver_t *child_driver,
		    td_driver_t *parent_driver, td_flag_t flags)
{
	struct vhd_state *child  = (struct vhd_state *)child_driver->data;
	struct vhd_state *parent;

	if (parent_driver->type != DISK_TYPE_VHD) {
		if (child_driver->type != DISK_TYPE_VHD)
			return -EINVAL;
		if (child->vhd.footer.type != HD_TYPE_DIFF)
			return -EINVAL;
		if (!vhd_parent_raw(&child->vhd))
			return -EINVAL;
		return 0;
	}

	parent = (struct vhd_state *)parent_driver->data;

	/* 
	 * This check removed because of cases like:
	 *   - parent VHD marked as 'hidden'
	 *   - parent VHD modified during coalesce
	 */
	/*
	if (stat(parent->vhd.file, &stats)) {
		DPRINTF("ERROR stating parent file %s\n", parent->vhd.file);
		return -errno;
	}

	if (child->hdr.prt_ts != vhd_time(stats.st_mtime)) {
		DPRINTF("ERROR: parent file has been modified since "
			"snapshot.  Child image no longer valid.\n");
		return -EINVAL;
	}
	*/

	if (uuid_compare(child->vhd.header.prt_uuid, parent->vhd.footer.uuid)) {
		DPRINTF("ERROR: %s: %s, %s: parent uuid has changed since "
			"snapshot.  Child image no longer valid.\n",
			__func__, child->vhd.file, parent->vhd.file);
		return -EINVAL;
	}

	/* TODO: compare sizes */
	
	return 0;
}

int
vhd_get_parent_id(td_driver_t *driver, td_disk_id_t *id)
{
	int err;
	char *parent;
	struct vhd_state *s;
	int flags;

	DBG(TLOG_DBG, "\n");
	flags = id->flags;
	memset(id, 0, sizeof(td_disk_id_t));

	s = (struct vhd_state *)driver->data;

	if (s->vhd.footer.type != HD_TYPE_DIFF)
		return TD_NO_PARENT;

	err = vhd_parent_locator_get(&s->vhd, &parent);
	if (err)
		return err;

	id->name   = parent;
	id->type   = vhd_parent_raw(&s->vhd) ? DISK_TYPE_AIO : DISK_TYPE_VHD;
	id->flags  = flags|TD_OPEN_SHAREABLE|TD_OPEN_RDONLY;

	return 0;
}

static inline void
clear_req_list(struct vhd_req_list *list)
{
	list->head = list->tail = NULL;
}

static inline void
add_to_tail(struct vhd_req_list *list, struct vhd_request *e)
{
	if (!list->head) 
		list->head = list->tail = e;
	else 
		list->tail = list->tail->next = e;
}

static inline int
remove_from_req_list(struct vhd_req_list *list, struct vhd_request *e)
{
	struct vhd_request *i = list->head;

	if (list->head == e) {
		if (list->tail == e)
			clear_req_list(list);
		else
			list->head = list->head->next;
		return 0;
	}

	while (i->next) {
		if (i->next == e) {
			if (list->tail == e) {
				i->next = NULL;
				list->tail = i;
			} else
				i->next = i->next->next;
			return 0;
		}
		i = i->next;
	}

	return -EINVAL;
}

static inline void
init_vhd_request(struct vhd_state *s, struct vhd_request *req)
{
	memset(req, 0, sizeof(struct vhd_request));
	req->state = s;
}

static inline void
init_tx(struct vhd_transaction *tx)
{
	memset(tx, 0, sizeof(struct vhd_transaction));
}

static inline void
add_to_transaction(struct vhd_transaction *tx, struct vhd_request *r)
{
	ASSERT(!tx->closed);

	r->tx = tx;
	tx->started++;
	add_to_tail(&tx->requests, r);
	set_vhd_flag(tx->status, VHD_FLAG_TX_LIVE);

	DBG(TLOG_DBG, "blk: 0x%04"PRIx64", lsec: 0x%08"PRIx64", tx: %p, "
	    "started: %d, finished: %d, status: %u\n",
	    r->treq.sec / SPB, r->treq.sec, tx,
	    tx->started, tx->finished, tx->status);
}

static inline int
transaction_completed(struct vhd_transaction *tx)
{
	return (tx->started == tx->finished);
}

static inline void
init_bat(struct vhd_state *s)
{
	s->bat.req.tx     = NULL;
	s->bat.req.next   = NULL;
	s->bat.req.error  = 0;
	s->bat.pbw_blk    = 0;
	s->bat.pbw_offset = 0;
	s->bat.status     = 0;
}

static inline void
lock_bat(struct vhd_state *s)
{
	set_vhd_flag(s->bat.status, VHD_FLAG_BAT_LOCKED);
}

static inline void
unlock_bat(struct vhd_state *s)
{
	clear_vhd_flag(s->bat.status, VHD_FLAG_BAT_LOCKED);
}

static inline int
bat_locked(struct vhd_state *s)
{
	return test_vhd_flag(s->bat.status, VHD_FLAG_BAT_LOCKED);
}

static inline void
init_vhd_bitmap(struct vhd_state *s, struct vhd_bitmap *bm)
{
	bm->blk    = 0;
	bm->seqno  = 0;
	bm->status = 0;
	init_tx(&bm->tx);
	clear_req_list(&bm->queue);
	clear_req_list(&bm->waiting);
	memset(bm->map, 0, vhd_sectors_to_bytes(s->bm_secs));
	memset(bm->shadow, 0, vhd_sectors_to_bytes(s->bm_secs));
	init_vhd_request(s, &bm->req);
}

static inline struct vhd_bitmap *
get_bitmap(struct vhd_state *s, uint32_t block)
{
	int i;
	struct vhd_bitmap *bm;

	for (i = 0; i < VHD_CACHE_SIZE; i++) {
		bm = s->bitmap[i];
		if (bm && bm->blk == block)
			return bm;
	}

	return NULL;
}

static inline void
lock_bitmap(struct vhd_bitmap *bm)
{
	set_vhd_flag(bm->status, VHD_FLAG_BM_LOCKED);
}

static inline void
unlock_bitmap(struct vhd_bitmap *bm)
{
	clear_vhd_flag(bm->status, VHD_FLAG_BM_LOCKED);
}

static inline int
bitmap_locked(struct vhd_bitmap *bm)
{
	return test_vhd_flag(bm->status, VHD_FLAG_BM_LOCKED);
}

static inline int
bitmap_valid(struct vhd_bitmap *bm)
{
	return !test_vhd_flag(bm->status, VHD_FLAG_BM_READ_PENDING);
}

static inline int
bitmap_in_use(struct vhd_bitmap *bm)
{
	return (test_vhd_flag(bm->status, VHD_FLAG_BM_READ_PENDING)  ||
		test_vhd_flag(bm->status, VHD_FLAG_BM_WRITE_PENDING) ||
		test_vhd_flag(bm->tx.status, VHD_FLAG_TX_UPDATE_BAT) ||
		bm->waiting.head || bm->tx.requests.head || bm->queue.head);
}

static inline int
bitmap_full(struct vhd_state *s, struct vhd_bitmap *bm)
{
	int i, n;

	n = s->spb >> 3;
	for (i = 0; i < n; i++)
		if (bm->map[i] != (char)0xFF)
			return 0;

	DBG(TLOG_DBG, "bitmap 0x%04x full\n", bm->blk);
	return 1;
}

static struct vhd_bitmap *
remove_lru_bitmap(struct vhd_state *s)
{
	int i, idx = 0;
	uint64_t seq = s->bm_lru;
	struct vhd_bitmap *bm, *lru = NULL;

	for (i = 0; i < VHD_CACHE_SIZE; i++) {
		bm = s->bitmap[i];
		if (bm && bm->seqno < seq && !bitmap_locked(bm)) {
			idx = i;
			lru = bm;
			seq = lru->seqno;
		}
	}

	if (lru) {
		s->bitmap[idx] = NULL;
		ASSERT(!bitmap_in_use(lru));
	}

	return  lru;
}

static int
alloc_vhd_bitmap(struct vhd_state *s, struct vhd_bitmap **bitmap, uint32_t blk)
{
	struct vhd_bitmap *bm;
	
	*bitmap = NULL;

	if (s->bm_free_count > 0) {
		bm = s->bitmap_free[--s->bm_free_count];
	} else {
		bm = remove_lru_bitmap(s);
		if (!bm)
			return -EBUSY;
	}

	init_vhd_bitmap(s, bm);
	bm->blk = blk;
	*bitmap = bm;

	return 0;
}

static inline uint64_t
__bitmap_lru_seqno(struct vhd_state *s)
{
	int i;
	struct vhd_bitmap *bm;

	if (s->bm_lru == 0xffffffff) {
		s->bm_lru = 0;
		for (i = 0; i < VHD_CACHE_SIZE; i++) {
			bm = s->bitmap[i];
			if (bm) {
				bm->seqno >>= 1;
				if (bm->seqno > s->bm_lru)
					s->bm_lru = bm->seqno;
			}
		}
	}

	return ++s->bm_lru;
}

static inline void
touch_bitmap(struct vhd_state *s, struct vhd_bitmap *bm)
{
	bm->seqno = __bitmap_lru_seqno(s);
}

static inline void
install_bitmap(struct vhd_state *s, struct vhd_bitmap *bm)
{
	int i;
	for (i = 0; i < VHD_CACHE_SIZE; i++) {
		if (!s->bitmap[i]) {
			touch_bitmap(s, bm);
			s->bitmap[i] = bm;
			return;
		}
	}

	ASSERT(0);
}

static inline void
free_vhd_bitmap(struct vhd_state *s, struct vhd_bitmap *bm)
{
	int i;

	for (i = 0; i < VHD_CACHE_SIZE; i++)
		if (s->bitmap[i] == bm)
			break;

	ASSERT(!bitmap_locked(bm));
	ASSERT(!bitmap_in_use(bm));
	ASSERT(i < VHD_CACHE_SIZE);

	s->bitmap[i] = NULL;
	s->bitmap_free[s->bm_free_count++] = bm;
}

static int
read_bitmap_cache(struct vhd_state *s, uint64_t sector, uint8_t op)
{
	uint32_t blk, sec;
	struct vhd_bitmap *bm;

	/* in fixed disks, every block is present */
	if (s->vhd.footer.type == HD_TYPE_FIXED) 
		return VHD_BM_BIT_SET;

	/* the extent the logical sector falls in */
	blk = sector / s->spb;

	/* offset within the extent the logical sector is located */
	sec = sector % s->spb;

	if (blk > s->vhd.header.max_bat_size) {
		DPRINTF("ERROR: sec %"PRIu64" out of range, op = %d\n",
			sector, op);
		return -EINVAL;
	}

	if (bat_entry(s, blk) == DD_BLK_UNUSED) {
		if (op == VHD_OP_DATA_WRITE &&
		    s->bat.pbw_blk != blk && bat_locked(s))
			return VHD_BM_BAT_LOCKED;

		return VHD_BM_BAT_CLEAR;
	}

	if (test_batmap(s, blk)) {
		DBG(TLOG_DBG, "batmap set for 0x%04x\n", blk);
		return VHD_BM_BIT_SET;
	}

	bm = get_bitmap(s, blk);
	if (!bm)
		return VHD_BM_NOT_CACHED;

	/* bump lru count */
	touch_bitmap(s, bm);

	if (test_vhd_flag(bm->status, VHD_FLAG_BM_READ_PENDING))
		return VHD_BM_READ_PENDING;

	return ((vhd_bitmap_test(&s->vhd, bm->map, sec)) ? 
		VHD_BM_BIT_SET : VHD_BM_BIT_CLEAR);
}

static int
read_bitmap_cache_span(struct vhd_state *s, 
		       uint64_t sector, int nr_secs, int value)
{
	int ret;
	uint32_t blk, sec;
	struct vhd_bitmap *bm;

	/* in fixed disks, every block is present */
	if (s->vhd.footer.type == HD_TYPE_FIXED) 
		return nr_secs;

	sec = sector % s->spb;
	blk = sector / s->spb;

	if (test_batmap(s, blk))
		return MIN(nr_secs, s->spb - sec);

	bm  = get_bitmap(s, blk);
	
	ASSERT(bm && bitmap_valid(bm));

	for (ret = 0; sec < s->spb && ret < nr_secs; sec++, ret++)
		if (vhd_bitmap_test(&s->vhd, bm->map, sec) != value)
			break;

	return ret;
}

static inline struct vhd_request *
alloc_vhd_request(struct vhd_state *s)
{
	struct vhd_request *req = NULL;
	
	if (s->vreq_free_count > 0) {
		req = s->vreq_free[--s->vreq_free_count];
		ASSERT(req->treq.secs == 0);
		init_vhd_request(s, req);
		return req;
	}

	return NULL;
}

static inline void
free_vhd_request(struct vhd_state *s, struct vhd_request *req)
{
	memset(req, 0, sizeof(struct vhd_request));
	s->vreq_free[s->vreq_free_count++] = req;
}

static inline void
aio_read(struct vhd_state *s, struct vhd_request *req, uint64_t offset)
{
	struct tiocb *tiocb = &req->tiocb;

	td_prep_read(tiocb, s->vhd.fd, req->treq.buf,
		     vhd_sectors_to_bytes(req->treq.secs),
		     offset, vhd_complete, req);
	td_queue_tiocb(s->driver, tiocb);

	s->queued++;
	s->reads++;
	s->read_size += req->treq.secs;
	TRACE(s);
}

static inline void
aio_write(struct vhd_state *s, struct vhd_request *req, uint64_t offset)
{
	struct tiocb *tiocb = &req->tiocb;

	td_prep_write(tiocb, s->vhd.fd, req->treq.buf,
		      vhd_sectors_to_bytes(req->treq.secs),
		      offset, vhd_complete, req);
	td_queue_tiocb(s->driver, tiocb);

	s->queued++;
	s->writes++;
	s->write_size += req->treq.secs;
	TRACE(s);
}

/**
 * Reserves a new extent.
 *
 * @returns a 64-bit unsigned integer where the error code is stored in the
 * upper 32 bits and the reserved block number is stored in the lower 32 bits.
 * If an error is returned (the upper 32 bits are not zero), the lower 32 bits
 * are undefined.
 */
static inline uint64_t
reserve_new_block(struct vhd_state *s, uint32_t blk)
{
	int gap = 0;

	ASSERT(!test_vhd_flag(s->bat.status, VHD_FLAG_BAT_WRITE_STARTED));

	/* data region of segment should begin on page boundary */
	if ((s->next_db + s->bm_secs) % s->spp)
		gap = (s->spp - ((s->next_db + s->bm_secs) % s->spp));

	if (s->next_db + gap > UINT_MAX)
		return (uint64_t)ENOSPC << 32;

	s->bat.pbw_blk    = blk;
	s->bat.pbw_offset = s->next_db + gap;

	return s->next_db;
}

static int
schedule_bat_write(struct vhd_state *s)
{
	int i;
	uint32_t blk;
	char *buf;
	uint64_t offset;
	struct vhd_request *req;

	ASSERT(bat_locked(s));

	req = &s->bat.req;
	buf = s->bat.bat_buf;
	blk = s->bat.pbw_blk;

	init_vhd_request(s, req);
	memcpy(buf, &bat_entry(s, blk - (blk % 128)), 512);

	((uint32_t *)buf)[blk % 128] = s->bat.pbw_offset;

	for (i = 0; i < 128; i++)
		BE32_OUT(&((uint32_t *)buf)[i]);

	offset         = s->vhd.header.table_offset + (blk - (blk % 128)) * 4;
	req->treq.secs = 1;
	req->treq.buf  = buf;
	req->op        = VHD_OP_BAT_WRITE;
	req->next      = NULL;

	aio_write(s, req, offset);
	set_vhd_flag(s->bat.status, VHD_FLAG_BAT_WRITE_STARTED);

	DBG(TLOG_DBG, "blk: 0x%04x, pbwo: 0x%08"PRIx64", "
	    "table_offset: 0x%08"PRIx64"\n", blk, s->bat.pbw_offset, offset);

	return 0;
}

static void
schedule_zero_bm_write(struct vhd_state *s,
		       struct vhd_bitmap *bm, uint64_t lb_end)
{
	uint64_t offset;
	struct vhd_request *req = &s->bat.zero_req;

	init_vhd_request(s, req);

	offset         = vhd_sectors_to_bytes(lb_end);
	req->op        = VHD_OP_ZERO_BM_WRITE;
	req->treq.sec  = s->bat.pbw_blk * s->spb;
	req->treq.secs = (s->bat.pbw_offset - lb_end) + s->bm_secs;
	req->treq.buf  = vhd_zeros(vhd_sectors_to_bytes(req->treq.secs));
	req->next      = NULL;

	DBG(TLOG_DBG, "blk: 0x%04x, writing zero bitmap at 0x%08"PRIx64"\n",
	    s->bat.pbw_blk, offset);

	lock_bitmap(bm);
	add_to_transaction(&bm->tx, req);
	aio_write(s, req, offset);
}

/* This is a performance optimization. When writing sequentially into full 
 * blocks, skipping (up-to-date) bitmaps causes an approx. 25% reduction in 
 * throughput. To prevent skipping, we issue redundant writes into the (padded) 
 * bitmap area just to make all writes sequential. This will help VHDs on raw 
 * block devices, while the FS-based VHDs shouldn't suffer much.
 *
 * Note that it only makes sense to perform this reduntant bitmap write if the 
 * block is completely full (i.e. the batmap entry is set). If the block is not 
 * completely full then one of the following two things will be true:
 *  1. we'll either be allocating new sectors in this block and writing its
 *     bitmap transactionally, which will be slow anyways; or
 *  2. the IO will be skipping over the unallocated sectors again, so the
 *     pattern will not be sequential anyways
 * In either case a redundant bitmap write becomes pointless. This fact 
 * simplifies the implementation of redundant writes: since we know the bitmap 
 * cannot be updated by anyone else, we don't have to worry about transactions 
 * or potential write conflicts.
 * */
static void
schedule_redundant_bm_write(struct vhd_state *s, uint32_t blk)
{
	uint64_t offset;
	struct vhd_request *req;

	ASSERT(s->vhd.footer.type != HD_TYPE_FIXED);
	ASSERT(test_batmap(s, blk));

	req = alloc_vhd_request(s);
	if (!req) 
		return;

	req->treq.buf = s->padbm_buf;

	offset = bat_entry(s, blk);
	ASSERT(offset != DD_BLK_UNUSED);
	offset <<= VHD_SECTOR_SHIFT;
	offset -= s->padbm_size - (s->bm_secs << VHD_SECTOR_SHIFT);

	req->op        = VHD_OP_REDUNDANT_BM_WRITE;
	req->treq.sec  = blk * s->spb;
	req->treq.secs = s->padbm_size >> VHD_SECTOR_SHIFT;
	req->next      = NULL;

	DBG(TLOG_DBG, "blk: %u, writing redundant bitmap at %" PRIu64 "\n",
	    blk, offset);

	aio_write(s, req, offset);
}

static int
update_bat(struct vhd_state *s, uint32_t blk)
{
	int err;
	uint64_t lb_end;
	struct vhd_bitmap *bm;

	ASSERT(bat_entry(s, blk) == DD_BLK_UNUSED);
	
	if (bat_locked(s)) {
		ASSERT(s->bat.pbw_blk == blk);
		return 0;
	}

	/* empty bitmap could already be in
	 * cache if earlier bat update failed */
	bm = get_bitmap(s, blk);
	if (!bm) {
		/* install empty bitmap in cache */
		err = alloc_vhd_bitmap(s, &bm, blk);
		if (err) 
			return err;

		install_bitmap(s, bm);
	}

	lock_bat(s);
	lb_end = reserve_new_block(s, blk);
	if (lb_end >> 32) {
		unlock_bat(s);
		return -(lb_end >> 32);
	}
	schedule_zero_bm_write(s, bm, lb_end);
	set_vhd_flag(bm->tx.status, VHD_FLAG_TX_UPDATE_BAT);

	return 0;
}

static int
allocate_block(struct vhd_state *s, uint32_t blk)
{
	int err, gap;
	uint64_t offset, size;
	struct vhd_bitmap *bm;
	ssize_t count;
	uint64_t next_db;

	ASSERT(bat_entry(s, blk) == DD_BLK_UNUSED);

	if (bat_locked(s)) {
		ASSERT(s->bat.pbw_blk == blk);
		if (s->bat.req.error)
			return -EBUSY;
		return 0;
	}

	gap     = 0;
	offset  = vhd_sectors_to_bytes(s->next_db);
	next_db = s->next_db;

	/* data region of segment should begin on page boundary */
	if ((next_db + s->bm_secs) % s->spp) {
		gap = (s->spp - ((next_db + s->bm_secs) % s->spp));
		next_db += gap;
	}

	if (next_db > UINT_MAX)
		return -ENOSPC;

	s->next_db = next_db;

	s->bat.pbw_blk = blk;
	s->bat.pbw_offset = s->next_db;

	DBG(TLOG_DBG, "blk: 0x%04x, pbwo: 0x%08"PRIx64"\n",
	    blk, s->bat.pbw_offset);

	if (lseek(s->vhd.fd, offset, SEEK_SET) == (off_t)-1) {
		ERR(s, -errno, "lseek failed\n");
		return -errno;
	}

	size  = vhd_sectors_to_bytes(s->spb + s->bm_secs + gap);
	count = write(s->vhd.fd, vhd_zeros(size), size);
	if (count != size) {
		err = count < 0 ? -errno : -ENOSPC;
		ERR(s, -errno,
		    "write failed (%zd, offset %"PRIu64")\n", count, offset);
		return err;
	}

	/* empty bitmap could already be in
	 * cache if earlier bat update failed */
	bm = get_bitmap(s, blk);
	if (!bm) {
		/* install empty bitmap in cache */
		err = alloc_vhd_bitmap(s, &bm, blk);
		if (err) 
			return err;

		install_bitmap(s, bm);
	}

	lock_bat(s);
	lock_bitmap(bm);
	schedule_bat_write(s);
	add_to_transaction(&bm->tx, &s->bat.req);

	return 0;
}

static int 
schedule_data_read(struct vhd_state *s, td_request_t treq, vhd_flag_t flags)
{
	uint64_t offset;
	uint32_t blk = 0, sec = 0;
	struct vhd_bitmap  *bm;
	struct vhd_request *req;

	if (s->vhd.footer.type == HD_TYPE_FIXED) {
		offset = vhd_sectors_to_bytes(treq.sec);
		goto make_request;
	}

	blk    = treq.sec / s->spb;
	sec    = treq.sec % s->spb;
	bm     = get_bitmap(s, blk);
	offset = bat_entry(s, blk);

	ASSERT(offset != DD_BLK_UNUSED);
	ASSERT(test_batmap(s, blk) || (bm && bitmap_valid(bm)));

	offset += s->bm_secs + sec;
	offset  = vhd_sectors_to_bytes(offset);

 make_request:
	req = alloc_vhd_request(s);
	if (!req) 
		return -EBUSY;

	req->treq  = treq;
	req->flags = flags;
	req->op    = VHD_OP_DATA_READ;
	req->next  = NULL;

	aio_read(s, req, offset);

	DBG(TLOG_DBG, "%s: lsec: 0x%08"PRIx64", blk: 0x%04x, sec: 0x%04x, "
	    "nr_secs: 0x%04x, offset: 0x%08"PRIx64", flags: 0x%08x, buf: %p\n",
	    s->vhd.file, treq.sec, blk, sec, treq.secs, offset, req->flags,
	    treq.buf);

	return 0;
}

static int
schedule_data_write(struct vhd_state *s, td_request_t treq, vhd_flag_t flags)
{
	int err;
	uint64_t offset;
	uint32_t blk = 0, sec = 0;
	struct vhd_bitmap  *bm = NULL;
	struct vhd_request *req;

	if (s->vhd.footer.type == HD_TYPE_FIXED) {
		offset = vhd_sectors_to_bytes(treq.sec);
		goto make_request;
	}

	blk    = treq.sec / s->spb;
	sec    = treq.sec % s->spb;
	offset = bat_entry(s, blk);

	if (test_vhd_flag(flags, VHD_FLAG_REQ_UPDATE_BAT)) {
		if (test_vhd_flag(s->flags, VHD_FLAG_OPEN_PREALLOCATE))
			err = allocate_block(s, blk);
		else
			err = update_bat(s, blk);

		if (err)
			return err;

		offset = s->bat.pbw_offset;
	}

	offset += s->bm_secs + sec;
	offset  = vhd_sectors_to_bytes(offset);

 make_request:
	req = alloc_vhd_request(s);
	if (!req)
		return -EBUSY;

	req->treq  = treq;
	req->flags = flags;
	req->op    = VHD_OP_DATA_WRITE;
	req->next  = NULL;

	if (test_vhd_flag(flags, VHD_FLAG_REQ_UPDATE_BITMAP)) {
		bm = get_bitmap(s, blk);
		ASSERT(bm && bitmap_valid(bm));
		lock_bitmap(bm);

		if (bm->tx.closed) {
			add_to_tail(&bm->queue, req);
			set_vhd_flag(req->flags, VHD_FLAG_REQ_QUEUED);
		} else
			add_to_transaction(&bm->tx, req);
	} else if (sec == 0 && 	/* first sector inside data block */
		   s->vhd.footer.type != HD_TYPE_FIXED && 
		   bat_entry(s, blk) != s->first_db &&
		   test_batmap(s, blk))
		schedule_redundant_bm_write(s, blk);

	aio_write(s, req, offset);

	DBG(TLOG_DBG, "%s: lsec: 0x%08"PRIx64", blk: 0x%04x, sec: 0x%04x, "
	    "nr_secs: 0x%04x, offset: 0x%08"PRIx64", flags: 0x%08x\n",
	    s->vhd.file, treq.sec, blk, sec, treq.secs, offset, req->flags);

	return 0;
}

static int 
schedule_bitmap_read(struct vhd_state *s, uint32_t blk)
{
	int err;
	uint64_t offset;
	struct vhd_bitmap  *bm;
	struct vhd_request *req = NULL;

	ASSERT(vhd_type_dynamic(&s->vhd));

	offset = bat_entry(s, blk);

	ASSERT(offset != DD_BLK_UNUSED);
	ASSERT(!get_bitmap(s, blk));

	offset = vhd_sectors_to_bytes(offset);

	err = alloc_vhd_bitmap(s, &bm, blk);
	if (err)
		return err;

	req = &bm->req;
	init_vhd_request(s, req);

	req->treq.sec  = blk * s->spb;
	req->treq.secs = s->bm_secs;
	req->treq.buf  = bm->map;
	req->treq.cb   = NULL;
	req->op        = VHD_OP_BITMAP_READ;
	req->next      = NULL;

	aio_read(s, req, offset);
	lock_bitmap(bm);
	install_bitmap(s, bm);
	set_vhd_flag(bm->status, VHD_FLAG_BM_READ_PENDING);

	DBG(TLOG_DBG, "%s: lsec: 0x%08"PRIx64", blk: 0x%04x, nr_secs: 0x%04x, "
	    "offset: 0x%08"PRIx64"\n", s->vhd.file, req->treq.sec, blk,
	    req->treq.secs, offset);

	return 0;
}

static void
schedule_bitmap_write(struct vhd_state *s, uint32_t blk)
{
	uint64_t offset;
	struct vhd_bitmap  *bm;
	struct vhd_request *req;

	bm     = get_bitmap(s, blk);
	offset = bat_entry(s, blk);

	ASSERT(vhd_type_dynamic(&s->vhd));
	ASSERT(bm && bitmap_valid(bm) &&
	       !test_vhd_flag(bm->status, VHD_FLAG_BM_WRITE_PENDING));

	if (offset == DD_BLK_UNUSED) {
		ASSERT(bat_locked(s) && s->bat.pbw_blk == blk);
		offset = s->bat.pbw_offset;
	}
	
	offset = vhd_sectors_to_bytes(offset);

	req = &bm->req;
	init_vhd_request(s, req);

	req->treq.sec  = blk * s->spb;
	req->treq.secs = s->bm_secs;
	req->treq.buf  = bm->shadow;
	req->treq.cb   = NULL;
	req->op        = VHD_OP_BITMAP_WRITE;
	req->next      = NULL;

	aio_write(s, req, offset);
	lock_bitmap(bm);
	touch_bitmap(s, bm);     /* bump lru count */
	set_vhd_flag(bm->status, VHD_FLAG_BM_WRITE_PENDING);

	DBG(TLOG_DBG, "%s: blk: 0x%04x, sec: 0x%08"PRIx64", nr_secs: 0x%04x, "
	    "offset: 0x%"PRIx64"\n", s->vhd.file, blk, req->treq.sec,
	    req->treq.secs, offset);
}

/* 
 * queued requests will be submitted once the bitmap
 * describing them is read and the requests are validated. 
 */
static int
__vhd_queue_request(struct vhd_state *s, uint8_t op, td_request_t treq)
{
	uint32_t blk;
	struct vhd_bitmap  *bm;
	struct vhd_request *req;

	ASSERT(vhd_type_dynamic(&s->vhd));

	blk = treq.sec / s->spb;
	bm  = get_bitmap(s, blk);

	ASSERT(bm && test_vhd_flag(bm->status, VHD_FLAG_BM_READ_PENDING));

	req = alloc_vhd_request(s);
	if (!req)
		return -EBUSY;

	req->treq = treq;
	req->op   = op;
	req->next = NULL;

	add_to_tail(&bm->waiting, req);
	lock_bitmap(bm);

	DBG(TLOG_DBG, "%s: lsec: 0x%08"PRIx64", blk: 0x%04x nr_secs: 0x%04x, "
	    "op: %u\n", s->vhd.file, treq.sec, blk, treq.secs, op);

	TRACE(s);
	return 0;
}

static void
vhd_queue_read(td_driver_t *driver, td_request_t treq)
{
	struct vhd_state *s = (struct vhd_state *)driver->data;

	DBG(TLOG_DBG, "%s: lsec: 0x%08"PRIx64", secs: 0x%04x (seg: %d)\n",
	    s->vhd.file, treq.sec, treq.secs, treq.sidx);

	while (treq.secs) {
		int err;
		td_request_t clone;

		err   = 0;
		clone = treq;

		switch (read_bitmap_cache(s, clone.sec, VHD_OP_DATA_READ)) {
		case -EINVAL:
			err = -EINVAL;
			goto fail;

		case VHD_BM_BAT_CLEAR:
			clone.secs = MIN(clone.secs, s->spb - (clone.sec % s->spb));
			td_forward_request(clone);
			break;

		case VHD_BM_BIT_CLEAR:
			clone.secs = read_bitmap_cache_span(s, clone.sec, clone.secs, 0);
			td_forward_request(clone);
			break;

		case VHD_BM_BIT_SET:
			clone.secs = read_bitmap_cache_span(s, clone.sec, clone.secs, 1);
			err = schedule_data_read(s, clone, 0);
			if (err)
				goto fail;
			break;

		case VHD_BM_NOT_CACHED:
			err = schedule_bitmap_read(s, clone.sec / s->spb);
			if (err)
				goto fail;

			clone.secs = MIN(clone.secs, s->spb - (clone.sec % s->spb));
			err = __vhd_queue_request(s, VHD_OP_DATA_READ, clone);
			if (err)
				goto fail;
			break;

		case VHD_BM_READ_PENDING:
			clone.secs = MIN(clone.secs, s->spb - (clone.sec % s->spb));
			err = __vhd_queue_request(s, VHD_OP_DATA_READ, clone);
			if (err)
				goto fail;
			break;

		case VHD_BM_BAT_LOCKED:
		default:
			ASSERT(0);
			break;
		}

		treq.sec  += clone.secs;
		treq.secs -= clone.secs;
		treq.buf  += vhd_sectors_to_bytes(clone.secs);
		continue;

	fail:
		clone.secs = treq.secs;
		td_complete_request(clone, err);
		break;
	}
}

static void
vhd_queue_write(td_driver_t *driver, td_request_t treq)
{
	struct vhd_state *s = (struct vhd_state *)driver->data;

	DBG(TLOG_DBG, "%s: lsec: 0x%08"PRIx64", secs: 0x%04x, (seg: %d)\n",
	    s->vhd.file, treq.sec, treq.secs, treq.sidx);

	while (treq.secs) {
		int err;
		uint8_t flags;
		td_request_t clone;

		err   = 0;
		flags = 0;
		clone = treq;

		switch (read_bitmap_cache(s, clone.sec, VHD_OP_DATA_WRITE)) {
		case -EINVAL:
			err = -EINVAL;
			goto fail;

		case VHD_BM_BAT_LOCKED:
			err = -EBUSY;
			goto fail;

		case VHD_BM_BAT_CLEAR:
			flags      = (VHD_FLAG_REQ_UPDATE_BAT |
				      VHD_FLAG_REQ_UPDATE_BITMAP);
			clone.secs = MIN(clone.secs, s->spb - (clone.sec % s->spb));
			err        = schedule_data_write(s, clone, flags);
			if (err)
				goto fail;
			break;

		case VHD_BM_BIT_CLEAR:
			flags      = VHD_FLAG_REQ_UPDATE_BITMAP;
			clone.secs = read_bitmap_cache_span(s, clone.sec, clone.secs, 0);
			err        = schedule_data_write(s, clone, flags);
			if (err)
				goto fail;
			break;

		case VHD_BM_BIT_SET:
			clone.secs = read_bitmap_cache_span(s, clone.sec, clone.secs, 1);
			err = schedule_data_write(s, clone, 0);
			if (err)
				goto fail;
			break;

		case VHD_BM_NOT_CACHED:
			clone.secs = MIN(clone.secs, s->spb - (clone.sec % s->spb));
			err = schedule_bitmap_read(s, clone.sec / s->spb);
			if (err)
				goto fail;

			err = __vhd_queue_request(s, VHD_OP_DATA_WRITE, clone);
			if (err)
				goto fail;
			break;

		case VHD_BM_READ_PENDING:
			clone.secs = MIN(clone.secs, s->spb - (clone.sec % s->spb));
			err = __vhd_queue_request(s, VHD_OP_DATA_WRITE, clone);
			if (err)
				goto fail;
			break;

		default:
			ASSERT(0);
			break;
		}

		treq.sec  += clone.secs;
		treq.secs -= clone.secs;
		treq.buf  += vhd_sectors_to_bytes(clone.secs);
		continue;

	fail:
		clone.secs = treq.secs;
		td_complete_request(clone, err);
		break;
	}
}

static inline void
signal_completion(struct vhd_request *list, int error)
{
	struct vhd_state *s;
	struct vhd_request *r, *next;

	if (!list)
		return;

	r = list;
	s = list->state;

	while (r) {
		int err;

		err  = (error ? error : r->error);
		next = r->next;
		td_complete_request(r->treq, err);
		DBG(TLOG_DBG, "lsec: 0x%08"PRIx64", blk: 0x%04"PRIx64", "
		    "err: %d\n", r->treq.sec, r->treq.sec / s->spb, err);
		free_vhd_request(s, r);
		r    = next;

		s->returned++;
		TRACE(s);
	}
}

static void
start_new_bitmap_transaction(struct vhd_state *s, struct vhd_bitmap *bm)
{
	struct vhd_transaction *tx;
	struct vhd_request *r, *next;
	int i;

	if (!bm->queue.head)
		return;

	DBG(TLOG_DBG, "blk: 0x%04x\n", bm->blk);

	r  = bm->queue.head;
	tx = &bm->tx;
	clear_req_list(&bm->queue);

	if (r && bat_entry(s, bm->blk) == DD_BLK_UNUSED)
		tx->error = -EIO;

	while (r) {
		next    = r->next;
		r->next = NULL;
		clear_vhd_flag(r->flags, VHD_FLAG_REQ_QUEUED);

		add_to_transaction(tx, r);
		if (test_vhd_flag(r->flags, VHD_FLAG_REQ_FINISHED)) {
			tx->finished++;
			if (!r->error) {
				uint32_t sec = r->treq.sec % s->spb;
				for (i = 0; i < r->treq.secs; i++)
					vhd_bitmap_set(&s->vhd,
						       bm->shadow, sec + i);
			}
		}
		r = next;
	}

	/* perhaps all the queued writes already completed? */
	if (tx->started && transaction_completed(tx))
		finish_data_transaction(s, bm);
}

static void
finish_bat_transaction(struct vhd_state *s, struct vhd_bitmap *bm)
{
	struct vhd_transaction *tx = &bm->tx;

	if (!bat_locked(s))
		return;

	if (s->bat.pbw_blk != bm->blk)
		return;

	if (!s->bat.req.error)
		goto release;

	if (!test_vhd_flag(tx->status, VHD_FLAG_TX_LIVE))
		goto release;

	tx->closed = 1;
	return;

 release:
	DBG(TLOG_DBG, "blk: 0x%04x\n", bm->blk);
	unlock_bat(s);
	init_bat(s);
}

static void
finish_bitmap_transaction(struct vhd_state *s,
			  struct vhd_bitmap *bm, int error)
{
	int map_size;
	struct vhd_transaction *tx = &bm->tx;

	DBG(TLOG_DBG, "blk: 0x%04x, err: %d\n", bm->blk, error);
	tx->error = (tx->error ? tx->error : error);
	map_size  = vhd_sectors_to_bytes(s->bm_secs);

	if (!test_vhd_flag(s->flags, VHD_FLAG_OPEN_PREALLOCATE)) {
		if (test_vhd_flag(tx->status, VHD_FLAG_TX_UPDATE_BAT)) {
			/* still waiting for bat write */
			ASSERT(bm->blk == s->bat.pbw_blk);
			ASSERT(test_vhd_flag(s->bat.status, 
					     VHD_FLAG_BAT_WRITE_STARTED));
			s->bat.req.tx = tx;
			return;
		}
	}

	if (tx->error) {
		/* undo changes to shadow */
		memcpy(bm->shadow, bm->map, map_size);
	} else {
		/* complete atomic write */
		memcpy(bm->map, bm->shadow, map_size);
		if (!test_batmap(s, bm->blk) && bitmap_full(s, bm))
			set_batmap(s, bm->blk);
	}

	/* transaction done; signal completions */
	signal_completion(tx->requests.head, tx->error);
	init_tx(tx);
	start_new_bitmap_transaction(s, bm);

	if (!bitmap_in_use(bm))
		unlock_bitmap(bm);

	finish_bat_transaction(s, bm);
}

static void
finish_data_transaction(struct vhd_state *s, struct vhd_bitmap *bm)
{
	struct vhd_transaction *tx = &bm->tx;

	DBG(TLOG_DBG, "blk: 0x%04x\n", bm->blk);

	tx->closed = 1;

	if (!tx->error)
		return schedule_bitmap_write(s, bm->blk);

	return finish_bitmap_transaction(s, bm, 0);
}

static void
finish_bat_write(struct vhd_request *req)
{
	struct vhd_bitmap *bm;
	struct vhd_transaction *tx;
	struct vhd_state *s = req->state;

	s->returned++;
	TRACE(s);

	bm = get_bitmap(s, s->bat.pbw_blk);

	DBG(TLOG_DBG, "blk 0x%04x, pbwo: 0x%08"PRIx64", err %d\n",
	    s->bat.pbw_blk, s->bat.pbw_offset, req->error);
	ASSERT(bm && bitmap_valid(bm));
	ASSERT(bat_locked(s) &&
	       test_vhd_flag(s->bat.status, VHD_FLAG_BAT_WRITE_STARTED));

	tx = &bm->tx;
	ASSERT(test_vhd_flag(tx->status, VHD_FLAG_TX_LIVE));

	if (!req->error) {
		bat_entry(s, s->bat.pbw_blk) = s->bat.pbw_offset;
		s->next_db = s->bat.pbw_offset + s->spb + s->bm_secs;
	} else
		tx->error = req->error;

	if (test_vhd_flag(s->flags, VHD_FLAG_OPEN_PREALLOCATE)) {
		tx->finished++;
		remove_from_req_list(&tx->requests, req);
		if (transaction_completed(tx))
			finish_data_transaction(s, bm);
	} else {
		clear_vhd_flag(tx->status, VHD_FLAG_TX_UPDATE_BAT);
		if (s->bat.req.tx)
			finish_bitmap_transaction(s, bm, req->error);
	}

	finish_bat_transaction(s, bm);
}

static void
finish_zero_bm_write(struct vhd_request *req)
{
	uint32_t blk;
	struct vhd_bitmap *bm;
	struct vhd_transaction *tx = req->tx;
	struct vhd_state *s = req->state;

	s->returned++;
	TRACE(s);

	blk = req->treq.sec / s->spb;
	bm  = get_bitmap(s, blk);

	DBG(TLOG_DBG, "blk: 0x%04x\n", blk);
	ASSERT(bat_locked(s));
	ASSERT(s->bat.pbw_blk == blk);
	ASSERT(bm && bitmap_valid(bm) && bitmap_locked(bm));

	tx->finished++;
	remove_from_req_list(&tx->requests, req);

	if (req->error) {
		unlock_bat(s);
		init_bat(s);
		tx->error = req->error;
		clear_vhd_flag(tx->status, VHD_FLAG_TX_UPDATE_BAT);
	} else
		schedule_bat_write(s);

	if (transaction_completed(tx))
		finish_data_transaction(s, bm);
}

static int
finish_redundant_bm_write(struct vhd_request *req)
{
	/* uint32_t blk; */
	struct vhd_state *s = (struct vhd_state *) req->state;

	s->returned++;
	TRACE(s);	
	/* blk = req->treq.sec / s->spb;
	   DBG(TLOG_DBG, "blk: %u\n", blk); */

	if (req->error) {
		ERR(s, req->error, "lsec: 0x%08"PRIx64, req->treq.sec);
	}
	free_vhd_request(s, req);
	s->debug_done_redundant_writes++;
	return 0;
}


static void
finish_bitmap_read(struct vhd_request *req)
{
	uint32_t blk;
	struct vhd_bitmap  *bm;
	struct vhd_request *r, *next;
	struct vhd_state   *s = req->state;

	s->returned++;
	TRACE(s);

	blk = req->treq.sec / s->spb;
	bm  = get_bitmap(s, blk);

	DBG(TLOG_DBG, "blk: 0x%04x\n", blk);
	ASSERT(bm && test_vhd_flag(bm->status, VHD_FLAG_BM_READ_PENDING));

	r = bm->waiting.head;
	clear_req_list(&bm->waiting);
	clear_vhd_flag(bm->status, VHD_FLAG_BM_READ_PENDING);

	if (!req->error) {
		memcpy(bm->shadow, bm->map, vhd_sectors_to_bytes(s->bm_secs));

		while (r) {
			struct vhd_request tmp;

			tmp  = *r;
			next =  r->next;
			free_vhd_request(s, r);

			ASSERT(tmp.op == VHD_OP_DATA_READ || 
			       tmp.op == VHD_OP_DATA_WRITE);

			if (tmp.op == VHD_OP_DATA_READ)
				vhd_queue_read(s->driver, tmp.treq);
			else if (tmp.op == VHD_OP_DATA_WRITE)
				vhd_queue_write(s->driver, tmp.treq);

			r = next;
		}
	} else {
		int err = req->error;
		unlock_bitmap(bm);
		free_vhd_bitmap(s, bm);
		return signal_completion(r, err);
	}

	if (!bitmap_in_use(bm))
		unlock_bitmap(bm);
}

static void
finish_bitmap_write(struct vhd_request *req)
{
	uint32_t blk;
	struct vhd_bitmap  *bm;
	struct vhd_transaction *tx;
	struct vhd_state *s = req->state;

	s->returned++;
	TRACE(s);

	blk = req->treq.sec / s->spb;
	bm  = get_bitmap(s, blk);
	tx  = &bm->tx;

	DBG(TLOG_DBG, "blk: 0x%04x, started: %d, finished: %d\n",
	    blk, tx->started, tx->finished);
	ASSERT(tx->closed);
	ASSERT(bm && bitmap_valid(bm));
	ASSERT(test_vhd_flag(bm->status, VHD_FLAG_BM_WRITE_PENDING));

	clear_vhd_flag(bm->status, VHD_FLAG_BM_WRITE_PENDING);

	finish_bitmap_transaction(s, bm, req->error);
}

static void
finish_data_read(struct vhd_request *req)
{
	struct vhd_state *s = req->state;

	DBG(TLOG_DBG, "lsec 0x%08"PRIx64", blk: 0x%04"PRIx64"\n", 
	    req->treq.sec, req->treq.sec / s->spb);
	signal_completion(req, 0);
}

static void
finish_data_write(struct vhd_request *req)
{
	int i;
	struct vhd_transaction *tx = req->tx;
	struct vhd_state *s = (struct vhd_state *)req->state;

	set_vhd_flag(req->flags, VHD_FLAG_REQ_FINISHED);

	if (tx) {
		uint32_t blk, sec;
		struct vhd_bitmap *bm;

		blk = req->treq.sec / s->spb;
		sec = req->treq.sec % s->spb;
		bm  = get_bitmap(s, blk);

		ASSERT(bm && bitmap_valid(bm) && bitmap_locked(bm));

		tx->finished++;

		DBG(TLOG_DBG, "lsec: 0x%08"PRIx64", blk: 0x04%"PRIx64", "
		    "tx->started: %d, tx->finished: %d\n", req->treq.sec,
		    req->treq.sec / s->spb, tx->started, tx->finished);

		if (!req->error)
			for (i = 0; i < req->treq.secs; i++)
				vhd_bitmap_set(&s->vhd, bm->shadow,  sec + i);

		if (transaction_completed(tx))
			finish_data_transaction(s, bm);

	} else if (!test_vhd_flag(req->flags, VHD_FLAG_REQ_QUEUED)) {
		ASSERT(!req->next);
		DBG(TLOG_DBG, "lsec: 0x%08"PRIx64", blk: 0x%04"PRIx64"\n", 
		    req->treq.sec, req->treq.sec / s->spb);
		signal_completion(req, 0);
	}
}

void
vhd_complete(void *arg, struct tiocb *tiocb, int err)
{
	struct vhd_request *req = (struct vhd_request *)arg;
	struct vhd_state *s = req->state;
	struct iocb *io = &tiocb->iocb;

	s->completed++;
	TRACE(s);

	req->error = err;

	if (req->error)
		ERR(s, req->error, "%s: op: %u, lsec: %"PRIu64", secs: %u, "
		    "nbytes: %lu, blk: %"PRIu64", blk_offset: %u",
		    s->vhd.file, req->op, req->treq.sec, req->treq.secs,
		    io->u.c.nbytes, req->treq.sec / s->spb,
		    bat_entry(s, req->treq.sec / s->spb));

	switch (req->op) {
	case VHD_OP_DATA_READ:
		finish_data_read(req);
		break;

	case VHD_OP_DATA_WRITE:
		finish_data_write(req);
		break;

	case VHD_OP_BITMAP_READ:
		finish_bitmap_read(req);
		break;

	case VHD_OP_BITMAP_WRITE:
		finish_bitmap_write(req);
		break;

	case VHD_OP_ZERO_BM_WRITE:
		finish_zero_bm_write(req);
		break;

	case VHD_OP_REDUNDANT_BM_WRITE:
		finish_redundant_bm_write(req);
		break;

	case VHD_OP_BAT_WRITE:
		finish_bat_write(req);
		break;

	default:
		ASSERT(0);
		break;
	}
}

void 
vhd_debug(td_driver_t *driver)
{
	int i;
	struct vhd_state *s = (struct vhd_state *)driver->data;

	DBG(TLOG_WARN, "%s: QUEUED: 0x%08"PRIx64", COMPLETED: 0x%08"PRIx64", "
	    "RETURNED: 0x%08"PRIx64"\n", s->vhd.file, s->queued, s->completed,
	    s->returned);
	DBG(TLOG_WARN, "WRITES: 0x%08"PRIx64", AVG_WRITE_SIZE: %f\n",
	    s->writes, (s->writes ? ((float)s->write_size / s->writes) : 0.0));
	DBG(TLOG_WARN, "READS: 0x%08"PRIx64", AVG_READ_SIZE: %f\n",
	    s->reads, (s->reads ? ((float)s->read_size / s->reads) : 0.0));

	DBG(TLOG_WARN, "ALLOCATED REQUESTS: (%u total)\n", VHD_REQS_DATA);
	for (i = 0; i < VHD_REQS_DATA; i++) {
		struct vhd_request *r = &s->vreq_list[i];
		td_request_t *t       = &r->treq;
		const char *vname     = t->vreq ? t->vreq->name: NULL;
		if (t->secs)
			DBG(TLOG_WARN, "%d: vreq: %s.%d, err: %d, op: %d,"
			    " lsec: 0x%08"PRIx64", flags: %d, this: %p, "
			    "next: %p, tx: %p\n", i, vname, t->sidx, r->error, r->op,
			    t->sec, r->flags, r, r->next, r->tx);
	}

	DBG(TLOG_WARN, "BITMAP CACHE:\n");
	for (i = 0; i < VHD_CACHE_SIZE; i++) {
		int qnum = 0, wnum = 0, rnum = 0;
		struct vhd_bitmap *bm = s->bitmap[i];
		struct vhd_transaction *tx;
		struct vhd_request *r;

		if (!bm)
			continue;

		tx = &bm->tx;
		r = bm->queue.head;
		while (r) {
			qnum++;
			r = r->next;
		}

		r = bm->waiting.head;
		while (r) {
			wnum++;
			r = r->next;
		}

		r = tx->requests.head;
		while (r) {
			rnum++;
			r = r->next;
		}

		DBG(TLOG_WARN, "%d: blk: 0x%04x, status: 0x%08x, q: %p, qnum: %d, w: %p, "
		    "wnum: %d, locked: %d, in use: %d, tx: %p, tx_error: %d, "
		    "started: %d, finished: %d, status: %u, reqs: %p, nreqs: %d\n",
		    i, bm->blk, bm->status, bm->queue.head, qnum, bm->waiting.head,
		    wnum, bitmap_locked(bm), bitmap_in_use(bm), tx, tx->error,
		    tx->started, tx->finished, tx->status, tx->requests.head, rnum);
	}

	DBG(TLOG_WARN, "BAT: status: 0x%08x, pbw_blk: 0x%04x, "
	    "pbw_off: 0x%08"PRIx64", tx: %p\n", s->bat.status, s->bat.pbw_blk,
	    s->bat.pbw_offset, s->bat.req.tx);

/*
	for (i = 0; i < s->hdr.max_bat_size; i++)
		DPRINTF("%d: %u\n", i, s->bat.bat[i]);
*/
}

struct tap_disk tapdisk_vhd = {
	.disk_type          = "tapdisk_vhd",
	.flags              = 0,
	.private_data_size  = sizeof(struct vhd_state),
	.td_open            = _vhd_open,
	.td_close           = _vhd_close,
	.td_queue_read      = vhd_queue_read,
	.td_queue_write     = vhd_queue_write,
	.td_get_parent_id   = vhd_get_parent_id,
	.td_validate_parent = vhd_validate_parent,
	.td_debug           = vhd_debug,
};
_______________________________________________
Xen-devel mailing list
Xen-devel@lists.xen.org
http://lists.xen.org/xen-devel

Reply via email to