Changeset: 17d121c9c60d for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/17d121c9c60d Branch: extract_types Log Message:
Merge with default. diffs (truncated from 759 to 300 lines): diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c --- a/gdk/gdk_bbp.c +++ b/gdk/gdk_bbp.c @@ -2916,6 +2916,10 @@ decref(bat i, bool logical, bool lock, c BBP_status_off(i, BBPUNLOADING); } } + if (tp) + decref(tp, false, lock, func); + if (tvp) + decref(tvp, false, lock, func); return refs; } @@ -2931,13 +2935,6 @@ BBPrelease(bat i) return decref(i, true, true, __func__); } -/* - * M5 often changes the physical ref into a logical reference. This - * state change consist of the sequence BBPretain(b);BBPunfix(b). - * A faster solution is given below, because it does not trigger the - * BBP management actions, such as garbage collecting the bats. - * [first step, initiate code change] - */ void BBPkeepref(BAT *b) { @@ -3202,9 +3199,9 @@ BBPdestroy(BAT *b) /* parent released when completely done with child */ if (tp) - BBPunshare(tp); + BBPrelease(tp); if (vtp) - BBPunshare(vtp); + BBPrelease(vtp); } static gdk_return @@ -3231,9 +3228,9 @@ BBPfree(BAT *b) /* parent released when completely done with child */ if (ret == GDK_SUCCEED && tp) - BBPunshare(tp); + BBPrelease(tp); if (ret == GDK_SUCCEED && vtp) - BBPunshare(vtp); + BBPrelease(vtp); return ret; } diff --git a/gdk/gdk_heap.c b/gdk/gdk_heap.c --- a/gdk/gdk_heap.c +++ b/gdk/gdk_heap.c @@ -177,6 +177,11 @@ HEAPalloc(Heap *h, size_t nitems, size_t return GDK_FAIL; } h->newstorage = h->storage; + if (h->farmid == 1) { + QryCtx *qc = MT_thread_get_qry_ctx(); + if (qc) + ATOMIC_ADD(&qc->datasize, h->size); + } return GDK_SUCCEED; } @@ -201,6 +206,8 @@ HEAPalloc(Heap *h, size_t nitems, size_t gdk_return HEAPextend(Heap *h, size_t size, bool mayshare) { + size_t osize = h->size; + if (size <= h->size) return GDK_SUCCEED; /* nothing to do */ @@ -239,6 +246,11 @@ HEAPextend(Heap *h, size_t size, bool ma if (p) { h->size = size; h->base = p; + if (h->farmid == 1) { + QryCtx *qc = MT_thread_get_qry_ctx(); + if (qc) + ATOMIC_ADD(&qc->datasize, size - osize); + } return GDK_SUCCEED; /* success */ } failure = "GDKmremap() failed"; @@ -262,8 +274,14 @@ HEAPextend(Heap *h, size_t size, bool ma h->base = GDKrealloc(h->base, size); TRC_DEBUG(HEAP, "Extending malloced heap %s %zu %zu %p %p\n", h->filename, size, h->size, bak.base, h->base); h->size = size; - if (h->base) + if (h->base) { + if (h->farmid == 1) { + QryCtx *qc = MT_thread_get_qry_ctx(); + if (qc) + ATOMIC_ADD(&qc->datasize, size - osize); + } return GDK_SUCCEED; /* success */ + } /* bak.base is still valid and may get restored */ failure = "h->storage == STORE_MEM && !must_map && !h->base"; } @@ -293,6 +311,11 @@ HEAPextend(Heap *h, size_t size, bool ma if (bak.free > 0) memcpy(h->base, bak.base, bak.free); HEAPfree(&bak, false); + if (h->farmid == 1) { + QryCtx *qc = MT_thread_get_qry_ctx(); + if (qc) + ATOMIC_ADD(&qc->datasize, size); + } return GDK_SUCCEED; } GDKclrerr(); @@ -391,6 +414,11 @@ HEAPshrink(Heap *h, size_t size) h->filename, h->size, size, h->base, p); } if (p) { + if (h->farmid == 1) { + QryCtx *qc = MT_thread_get_qry_ctx(); + if (qc) + ATOMIC_SUB(&qc->datasize, h->size - size); + } h->size = size; h->base = p; return GDK_SUCCEED; @@ -578,6 +606,11 @@ void HEAPfree(Heap *h, bool rmheap) { if (h->base) { + if (h->farmid == 1) { + QryCtx *qc = MT_thread_get_qry_ctx(); + if (qc) + ATOMIC_SUB(&qc->datasize, h->size); + } if (h->storage == STORE_MEM) { /* plain memory */ TRC_DEBUG(HEAP, "HEAPfree %s %zu %p\n", h->filename, h->size, h->base); GDKfree(h->base); @@ -764,6 +797,11 @@ HEAPload_intern(Heap *h, const char *nme return GDK_FAIL; /* file could not be read satisfactorily */ h->dirty = false; /* we just read it, so it's clean */ + if (h->farmid == 1) { + QryCtx *qc = MT_thread_get_qry_ctx(); + if (qc) + ATOMIC_ADD(&qc->datasize, h->size); + } return GDK_SUCCEED; } diff --git a/gdk/gdk_system.h b/gdk/gdk_system.h --- a/gdk/gdk_system.h +++ b/gdk/gdk_system.h @@ -134,6 +134,8 @@ #define PTW32 1 #endif +#include "matomic.h" + /* debug and errno integers */ gdk_export int GDKdebug; gdk_export void GDKsetdebug(int debug); @@ -161,8 +163,9 @@ enum MT_thr_detach { MT_THR_JOINABLE, MT typedef int64_t lng; typedef struct QryCtx { - const lng starttime; + lng starttime; lng querytimeout; + ATOMIC_TYPE datasize; } QryCtx; gdk_export bool MT_thread_init(void); diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c --- a/monetdb5/mal/mal_client.c +++ b/monetdb5/mal/mal_client.c @@ -55,11 +55,15 @@ ClientRec *mal_clients = NULL; void mal_client_reset(void) { - MAL_MAXCLIENTS = 0; if (mal_clients) { + for (int i = 0; i < MAL_MAXCLIENTS; i++) { + ATOMIC_DESTROY(&mal_clients[i].lastprint); + ATOMIC_DESTROY(&mal_clients[i].qryctx.datasize); + } GDKfree(mal_clients); mal_clients = NULL; } + MAL_MAXCLIENTS = 0; } bool @@ -86,6 +90,7 @@ MCinit(void) } for (int i = 0; i < MAL_MAXCLIENTS; i++){ ATOMIC_INIT(&mal_clients[i].lastprint, 0); + ATOMIC_INIT(&mal_clients[i].qryctx.datasize, 0); mal_clients[i].idx = -1; /* indicate it's available */ } return true; @@ -258,9 +263,10 @@ MCinitClientRecord(Client c, oid user, b strcpy_len(c->optimizer, "default_pipe", sizeof(c->optimizer)); c->workerlimit = 0; c->memorylimit = 0; - c->querytimeout = 0; + c->qryctx.querytimeout = 0; c->sessiontimeout = 0; - c->starttime = 0; + c->qryctx.starttime = 0; + ATOMIC_SET(&c->qryctx.datasize, 0); c->itrace = 0; c->errbuf = 0; @@ -382,7 +388,7 @@ MCforkClient(Client father) strcpy_len(father->optimizer, son->optimizer, sizeof(father->optimizer)); son->workerlimit = father->workerlimit; son->memorylimit = father->memorylimit; - son->querytimeout = father->querytimeout; + son->qryctx.querytimeout = father->qryctx.querytimeout; son->sessiontimeout = father->sessiontimeout; if (son->prompt) @@ -425,7 +431,7 @@ MCshutdowninprogress(void){ * child can not close a parent. */ void -MCfreeClient(Client c) +MCcloseClient(Client c) { MT_lock_set(&mal_contextLock); if( c->mode == FREECLIENT) { @@ -462,7 +468,7 @@ MCfreeClient(Client c) strcpy_len(c->optimizer, "default_pipe", sizeof(c->optimizer)); c->workerlimit = 0; c->memorylimit = 0; - c->querytimeout = 0; + c->qryctx.querytimeout = 0; c->sessiontimeout = 0; c->user = oid_nil; if( c->username){ @@ -571,12 +577,6 @@ MCmemoryClaim(void) return claim * LL_CONSTANT(1048576); } -void -MCcloseClient(Client c) -{ - MCfreeClient(c); -} - str MCsuspendClient(int id) { diff --git a/monetdb5/mal/mal_client.h b/monetdb5/mal/mal_client.h --- a/monetdb5/mal/mal_client.h +++ b/monetdb5/mal/mal_client.h @@ -67,9 +67,8 @@ typedef struct CLIENT { char optimizer[IDLENGTH];/* The optimizer pipe preferred for this session */ int workerlimit; /* maximum number of workthreads processing a query */ int memorylimit; /* Memory claim highwater mark, 0 = no limit */ - lng querytimeout; /* query abort after x usec, 0 = no limit*/ lng sessiontimeout; /* session abort after x usec, 0 = no limit */ - lng starttime; /* track when the query started, for resource management */ + QryCtx qryctx; /* per query limitations */ time_t login; /* Time when this session started */ lng session; /* usec since start of server */ diff --git a/monetdb5/mal/mal_embedded.c b/monetdb5/mal/mal_embedded.c --- a/monetdb5/mal/mal_embedded.c +++ b/monetdb5/mal/mal_embedded.c @@ -104,7 +104,7 @@ malEmbeddedBoot(int workerlimit, int mem throw(MAL, "malEmbeddedBoot", "Failed to initialize client"); c->workerlimit = workerlimit; c->memorylimit = memorylimit; - c->querytimeout = querytimeout * 1000000; // from sec to usec + c->qryctx.querytimeout = querytimeout * 1000000; // from sec to usec c->sessiontimeout = sessiontimeout * 1000000; c->curmodule = c->usermodule = userModule(); if(c->usermodule == NULL) { diff --git a/monetdb5/mal/mal_interpreter.c b/monetdb5/mal/mal_interpreter.c --- a/monetdb5/mal/mal_interpreter.c +++ b/monetdb5/mal/mal_interpreter.c @@ -360,7 +360,7 @@ runMAL(Client cntxt, MalBlkPtr mb, MalBl garbageCollector(cntxt, mb, stk, env != stk); if (stk && stk != env) freeStack(stk); - if (ret == MAL_SUCCEED && cntxt->querytimeout && cntxt->starttime && GDKusec()- cntxt->starttime > cntxt->querytimeout) + if (ret == MAL_SUCCEED && cntxt->qryctx.querytimeout && cntxt->qryctx.starttime && GDKusec()- cntxt->qryctx.starttime > cntxt->qryctx.querytimeout) throw(MAL, "mal.interpreter", SQLSTATE(HYT00) RUNTIME_QRY_TIMEOUT); return ret; } @@ -454,7 +454,7 @@ callMAL(Client cntxt, MalBlkPtr mb, MalS } if (stk) garbageCollector(cntxt, mb, stk, TRUE); - if (ret == MAL_SUCCEED && cntxt->querytimeout && cntxt->starttime && GDKusec()- cntxt->starttime > cntxt->querytimeout) + if (ret == MAL_SUCCEED && cntxt->qryctx.querytimeout && cntxt->qryctx.starttime && GDKusec()- cntxt->qryctx.starttime > cntxt->qryctx.querytimeout) _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org