Changeset: c7a362ad4619 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/c7a362ad4619 Modified Files: gdk/gdk.h gdk/gdk_hash.c gdk/gdk_join.c gdk/gdk_select.c sql/backends/monet5/sql.c sql/backends/monet5/sql_upgrades.c sql/server/rel_schema.c sql/test/emptydb/Tests/check.stable.out sql/test/emptydb/Tests/check.stable.out.32bit sql/test/emptydb/Tests/check.stable.out.int128 Branch: default Log Message:
Merged with Jul2021 diffs (truncated from 5268 to 300 lines): diff --git a/common/utils/matomic.h b/common/utils/matomic.h --- a/common/utils/matomic.h +++ b/common/utils/matomic.h @@ -87,15 +87,17 @@ typedef unsigned long long ATOMIC_BASE_T #endif #define ATOMIC_INIT(var, val) atomic_init(var, (ATOMIC_BASE_TYPE) (val)) -#define ATOMIC_DESTROY(var) ((void) 0) -#define ATOMIC_GET(var) atomic_load(var) +#define ATOMIC_DESTROY(var) ((void) 0) +#define ATOMIC_GET(var) atomic_load(var) #define ATOMIC_SET(var, val) atomic_store(var, (ATOMIC_BASE_TYPE) (val)) #define ATOMIC_XCG(var, val) atomic_exchange(var, (ATOMIC_BASE_TYPE) (val)) #define ATOMIC_CAS(var, exp, des) atomic_compare_exchange_strong(var, exp, (ATOMIC_BASE_TYPE) (des)) #define ATOMIC_ADD(var, val) atomic_fetch_add(var, (ATOMIC_BASE_TYPE) (val)) #define ATOMIC_SUB(var, val) atomic_fetch_sub(var, (ATOMIC_BASE_TYPE) (val)) -#define ATOMIC_INC(var) (atomic_fetch_add(var, 1) + 1) -#define ATOMIC_DEC(var) (atomic_fetch_sub(var, 1) - 1) +#define ATOMIC_INC(var) (atomic_fetch_add(var, 1) + 1) +#define ATOMIC_DEC(var) (atomic_fetch_sub(var, 1) - 1) +#define ATOMIC_OR(var, val) atomic_fetch_or(var, (ATOMIC_BASE_TYPE) (val)) +#define ATOMIC_AND(var, val) atomic_fetch_and(var, (ATOMIC_BASE_TYPE) (val)) #ifdef __INTEL_COMPILER typedef volatile atomic_address ATOMIC_PTR_TYPE; @@ -166,8 +168,10 @@ ATOMIC_CAS(ATOMIC_TYPE *var, ATOMIC_BASE #define ATOMIC_CAS(var, exp, des) ATOMIC_CAS(var, exp, (ATOMIC_BASE_TYPE) (des)) #define ATOMIC_ADD(var, val) _InterlockedExchangeAdd64(var, (ATOMIC_BASE_TYPE) (val)) #define ATOMIC_SUB(var, val) _InterlockedExchangeAdd64(var, -(ATOMIC_BASE_TYPE) (val)) -#define ATOMIC_INC(var) _InterlockedIncrement64(var) -#define ATOMIC_DEC(var) _InterlockedDecrement64(var) +#define ATOMIC_INC(var) _InterlockedIncrement64(var) +#define ATOMIC_DEC(var) _InterlockedDecrement64(var) +#define ATOMIC_OR(var, val) _InterlockedOr64(var, (ATOMIC_BASE_TYPE) (val)) +#define ATOMIC_AND(var, val) _InterlockedAnd64(var, (ATOMIC_BASE_TYPE) (val)) #else @@ -179,6 +183,23 @@ ATOMIC_CAS(ATOMIC_TYPE *var, ATOMIC_BASE #define ATOMIC_GET(var) _InlineInterlockedExchangeAdd64(var, 0) #define ATOMIC_SET(var, val) _InlineInterlockedExchange64(var, (ATOMIC_BASE_TYPE) (val)) #define ATOMIC_XCG(var, val) _InlineInterlockedExchange64(var, (ATOMIC_BASE_TYPE) (val)) +#define ATOMIC_ADD(var, val) _InlineInterlockedExchangeAdd64(var, (ATOMIC_BASE_TYPE) (val)) +#define ATOMIC_SUB(var, val) _InlineInterlockedExchangeAdd64(var, -(ATOMIC_BASE_TYPE) (val)) +#define ATOMIC_INC(var) _InlineInterlockedIncrement64(var) +#define ATOMIC_DEC(var) _InlineInterlockedDecrement64(var) +#define ATOMIC_OR(var, val) _InlineInterlockedOr64(var, (ATOMIC_BASE_TYPE) (val)) +#define ATOMIC_AND(var, val) _InlineInterlockedAnd64(var, (ATOMIC_BASE_TYPE) (val)) +#else +#define ATOMIC_GET(var) _InterlockedExchangeAdd64(var, 0) +#define ATOMIC_SET(var, val) _InterlockedExchange64(var, (ATOMIC_BASE_TYPE) (val)) +#define ATOMIC_XCG(var, val) _InterlockedExchange64(var, (ATOMIC_BASE_TYPE) (val)) +#define ATOMIC_ADD(var, val) _InterlockedExchangeAdd64(var, (ATOMIC_BASE_TYPE) (val)) +#define ATOMIC_SUB(var, val) _InterlockedExchangeAdd64(var, -(ATOMIC_BASE_TYPE) (val)) +#define ATOMIC_INC(var) _InterlockedIncrement64(var) +#define ATOMIC_DEC(var) _InterlockedDecrement64(var) +#define ATOMIC_OR(var, val) _InterlockedOr64(var, (ATOMIC_BASE_TYPE) (val)) +#define ATOMIC_AND(var, val) _InterlockedAnd64(var, (ATOMIC_BASE_TYPE) (val)) +#endif static inline bool ATOMIC_CAS(ATOMIC_TYPE *var, ATOMIC_BASE_TYPE *exp, ATOMIC_BASE_TYPE des) { @@ -190,30 +211,6 @@ ATOMIC_CAS(ATOMIC_TYPE *var, ATOMIC_BASE return false; } #define ATOMIC_CAS(var, exp, des) ATOMIC_CAS(var, exp, (ATOMIC_BASE_TYPE) (des)) -#define ATOMIC_ADD(var, val) _InlineInterlockedExchangeAdd64(var, (ATOMIC_BASE_TYPE) (val)) -#define ATOMIC_SUB(var, val) _InlineInterlockedExchangeAdd64(var, -(ATOMIC_BASE_TYPE) (val)) -#define ATOMIC_INC(var) _InlineInterlockedIncrement64(var) -#define ATOMIC_DEC(var) _InlineInterlockedDecrement64(var) -#else -#define ATOMIC_GET(var) _InterlockedExchangeAdd64(var, 0) -#define ATOMIC_SET(var, val) _InterlockedExchange64(var, (ATOMIC_BASE_TYPE) (val)) -#define ATOMIC_XCG(var, val) _InterlockedExchange64(var, (ATOMIC_BASE_TYPE) (val)) -static inline bool -ATOMIC_CAS(ATOMIC_TYPE *var, ATOMIC_BASE_TYPE *exp, ATOMIC_BASE_TYPE des) -{ - ATOMIC_BASE_TYPE old; - old = _InterlockedCompareExchange64(var, des, *exp); - if (old == *exp) - return true; - *exp = old; - return false; -} -#define ATOMIC_CAS(var, exp, des) ATOMIC_CAS(var, exp, (ATOMIC_BASE_TYPE) (des)) -#define ATOMIC_ADD(var, val) _InterlockedExchangeAdd64(var, (ATOMIC_BASE_TYPE) (val)) -#define ATOMIC_SUB(var, val) _InterlockedExchangeAdd64(var, -(ATOMIC_BASE_TYPE) (val)) -#define ATOMIC_INC(var) _InterlockedIncrement64(var) -#define ATOMIC_DEC(var) _InterlockedDecrement64(var) -#endif #endif @@ -259,14 +256,16 @@ typedef volatile int ATOMIC_TYPE; #define ATOMIC_INIT(var, val) (*(var) = (val)) #define ATOMIC_DESTROY(var) ((void) 0) -#define ATOMIC_GET(var) __atomic_load_n(var, __ATOMIC_SEQ_CST) +#define ATOMIC_GET(var) __atomic_load_n(var, __ATOMIC_SEQ_CST) #define ATOMIC_SET(var, val) __atomic_store_n(var, (ATOMIC_BASE_TYPE) (val), __ATOMIC_SEQ_CST) #define ATOMIC_XCG(var, val) __atomic_exchange_n(var, (ATOMIC_BASE_TYPE) (val), __ATOMIC_SEQ_CST) #define ATOMIC_CAS(var, exp, des) __atomic_compare_exchange_n(var, exp, (ATOMIC_BASE_TYPE) (des), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) #define ATOMIC_ADD(var, val) __atomic_fetch_add(var, (ATOMIC_BASE_TYPE) (val), __ATOMIC_SEQ_CST) -#define ATOMIC_SUB(var, val) __atomic_fetch_sub(var, (ATOMIC_BASE_TYPE) (val), __ATOMIC_SEQ_CST) -#define ATOMIC_INC(var) __atomic_add_fetch(var, 1, __ATOMIC_SEQ_CST) -#define ATOMIC_DEC(var) __atomic_sub_fetch(var, 1, __ATOMIC_SEQ_CST) +#define ATOMIC_SUB(var, val) __atomic_fetch_sub(var, (ATOMIC_BASE_TYPE) (val), __ATOMIC_SEQ_CST)# +define ATOMIC_INC(var) __atomic_add_fetch(var, 1, __ATOMIC_SEQ_CST) +#define ATOMIC_DEC(var) __atomic_sub_fetch(var, 1, __ATOMIC_SEQ_CST) +#define ATOMIC_OR(var, val) __atomic_fetch_or(var, (ATOMIC_BASE_TYPE) (val), __ATOMIC_SEQ_CST) +#define ATOMIC_AND(var, val) __atomic_fetch_and(var, (ATOMIC_BASE_TYPE) (val), __ATOMIC_SEQ_CST) typedef void *volatile ATOMIC_PTR_TYPE; #define ATOMIC_PTR_INIT(var, val) (*(var) = (val)) @@ -397,6 +396,30 @@ ATOMIC_DEC(ATOMIC_TYPE *var) return new; } +static inline ATOMIC_BASE_TYPE +ATOMIC_OR(ATOMIC_TYPE *var, ATOMIC_BASE_TYPE val) +{ + ATOMIC_BASE_TYPE old; + pthread_mutex_lock(&var->lck); + old = var->val; + var->val += val; + pthread_mutex_unlock(&var->lck); + return old; +} +#define ATOMIC_OR(var, val) ATOMIC_OR(var, (ATOMIC_BASE_TYPE) (val)) + +static inline ATOMIC_BASE_TYPE +ATOMIC_AND(ATOMIC_TYPE *var, ATOMIC_BASE_TYPE val) +{ + ATOMIC_BASE_TYPE old; + pthread_mutex_lock(&var->lck); + old = var->val; + var->val += val; + pthread_mutex_unlock(&var->lck); + return old; +} +#define ATOMIC_AND(var, val) ATOMIC_AND(var, (ATOMIC_BASE_TYPE) (val)) + typedef struct { void *val; pthread_mutex_t lck; diff --git a/gdk/CMakeLists.txt b/gdk/CMakeLists.txt --- a/gdk/CMakeLists.txt +++ b/gdk/CMakeLists.txt @@ -53,7 +53,7 @@ target_sources(bat gdk_string.c gdk_qsort.c gdk_qsort_impl.h - gdk_storage.c gdk_storage.h + gdk_storage.c gdk_bat.c gdk_delta.c gdk_delta.h gdk_cross.c diff --git a/gdk/gdk.h b/gdk/gdk.h --- a/gdk/gdk.h +++ b/gdk/gdk.h @@ -1392,7 +1392,7 @@ typedef struct { bat next; /* next BBP slot in linked list */ int refs; /* in-memory references on which the loaded status of a BAT relies */ int lrefs; /* logical references on which the existence of a BAT relies */ - volatile unsigned status; /* status mask used for spin locking */ + ATOMIC_TYPE status; /* status mask used for spin locking */ /* MT_Id pid; non-zero thread-id if this BAT is private */ } BBPrec; @@ -1424,7 +1424,7 @@ gdk_export BBPrec *BBP[N_BBPINIT]; #define BBP_desc(i) BBP_record(i).desc #define BBP_refs(i) BBP_record(i).refs #define BBP_lrefs(i) BBP_record(i).lrefs -#define BBP_status(i) BBP_record(i).status +#define BBP_status(i) ((unsigned) ATOMIC_GET(&BBP_record(i).status)) #define BBP_pid(i) BBP_record(i).pid #define BATgetId(b) BBP_logical((b)->batCacheid) #define BBPvalid(i) (BBP_logical(i) != NULL && *BBP_logical(i) != '.') diff --git a/gdk/gdk_bat.c b/gdk/gdk_bat.c --- a/gdk/gdk_bat.c +++ b/gdk/gdk_bat.c @@ -1243,6 +1243,7 @@ BUNappendmulti(BAT *b, const void *value } BATrmprop(b, GDK_UNIQUE_ESTIMATE); + b->theap->dirty |= count > 0; for (BUN i = 0; i < count; i++) { void *t = b->ttype && b->tvarsized ? ((void **) values)[i] : (void *) ((char *) values + i * Tsize(b)); @@ -1255,8 +1256,6 @@ BUNappendmulti(BAT *b, const void *value } p++; } - if (b->theap) - b->theap->dirty |= count > 0; IMPSdestroy(b); /* no support for inserts in imprints yet */ OIDXdestroy(b); @@ -1756,6 +1755,7 @@ BATsetcount(BAT *b, BUN cnt) b->batCount = cnt; b->batDirtydesc = true; + b->theap->dirty |= b->ttype != TYPE_void; if (b->theap->parentid == b->batCacheid) b->theap->free = tailsize(b, cnt); if (b->ttype == TYPE_void) @@ -2281,24 +2281,24 @@ BATmode(BAT *b, bool transient) MT_lock_set(&GDKswapLock(bid)); if (!transient) { if (!(BBP_status(bid) & BBPDELETED)) - BBP_status_on(bid, BBPNEW, "BATmode"); + BBP_status_on(bid, BBPNEW); else - BBP_status_on(bid, BBPEXISTING, "BATmode"); - BBP_status_off(bid, BBPDELETED, "BATmode"); + BBP_status_on(bid, BBPEXISTING); + BBP_status_off(bid, BBPDELETED); } else if (!b->batTransient) { if (!(BBP_status(bid) & BBPNEW)) - BBP_status_on(bid, BBPDELETED, "BATmode"); - BBP_status_off(bid, BBPPERSISTENT, "BATmode"); + BBP_status_on(bid, BBPDELETED); + BBP_status_off(bid, BBPPERSISTENT); } /* session bats or persistent bats that did not * witness a commit yet may have been saved */ if (b->batCopiedtodisk) { if (!transient) { - BBP_status_off(bid, BBPTMP, "BATmode"); + BBP_status_off(bid, BBPTMP); } else { /* TMcommit must remove it to * guarantee free space */ - BBP_status_on(bid, BBPTMP, "BATmode"); + BBP_status_on(bid, BBPTMP); } } b->batTransient = transient; diff --git a/gdk/gdk_batop.c b/gdk/gdk_batop.c --- a/gdk/gdk_batop.c +++ b/gdk/gdk_batop.c @@ -540,6 +540,7 @@ append_msk_bat(BAT *b, BAT *n, struct ca uint32_t boff = b->batCount % 32; uint32_t *bp = (uint32_t *) b->theap->base + b->batCount / 32; b->batCount += ci->ncand; + b->theap->dirty = true; b->theap->free = ((b->batCount + 31) / 32) * 4; if (ci->tpe == cand_dense) { uint32_t *np; diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c --- a/gdk/gdk_bbp.c +++ b/gdk/gdk_bbp.c @@ -73,7 +73,6 @@ #include "monetdb_config.h" #include "gdk.h" #include "gdk_private.h" -#include "gdk_storage.h" #include "mutils.h" #ifndef F_OK @@ -108,7 +107,7 @@ struct BBPfarm_t BBPfarms[MAXFARMS]; bat *BBP_hash = NULL; /* BBP logical name hash buckets */ bat BBP_mask = 0; /* number of buckets = & mask */ -static gdk_return BBPfree(BAT *b, const char *calledFrom); +static gdk_return BBPfree(BAT *b); static void BBPdestroy(BAT *b); static void BBPuncacheit(bat bid, bool unloaddesc); static gdk_return BBPprepare(bool subcommit); @@ -362,12 +361,15 @@ BBPextend(int idx, bool buildhash) /* make sure the new size is at least BBPsize large */ while (BBPlimit < (bat) ATOMIC_GET(&BBPsize)) { - assert(BBP[BBPlimit >> BBPINITLOG] == NULL); - BBP[BBPlimit >> BBPINITLOG] = GDKzalloc(BBPINIT * sizeof(BBPrec)); - if (BBP[BBPlimit >> BBPINITLOG] == NULL) { + BUN limit = BBPlimit >> BBPINITLOG; + assert(BBP[limit] == NULL); + BBP[limit] = GDKzalloc(BBPINIT * sizeof(BBPrec)); + if (BBP[limit] == NULL) { GDKerror("failed to extend BAT pool\n"); return GDK_FAIL; } + for (BUN i = 0; i < BBPINIT; i++) + ATOMIC_INIT(&BBP[limit][i].status, 0); BBPlimit += BBPINIT; } @@ -720,7 +722,7 @@ BBPreadEntries(FILE *fp, unsigned bbpver BBP_refs(bid) = 0; BBP_lrefs(bid) = 1; /* any BAT we encounter here is persistent, so has a logical reference */ BBP_desc(bid) = bn; - BBP_status(bid) = BBPEXISTING; /* do we need other status bits? */ + BBP_status_set(bid, BBPEXISTING); /* do we need other status bits? */ } return GDK_SUCCEED; } @@ -1677,19 +1679,6 @@ BBPindex(const char *nme) return BBP_find(nme, true); } _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list