Changeset: 032cd11701b8 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=032cd11701b8
Modified Files:
        gdk/gdk_utils.c
        monetdb5/mal/mal_dataflow.c
        monetdb5/mal/mal_instruction.c
        monetdb5/modules/mal/batExtensions.c
        monetdb5/optimizer/opt_multicore.c
        monetdb5/scheduler/Makefile.ag
        monetdb5/scheduler/mut_policy.c
        monetdb5/scheduler/mut_stopRuns.c
        monetdb5/scheduler/mut_transforms.c
        monetdb5/scheduler/run_multicore.c
        monetdb5/scheduler/run_multicore.h
Branch: mutation
Log Message:

macros added / corrected for join and select operators, cleaned up the code, 
new algorithm heuristic tunings added


diffs (truncated from 873 to 300 lines):

diff --git a/gdk/gdk_utils.c b/gdk/gdk_utils.c
--- a/gdk/gdk_utils.c
+++ b/gdk/gdk_utils.c
@@ -919,12 +919,14 @@ GDKvmtrim(void *limit)
                MEMDEBUG THRprintf(GDKstdout, "alloc = " SZFMT " %+zd rss = " 
SZFMT " %+zd\n", cursize, memdiff, rss, rssdiff);
                prevmem = cursize;
                prevrss = rss;
