> dd -if /dev/sdE0/data -of /dev/sdF0/data -bs 1048576

i was thinking about your technique, and it occured to me
that this command is equvalent to
        for(;;)
                read(E0, buf, 1mb)
                write(F0, buf, 1mb)
but if you wrote it like this
        dd -if data -bs 64k -count 20000 |dd -bs 64k -of ../sda1/data
the read and write could be run in parallel, at the expense
of a buffer copy.  i didn't have anything except for some
very fast (120mb/s) sas disks to test with, but even they
showed 10% performance improvement. even at the expense
of copies
        0.01u 0.62s 11.63r       rc -c dd -if data -bs 64k -count 20000 -of 
../sda1/data
        0.02u 0.97s 10.72r       rc -c dd -if data -bs 64k -count 20000|dd -bs 
64k -of ../sda1/data
not all that impressive with my disks, perhaps this would
show more improvement on normal disks.

> fn chk {
>     for(i in sdE0 sdF0) dd -if /dev/$i/data -bs 1048576 -iseek $1
> -count 1 |md5sum
> }

i found this interesting, too.  i wrote a short program using the
threads library to do the reads and compares in parallel.  in
the process of writing that i realized that the md5sum is not
necessary.  a memcmp would do.  i finished the program up
(attached) and found that it performed pretty well.  giving me
~123mb/s.  that's about what these drives will do.  but i was
wondering why cmp would just work.  it occurred to me that
i could run the dds in parallel with a command like this
        cmp <{dd -if data -bs 64k -count 20000} <{dd -if ../sda1/data -bs 64k 
-count 20000}
surprisingly, this was just as fast on my setup as the specalized
program
        0.07u 0.04s 10.65r       8.out -n 20000 data ../sda1/data ...
        0.32u 0.26s 10.65r       cmp /fd/7 /fd/6

clearly if the compare is more involved, like sha1sum, it would
be more fruitful to use a modified version of the threaded program.
(unless you see a way of parallelizing the cmp part of that command
without byzantine contortions.)  i ran this test and found a surprising
speedup:
        0.06u 0.03s 13.65r       8.out -sn 20000 data ../sda1/data ...
i suspect there is something a bit amiss with time(1)'s accounting.

i suppose that a motivated person could write a book on parallel
programming with the shell.  tony hoar would be proud.

- erik
/*
 * cf. cmp <{dd -if data -bs 64k -count 20000} <{dd -if ../sda1/data -bs 64k 
-count 20000}
 * copyright © 2009 erik quanstrom
 */
#include <u.h>
#include <libc.h>
#include <thread.h>
#include <libsec.h>

enum {
        Stack   = 64*1024,
        Block   = 64*1024,
        Buffer  = 3,

        Memcmp= 1<<0,
        Sha1    = 1<<1,

        Ferror  = 1<<1,
        Fcmp    = 1<<2,
        Fend    = 1<<3,
};

typedef struct Ddargs Ddargs;
struct Ddargs {
        int     fd;
        Channel *c;
        ulong   bs;
        uvlong  start;
        uvlong  end;
};

typedef struct Bargs Bargs;
struct Bargs {
        uvlong  nblocks;
        ulong   bs;
        int     nend;
};

typedef struct Msgbuf Msgbuf;
struct Msgbuf {
        uint    flags;
        uvlong  lba;
        char    status[ERRMAX];
        uchar   data[Block];
};

Channel *blockfree;
Channel *blockalloc;
        static Alt alts[3];

void
blockproc(void *a)
{
        uint h, t, f, e, c, m;
        uvlong i;
        Bargs *args;
        Msgbuf *s, *r, **tab;

        threadsetname("blockproc");

        alts[0].c = blockalloc;
        alts[0].v = &s;
        alts[0].op = CHANSND;
        alts[1].c = blockfree;
        alts[1].v = &r;
        alts[1].op = CHANRCV;
        alts[2].op = CHANEND;

        args = (Bargs*)a;
        tab = malloc(args->nblocks * sizeof tab[0]);
        m = args->nblocks - 1;
        if(tab == nil)
                sysfatal("malloc: %r");
        for(i = 0; i < args->nblocks; i++){
                tab[i] = malloc(sizeof(Msgbuf));
                if(tab[i] == nil)
                        sysfatal("malloc: %r");
        }
        h = t = 0;
        e = c = 0;
        s = nil;
        for(f = args->nend; f > 0;){
                if(s == nil){
                        s = tab[h % m];
                        if(s != nil){
                                tab[h++ % m] = nil;
                                alts[0].op = CHANSND;
                        }else
                                alts[0].op = CHANNOP;
                }
                switch(alt(alts)){
                case 0:
                        s = nil;
                        break;
                case 1:
                        assert(r != nil && tab[t % m] == nil);
                        tab[t++ % m] = r;
                        if(r->flags & Fend)
                                f--;
                        if(r->flags & Fcmp)
                                c++;
                        if(r->flags & Ferror)
                                e++;
                        r = nil;
                        break;
                }
        }
        for(i = 0; i < args->nblocks; i++)
                free(tab[i]);
        free(tab);
        if(e > 0)
                threadexitsall("errors");
        if(c > 0)
                threadexitsall("cmp");
        threadexitsall("");
}

