Changeset: c7a362ad4619 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/c7a362ad4619
Modified Files:
        gdk/gdk.h
        gdk/gdk_hash.c
        gdk/gdk_join.c
        gdk/gdk_select.c
        sql/backends/monet5/sql.c
        sql/backends/monet5/sql_upgrades.c
        sql/server/rel_schema.c
        sql/test/emptydb/Tests/check.stable.out
        sql/test/emptydb/Tests/check.stable.out.32bit
        sql/test/emptydb/Tests/check.stable.out.int128
Branch: default
Log Message:

Merged with Jul2021


diffs (truncated from 5268 to 300 lines):

diff --git a/common/utils/matomic.h b/common/utils/matomic.h
--- a/common/utils/matomic.h
+++ b/common/utils/matomic.h
@@ -87,15 +87,17 @@ typedef unsigned long long ATOMIC_BASE_T
 #endif
 
 #define ATOMIC_INIT(var, val)  atomic_init(var, (ATOMIC_BASE_TYPE) (val))
-#define ATOMIC_DESTROY(var)    ((void) 0)
-#define ATOMIC_GET(var)                atomic_load(var)
+#define ATOMIC_DESTROY(var)            ((void) 0)
+#define ATOMIC_GET(var)                        atomic_load(var)
 #define ATOMIC_SET(var, val)   atomic_store(var, (ATOMIC_BASE_TYPE) (val))
 #define ATOMIC_XCG(var, val)   atomic_exchange(var, (ATOMIC_BASE_TYPE) (val))
 #define ATOMIC_CAS(var, exp, des)      atomic_compare_exchange_strong(var, 
exp, (ATOMIC_BASE_TYPE) (des))
 #define ATOMIC_ADD(var, val)   atomic_fetch_add(var, (ATOMIC_BASE_TYPE) (val))
 #define ATOMIC_SUB(var, val)   atomic_fetch_sub(var, (ATOMIC_BASE_TYPE) (val))
-#define ATOMIC_INC(var)                (atomic_fetch_add(var, 1) + 1)
-#define ATOMIC_DEC(var)                (atomic_fetch_sub(var, 1) - 1)
+#define ATOMIC_INC(var)                        (atomic_fetch_add(var, 1) + 1)
+#define ATOMIC_DEC(var)                        (atomic_fetch_sub(var, 1) - 1)
+#define ATOMIC_OR(var, val)            atomic_fetch_or(var, (ATOMIC_BASE_TYPE) 
(val))
+#define ATOMIC_AND(var, val)   atomic_fetch_and(var, (ATOMIC_BASE_TYPE) (val))
 
 #ifdef __INTEL_COMPILER
 typedef volatile atomic_address ATOMIC_PTR_TYPE;
@@ -166,8 +168,10 @@ ATOMIC_CAS(ATOMIC_TYPE *var, ATOMIC_BASE
 #define ATOMIC_CAS(var, exp, des)      ATOMIC_CAS(var, exp, (ATOMIC_BASE_TYPE) 
(des))
 #define ATOMIC_ADD(var, val)   _InterlockedExchangeAdd64(var, 
(ATOMIC_BASE_TYPE) (val))
 #define ATOMIC_SUB(var, val)   _InterlockedExchangeAdd64(var, 
-(ATOMIC_BASE_TYPE) (val))
-#define ATOMIC_INC(var)                _InterlockedIncrement64(var)
-#define ATOMIC_DEC(var)                _InterlockedDecrement64(var)
+#define ATOMIC_INC(var)                        _InterlockedIncrement64(var)
+#define ATOMIC_DEC(var)                        _InterlockedDecrement64(var)
+#define ATOMIC_OR(var, val)            _InterlockedOr64(var, 
(ATOMIC_BASE_TYPE) (val))
+#define ATOMIC_AND(var, val)   _InterlockedAnd64(var, (ATOMIC_BASE_TYPE) (val))
 
 #else
 
