Changeset: 1662ae0f3cf1 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=1662ae0f3cf1
Modified Files:
        gdk/gdk_heap.c
        gdk/gdk_logger.c
        gdk/gdk_logger_internals.h
        gdk/gdk_storage.c
        sql/storage/bat/bat_storage.c
        sql/storage/bat/bat_table.c
Branch: unlock
Log Message:

cleared tables now get new bats (old bats are kept until the log transaction id 
is processed by the log flush)


diffs (truncated from 487 to 300 lines):

diff --git a/gdk/gdk_heap.c b/gdk/gdk_heap.c
--- a/gdk/gdk_heap.c
+++ b/gdk/gdk_heap.c
@@ -721,6 +721,7 @@ HEAPdecref(Heap *h, bool remove)
 void
 HEAPincref(Heap *h)
 {
+       assert((int)h->refs > 0);
        //printf("inc ref(%d) %p %d\n", (int)h->refs, h, h->parentid);
        (void)ATOMIC_INC(&h->refs);
 }
diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -1252,6 +1252,7 @@ bm_get_counts(logger *lg)
        BATloop(lg->catalog_bid, p, q) {
                oid pos = p;
                lng cnt = 0;
+               lng lid = lng_nil;
 
                if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
                        BAT *b = BBPquickdesc(bids[p], 1);
@@ -1261,6 +1262,8 @@ bm_get_counts(logger *lg)
                }
                if (BUNappend(lg->catalog_cnt, &cnt, false) != GDK_SUCCEED)
                        return GDK_FAIL;
+               if (BUNappend(lg->catalog_lid, &lid, false) != GDK_SUCCEED)
+                       return GDK_FAIL;
        }
        lg->deleted = deleted;
        lg->cnt = BATcount(lg->catalog_bid);
@@ -1280,8 +1283,8 @@ bm_subcommit(logger *lg)
        int i = 0;
        gdk_return res;
        const log_bid *bids;
-       const BUN *cnts = NULL;
-       int cleanup = (lg->saved_tid == lg->tid && BATcount(lg->dcatalog) > 10 
);
+       const lng *cnts = NULL, *lids = NULL;
+       int cleanup = 0;
 
        if (n == NULL)
                return GDK_FAIL;
@@ -1290,17 +1293,18 @@ bm_subcommit(logger *lg)
        n[i++] = 0;             /* n[0] is not used */
        bids = (const log_bid *) Tloc(catalog_bid, 0);
        if (lg->catalog_cnt)
-               cnts = (const BUN *) Tloc(lg->catalog_cnt, 0);
+               cnts = (const lng *) Tloc(lg->catalog_cnt, 0);
+       if (lg->catalog_lid)
+               lids = (const lng *) Tloc(lg->catalog_lid, 0);
        BATloop(catalog_bid, p, q) {
                bat col = bids[p];
-               oid pos = p;
 
-               if (!cleanup && BUNfnd(dcatalog, &pos) != BUN_NONE)
-                       continue;
+               if (lids[p] != lng_nil && lids[p] <= lg->saved_tid)
+                       cleanup++;
                if (lg->debug & 1)
                        fprintf(stderr, "#commit new %s (%d)\n", BBPname(col), 
col);
                assert(col);
-               sizes[i] = cnts?cnts[p]:0;
+               sizes[i] = cnts?(BUN)cnts[p]:0;
                n[i++] = col;
        }
        /* now commit catalog, so it's also up to date on disk */