Msgbuf*
bufalloc(void)
{
        Msgbuf *b;

        b = recvp(blockalloc);
        if(b == nil)
                sysfatal("recvp: %r");
        b->flags = 0;
        b->lba = 0;
        b->status[0] = 0;
        return b;
}

static int
preadn(int fd, void *av, long n, vlong o)
{
        char *a;
        long m, t;

        a = av;
        t = 0;
        while(t < n){
                m = pread(fd, a+t, n-t, o+t);
                if(m <= 0){
                        if(t == 0)
                                return m;
                        break;
                }
                t += m;
        }
        return t;
}

void
ddproc(void *a)
{
        int rv;
        uvlong i;
        Ddargs *d;
        Msgbuf *b;

        threadsetname("ddproc");
        d = (Ddargs*)a;
        for(i = d->start; i < d->end; i++){
                b = bufalloc();
                b->lba = i;
                rv = preadn(d->fd, b->data, d->bs, b->lba * d->bs);
                if(rv != d->bs){
                        errstr(b->status, sizeof b->status);
                        b->flags |= Ferror;
                }
                sendp(d->c, b);
        }
        close(d->fd);

        b = bufalloc();
        b->flags |= Fend;
        sendp(d->c, b);
        threadexits("");
}

uint    bs              = Block;
uint    cmptype         = Memcmp;
Channel *dev[2];
QLock   cmplock;

uint
diffat(uchar *a, uchar *b, uint l)
{
        uint i;

        for(i = 0; i < l; i++)
                if(a[i] != b[i])
                        return i;
        abort();
        return ~0;
}

int
docmp(uchar *a, uchar *b, int l)
{
        uchar suma[SHA1dlen], sumb[SHA1dlen];

        if(cmptype == Memcmp)
                return memcmp(a, b, bs) != 0;
        sha1(a, l, suma, nil);
        sha1(b, l, sumb, nil);
//      Bprint(&out, "%A %A\n", suma, sumb);
        return memcmp(suma, sumb, sizeof suma) != 0;
}

void
cmpproc(void*)
{
        uchar *x, *y;
        int i;
        Msgbuf *b[2];

        threadsetname("cmpproc");
        for(;;){
                qlock(&cmplock);
                for(i = 0; i < 2; i++)
                        b[i] = recvp(dev[i]);
                qunlock(&cmplock);
                assert(b[0] != nil && b[1] != nil);
                assert(b[0]->lba == b[1]->lba);

                x = b[0]->data;
                y = b[1]->data;
                if(b[0]->flags & Ferror)
                        print("cmp error: %llud: device 0 error: %s\n",
                                b[0]->lba, b[0]->status);
                else if(b[0]->flags & Ferror)
                        print("cmp error: %llud: device 1 error: %s\n",
                                b[1]->lba, b[1]->status);
                else if(b[0]->flags & Fend){
                }else if(docmp(x, y, bs)){
                        b[0]->flags |= Fcmp;
                        print("%llud + %ud\n", b[0]->lba, diffat(x, y, bs));
                }
                sendp(blockfree, b[0]);
                sendp(blockfree, b[1]);
        }
}

void
usage(void)
{
        fprint(2, "usage: disk/cmp [-n nblocks] [-b blocksz] dev0 dev1\n");
        threadexitsall("usage");
}

Ddargs d[2];
Bargs a;

void
threadmain(int argc, char **argv)
{
        int i;
        uvlong nblocks;
        Dir *e;

        nblocks = 0;
        ARGBEGIN{
        case 'n':
                nblocks = atoi(EARGF(usage()));
                break;
        case 'b':
                bs = atoi(EARGF(usage()));
                break;
        case 's':
                cmptype = Sha1;
                break;
        default:
                usage();
        }ARGEND
        if(argc != 2)
                usage();
        for(i = 0; i < 2; i++){
                d[i].fd = open(argv[i], OREAD);
                if(d[i].fd == -1)
                        sysfatal("open: %r");
                d[i].bs = bs;
                d[i].start = 0;
                if(nblocks != 0)
                        d[i].end = nblocks;
                else{
                        e = dirfstat(d[i].fd);
                        if(e == nil)
                                sysfatal("dirfstat: %r");
                        d[i].end = e->length / d[i].bs;
                        free(e);
                }
                d[i].c = dev[i] = chancreate(sizeof(Msgbuf*), Buffer);
                if(d[i].c == nil)
                        sysfatal("chancreate: %r");
        }
        blockfree = chancreate(sizeof(Msgbuf*), 1);
        blockalloc = chancreate(sizeof(Msgbuf*), 1);
        if(blockalloc == nil || blockfree == nil)
                sysfatal("chancreate: %r");
        a.nblocks = 2*Buffer;
        a.bs = bs;
        a.nend = 2;
        proccreate(ddproc, d + 0, Stack);
        proccreate(ddproc, d + 1, Stack);
        for(i = 0; i < 4; i++)
                proccreate(cmpproc, nil, Stack);
        blockproc(&a);
        threadexitsall("");
}

Reply via email to