@@ -179,6 +183,23 @@ ATOMIC_CAS(ATOMIC_TYPE *var, ATOMIC_BASE
 #define ATOMIC_GET(var)                        
_InlineInterlockedExchangeAdd64(var, 0)
 #define ATOMIC_SET(var, val)   _InlineInterlockedExchange64(var, 
(ATOMIC_BASE_TYPE) (val))
 #define ATOMIC_XCG(var, val)   _InlineInterlockedExchange64(var, 
(ATOMIC_BASE_TYPE) (val))
+#define ATOMIC_ADD(var, val)   _InlineInterlockedExchangeAdd64(var, 
(ATOMIC_BASE_TYPE) (val))
+#define ATOMIC_SUB(var, val)   _InlineInterlockedExchangeAdd64(var, 
-(ATOMIC_BASE_TYPE) (val))
+#define ATOMIC_INC(var)                        
_InlineInterlockedIncrement64(var)
+#define ATOMIC_DEC(var)                        
_InlineInterlockedDecrement64(var)
+#define ATOMIC_OR(var, val)            _InlineInterlockedOr64(var, 
(ATOMIC_BASE_TYPE) (val))
+#define ATOMIC_AND(var, val)   _InlineInterlockedAnd64(var, (ATOMIC_BASE_TYPE) 
(val))
+#else
+#define ATOMIC_GET(var)                        _InterlockedExchangeAdd64(var, 
0)
+#define ATOMIC_SET(var, val)   _InterlockedExchange64(var, (ATOMIC_BASE_TYPE) 
(val))
+#define ATOMIC_XCG(var, val)   _InterlockedExchange64(var, (ATOMIC_BASE_TYPE) 
(val))
+#define ATOMIC_ADD(var, val)   _InterlockedExchangeAdd64(var, 
(ATOMIC_BASE_TYPE) (val))
+#define ATOMIC_SUB(var, val)   _InterlockedExchangeAdd64(var, 
-(ATOMIC_BASE_TYPE) (val))
+#define ATOMIC_INC(var)                        _InterlockedIncrement64(var)
+#define ATOMIC_DEC(var)                        _InterlockedDecrement64(var)
+#define ATOMIC_OR(var, val)            _InterlockedOr64(var, 
(ATOMIC_BASE_TYPE) (val))
+#define ATOMIC_AND(var, val)   _InterlockedAnd64(var, (ATOMIC_BASE_TYPE) (val))
+#endif
 static inline bool
 ATOMIC_CAS(ATOMIC_TYPE *var, ATOMIC_BASE_TYPE *exp, ATOMIC_BASE_TYPE des)
 {
@@ -190,30 +211,6 @@ ATOMIC_CAS(ATOMIC_TYPE *var, ATOMIC_BASE
        return false;
 }
 #define ATOMIC_CAS(var, exp, des)      ATOMIC_CAS(var, exp, (ATOMIC_BASE_TYPE) 
(des))
-#define ATOMIC_ADD(var, val)   _InlineInterlockedExchangeAdd64(var, 
(ATOMIC_BASE_TYPE) (val))
-#define ATOMIC_SUB(var, val)   _InlineInterlockedExchangeAdd64(var, 
-(ATOMIC_BASE_TYPE) (val))
-#define ATOMIC_INC(var)                _InlineInterlockedIncrement64(var)
-#define ATOMIC_DEC(var)                _InlineInterlockedDecrement64(var)
-#else
-#define ATOMIC_GET(var)                        _InterlockedExchangeAdd64(var, 
0)
-#define ATOMIC_SET(var, val)   _InterlockedExchange64(var, (ATOMIC_BASE_TYPE) 
(val))
-#define ATOMIC_XCG(var, val)   _InterlockedExchange64(var, (ATOMIC_BASE_TYPE) 
(val))
-static inline bool
-ATOMIC_CAS(ATOMIC_TYPE *var, ATOMIC_BASE_TYPE *exp, ATOMIC_BASE_TYPE des)
-{
-       ATOMIC_BASE_TYPE old;
-       old = _InterlockedCompareExchange64(var, des, *exp);
-       if (old == *exp)
-               return true;
-       *exp = old;
-       return false;
-}
-#define ATOMIC_CAS(var, exp, des)      ATOMIC_CAS(var, exp, (ATOMIC_BASE_TYPE) 
(des))
-#define ATOMIC_ADD(var, val)   _InterlockedExchangeAdd64(var, 
(ATOMIC_BASE_TYPE) (val))
-#define ATOMIC_SUB(var, val)   _InterlockedExchangeAdd64(var, 
-(ATOMIC_BASE_TYPE) (val))
-#define ATOMIC_INC(var)                _InterlockedIncrement64(var)
-#define ATOMIC_DEC(var)                _InterlockedDecrement64(var)
-#endif
 
 #endif
 