-               if (memdiff >= 0 && rssdiff < -32 * (ssize_t) MT_pagesize()) {
-                       BBPtrim(rss);
-                       highload = 1;
-               } else {
-                       highload = 0;
-               }
+
+                if (rss > 0.8 * (ssize_t) MT_npages() * MT_pagesize()) {
+                    BBPtrim(0.2 * (ssize_t) MT_npages() * MT_pagesize());
+                    highload = 1;
+                } else {
+                    highload = 0;
+                }
+
        } while (!GDKexiting());
 }
 
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -231,7 +231,7 @@ static int stick_this_thread_to_core(int
 
    pthread_t current_thread;
    cpu_set_t cpuset;
-   int num_cores = sysconf(_SC_NPROCESSORS_ONLN);
+   int num_cores = MT_check_nr_cores(); 
 
    if (core_id >= num_cores)
    {
diff --git a/monetdb5/mal/mal_instruction.c b/monetdb5/mal/mal_instruction.c
--- a/monetdb5/mal/mal_instruction.c
+++ b/monetdb5/mal/mal_instruction.c
@@ -1624,12 +1624,12 @@ void
 delArgument(InstrPtr p, int idx)
 {
        int i;
-
        for (i = idx; i < p->argc - 1; i++)
                p->argv[i] = p->argv[i + 1];
        p->argc--;
        if (idx < p->retc)
                p->retc--;
+
 }
 
 void
diff --git a/monetdb5/modules/mal/batExtensions.c 
b/monetdb5/modules/mal/batExtensions.c
--- a/monetdb5/modules/mal/batExtensions.c
+++ b/monetdb5/modules/mal/batExtensions.c
@@ -262,7 +262,6 @@ CMDbatpartition2(Client cntxt, MalBlkPtr
                hval = lval+step;
        bn =  BATslice(b, lval,hval);
        BATseqbase(bn, lval + b->hseqbase) ;
-//     BATseqbase(bn, lval) ;
        if (bn== NULL){
                BBPreleaseref(b->batCacheid);
                throw(MAL, "bat.partition",  INTERNAL_OBJ_CREATE);
diff --git a/monetdb5/optimizer/opt_multicore.c 
b/monetdb5/optimizer/opt_multicore.c
--- a/monetdb5/optimizer/opt_multicore.c
+++ b/monetdb5/optimizer/opt_multicore.c
@@ -43,15 +43,20 @@ OPTmulticoreImplementation(Client cntxt,
        limit= mb->stop;
        slimit = mb->ssize;
 
-       mb->vsize = 700;
-        new = (VarPtr *) GDKzalloc(mb->vsize * sizeof(VarPtr));
-       if (new == NULL) {
-                GDKerror("varPtrAlloc:" MAL_MALLOC_FAIL);
-                return -1;
-        }
-        memcpy((char *) new, (char *) mb->var, sizeof(VarPtr) * mb->vtop);
-        GDKfree(mb->var);               
-       mb->var = new;
+       if(mb->vsize < 700)
+       {
+               mb->vsize = 700;
+               new = (VarPtr *) GDKzalloc(mb->vsize * sizeof(VarPtr));
+               if (new == NULL) 
+               {
+                        // throw(MAL,"varPtrAlloc","Internal error");
+                       GDKerror("varPtrAlloc:" MAL_MALLOC_FAIL);
+                       return -1;
+               }
+               memcpy((char *) new, (char *) mb->var, sizeof(VarPtr) * 
mb->vtop);
+               GDKfree(mb->var);               
+               mb->var = new;
+       }
 
        if ( newMalBlkStmt(mb, mb->ssize) < 0)
                return 0;
diff --git a/monetdb5/scheduler/Makefile.ag b/monetdb5/scheduler/Makefile.ag
--- a/monetdb5/scheduler/Makefile.ag
+++ b/monetdb5/scheduler/Makefile.ag
@@ -47,4 +47,4 @@ headers_mal = {
 }
 
 EXTRA_DIST_DIR = Tests
-EXTRA_DIST = run_adder.mal run_isolate.mal run_mutation.mal run_memo.mal 
run_octopus.mal srvpool.mal
+EXTRA_DIST = run_adder.mal run_isolate.mal run_multicore.mal run_memo.mal 
run_octopus.mal srvpool.mal
diff --git a/monetdb5/scheduler/mut_policy.c b/monetdb5/scheduler/mut_policy.c
--- a/monetdb5/scheduler/mut_policy.c
+++ b/monetdb5/scheduler/mut_policy.c
@@ -73,6 +73,8 @@ MUTpolicy(Client cntxt, Mutant m)
                        continue;
                if ( src->profiler[i].ticks/src->calls <= MUT_THRESHOLD)
                        continue;
+       
+
                DEBUG_MULTICORE
                        mnstr_printf(cntxt->fdout,"#mutation candidate %d cost 
"LLFMT"\n", i, src->profiler[i].ticks/src->calls);
 
@@ -82,12 +84,13 @@ MUTpolicy(Client cntxt, Mutant m)
                if ( src->profiler[i].ticks/src->calls > 
src->profiler[m->target].ticks/src->calls)
                        m->target = i;
        }
+               
        DEBUG_MULTICORE 
                if ( src->profiler && m->target) {
-               mnstr_printf(cntxt->fdout,"#mutation calls %d cost "LLFMT"\n", 
src->calls, src->runtime/src->calls);
-               
-               mnstr_printf(cntxt->fdout,"#mutation target instruction %d cost 
"LLFMT"\n", m->target, src->profiler[m->target].ticks/src->calls);
-               
printInstruction(cntxt->fdout,src,0,getInstrPtr(src,m->target),LIST_MAL_ALL);
+                       mnstr_printf(cntxt->fdout,"#mutation calls %d cost 
"LLFMT"\n", src->calls, src->runtime/src->calls);
+       
+                       mnstr_printf(cntxt->fdout,"#mutation target instruction 
%d cost "LLFMT"\n", m->target, src->profiler[m->target].ticks/src->calls);
+                       
printInstruction(cntxt->fdout,src,0,getInstrPtr(src,m->target),LIST_MAL_ALL);
        }
        /* At this point we have a target instruction to be replaced */
        /* safe the previous version in the history list */
@@ -98,8 +101,8 @@ MUTpolicy(Client cntxt, Mutant m)
                if ( getModuleId(p) && strncmp(getModuleId(p), "algebra",7)== 0)
                {
                        if(getFunctionId(p) == joinRef)
-                               mutationJoinDouble(cntxt,m);
-//                             mutationJoin(cntxt,m);
+//                             mutationJoinDouble(cntxt,m);
+                               mutationJoin(cntxt,m);
                        else if(getFunctionId(p) == subselectRef)
                                mutationSelect(cntxt,m);
                        else    // proxy function
@@ -110,7 +113,9 @@ MUTpolicy(Client cntxt, Mutant m)
 
                /* reset/expand the profiler */
                if (ssize < src->ssize){
-                       mnstr_printf(GDKstdout, "\n Changed the size of stack");
+                       
+                       DEBUG_MULTICORE
+                               mnstr_printf(cntxt->fdout, "# \n Changed the 
size of stack");
                        GDKfree(src->profiler);
                        src->profiler = 0;
                        initProfiler(src);
@@ -124,7 +129,7 @@ MUTpolicy(Client cntxt, Mutant m)
                DEBUG_MULTICORE
                        printFunction(cntxt->fdout, src,0,LIST_MAL_ALL);
                if ( src->errors)
-                       throw(MAL,"run_mutation","Internal error");
+                       throw(MAL,"run_multicore","Internal error");
        } 
        return MAL_SUCCEED;
 }
diff --git a/monetdb5/scheduler/mut_stopRuns.c 
b/monetdb5/scheduler/mut_stopRuns.c
--- a/monetdb5/scheduler/mut_stopRuns.c
+++ b/monetdb5/scheduler/mut_stopRuns.c
@@ -39,18 +39,24 @@
 int 
 checkIfRun(MalBlkPtr mb)
 {
-       // This is the 1st run, so always execute it
        Mutant mutant = (Mutant)mb->mutants;    
 
-//     if((mb->mutants && mb->mutants->credit==1 && mb->mutants->debit == 0 && 
mb->mutants->next==NULL)
+       // This is the 1st run, so always execute it
        if(mutant && mutant->credit==1 && mutant->debit == 0 && 
mutant->next==NULL)
-       {       
-               return 1;
-       }else if(mutant && (mutant->credit - mutant->debit) > 0 )
        {
                return 1;
+       
+       // Let the next execution continue, because it is highly likely going 
to be normal unlike current noisy
+       }else if(mutant && mutant->isNoisyRun == TRUE)          
+       {
+               return 1;
+       // normal execution 
+       }else if ((mutant->credit - mutant->debit) > 0 )
+       {
+               return 1;       
        }else
-               return 0;       
+               return 0;
+
 }
 
 /*     Find the rate of increase / decrease in execution time of current query 
with respect to the previous query
@@ -67,7 +73,7 @@ checkRateOfFall(Client cntxt, Mutant mut
        lng baseTime, diffTime;
        lng queryTotalTime, prevQueryTotalTime;
 
-       int NUM_OF_CORES = sysconf(_SC_NPROCESSORS_ONLN) / 2;
+       int NUM_OF_CORES = MT_check_nr_cores() / 2;
 
        if(mutant)
        {
@@ -75,7 +81,8 @@ checkRateOfFall(Client cntxt, Mutant mut
                prevQueryTotalTime = mutant->next->totalQueryTime;
        }else
        {
-               mnstr_printf(GDKstdout, "\nNext mutant not present");
+               DEBUG_MULTICORE
+                       mnstr_printf(cntxt->fdout, "# \nNext mutant not 
present");
                return 0;
        }
        
@@ -98,22 +105,49 @@ checkRateOfFall(Client cntxt, Mutant mut
        {
                mutant->debit = (rateOfImprove * NUM_OF_CORES) + 
mutant->next->debit;
                mutant->credit = mutant->next->credit;
+
+               // The super noisy run condition check, if the exec time of 
current run is greater than serial exec time (0th run), then this is a noisy 
run, hence set flag which will 
+               // allow the conditionCheck checkIfRun to bypass. The 
assumption is since this is a super noisy run, the next run will bring back the 
exec time to normal, and hence debit and credit 
+               // will compensate each other. 
+               if(queryTotalTime >= mutant->serialExecTime)
+                       mutant->isNoisyRun = TRUE;
+               else
+                       mutant->isNoisyRun = FALSE;     
+
        }
        // once the threshold of number of logical cores passed, and global 
minimum presence is located change the rate of debit based on different 
thresholds
-       if(mutant->currentRun > NUM_OF_CORES *2)
-       {
-               if(mutant->globalMinRun <= NUM_OF_CORES *2)     
-                       mutant->debit = mutant->debit + 0.25;
+       if(mutant->currentRun > NUM_OF_CORES )
+        {
+               if(mutant->currentRun == NUM_OF_CORES + 1)
+               {
+                       // The new leaking debit base is spread across the 
available credit at this run, so that at least NUM_OF_CORES*2 runs are 
guaranteed
+                       // This guarantees the debit is dynamic based on credit 
accumulation so far, and thereafter static
+                       mutant->baseDebit = 
(mutant->credit)/((NUM_OF_CORES*2.5) - mutant->currentRun);
+               
+                       DEBUG_MULTICORE_PRINT_BASEDEBIT
+                               mnstr_printf(cntxt->fdout,"#BaseDebit= %f\n", 
mutant->baseDebit);
+               }
+               else
+                       mutant->baseDebit = mutant->next->baseDebit;
+                
+       
+               // If the global minimum is found in the initial window, we 
ensure atleast runs till NUM_OF_CORES, but if it falls in after Window, 
depending on which window it falls in the rate of aggressiveness
+               // of the debit leak is adjusted accordingly
 
-               else if(mutant->globalMinRun > (NUM_OF_CORES *2) && 
mutant->globalMinRun < (NUM_OF_CORES *2)*2)
-                       mutant->debit = mutant->debit + 0.5;
+               if(mutant->globalMinRun <= NUM_OF_CORES )       
+                       mutant->debit = mutant->debit + mutant->baseDebit; 
+
+                else if(mutant->globalMinRun > NUM_OF_CORES && 
mutant->globalMinRun <= (NUM_OF_CORES *2))
+                        mutant->debit = mutant->debit + mutant->baseDebit + 
0.25;
+
+                else if(mutant->globalMinRun > (NUM_OF_CORES *2) && 
mutant->globalMinRun <= (NUM_OF_CORES *2)*2)
+                        mutant->debit = mutant->debit + mutant->baseDebit + 
0.5;
+        
+                else if(mutant->globalMinRun > ((NUM_OF_CORES *2)*2) && 
mutant->globalMinRun < (NUM_OF_CORES *2)*4)
+                        mutant->debit = mutant->debit + mutant->baseDebit + 
0.75;
+        }       
        
-               else if(mutant->globalMinRun > ((NUM_OF_CORES *2)*2) && 
mutant->globalMinRun < (NUM_OF_CORES *2)*4)
-                       mutant->debit = mutant->debit + 0.75;
-       
-       }       
-
-       DEBUG_MULTICORE_STOP_RUN
+       DEBUG_MULTICORE_PRINT_BALANCE
                mnstr_printf(cntxt->fdout,"#Run- %d Credit- %f Debit- %f 
rateOfImprove- %f", mutant->currentRun, mutant->credit, mutant->debit, 
rateOfImprove);
 
        return 1;
@@ -124,18 +158,29 @@ checkRateOfFall(Client cntxt, Mutant mut
  */ 
 int checkGlobalMin(Mutant mutant)
 {
+       flt prcntDecreaseTimeCur, prcntDecreaseTimePrev;
+       
        if(mutant->totalQueryTime < mutant->next->globalMinExec)
        {
-               mutant->globalMinRun = mutant->currentRun;      
-               mutant->globalMinExec = mutant->totalQueryTime;
+               // Find how much is the percent decrease in time of current run 
time with respect to serial exec time
+               // Find how muhc is the percent decrease in time of last global 
min with respect to serial exec time
+               // If the increase is greater than a threshold 
(globalMinImproveThreshold), then consider present time to be new global min 
time
 
+               prcntDecreaseTimeCur = 100 * ((flt)(mutant->serialExecTime - 
mutant->totalQueryTime)/mutant->serialExecTime);
+               prcntDecreaseTimePrev = 100 * ((flt)(mutant->serialExecTime - 
mutant->next->globalMinExec)/mutant->serialExecTime);
+               
+               if((prcntDecreaseTimeCur - prcntDecreaseTimePrev) >= 
globalMinImproveThreshold) 
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to