Changeset: 78d7e86a78ae for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/78d7e86a78ae Modified Files: clients/Tests/exports.stable.out sql/backends/monet5/vaults/odbc/odbc_loader.c Branch: nested Log Message:
merged from default diffs (truncated from 929 to 300 lines): diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out --- a/clients/Tests/exports.stable.out +++ b/clients/Tests/exports.stable.out @@ -365,7 +365,7 @@ gdk_return MT_alloc_tls(MT_TLS_t *newkey int MT_check_nr_cores(void); void MT_cond_broadcast(MT_Cond *cond); void MT_cond_destroy(MT_Cond *cond); -void MT_cond_init(MT_Cond *cond); +void MT_cond_init(MT_Cond *cond, const char *name); void MT_cond_signal(MT_Cond *cond); void MT_cond_wait(MT_Cond *cond, MT_Lock *lock); int MT_create_thread(MT_Id *t, void (*function)(void *), void *arg, enum MT_thr_detach d, const char *threadname); diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c --- a/gdk/gdk_bbp.c +++ b/gdk/gdk_bbp.c @@ -2221,16 +2221,18 @@ BBPdir_first(bool subcommit, lng logno, static bat BBPdir_step(bat bid, BUN size, int n, char *buf, size_t bufsize, - FILE **obbpfp, FILE *nbbpf, BATiter *bi) + FILE **obbpfp, FILE *nbbpf, BATiter *bi, int *nbatp) { if (n < -1) /* safety catch */ return n; + int nbat = 0; while (n >= 0 && n < bid) { if (n > 0) { if (fputs(buf, nbbpf) == EOF) { GDKerror("Writing BBP.dir file failed.\n"); goto bailout; } + nbat++; } if (fgets(buf, (int) bufsize, *obbpfp) == NULL) { if (ferror(*obbpfp)) { @@ -2254,7 +2256,9 @@ BBPdir_step(bat bid, BUN size, int n, ch assert(BBP_status(bid) & BBPPERSISTENT); if (new_bbpentry(nbbpf, bid, size, bi) != GDK_SUCCEED) goto bailout; + nbat++; } + *nbatp += nbat; return n == -1 ? -1 : n == bid ? 0 : n; bailout: @@ -2885,13 +2889,10 @@ incref(bat i, bool logical, bool lock) return 0; if (lock) { - for (;;) { - MT_lock_set(&GDKswapLock(i)); - if (!(BBP_status(i) & (BBPUNSTABLE|BBPLOADING))) - break; + MT_lock_set(&GDKswapLock(i)); + while (BBP_status(i) & (BBPUNSTABLE|BBPLOADING)) { /* the BATs is "unstable", try again */ - MT_lock_unset(&GDKswapLock(i)); - BBPspin(i, __func__, BBPUNSTABLE|BBPLOADING); + MT_cond_wait(&GDKswapCond(i), &GDKswapLock(i)); } } /* we have the lock */ @@ -2954,15 +2955,12 @@ decref(bat i, bool logical, bool lock, c if (BBPcheck(i) == 0) return -1; - if (lock) + if (lock) { MT_lock_set(&GDKswapLock(i)); - - while (BBP_status(i) & BBPUNLOADING) { - if (lock) - MT_lock_unset(&GDKswapLock(i)); + while (BBP_status(i) & BBPUNLOADING) + MT_cond_wait(&GDKswapCond(i), &GDKswapLock(i)); + } else { BBPspin(i, func, BBPUNLOADING); - if (lock) - MT_lock_set(&GDKswapLock(i)); } b = (BBP_status(i) & BBPLOADED) ? BBP_desc(i) : NULL; @@ -3080,6 +3078,7 @@ decref(bat i, bool logical, bool lock, c BBPclear(i); } else { BBP_status_off(i, BBPUNLOADING); + MT_cond_broadcast(&GDKswapCond(i)); } } return refs; @@ -3127,13 +3126,10 @@ BATdescriptor(bat i) if (BBPcheck(i)) { bool lock = locked_by == 0 || locked_by != MT_getpid(); if (lock) { - for (;;) { - MT_lock_set(&GDKswapLock(i)); - if (!(BBP_status(i) & (BBPUNSTABLE|BBPLOADING))) - break; + MT_lock_set(&GDKswapLock(i)); + while (BBP_status(i) & (BBPUNSTABLE|BBPLOADING)) { /* the BATs is "unstable", try again */ - MT_lock_unset(&GDKswapLock(i)); - BBPspin(i, __func__, BBPUNSTABLE|BBPLOADING); + MT_cond_wait(&GDKswapCond(i), &GDKswapLock(i)); } } if (incref(i, false, false) > 0) { @@ -3175,9 +3171,7 @@ getBBPdescriptor(bat i) b = BBP_desc(i); if ((status & BBPLOADED) == 0 || status & BBPWAITING) { while (BBP_status(i) & BBPWAITING) { /* wait for bat to be loaded by other thread */ - MT_lock_unset(&GDKswapLock(i)); - BBPspin(i, __func__, BBPWAITING); - MT_lock_set(&GDKswapLock(i)); + MT_cond_wait(&GDKswapCond(i), &GDKswapLock(i)); } if (BBPvalid(i)) { if ((BBP_status(i) & BBPLOADED) == 0) { @@ -3195,6 +3189,7 @@ getBBPdescriptor(bat i) BBP_status_off(i, BBPLOADING); CHECKDEBUG if (b != NULL) BATassertProps(b); + MT_cond_broadcast(&GDKswapCond(i)); } return b; } @@ -3228,9 +3223,13 @@ BBPsave(BAT *b) if (BBP_status(bid) & BBPSAVING) { /* wait until save in other thread completes */ - if (lock) + if (lock) { + while (BBP_status(bid) & BBPSAVING) + MT_cond_wait(&GDKswapCond(bid), &GDKswapLock(bid)); MT_lock_unset(&GDKswapLock(bid)); - BBPspin(bid, __func__, BBPSAVING); + } else { + BBPspin(bid, __func__, BBPSAVING); + } } else { /* save it */ unsigned flags = BBPSAVING; @@ -3257,6 +3256,7 @@ BBPsave(BAT *b) } /* clearing bits can be done without the lock */ BBP_status_off(bid, BBPSAVING); + MT_cond_broadcast(&GDKswapCond(bid)); } return ret; } @@ -3314,6 +3314,7 @@ BBPfree(BAT *b) } TRC_DEBUG(BAT_, "turn off unloading %d\n", bid); BBP_status_off(bid, BBPUNLOADING); + MT_cond_broadcast(&GDKswapCond(bid)); BBP_unload_dec(); return ret; } @@ -3340,8 +3341,9 @@ BBPquickdesc(bat bid) } return NULL; } - BBPspin(bid, __func__, BBPWAITING); +// BBPspin(bid, __func__, BBPWAITING); b = BBP_desc(bid); + MT_lock_set(&b->theaplock); if (b->ttype < 0) { const char *aname = ATOMunknown_name(b->ttype); int tt = ATOMindex(aname); @@ -3352,6 +3354,7 @@ BBPquickdesc(bat bid) b->ttype = tt; } } + MT_lock_unset(&b->theaplock); return b; } @@ -3359,25 +3362,31 @@ BBPquickdesc(bat bid) * @+ Global Commit */ static BAT * -dirty_bat(bat *i, bool subcommit) +dirty_bat(bat *i, bool subcommit, bool lock) { - if (BBPvalid(*i)) { + const bat bid = *i; + if (BBPvalid(bid)) { BAT *b; - BBPspin(*i, __func__, BBPSAVING); - if (BBP_status(*i) & BBPLOADED) { - b = BBP_desc(*i); + if (lock) { + while (BBP_status(bid) & BBPSAVING) + MT_cond_wait(&GDKswapCond(bid), &GDKswapLock(bid)); + } else { + BBPspin(bid, __func__, BBPSAVING); + } + if (BBP_status(bid) & BBPLOADED) { + b = BBP_desc(bid); MT_lock_set(&b->theaplock); - if ((BBP_status(*i) & BBPNEW) && + if ((BBP_status(bid) & BBPNEW) && BATcheckmodes(b, false) != GDK_SUCCEED) /* check mmap modes */ - *i = -*i; /* error */ - else if ((BBP_status(*i) & BBPPERSISTENT) && + *i = -bid; /* error */ + else if ((BBP_status(bid) & BBPPERSISTENT) && (subcommit || BATdirty(b))) { MT_lock_unset(&b->theaplock); return b; /* the bat is loaded, persistent and dirty */ } MT_lock_unset(&b->theaplock); } else if (subcommit) - return BBP_desc(*i); + return BBP_desc(bid); } return NULL; } @@ -3790,6 +3799,7 @@ BBPsync(int cnt, bat *restrict subcommit char buf[3000]; int n = subcommit ? 0 : -1; FILE *obbpf, *nbbpf; + int nbats = 0; TRC_INFO(TM, "Committing %d bats\n", cnt - 1); @@ -3824,12 +3834,11 @@ BBPsync(int cnt, bat *restrict subcommit BBP_status_on(bid, BBPSYNCING); /* wait until unloading is finished before * attempting to make a backup */ - while (BBP_status(bid) & BBPUNLOADING) { - if (lock) - MT_lock_unset(&GDKswapLock(bid)); + if (lock) { + while (BBP_status(bid) & BBPUNLOADING) + MT_cond_wait(&GDKswapCond(bid), &GDKswapLock(bid)); + } else { BBPspin(bid, __func__, BBPUNLOADING); - if (lock) - MT_lock_set(&GDKswapLock(bid)); } BAT *b = BBP_desc(bid); if (subcommit && b->ttype != TYPE_void) { @@ -3866,7 +3875,7 @@ BBPsync(int cnt, bat *restrict subcommit fname, BAKDIR, SUBDIR); } } - b = dirty_bat(&i, subcommit != NULL); + b = dirty_bat(&i, subcommit != NULL, lock); if (i <= 0) ret = GDK_FAIL; else if (BBP_status(bid) & BBPEXISTING && @@ -3907,13 +3916,11 @@ BBPsync(int cnt, bat *restrict subcommit * can set it, wait for * BBPUNLOADING before * attempting to save */ - for (;;) { - if (lock) - MT_lock_set(&GDKswapLock(i)); - if (!(BBP_status(i) & (BBPSAVING|BBPUNLOADING))) - break; - if (lock) - MT_lock_unset(&GDKswapLock(i)); + if (lock) { + MT_lock_set(&GDKswapLock(i)); + while (BBP_status(i) & (BBPSAVING|BBPUNLOADING)) + MT_cond_wait(&GDKswapCond(i), &GDKswapLock(i)); + } else { BBPspin(i, __func__, BBPSAVING|BBPUNLOADING); } BBP_status_on(i, BBPSAVING); @@ -3921,13 +3928,14 @@ BBPsync(int cnt, bat *restrict subcommit MT_lock_unset(&GDKswapLock(i)); ret = BATsave_iter(b, &bi, size); BBP_status_off(i, BBPSAVING); + MT_cond_broadcast(&GDKswapCond(i)); } bip = &bi; } else { bip = NULL; } if (ret == GDK_SUCCEED) { - n = BBPdir_step(i, size, n, buf, sizeof(buf), &obbpf, nbbpf, bip); + n = BBPdir_step(i, size, n, buf, sizeof(buf), &obbpf, nbbpf, bip, &nbats); if (n < -1) ret = GDK_FAIL; } @@ -3959,6 +3967,7 @@ BBPsync(int cnt, bat *restrict subcommit if (ret != GDK_SUCCEED) GDKsyserror("rename(%s,%s) failed\n", bakdir, deldir); TRC_DEBUG(IO_, "rename %s %s = %d\n", bakdir, deldir, (int) ret); + TRC_INFO(TM, "%d bats written to BBP.dir\n", nbats); } /* AFTERMATH */ @@ -4001,6 +4010,7 @@ BBPsync(int cnt, bat *restrict subcommit for (int idx = 1; idx < cnt; idx++) { bat i = subcommit ? subcommit[idx] : idx; BBP_status_off(i, BBPSYNCING); + MT_cond_broadcast(&GDKswapCond(i)); _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org