@@ -259,14 +256,16 @@ typedef volatile int ATOMIC_TYPE;
 #define ATOMIC_INIT(var, val)  (*(var) = (val))
 #define ATOMIC_DESTROY(var)    ((void) 0)
 
-#define ATOMIC_GET(var)                __atomic_load_n(var, __ATOMIC_SEQ_CST)
+#define ATOMIC_GET(var)                        __atomic_load_n(var, 
__ATOMIC_SEQ_CST)
 #define ATOMIC_SET(var, val)   __atomic_store_n(var, (ATOMIC_BASE_TYPE) (val), 
__ATOMIC_SEQ_CST)
 #define ATOMIC_XCG(var, val)   __atomic_exchange_n(var, (ATOMIC_BASE_TYPE) 
(val), __ATOMIC_SEQ_CST)
 #define ATOMIC_CAS(var, exp, des)      __atomic_compare_exchange_n(var, exp, 
(ATOMIC_BASE_TYPE) (des), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
 #define ATOMIC_ADD(var, val)   __atomic_fetch_add(var, (ATOMIC_BASE_TYPE) 
(val), __ATOMIC_SEQ_CST)
-#define ATOMIC_SUB(var, val)   __atomic_fetch_sub(var, (ATOMIC_BASE_TYPE) 
(val), __ATOMIC_SEQ_CST)
-#define ATOMIC_INC(var)                __atomic_add_fetch(var, 1, 
__ATOMIC_SEQ_CST)
-#define ATOMIC_DEC(var)                __atomic_sub_fetch(var, 1, 
__ATOMIC_SEQ_CST)
+#define ATOMIC_SUB(var, val)   __atomic_fetch_sub(var, (ATOMIC_BASE_TYPE) 
(val), __ATOMIC_SEQ_CST)#
+define ATOMIC_INC(var)                 __atomic_add_fetch(var, 1, 
__ATOMIC_SEQ_CST)
+#define ATOMIC_DEC(var)                        __atomic_sub_fetch(var, 1, 
__ATOMIC_SEQ_CST)
+#define ATOMIC_OR(var, val)            __atomic_fetch_or(var, 
(ATOMIC_BASE_TYPE) (val), __ATOMIC_SEQ_CST)
+#define ATOMIC_AND(var, val)   __atomic_fetch_and(var, (ATOMIC_BASE_TYPE) 
(val), __ATOMIC_SEQ_CST)
 
 typedef void *volatile ATOMIC_PTR_TYPE;
 #define ATOMIC_PTR_INIT(var, val)      (*(var) = (val))
@@ -397,6 +396,30 @@ ATOMIC_DEC(ATOMIC_TYPE *var)
        return new;
 }
 
