Changeset: 17d121c9c60d for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/17d121c9c60d
Branch: extract_types
Log Message:

Merge with default.


diffs (truncated from 759 to 300 lines):

diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c
--- a/gdk/gdk_bbp.c
+++ b/gdk/gdk_bbp.c
@@ -2916,6 +2916,10 @@ decref(bat i, bool logical, bool lock, c
                        BBP_status_off(i, BBPUNLOADING);
                }
        }
+       if (tp)
+               decref(tp, false, lock, func);
+       if (tvp)
+               decref(tvp, false, lock, func);
        return refs;
 }
 
@@ -2931,13 +2935,6 @@ BBPrelease(bat i)
        return decref(i, true, true, __func__);
 }
 
-/*
- * M5 often changes the physical ref into a logical reference.  This
- * state change consist of the sequence BBPretain(b);BBPunfix(b).
- * A faster solution is given below, because it does not trigger the
- * BBP management actions, such as garbage collecting the bats.
- * [first step, initiate code change]
- */
 void
 BBPkeepref(BAT *b)
 {
@@ -3202,9 +3199,9 @@ BBPdestroy(BAT *b)
 
        /* parent released when completely done with child */
        if (tp)
-               BBPunshare(tp);
+               BBPrelease(tp);
        if (vtp)
-               BBPunshare(vtp);
+               BBPrelease(vtp);
 }
 
 static gdk_return
@@ -3231,9 +3228,9 @@ BBPfree(BAT *b)
 
        /* parent released when completely done with child */
        if (ret == GDK_SUCCEED && tp)
-               BBPunshare(tp);
+               BBPrelease(tp);
        if (ret == GDK_SUCCEED && vtp)
-               BBPunshare(vtp);
+               BBPrelease(vtp);
        return ret;
 }
 
diff --git a/gdk/gdk_heap.c b/gdk/gdk_heap.c
--- a/gdk/gdk_heap.c
+++ b/gdk/gdk_heap.c
@@ -177,6 +177,11 @@ HEAPalloc(Heap *h, size_t nitems, size_t
                return GDK_FAIL;
        }
        h->newstorage = h->storage;
+       if (h->farmid == 1) {
+               QryCtx *qc = MT_thread_get_qry_ctx();
+               if (qc)
+                       ATOMIC_ADD(&qc->datasize, h->size);
+       }
        return GDK_SUCCEED;
 }
 