@@ -1312,83 +1316,113 @@ bm_subcommit(logger *lg)
        n[i++] = dcatalog->batCacheid;
 
        if (cleanup) {
-               BAT *bids, *tids, *oids, *cnts;
+               BAT *nbids, *noids, *ncnts, *nlids, *ndels;
 
                oid *poss = Tloc(dcatalog, 0);
-               const log_bid *bid = (const log_bid *) Tloc(catalog_bid, 0);
                BATloop(dcatalog, p, q) {
                        oid pos = poss[p];
                        BAT *lb;
-       
-                       BBPrelease(bid[pos]);
-                       if ((lb = BATdescriptor(bid[pos])) == NULL ||
+
+                       if (lids[pos] == lng_nil || lids[pos] > lg->saved_tid)
+                               continue;
+
+                       if (lg->debug & 1) {
+                               fprintf(stderr, "release %d\n", bids[pos]);
+                               if (BBP_lrefs(bids[pos]) != 2)
+                                       fprintf(stderr, "release %d %d\n", 
bids[pos], BBP_lrefs(bids[pos]));
+                       }
+                       BBPrelease(bids[pos]);
+                       if ((lb = BATdescriptor(bids[pos])) == NULL ||
                            BATmode(lb, true/*transient*/) != GDK_SUCCEED) {
                                logbat_destroy(lb);
                                return GDK_FAIL;
                        }
                        logbat_destroy(lb);
                }
+               /* only project out the deleted with last id > lg->saved_tid
+                * update dcatalog, ie only keep those deleted which
+                * were not released ie last id <= lg->saved_tid */
 
-               tids = bm_tids(catalog_bid, dcatalog);
-               if (tids == NULL) {
-                       GDKfree(n);
-                       GDKfree(sizes);
-                       return GDK_FAIL;
-               }
-               bids = logbat_new(TYPE_int, BATcount(tids), PERSISTENT);
-               oids = logbat_new(TYPE_int, BATcount(tids), PERSISTENT);
-               cnts = logbat_new(TYPE_lng, BATcount(tids), TRANSIENT);
+               BUN ocnt = BATcount(catalog_bid);
+               nbids = logbat_new(TYPE_int, ocnt-cleanup, PERSISTENT);
+               noids = logbat_new(TYPE_int, ocnt-cleanup, PERSISTENT);
+               ncnts = logbat_new(TYPE_lng, ocnt-cleanup, TRANSIENT);
+               nlids = logbat_new(TYPE_lng, ocnt-cleanup, TRANSIENT);
+               ndels = logbat_new(TYPE_oid, BATcount(dcatalog)-cleanup, 
PERSISTENT);
 
-               if (bids == NULL || oids == NULL || cnts == NULL) {
-                       logbat_destroy(tids);
-                       logbat_destroy(bids);
-                       logbat_destroy(oids);
-                       logbat_destroy(cnts);
+               if (nbids == NULL || noids == NULL || ncnts == NULL || nlids == 
NULL || ndels == NULL) {
+                       logbat_destroy(nbids);
+                       logbat_destroy(noids);
+                       logbat_destroy(ncnts);
+                       logbat_destroy(nlids);
+                       logbat_destroy(ndels);
                        GDKfree(n);
                        GDKfree(sizes);
                        return GDK_FAIL;
                }
 
-               if (BATappend(bids, catalog_bid, tids, true) != GDK_SUCCEED ||
-                   BATappend(oids, catalog_id, tids, true) != GDK_SUCCEED ||
-                   BATappend(cnts, lg->catalog_cnt, tids, true) != 
GDK_SUCCEED) {
-                       logbat_destroy(tids);
-                       logbat_destroy(bids);
-                       logbat_destroy(oids);
-                       logbat_destroy(cnts);
+               int *oids = (int*)Tloc(catalog_id, 0);
+               q = BUNlast(catalog_bid);
+               int err = 0;
+               for(p = 0; p<q && !err; p++) {
+                       bat col = bids[p];
+                       int nid = oids[p];
+                       lng lid = lids[p];
+                       lng cnt = cnts[p];
+                       oid pos = p;
+
+                       if (lid != lng_nil && lid <= lg->saved_tid)
+                               continue; /* remove */
+
+                       if (BUNappend(nbids, &col, false) != GDK_SUCCEED ||
+                           BUNappend(noids, &nid, false) != GDK_SUCCEED ||
+                           BUNappend(nlids, &lid, false) != GDK_SUCCEED ||
+                           BUNappend(ncnts, &cnt, false) != GDK_SUCCEED)
+                               err=1;
+                       pos = (oid)(BATcount(nbids)-1);
+                       if (lid != lng_nil && BUNappend(ndels, &pos, false) != 
GDK_SUCCEED)
+                               err=1;
+               }
+
+               if (err ||
+                   logger_switch_bat(catalog_bid, nbids, lg->fn, 
"catalog_bid") != GDK_SUCCEED ||
+                   logger_switch_bat(catalog_id, noids, lg->fn, "catalog_id") 
!= GDK_SUCCEED ||
+                   logger_switch_bat(dcatalog, ndels, lg->fn, "dcatalog") != 
GDK_SUCCEED) {
+                       logbat_destroy(nbids);
+                       logbat_destroy(noids);
+                       logbat_destroy(ndels);
+                       logbat_destroy(ncnts);
+                       logbat_destroy(nlids);
                        GDKfree(n);
                        GDKfree(sizes);
                        return GDK_FAIL;
                }
-               logbat_destroy(tids);
-               BATclear(dcatalog, true);
-
-               if (logger_switch_bat(catalog_bid, bids, lg->fn, "catalog_bid") 
!= GDK_SUCCEED ||
-                   logger_switch_bat(catalog_id, oids, lg->fn, "catalog_id") 
!= GDK_SUCCEED) {
-                       logbat_destroy(bids);
-                       GDKfree(n);
-                       GDKfree(sizes);
-                       return GDK_FAIL;
-               }
-               n[i++] = bids->batCacheid;
-               n[i++] = oids->batCacheid;
+               sizes[i] = BATcount(nbids);
+               n[i++] = nbids->batCacheid;
+               sizes[i] = BATcount(noids);
+               n[i++] = noids->batCacheid;
+               sizes[i] = BATcount(ndels);
+               n[i++] = ndels->batCacheid;
 
                logbat_destroy(lg->catalog_bid);
                logbat_destroy(lg->catalog_id);
+               logbat_destroy(lg->dcatalog);
 
-               lg->catalog_bid = catalog_bid = bids;
-               lg->catalog_id = catalog_id = oids;
+               lg->catalog_bid = catalog_bid = nbids;
+               lg->catalog_id = catalog_id = noids;
+               lg->dcatalog = dcatalog = ndels;
 
                BBPunfix(lg->catalog_cnt->batCacheid);
-               lg->catalog_cnt = cnts;
+               BBPunfix(lg->catalog_lid->batCacheid);
+
+               lg->catalog_cnt = ncnts;
+               lg->catalog_lid = nlids;
        }
        if (lg->seqs_id) {
                sizes[i] = BATcount(lg->seqs_id);
                n[i++] = lg->seqs_id->batCacheid;
                sizes[i] = BATcount(lg->seqs_id);
                n[i++] = lg->seqs_val->batCacheid;
-               sizes[i] = BATcount(lg->dseqs);
-               n[i++] = lg->dseqs->batCacheid;
        }
        if (lg->seqs_id && BATcount(lg->dseqs) > (BATcount(lg->seqs_id)/2)) {
                BAT *tids, *ids, *vals;
@@ -1431,9 +1465,10 @@ bm_subcommit(logger *lg)
                        GDKfree(sizes);
                        return GDK_FAIL;
                }
+               sizes[i] = BATcount(ids);
                n[i++] = ids->batCacheid;
+               sizes[i] = BATcount(vals);
                n[i++] = vals->batCacheid;
-               n[i++] = lg->dseqs->batCacheid;
 
                logbat_destroy(lg->seqs_id);
                logbat_destroy(lg->seqs_val);
@@ -1441,6 +1476,10 @@ bm_subcommit(logger *lg)
                lg->seqs_id = ids;
                lg->seqs_val = vals;
        }
+       if (lg->seqs_id) {
+               sizes[i] = BATcount(lg->dseqs);
+               n[i++] = lg->dseqs->batCacheid;
+       }
 
        assert((BUN) i <= nn);
        BATcommit(catalog_bid, BUN_NONE);
@@ -1521,6 +1560,7 @@ logger_load(int debug, const char *fn, c
        lg->catalog_bid = NULL;
        lg->catalog_id = NULL;
        lg->catalog_cnt = NULL;
+       lg->catalog_lid = NULL;
        lg->dcatalog = NULL;
 
        lg->seqs_id = NULL;
@@ -1686,8 +1726,8 @@ logger_load(int debug, const char *fn, c
                        bat bid = bids[p];
                        oid pos = p;
 
-                       if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE &&
-                           BBPretain(bid) == 0 &&
+                       if (BBPretain(bid) == 0 && /* any bid in the 
catalog_bid, needs one logical ref */
+                           BUNfnd(lg->dcatalog, &pos) == BUN_NONE &&
                            BUNappend(lg->dcatalog, &pos, false) != GDK_SUCCEED)
                                goto error;
                }
@@ -1701,6 +1741,15 @@ logger_load(int debug, const char *fn, c
        if (BBPrename(lg->catalog_cnt->batCacheid, bak) < 0) {
                goto error;
        }
+       lg->catalog_lid = logbat_new(TYPE_lng, 1, TRANSIENT);
+       if (lg->catalog_lid == NULL) {
+               GDKerror("failed to create catalog_lid bat");
+               goto error;
+       }
+       strconcat_len(bak, sizeof(bak), fn, "_catalog_lid", NULL);
+       if (BBPrename(lg->catalog_lid->batCacheid, bak) < 0) {
+               goto error;
+       }
        if (bm_get_counts(lg) == GDK_FAIL)
                goto error;
 
@@ -1856,10 +1905,8 @@ logger_destroy(logger *lg)
                const log_bid *bids = (const log_bid *) Tloc(b, 0);
                BATloop(b, p, q) {
                        bat bid = bids[p];
-                       oid pos = p;
 
-                       if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE)
-                               BBPrelease(bid);
+                       BBPrelease(bid);
                }
 
                BBPrelease(lg->catalog_bid->batCacheid);
@@ -1870,6 +1917,7 @@ logger_destroy(logger *lg)
                logbat_destroy(lg->dcatalog);
 
                logbat_destroy(lg->catalog_cnt);
+               logbat_destroy(lg->catalog_lid);
                logger_unlock(lg);
        }
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to