+static inline ATOMIC_BASE_TYPE
+ATOMIC_OR(ATOMIC_TYPE *var, ATOMIC_BASE_TYPE val)
+{
+       ATOMIC_BASE_TYPE old;
+       pthread_mutex_lock(&var->lck);
+       old = var->val;
+       var->val += val;
+       pthread_mutex_unlock(&var->lck);
+       return old;
+}
+#define ATOMIC_OR(var, val)    ATOMIC_OR(var, (ATOMIC_BASE_TYPE) (val))
+
+static inline ATOMIC_BASE_TYPE
+ATOMIC_AND(ATOMIC_TYPE *var, ATOMIC_BASE_TYPE val)
+{
+       ATOMIC_BASE_TYPE old;
+       pthread_mutex_lock(&var->lck);
+       old = var->val;
+       var->val += val;
+       pthread_mutex_unlock(&var->lck);
+       return old;
+}
+#define ATOMIC_AND(var, val)   ATOMIC_AND(var, (ATOMIC_BASE_TYPE) (val))
+
 typedef struct {
        void *val;
        pthread_mutex_t lck;
diff --git a/gdk/CMakeLists.txt b/gdk/CMakeLists.txt
--- a/gdk/CMakeLists.txt
+++ b/gdk/CMakeLists.txt
@@ -53,7 +53,7 @@ target_sources(bat
   gdk_string.c
   gdk_qsort.c
   gdk_qsort_impl.h
-  gdk_storage.c gdk_storage.h
+  gdk_storage.c
   gdk_bat.c
   gdk_delta.c gdk_delta.h
   gdk_cross.c
diff --git a/gdk/gdk.h b/gdk/gdk.h
--- a/gdk/gdk.h
+++ b/gdk/gdk.h
@@ -1392,7 +1392,7 @@ typedef struct {
        bat next;               /* next BBP slot in linked list */
        int refs;               /* in-memory references on which the loaded 
status of a BAT relies */
        int lrefs;              /* logical references on which the existence of 
a BAT relies */
-       volatile unsigned status; /* status mask used for spin locking */
+       ATOMIC_TYPE status;     /* status mask used for spin locking */
        /* MT_Id pid;           non-zero thread-id if this BAT is private */
 } BBPrec;
 
@@ -1424,7 +1424,7 @@ gdk_export BBPrec *BBP[N_BBPINIT];
 #define BBP_desc(i)    BBP_record(i).desc
 #define BBP_refs(i)    BBP_record(i).refs
 #define BBP_lrefs(i)   BBP_record(i).lrefs
-#define BBP_status(i)  BBP_record(i).status
+#define BBP_status(i)  ((unsigned) ATOMIC_GET(&BBP_record(i).status))
 #define BBP_pid(i)     BBP_record(i).pid
 #define BATgetId(b)    BBP_logical((b)->batCacheid)
 #define BBPvalid(i)    (BBP_logical(i) != NULL && *BBP_logical(i) != '.')
diff --git a/gdk/gdk_bat.c b/gdk/gdk_bat.c
--- a/gdk/gdk_bat.c
+++ b/gdk/gdk_bat.c
@@ -1243,6 +1243,7 @@ BUNappendmulti(BAT *b, const void *value
        }
 
        BATrmprop(b, GDK_UNIQUE_ESTIMATE);
+       b->theap->dirty |= count > 0;
        for (BUN i = 0; i < count; i++) {
                void *t = b->ttype && b->tvarsized ? ((void **) values)[i] :
                        (void *) ((char *) values + i * Tsize(b));
@@ -1255,8 +1256,6 @@ BUNappendmulti(BAT *b, const void *value
                }
                p++;
        }
-       if (b->theap)
-               b->theap->dirty |= count > 0;
 
        IMPSdestroy(b); /* no support for inserts in imprints yet */
        OIDXdestroy(b);
@@ -1756,6 +1755,7 @@ BATsetcount(BAT *b, BUN cnt)
 
        b->batCount = cnt;
        b->batDirtydesc = true;
+       b->theap->dirty |= b->ttype != TYPE_void;
        if (b->theap->parentid == b->batCacheid)
                b->theap->free = tailsize(b, cnt);
        if (b->ttype == TYPE_void)
@@ -2281,24 +2281,24 @@ BATmode(BAT *b, bool transient)
                MT_lock_set(&GDKswapLock(bid));
                if (!transient) {
                        if (!(BBP_status(bid) & BBPDELETED))
-                               BBP_status_on(bid, BBPNEW, "BATmode");
+                               BBP_status_on(bid, BBPNEW);
                        else
-                               BBP_status_on(bid, BBPEXISTING, "BATmode");
-                       BBP_status_off(bid, BBPDELETED, "BATmode");
+                               BBP_status_on(bid, BBPEXISTING);
+                       BBP_status_off(bid, BBPDELETED);
                } else if (!b->batTransient) {
                        if (!(BBP_status(bid) & BBPNEW))
-                               BBP_status_on(bid, BBPDELETED, "BATmode");
-                       BBP_status_off(bid, BBPPERSISTENT, "BATmode");
+                               BBP_status_on(bid, BBPDELETED);
+                       BBP_status_off(bid, BBPPERSISTENT);
                }
                /* session bats or persistent bats that did not
                 * witness a commit yet may have been saved */
                if (b->batCopiedtodisk) {
                        if (!transient) {
-                               BBP_status_off(bid, BBPTMP, "BATmode");
+                               BBP_status_off(bid, BBPTMP);
                        } else {
                                /* TMcommit must remove it to
                                 * guarantee free space */
-                               BBP_status_on(bid, BBPTMP, "BATmode");
+                               BBP_status_on(bid, BBPTMP);
                        }
                }
                b->batTransient = transient;
diff --git a/gdk/gdk_batop.c b/gdk/gdk_batop.c
--- a/gdk/gdk_batop.c
+++ b/gdk/gdk_batop.c
@@ -540,6 +540,7 @@ append_msk_bat(BAT *b, BAT *n, struct ca
        uint32_t boff = b->batCount % 32;
        uint32_t *bp = (uint32_t *) b->theap->base + b->batCount / 32;
        b->batCount += ci->ncand;
+       b->theap->dirty = true;
        b->theap->free = ((b->batCount + 31) / 32) * 4;
        if (ci->tpe == cand_dense) {
                uint32_t *np;
diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c
--- a/gdk/gdk_bbp.c
+++ b/gdk/gdk_bbp.c
@@ -73,7 +73,6 @@
 #include "monetdb_config.h"
 #include "gdk.h"
 #include "gdk_private.h"
-#include "gdk_storage.h"
 #include "mutils.h"
 
 #ifndef F_OK
@@ -108,7 +107,7 @@ struct BBPfarm_t BBPfarms[MAXFARMS];
 bat *BBP_hash = NULL;          /* BBP logical name hash buckets */
 bat BBP_mask = 0;              /* number of buckets = & mask */
 
-static gdk_return BBPfree(BAT *b, const char *calledFrom);
+static gdk_return BBPfree(BAT *b);
 static void BBPdestroy(BAT *b);
 static void BBPuncacheit(bat bid, bool unloaddesc);
 static gdk_return BBPprepare(bool subcommit);
@@ -362,12 +361,15 @@ BBPextend(int idx, bool buildhash)
 
        /* make sure the new size is at least BBPsize large */
        while (BBPlimit < (bat) ATOMIC_GET(&BBPsize)) {
-               assert(BBP[BBPlimit >> BBPINITLOG] == NULL);
-               BBP[BBPlimit >> BBPINITLOG] = GDKzalloc(BBPINIT * 
sizeof(BBPrec));
-               if (BBP[BBPlimit >> BBPINITLOG] == NULL) {
+               BUN limit = BBPlimit >> BBPINITLOG;
+               assert(BBP[limit] == NULL);
+               BBP[limit] = GDKzalloc(BBPINIT * sizeof(BBPrec));
+               if (BBP[limit] == NULL) {
                        GDKerror("failed to extend BAT pool\n");
                        return GDK_FAIL;
                }
+               for (BUN i = 0; i < BBPINIT; i++)
+                       ATOMIC_INIT(&BBP[limit][i].status, 0);
                BBPlimit += BBPINIT;
        }
 
@@ -720,7 +722,7 @@ BBPreadEntries(FILE *fp, unsigned bbpver
                BBP_refs(bid) = 0;
                BBP_lrefs(bid) = 1;     /* any BAT we encounter here is 
persistent, so has a logical reference */
                BBP_desc(bid) = bn;
-               BBP_status(bid) = BBPEXISTING;  /* do we need other status 
bits? */
+               BBP_status_set(bid, BBPEXISTING);       /* do we need other 
status bits? */
        }
        return GDK_SUCCEED;
 }
@@ -1677,19 +1679,6 @@ BBPindex(const char *nme)
        return BBP_find(nme, true);
 }
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to