i call this 'ventino'.  it's a tiny venti that keeps
the whole index in memory, backed by a text file.
i have not used it in a while (the file is dated may 25, 2009)
but hey, it's a working venti server in 329 lines of code.

#include <u.h>
#include <libc.h>
#include <bio.h>
#include <flate.h>
#include <thread.h>
#include <venti.h>
#include <libsec.h>

typedef uchar byte;
typedef u64int uint64;
typedef u32int uint32;

typedef struct IEntry IEntry;
struct IEntry
{
        IEntry *link;
        
        // disk data
        byte score[VtScoreSize];
        uint64 offset;
};

typedef struct Chunk Chunk;
struct Chunk
{
        byte score[VtScoreSize];
        uint32 size;
        byte *data;
};

IEntry **ihash;
uint nihash;
uint nientry;

void
rehash(void)
{
        IEntry **new, *e, *next;
        uint i, n;
        uint32 h;

        n = nihash<<1;
        new = vtmallocz(nihash*sizeof new[0]);
        for(i=0; i<nihash; i++) {
                for(e = ihash[i]; e; e = next) {
                        next = e->link;
                        h = *(uint32*)e->score & (n - 1);
                        e->link = new[h];
                        new[h] = e;
                }
        }
        free(ihash);
        ihash = new;
        nihash = n;
}
                
IEntry*
ilookup(byte *score)
{
        uint32 h;
        IEntry *e;
        
        // Not a great hash; assumes we are
        // seeing all blocks, not just some chosen subset.
        h = *(uint32*)score & (nihash - 1);

        for(e = ihash[h]; e; e = e->link)
                if(memcmp(e->score, score, VtScoreSize) == 0)
                        return e;
        return nil;
}

IEntry*
iinsert(byte *score)
{
        uint32 h;
        IEntry *e;

        if(nihash < (1<<28) && nientry > 2*nihash)
                rehash();
        h = *(uint32*)score & (nihash - 1);
        e = vtmallocz(sizeof(IEntry));
        e->link = ihash[h];
        ihash[h] = e;
        memmove(e->score, score, VtScoreSize);
        return e;
}

void
iload(Biobuf *b)
{
        char *p;
        char *f[10];
        int nf;
        byte score[VtScoreSize];
        uint64 offset;
        IEntry *e;

        while((p = Brdline(b, '\n')) != nil) {
                p[Blinelen(b)-1] = '\0';
                nf = tokenize(p, f, nelem(f));
                if(nf != 2 || vtparsescore(f[0], nil, score) < 0 || (offset =
strtoull(f[1], 0, 0)) == 0) {
                        sysfatal("malformed index");
                        return;
                }
                e = iinsert(score);
                e->offset = offset;
        }
}

void
iwrite(int fd, IEntry *e)
{
        fprint(fd, "%V %-22llud\n", e->score, e->offset);
}

enum {
        ArenaBlock = 1<<30
};

uint64
dwrite(int fd, Chunk *c)
{
        byte *zdat, *w;
        int nzdat;
        uint nw;
        uint64 offset, eoffset;

        zdat = vtmallocz(c->size + 1024);
        nzdat = deflateblock(zdat, c->size + 1024, c->data, c->size, 6, 0);
        if(nzdat < 0 || nzdat > c->size - 512) {
                // don't bother with compression
                w = c->data;
                nw = c->size;
        } else {
                w = zdat;
                nw = nzdat;
        }
        offset = seek(fd, 0, 1);
        eoffset = offset + 2*VtScoreSize + 12 + 12 + 1 + nw;
        if(eoffset / ArenaBlock != offset / ArenaBlock) {
                offset /= ArenaBlock;
                offset++;
                offset *= ArenaBlock;
                seek(fd, offset, 0);
        }
        fprint(fd, "%V %-11ud %-11ud\n", c->score, c->size, nw);
        write(fd, w, nw);
        free(zdat);
        return offset;
}

void
dread(Biobuf *b, Chunk *c)
{
        char *p, *f[10];
        int nf;
        uint zsize;
        byte *r;
        uint64 offset;
        char buf[100];
        
        offset = Boffset(b);
        p = Brdline(b, '\n');
        if(p == nil || Blinelen(b) >= sizeof buf)
                sysfatal("malformed data - EOF");
        memmove(buf, p, Blinelen(b));
        buf[Blinelen(b)-1] = '\0';
        nf = tokenize(buf, f, nelem(f));
        if(nf != 3 || vtparsescore(f[0], nil, c->score) < 0 || (c->size =
strtoul(f[1], 0, 0)) == 0 || (zsize = strtoul(f[2], 0, 0)) == 0) {
                sysfatal("malformed data at %llud / %d", offset, nf);
                return;
        }
        c->data = vtmalloc(c->size);
        if(c->size == zsize)
                r = c->data;
        else
                r = vtmallocz(zsize);
        Bread(b, r, zsize);
        if(c->size != zsize) {
                if((nf = inflateblock(c->data, c->size, r, zsize)) < 0)
                        sysfatal("inflateblock fail %d %d %d %.10H...", 
c->size, zsize, nf, r);
                free(r);
        }
}

