Changeset: 78d7e86a78ae for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/78d7e86a78ae
Modified Files:
        clients/Tests/exports.stable.out
        sql/backends/monet5/vaults/odbc/odbc_loader.c
Branch: nested
Log Message:

merged from default


diffs (truncated from 929 to 300 lines):

diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -365,7 +365,7 @@ gdk_return MT_alloc_tls(MT_TLS_t *newkey
 int MT_check_nr_cores(void);
 void MT_cond_broadcast(MT_Cond *cond);
 void MT_cond_destroy(MT_Cond *cond);
-void MT_cond_init(MT_Cond *cond);
+void MT_cond_init(MT_Cond *cond, const char *name);
 void MT_cond_signal(MT_Cond *cond);
 void MT_cond_wait(MT_Cond *cond, MT_Lock *lock);
 int MT_create_thread(MT_Id *t, void (*function)(void *), void *arg, enum 
MT_thr_detach d, const char *threadname);
diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c
--- a/gdk/gdk_bbp.c
+++ b/gdk/gdk_bbp.c
@@ -2221,16 +2221,18 @@ BBPdir_first(bool subcommit, lng logno, 
 
 static bat
 BBPdir_step(bat bid, BUN size, int n, char *buf, size_t bufsize,
-           FILE **obbpfp, FILE *nbbpf, BATiter *bi)
+           FILE **obbpfp, FILE *nbbpf, BATiter *bi, int *nbatp)
 {
        if (n < -1)             /* safety catch */
                return n;
+       int nbat = 0;
        while (n >= 0 && n < bid) {
                if (n > 0) {
                        if (fputs(buf, nbbpf) == EOF) {
                                GDKerror("Writing BBP.dir file failed.\n");
                                goto bailout;
                        }
+                       nbat++;
                }
                if (fgets(buf, (int) bufsize, *obbpfp) == NULL) {
                        if (ferror(*obbpfp)) {
@@ -2254,7 +2256,9 @@ BBPdir_step(bat bid, BUN size, int n, ch
                assert(BBP_status(bid) & BBPPERSISTENT);
                if (new_bbpentry(nbbpf, bid, size, bi) != GDK_SUCCEED)
                        goto bailout;
+               nbat++;
        }
+       *nbatp += nbat;
        return n == -1 ? -1 : n == bid ? 0 : n;
 
   bailout:
@@ -2885,13 +2889,10 @@ incref(bat i, bool logical, bool lock)
                return 0;
 
        if (lock) {
-               for (;;) {
-                       MT_lock_set(&GDKswapLock(i));
-                       if (!(BBP_status(i) & (BBPUNSTABLE|BBPLOADING)))
-                               break;
+               MT_lock_set(&GDKswapLock(i));
+               while (BBP_status(i) & (BBPUNSTABLE|BBPLOADING)) {
                        /* the BATs is "unstable", try again */
-                       MT_lock_unset(&GDKswapLock(i));
-                       BBPspin(i, __func__, BBPUNSTABLE|BBPLOADING);
+                       MT_cond_wait(&GDKswapCond(i), &GDKswapLock(i));
                }
        }
        /* we have the lock */
@@ -2954,15 +2955,12 @@ decref(bat i, bool logical, bool lock, c
        if (BBPcheck(i) == 0)
                return -1;
 
-       if (lock)
+       if (lock) {
                MT_lock_set(&GDKswapLock(i));
-
-       while (BBP_status(i) & BBPUNLOADING) {
-               if (lock)
-                       MT_lock_unset(&GDKswapLock(i));
+               while (BBP_status(i) & BBPUNLOADING)
+                       MT_cond_wait(&GDKswapCond(i), &GDKswapLock(i));
+       } else {
                BBPspin(i, func, BBPUNLOADING);
-               if (lock)
-                       MT_lock_set(&GDKswapLock(i));
        }
 
        b = (BBP_status(i) & BBPLOADED) ? BBP_desc(i) : NULL;
@@ -3080,6 +3078,7 @@ decref(bat i, bool logical, bool lock, c
                        BBPclear(i);
                } else {
                        BBP_status_off(i, BBPUNLOADING);
+                       MT_cond_broadcast(&GDKswapCond(i));
                }
        }
        return refs;
@@ -3127,13 +3126,10 @@ BATdescriptor(bat i)
        if (BBPcheck(i)) {
                bool lock = locked_by == 0 || locked_by != MT_getpid();
                if (lock) {
-                       for (;;) {
-                               MT_lock_set(&GDKswapLock(i));
-                               if (!(BBP_status(i) & (BBPUNSTABLE|BBPLOADING)))
-                                       break;
+                       MT_lock_set(&GDKswapLock(i));
+                       while (BBP_status(i) & (BBPUNSTABLE|BBPLOADING)) {
                                /* the BATs is "unstable", try again */
-                               MT_lock_unset(&GDKswapLock(i));
-                               BBPspin(i, __func__, BBPUNSTABLE|BBPLOADING);
+                               MT_cond_wait(&GDKswapCond(i), &GDKswapLock(i));
                        }
                }
                if (incref(i, false, false) > 0) {
@@ -3175,9 +3171,7 @@ getBBPdescriptor(bat i)
        b = BBP_desc(i);
        if ((status & BBPLOADED) == 0 || status & BBPWAITING) {
                while (BBP_status(i) & BBPWAITING) {    /* wait for bat to be 
loaded by other thread */
-                       MT_lock_unset(&GDKswapLock(i));
-                       BBPspin(i, __func__, BBPWAITING);
-                       MT_lock_set(&GDKswapLock(i));
+                       MT_cond_wait(&GDKswapCond(i), &GDKswapLock(i));
                }
                if (BBPvalid(i)) {
                        if ((BBP_status(i) & BBPLOADED) == 0) {
@@ -3195,6 +3189,7 @@ getBBPdescriptor(bat i)
                BBP_status_off(i, BBPLOADING);
                CHECKDEBUG if (b != NULL)
                        BATassertProps(b);
+               MT_cond_broadcast(&GDKswapCond(i));
        }
        return b;
 }
@@ -3228,9 +3223,13 @@ BBPsave(BAT *b)
 
        if (BBP_status(bid) & BBPSAVING) {
                /* wait until save in other thread completes */
-               if (lock)
+               if (lock) {
+                       while (BBP_status(bid) & BBPSAVING)
+                               MT_cond_wait(&GDKswapCond(bid), 
&GDKswapLock(bid));
                        MT_lock_unset(&GDKswapLock(bid));
-               BBPspin(bid, __func__, BBPSAVING);
+               } else {
+                       BBPspin(bid, __func__, BBPSAVING);
+               }
        } else {
                /* save it */
                unsigned flags = BBPSAVING;
@@ -3257,6 +3256,7 @@ BBPsave(BAT *b)
                }
                /* clearing bits can be done without the lock */
                BBP_status_off(bid, BBPSAVING);
+               MT_cond_broadcast(&GDKswapCond(bid));
        }
        return ret;
 }
@@ -3314,6 +3314,7 @@ BBPfree(BAT *b)
        }
        TRC_DEBUG(BAT_, "turn off unloading %d\n", bid);
        BBP_status_off(bid, BBPUNLOADING);
+       MT_cond_broadcast(&GDKswapCond(bid));
        BBP_unload_dec();
        return ret;
 }
@@ -3340,8 +3341,9 @@ BBPquickdesc(bat bid)
                }
                return NULL;
        }
-       BBPspin(bid, __func__, BBPWAITING);
+//     BBPspin(bid, __func__, BBPWAITING);
        b = BBP_desc(bid);
+       MT_lock_set(&b->theaplock);
        if (b->ttype < 0) {
                const char *aname = ATOMunknown_name(b->ttype);
                int tt = ATOMindex(aname);
@@ -3352,6 +3354,7 @@ BBPquickdesc(bat bid)
                        b->ttype = tt;
                }
        }
+       MT_lock_unset(&b->theaplock);
        return b;
 }
 
@@ -3359,25 +3362,31 @@ BBPquickdesc(bat bid)
  * @+ Global Commit
  */
 static BAT *
-dirty_bat(bat *i, bool subcommit)
+dirty_bat(bat *i, bool subcommit, bool lock)
 {
-       if (BBPvalid(*i)) {
+       const bat bid = *i;
+       if (BBPvalid(bid)) {
                BAT *b;
-               BBPspin(*i, __func__, BBPSAVING);
-               if (BBP_status(*i) & BBPLOADED) {
-                       b = BBP_desc(*i);
+               if (lock) {
+                       while (BBP_status(bid) & BBPSAVING)
+                               MT_cond_wait(&GDKswapCond(bid), 
&GDKswapLock(bid));
+               } else {
+                       BBPspin(bid, __func__, BBPSAVING);
+               }
+               if (BBP_status(bid) & BBPLOADED) {
+                       b = BBP_desc(bid);
                        MT_lock_set(&b->theaplock);
-                       if ((BBP_status(*i) & BBPNEW) &&
+                       if ((BBP_status(bid) & BBPNEW) &&
                            BATcheckmodes(b, false) != GDK_SUCCEED) /* check 
mmap modes */
-                               *i = -*i;       /* error */
-                       else if ((BBP_status(*i) & BBPPERSISTENT) &&
+                               *i = -bid;      /* error */
+                       else if ((BBP_status(bid) & BBPPERSISTENT) &&
                                 (subcommit || BATdirty(b))) {
                                MT_lock_unset(&b->theaplock);
                                return b;       /* the bat is loaded, 
persistent and dirty */
                        }
                        MT_lock_unset(&b->theaplock);
                } else if (subcommit)
-                       return BBP_desc(*i);
+                       return BBP_desc(bid);
        }
        return NULL;
 }
@@ -3790,6 +3799,7 @@ BBPsync(int cnt, bat *restrict subcommit
        char buf[3000];
        int n = subcommit ? 0 : -1;
        FILE *obbpf, *nbbpf;
+       int nbats = 0;
 
        TRC_INFO(TM, "Committing %d bats\n", cnt - 1);
 
@@ -3824,12 +3834,11 @@ BBPsync(int cnt, bat *restrict subcommit
                BBP_status_on(bid, BBPSYNCING);
                /* wait until unloading is finished before
                 * attempting to make a backup */
-               while (BBP_status(bid) & BBPUNLOADING) {
-                       if (lock)
-                               MT_lock_unset(&GDKswapLock(bid));
+               if (lock) {
+                       while (BBP_status(bid) & BBPUNLOADING)
+                               MT_cond_wait(&GDKswapCond(bid), 
&GDKswapLock(bid));
+               } else {
                        BBPspin(bid, __func__, BBPUNLOADING);
-                       if (lock)
-                               MT_lock_set(&GDKswapLock(bid));
                }
                BAT *b = BBP_desc(bid);
                if (subcommit && b->ttype != TYPE_void) {
@@ -3866,7 +3875,7 @@ BBPsync(int cnt, bat *restrict subcommit
                                                  fname, BAKDIR, SUBDIR);
                        }
                }
-               b = dirty_bat(&i, subcommit != NULL);
+               b = dirty_bat(&i, subcommit != NULL, lock);
                if (i <= 0)
                        ret = GDK_FAIL;
                else if (BBP_status(bid) & BBPEXISTING &&
@@ -3907,13 +3916,11 @@ BBPsync(int cnt, bat *restrict subcommit
                                 * can set it, wait for
                                 * BBPUNLOADING before
                                 * attempting to save */
-                               for (;;) {
-                                       if (lock)
-                                               MT_lock_set(&GDKswapLock(i));
-                                       if (!(BBP_status(i) & 
(BBPSAVING|BBPUNLOADING)))
-                                               break;
-                                       if (lock)
-                                               MT_lock_unset(&GDKswapLock(i));
+                               if (lock) {
+                                       MT_lock_set(&GDKswapLock(i));
+                                       while (BBP_status(i) & 
(BBPSAVING|BBPUNLOADING))
+                                               MT_cond_wait(&GDKswapCond(i), 
&GDKswapLock(i));
+                               } else {
                                        BBPspin(i, __func__, 
BBPSAVING|BBPUNLOADING);
                                }
                                BBP_status_on(i, BBPSAVING);
@@ -3921,13 +3928,14 @@ BBPsync(int cnt, bat *restrict subcommit
                                        MT_lock_unset(&GDKswapLock(i));
                                ret = BATsave_iter(b, &bi, size);
                                BBP_status_off(i, BBPSAVING);
+                               MT_cond_broadcast(&GDKswapCond(i));
                        }
                        bip = &bi;
                } else {
                        bip = NULL;
                }
                if (ret == GDK_SUCCEED) {
-                       n = BBPdir_step(i, size, n, buf, sizeof(buf), &obbpf, 
nbbpf, bip);
+                       n = BBPdir_step(i, size, n, buf, sizeof(buf), &obbpf, 
nbbpf, bip, &nbats);
                        if (n < -1)
                                ret = GDK_FAIL;
                }
@@ -3959,6 +3967,7 @@ BBPsync(int cnt, bat *restrict subcommit
                if (ret != GDK_SUCCEED)
                        GDKsyserror("rename(%s,%s) failed\n", bakdir, deldir);
                TRC_DEBUG(IO_, "rename %s %s = %d\n", bakdir, deldir, (int) 
ret);
+               TRC_INFO(TM, "%d bats written to BBP.dir\n", nbats);
        }
 
        /* AFTERMATH */
@@ -4001,6 +4010,7 @@ BBPsync(int cnt, bat *restrict subcommit
        for (int idx = 1; idx < cnt; idx++) {
                bat i = subcommit ? subcommit[idx] : idx;
                BBP_status_off(i, BBPSYNCING);
+               MT_cond_broadcast(&GDKswapCond(i));
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to