Changeset: b077db99e2e3 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/b077db99e2e3
Modified Files:
        monetdb5/mal/mal_client.c
        monetdb5/mal/mal_client.h
        monetdb5/modules/mal/clients.c
        monetdb5/optimizer/opt_mitosis.c
Branch: default
Log Message:

Take into account all outstanding memory limits of active clients
to determine their fare share. If ony thread claims all, then
the remaining threads also receive their fair portion.


diffs (127 lines):

diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c
--- a/monetdb5/mal/mal_client.c
+++ b/monetdb5/mal/mal_client.c
@@ -515,6 +515,30 @@ MCactiveClients(void)
        return active;
 }
 
+/* To determine the average memory claims for assignment, we should calculate 
the outstanding claims*/
+/* This only concerns active clients and if one query claims all, then all 
should be divided equally */
+
+size_t
+MCmemoryClaim(void)
+{
+       size_t claim = 0;
+       int active = 1;
+
+       Client cntxt = mal_clients;
+
+       for(cntxt = mal_clients;  cntxt<mal_clients+MAL_MAXCLIENTS; cntxt++)
+       if( cntxt->idle == 0 && cntxt->mode == RUNCLIENT){
+               if(cntxt->memorylimit){
+                       claim += cntxt->memorylimit;
+                       active ++;
+               } else
+                       return GDK_mem_maxsize;
+       }
+       if(active == 0 ||  claim  * LL_CONSTANT(1048576) >= GDK_mem_maxsize)
+               return GDK_mem_maxsize;
+       return claim * LL_CONSTANT(1048576);
+}
+
 void
 MCcloseClient(Client c)
 {
diff --git a/monetdb5/mal/mal_client.h b/monetdb5/mal/mal_client.h
--- a/monetdb5/mal/mal_client.h
+++ b/monetdb5/mal/mal_client.h
@@ -192,6 +192,7 @@ mal_export Client  MCinitClient(oid user
 mal_export Client  MCforkClient(Client father);
 mal_export void           MCstopClients(Client c);
 mal_export int    MCactiveClients(void);
+mal_export size_t  MCmemoryClaim(void);
 mal_export void    MCcloseClient(Client c);
 mal_export str     MCsuspendClient(int id);
 mal_export str     MCawakeClient(int id);
diff --git a/monetdb5/modules/mal/clients.c b/monetdb5/modules/mal/clients.c
--- a/monetdb5/modules/mal/clients.c
+++ b/monetdb5/modules/mal/clients.c
@@ -307,6 +307,9 @@ CLTsetmemorylimit(Client cntxt, MalBlkPt
                limit = *getArgReference_int(stk,pci,1);
        }
 
+       if( limit > (int)(GDK_mem_maxsize / LL_CONSTANT(1048576)) )
+               throw(MAL,"clients.setmemorylimit","Memory claim beyond 
physical memory ");
+
        if( idx < 0 || idx > MAL_MAXCLIENTS)
                throw(MAL,"clients.setmemorylimit","Illegal session id");
        if( is_int_nil(limit))
diff --git a/monetdb5/optimizer/opt_mitosis.c b/monetdb5/optimizer/opt_mitosis.c
--- a/monetdb5/optimizer/opt_mitosis.c
+++ b/monetdb5/optimizer/opt_mitosis.c
@@ -19,10 +19,9 @@ OPTmitosisImplementation(Client cntxt, M
        str schema = 0, table = 0;
        BUN r = 0, rowcnt = 0;    /* table should be sizeable to consider 
parallel execution*/
        InstrPtr q, *old, target = 0;
-       size_t argsize = 6 * sizeof(lng), m = 0;
+       size_t argsize = 6 * sizeof(lng), m = 0, memclaim;
        /*     estimate size per operator estimate:   4 args + 2 res*/
        int threads = GDKnr_threads ? GDKnr_threads : 1;
-       int activeClients;
        char buf[256];
        lng usec = GDKusec();
        str msg = MAL_SUCCEED;
@@ -143,36 +142,44 @@ OPTmitosisImplementation(Client cntxt, M
         * because all user together are responsible for resource contentions
         */
        cntxt->idle = 0; // this one is definitely not idle
-       activeClients = MCactiveClients();
 
 /* This code was used to experiment with block sizes, mis-using the 
memorylimit  variable
        if (cntxt->memorylimit){
                // the new mitosis scheme uses a maximum chunck size in MB from 
the client context
-               m = (size_t) ((cntxt->memorylimit * 1024 *1024) / row_size);
+               m = (size_t) ((cntxt->memorylimit *  LL_CONSTANT(1048576)) / 
row_size);
                pieces = (int) (rowcnt / m + (rowcnt - m * pieces > 0));
        }
        if (cntxt->memorylimit == 0 || pieces <= 1){
 */
        if (pieces <= 1){
-               /* We haven't assigned the number of pieces.*/
+               /* We haven't assigned the number of pieces.
+                * Determine the memory available for this client
+                */
 
-               /* respect the memory limit size set for the user */
+               /* respect the memory limit size set for the user 
+               * and determine the column slice size 
+               */
                if( cntxt->memorylimit)
-                       m = cntxt->memorylimit * 1024 *1024 / argsize;
-               else
-                       m = GDK_mem_maxsize / argsize;
+                       m = cntxt->memorylimit *  LL_CONSTANT(1048576) / 
argsize;
+               else {
+                       memclaim= MCmemoryClaim();
+                       if(memclaim == GDK_mem_maxsize){
+                               m = GDK_mem_maxsize / MCactiveClients()/ 
argsize;
+                       } else
+                               m = (GDK_mem_maxsize - memclaim) / argsize;
+               }
 
                /* if data exceeds memory size,
                 * i.e., (rowcnt*argsize > GDK_mem_maxsize),
                 * i.e., (rowcnt > GDK_mem_maxsize/argsize = m) */
-               if (rowcnt > m && m / threads / activeClients > 0) {
+               if (rowcnt > m && m / threads > 0) {
                        /* create |pieces| > |threads| partitions such that
                         * |threads| partitions at a time fit in memory,
                         * i.e., (threads*(rowcnt/pieces) <= m),
                         * i.e., (rowcnt/pieces <= m/threads),
                         * i.e., (pieces => rowcnt/(m/threads))
                         * (assuming that (m > threads*MINPARTCNT)) */
-                       pieces = (int) (rowcnt / (m / threads / activeClients)) 
+ 1;
+                       pieces = (int) (rowcnt / (m / threads)) + 1;
                } else if (rowcnt > MINPARTCNT) {
                /* exploit parallelism, but ensure minimal partition size to
                 * limit overhead */
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to