@@ -201,6 +206,8 @@ HEAPalloc(Heap *h, size_t nitems, size_t
 gdk_return
 HEAPextend(Heap *h, size_t size, bool mayshare)
 {
+       size_t osize = h->size;
+
        if (size <= h->size)
                return GDK_SUCCEED;     /* nothing to do */
 
@@ -239,6 +246,11 @@ HEAPextend(Heap *h, size_t size, bool ma
                if (p) {
                        h->size = size;
                        h->base = p;
+                       if (h->farmid == 1) {
+                               QryCtx *qc = MT_thread_get_qry_ctx();
+                               if (qc)
+                                       ATOMIC_ADD(&qc->datasize, size - osize);
+                       }
                        return GDK_SUCCEED; /* success */
                }
                failure = "GDKmremap() failed";
@@ -262,8 +274,14 @@ HEAPextend(Heap *h, size_t size, bool ma
                        h->base = GDKrealloc(h->base, size);
                        TRC_DEBUG(HEAP, "Extending malloced heap %s %zu %zu %p 
%p\n", h->filename, size, h->size, bak.base, h->base);
                        h->size = size;
-                       if (h->base)
+                       if (h->base) {
+                               if (h->farmid == 1) {
+                                       QryCtx *qc = MT_thread_get_qry_ctx();
+                                       if (qc)
+                                               ATOMIC_ADD(&qc->datasize, size 
- osize);
+                               }
                                return GDK_SUCCEED; /* success */
+                       }
                        /* bak.base is still valid and may get restored */
                        failure = "h->storage == STORE_MEM && !must_map && 
!h->base";
                }
@@ -293,6 +311,11 @@ HEAPextend(Heap *h, size_t size, bool ma
                                        if (bak.free > 0)
                                                memcpy(h->base, bak.base, 
bak.free);
                                        HEAPfree(&bak, false);
+                                       if (h->farmid == 1) {
+                                               QryCtx *qc = 
MT_thread_get_qry_ctx();
+                                               if (qc)
+                                                       
ATOMIC_ADD(&qc->datasize, size);
+                                       }
                                        return GDK_SUCCEED;
                                }
                                GDKclrerr();
@@ -391,6 +414,11 @@ HEAPshrink(Heap *h, size_t size)
                          h->filename, h->size, size, h->base, p);
        }
        if (p) {
+               if (h->farmid == 1) {
+                       QryCtx *qc = MT_thread_get_qry_ctx();
+                       if (qc)
+                               ATOMIC_SUB(&qc->datasize, h->size - size);
+               }
                h->size = size;
                h->base = p;
                return GDK_SUCCEED;
@@ -578,6 +606,11 @@ void
 HEAPfree(Heap *h, bool rmheap)
 {
        if (h->base) {
+               if (h->farmid == 1) {
+                       QryCtx *qc = MT_thread_get_qry_ctx();
+                       if (qc)
+                               ATOMIC_SUB(&qc->datasize, h->size);
+               }
                if (h->storage == STORE_MEM) {  /* plain memory */
                        TRC_DEBUG(HEAP, "HEAPfree %s %zu %p\n", h->filename, 
h->size, h->base);
                        GDKfree(h->base);
@@ -764,6 +797,11 @@ HEAPload_intern(Heap *h, const char *nme
                return GDK_FAIL; /* file could  not be read satisfactorily */
 
        h->dirty = false;       /* we just read it, so it's clean */
+       if (h->farmid == 1) {
+               QryCtx *qc = MT_thread_get_qry_ctx();
+               if (qc)
+                       ATOMIC_ADD(&qc->datasize, h->size);
+       }
        return GDK_SUCCEED;
 }
 
diff --git a/gdk/gdk_system.h b/gdk/gdk_system.h
--- a/gdk/gdk_system.h
+++ b/gdk/gdk_system.h
@@ -134,6 +134,8 @@
 #define PTW32 1
 #endif
 
+#include "matomic.h"
+
 /* debug and errno integers */
 gdk_export int GDKdebug;
 gdk_export void GDKsetdebug(int debug);
@@ -161,8 +163,9 @@ enum MT_thr_detach { MT_THR_JOINABLE, MT
 typedef int64_t lng;
 
 typedef struct QryCtx {
-       const lng starttime;
+       lng starttime;
        lng querytimeout;
+       ATOMIC_TYPE datasize;
 } QryCtx;
 
 gdk_export bool MT_thread_init(void);
diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c
--- a/monetdb5/mal/mal_client.c
+++ b/monetdb5/mal/mal_client.c
@@ -55,11 +55,15 @@ ClientRec *mal_clients = NULL;
 void
 mal_client_reset(void)
 {
-       MAL_MAXCLIENTS = 0;
        if (mal_clients) {
+               for (int i = 0; i < MAL_MAXCLIENTS; i++) {
+                       ATOMIC_DESTROY(&mal_clients[i].lastprint);
+                       ATOMIC_DESTROY(&mal_clients[i].qryctx.datasize);
+               }
                GDKfree(mal_clients);
                mal_clients = NULL;
        }
+       MAL_MAXCLIENTS = 0;
 }
 
 bool
@@ -86,6 +90,7 @@ MCinit(void)
        }
        for (int i = 0; i < MAL_MAXCLIENTS; i++){
                ATOMIC_INIT(&mal_clients[i].lastprint, 0);
+               ATOMIC_INIT(&mal_clients[i].qryctx.datasize, 0);
                mal_clients[i].idx = -1; /* indicate it's available */
        }
        return true;
@@ -258,9 +263,10 @@ MCinitClientRecord(Client c, oid user, b
        strcpy_len(c->optimizer, "default_pipe", sizeof(c->optimizer));
        c->workerlimit = 0;
        c->memorylimit = 0;
-       c->querytimeout = 0;
+       c->qryctx.querytimeout = 0;
        c->sessiontimeout = 0;
-       c->starttime = 0;
+       c->qryctx.starttime = 0;
+       ATOMIC_SET(&c->qryctx.datasize, 0);
        c->itrace = 0;
        c->errbuf = 0;
 
@@ -382,7 +388,7 @@ MCforkClient(Client father)
                strcpy_len(father->optimizer, son->optimizer, 
sizeof(father->optimizer));
                son->workerlimit = father->workerlimit;
                son->memorylimit = father->memorylimit;
-               son->querytimeout = father->querytimeout;
+               son->qryctx.querytimeout = father->qryctx.querytimeout;
                son->sessiontimeout = father->sessiontimeout;
 
                if (son->prompt)
@@ -425,7 +431,7 @@ MCshutdowninprogress(void){
  * child can not close a parent.
  */
 void
-MCfreeClient(Client c)
+MCcloseClient(Client c)
 {
        MT_lock_set(&mal_contextLock);
        if( c->mode == FREECLIENT) {
@@ -462,7 +468,7 @@ MCfreeClient(Client c)
        strcpy_len(c->optimizer, "default_pipe", sizeof(c->optimizer));
        c->workerlimit = 0;
        c->memorylimit = 0;
-       c->querytimeout = 0;
+       c->qryctx.querytimeout = 0;
        c->sessiontimeout = 0;
        c->user = oid_nil;
        if( c->username){
@@ -571,12 +577,6 @@ MCmemoryClaim(void)
        return claim * LL_CONSTANT(1048576);
 }
 
-void
-MCcloseClient(Client c)
-{
-       MCfreeClient(c);
-}
-
 str
 MCsuspendClient(int id)
 {
diff --git a/monetdb5/mal/mal_client.h b/monetdb5/mal/mal_client.h
--- a/monetdb5/mal/mal_client.h
+++ b/monetdb5/mal/mal_client.h
@@ -67,9 +67,8 @@ typedef struct CLIENT {
        char    optimizer[IDLENGTH];/* The optimizer pipe preferred for this 
session */
        int     workerlimit;            /* maximum number of workthreads 
processing a query */
        int             memorylimit;            /* Memory claim highwater mark, 
0 = no limit */
-       lng     querytimeout;           /* query abort after x usec, 0 = no 
limit*/
        lng         sessiontimeout;             /* session abort after x usec, 
0 = no limit */
-       lng starttime; /* track when the query started, for resource management 
*/
+       QryCtx  qryctx;                         /* per query limitations */
 
        time_t  login;          /* Time when this session started */
        lng     session;        /* usec since start of server */
diff --git a/monetdb5/mal/mal_embedded.c b/monetdb5/mal/mal_embedded.c
--- a/monetdb5/mal/mal_embedded.c
+++ b/monetdb5/mal/mal_embedded.c
@@ -104,7 +104,7 @@ malEmbeddedBoot(int workerlimit, int mem
                throw(MAL, "malEmbeddedBoot", "Failed to initialize client");
        c->workerlimit = workerlimit;
        c->memorylimit = memorylimit;
-       c->querytimeout = querytimeout * 1000000;       // from sec to usec
+       c->qryctx.querytimeout = querytimeout * 1000000;        // from sec to 
usec
        c->sessiontimeout = sessiontimeout * 1000000;
        c->curmodule = c->usermodule = userModule();
        if(c->usermodule == NULL) {
diff --git a/monetdb5/mal/mal_interpreter.c b/monetdb5/mal/mal_interpreter.c
--- a/monetdb5/mal/mal_interpreter.c
+++ b/monetdb5/mal/mal_interpreter.c
@@ -360,7 +360,7 @@ runMAL(Client cntxt, MalBlkPtr mb, MalBl
                garbageCollector(cntxt, mb, stk, env != stk);
        if (stk && stk != env)
                freeStack(stk);
-       if (ret == MAL_SUCCEED && cntxt->querytimeout && cntxt->starttime && 
GDKusec()- cntxt->starttime > cntxt->querytimeout)
+       if (ret == MAL_SUCCEED && cntxt->qryctx.querytimeout && 
cntxt->qryctx.starttime && GDKusec()- cntxt->qryctx.starttime > 
cntxt->qryctx.querytimeout)
                throw(MAL, "mal.interpreter", SQLSTATE(HYT00) 
RUNTIME_QRY_TIMEOUT);
        return ret;
 }
@@ -454,7 +454,7 @@ callMAL(Client cntxt, MalBlkPtr mb, MalS
        }
        if (stk)
                garbageCollector(cntxt, mb, stk, TRUE);
-       if (ret == MAL_SUCCEED && cntxt->querytimeout && cntxt->starttime && 
GDKusec()- cntxt->starttime > cntxt->querytimeout)
+       if (ret == MAL_SUCCEED && cntxt->qryctx.querytimeout && 
cntxt->qryctx.starttime && GDKusec()- cntxt->qryctx.starttime > 
cntxt->qryctx.querytimeout)
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to