Biobuf *bindexr;
Biobuf *bdatar;
int indexw, dataw;
Biobuf *bsha1;  // TODO
int doCreate;
int verbose;

void
doOpen(char *name, char *what, int *w, Biobuf **r)
{
        int fd;
        char buf[100];
        char *p;
        
        snprint(buf, sizeof buf, "# sventi %s\n", what);
        
        if((*w = open(name, OWRITE)) < 0) {
                if(!doCreate)
                        sysfatal("open %s: %r", name);
                if((*w = create(name, OWRITE, 0644)) < 0)
                        sysfatal("create %s: %r", name);
                write(*w, buf, strlen(buf));
        }
        if((fd = open(name, OREAD)) < 0)
                sysfatal("open %s: %r", name);
        *r = Bfdopen(fd, OREAD);
        p = Brdline(*r, '\n');
        if(p == nil || Blinelen(*r) != strlen(buf) || memcmp(p, buf, 
strlen(buf)) != 0)
                sysfatal("%s is not %s - %p", name, what);
}

void
usage(void)
{
        fprint(2, "usage: sventi [-cv] [-a address] [-d dir]\n");
        threadexitsall("usage");
}

void
threadmain(int argc, char **argv)
{
        VtReq *r;
        VtSrv *srv;
        char *address, *dir;
        Chunk c;
        IEntry *e;
        
        nihash = 16;
        ihash = vtmallocz(nihash * sizeof ihash[0]);

        fmtinstall('F', vtfcallfmt);
        fmtinstall('V', vtscorefmt);
        fmtinstall('H', encodefmt);

        address = "tcp!*!venti";
        dir = ".";
        deflateinit();
        inflateinit();
        
        ARGBEGIN{
        case 'v':
                verbose++;
                break;
        case 'a':
                address = EARGF(usage());
                break;
        case 'c':
                doCreate = 1;
                break;
        case 'd':
                dir = EARGF(usage());
                break;
        default:
                usage();
        }ARGEND
        
        if(argc != 0)
                usage();
        
        if(chdir(dir) < 0)
                sysfatal("chdir %s: %r", dir);

        doOpen("sventi.index", "index", &indexw,  &bindexr);
        doOpen("sventi.data", "data", &dataw, &bdatar);
        iload(bindexr);
        Bterm(bindexr);
        seek(indexw, 0, 2);
        seek(dataw, 0, 2);

        srv = vtlisten(address);
        if(srv == nil)
                sysfatal("vtlisten %s: %r", address);
        
        while((r = vtgetreq(srv)) != nil) {
                r->rx.msgtype = r->tx.msgtype+1;
                if(verbose)
                        fprint(2, "<- %F\n", &r->tx);
                switch(r->tx.msgtype) {
                case VtTping:
                        break;
                case VtTgoodbye:
                        break;
                case VtTsync:
                        fsync(dataw);
                        fsync(indexw);
                        break;
                case VtTread:
                        e = ilookup(r->tx.score);
                        if(e == nil) {
                                r->rx.msgtype = VtRerror;
                                r->rx.error = vtstrdup("block not found");
                                break;
                        }
                        Bseek(bdatar, e->offset, 0);
                        dread(bdatar, &c);
                        if(memcmp(c.score, r->tx.score, VtScoreSize) != 0)
                                sysfatal("data/index score mismatch");
                        r->rx.data = packetforeign(c.data, c.size, free, 
c.data);
                        packetsha1(r->rx.data, c.score);
                        if(memcmp(c.score, r->tx.score, VtScoreSize) != 0)
                                sysfatal("data/data score mismatch");
                        break;
                case VtTwrite:
                        packetsha1(r->tx.data, r->rx.score);
                        e = ilookup(r->rx.score);
                        if(e != nil)
                                break;
                        c.size = packetsize(r->tx.data);
                        c.data = vtmalloc(c.size);
                        packetconsume(r->tx.data, c.data, c.size);
                        memmove(c.score, r->rx.score, VtScoreSize);
                        e = iinsert(c.score);
                        e->offset = dwrite(dataw, &c);
                        free(c.data);
                        iwrite(indexw, e);
                        break;
                }
                if(verbose)
                        fprint(2, "-> %F\n", &r->rx);
                vtrespond(r);
        }
        threadexitsall(nil);            
}

Reply via email to