Changeset: 032cd11701b8 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=032cd11701b8 Modified Files: gdk/gdk_utils.c monetdb5/mal/mal_dataflow.c monetdb5/mal/mal_instruction.c monetdb5/modules/mal/batExtensions.c monetdb5/optimizer/opt_multicore.c monetdb5/scheduler/Makefile.ag monetdb5/scheduler/mut_policy.c monetdb5/scheduler/mut_stopRuns.c monetdb5/scheduler/mut_transforms.c monetdb5/scheduler/run_multicore.c monetdb5/scheduler/run_multicore.h Branch: mutation Log Message:
macros added / corrected for join and select operators, cleaned up the code, new algorithm heuristic tunings added diffs (truncated from 873 to 300 lines): diff --git a/gdk/gdk_utils.c b/gdk/gdk_utils.c --- a/gdk/gdk_utils.c +++ b/gdk/gdk_utils.c @@ -919,12 +919,14 @@ GDKvmtrim(void *limit) MEMDEBUG THRprintf(GDKstdout, "alloc = " SZFMT " %+zd rss = " SZFMT " %+zd\n", cursize, memdiff, rss, rssdiff); prevmem = cursize; prevrss = rss; - if (memdiff >= 0 && rssdiff < -32 * (ssize_t) MT_pagesize()) { - BBPtrim(rss); - highload = 1; - } else { - highload = 0; - } + + if (rss > 0.8 * (ssize_t) MT_npages() * MT_pagesize()) { + BBPtrim(0.2 * (ssize_t) MT_npages() * MT_pagesize()); + highload = 1; + } else { + highload = 0; + } + } while (!GDKexiting()); } diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -231,7 +231,7 @@ static int stick_this_thread_to_core(int pthread_t current_thread; cpu_set_t cpuset; - int num_cores = sysconf(_SC_NPROCESSORS_ONLN); + int num_cores = MT_check_nr_cores(); if (core_id >= num_cores) { diff --git a/monetdb5/mal/mal_instruction.c b/monetdb5/mal/mal_instruction.c --- a/monetdb5/mal/mal_instruction.c +++ b/monetdb5/mal/mal_instruction.c @@ -1624,12 +1624,12 @@ void delArgument(InstrPtr p, int idx) { int i; - for (i = idx; i < p->argc - 1; i++) p->argv[i] = p->argv[i + 1]; p->argc--; if (idx < p->retc) p->retc--; + } void diff --git a/monetdb5/modules/mal/batExtensions.c b/monetdb5/modules/mal/batExtensions.c --- a/monetdb5/modules/mal/batExtensions.c +++ b/monetdb5/modules/mal/batExtensions.c @@ -262,7 +262,6 @@ CMDbatpartition2(Client cntxt, MalBlkPtr hval = lval+step; bn = BATslice(b, lval,hval); BATseqbase(bn, lval + b->hseqbase) ; -// BATseqbase(bn, lval) ; if (bn== NULL){ BBPreleaseref(b->batCacheid); throw(MAL, "bat.partition", INTERNAL_OBJ_CREATE); diff --git a/monetdb5/optimizer/opt_multicore.c b/monetdb5/optimizer/opt_multicore.c --- a/monetdb5/optimizer/opt_multicore.c +++ b/monetdb5/optimizer/opt_multicore.c @@ -43,15 +43,20 @@ OPTmulticoreImplementation(Client cntxt, limit= mb->stop; slimit = mb->ssize; - mb->vsize = 700; - new = (VarPtr *) GDKzalloc(mb->vsize * sizeof(VarPtr)); - if (new == NULL) { - GDKerror("varPtrAlloc:" MAL_MALLOC_FAIL); - return -1; - } - memcpy((char *) new, (char *) mb->var, sizeof(VarPtr) * mb->vtop); - GDKfree(mb->var); - mb->var = new; + if(mb->vsize < 700) + { + mb->vsize = 700; + new = (VarPtr *) GDKzalloc(mb->vsize * sizeof(VarPtr)); + if (new == NULL) + { + // throw(MAL,"varPtrAlloc","Internal error"); + GDKerror("varPtrAlloc:" MAL_MALLOC_FAIL); + return -1; + } + memcpy((char *) new, (char *) mb->var, sizeof(VarPtr) * mb->vtop); + GDKfree(mb->var); + mb->var = new; + } if ( newMalBlkStmt(mb, mb->ssize) < 0) return 0; diff --git a/monetdb5/scheduler/Makefile.ag b/monetdb5/scheduler/Makefile.ag --- a/monetdb5/scheduler/Makefile.ag +++ b/monetdb5/scheduler/Makefile.ag @@ -47,4 +47,4 @@ headers_mal = { } EXTRA_DIST_DIR = Tests -EXTRA_DIST = run_adder.mal run_isolate.mal run_mutation.mal run_memo.mal run_octopus.mal srvpool.mal +EXTRA_DIST = run_adder.mal run_isolate.mal run_multicore.mal run_memo.mal run_octopus.mal srvpool.mal diff --git a/monetdb5/scheduler/mut_policy.c b/monetdb5/scheduler/mut_policy.c --- a/monetdb5/scheduler/mut_policy.c +++ b/monetdb5/scheduler/mut_policy.c @@ -73,6 +73,8 @@ MUTpolicy(Client cntxt, Mutant m) continue; if ( src->profiler[i].ticks/src->calls <= MUT_THRESHOLD) continue; + + DEBUG_MULTICORE mnstr_printf(cntxt->fdout,"#mutation candidate %d cost "LLFMT"\n", i, src->profiler[i].ticks/src->calls); @@ -82,12 +84,13 @@ MUTpolicy(Client cntxt, Mutant m) if ( src->profiler[i].ticks/src->calls > src->profiler[m->target].ticks/src->calls) m->target = i; } + DEBUG_MULTICORE if ( src->profiler && m->target) { - mnstr_printf(cntxt->fdout,"#mutation calls %d cost "LLFMT"\n", src->calls, src->runtime/src->calls); - - mnstr_printf(cntxt->fdout,"#mutation target instruction %d cost "LLFMT"\n", m->target, src->profiler[m->target].ticks/src->calls); - printInstruction(cntxt->fdout,src,0,getInstrPtr(src,m->target),LIST_MAL_ALL); + mnstr_printf(cntxt->fdout,"#mutation calls %d cost "LLFMT"\n", src->calls, src->runtime/src->calls); + + mnstr_printf(cntxt->fdout,"#mutation target instruction %d cost "LLFMT"\n", m->target, src->profiler[m->target].ticks/src->calls); + printInstruction(cntxt->fdout,src,0,getInstrPtr(src,m->target),LIST_MAL_ALL); } /* At this point we have a target instruction to be replaced */ /* safe the previous version in the history list */ @@ -98,8 +101,8 @@ MUTpolicy(Client cntxt, Mutant m) if ( getModuleId(p) && strncmp(getModuleId(p), "algebra",7)== 0) { if(getFunctionId(p) == joinRef) - mutationJoinDouble(cntxt,m); -// mutationJoin(cntxt,m); +// mutationJoinDouble(cntxt,m); + mutationJoin(cntxt,m); else if(getFunctionId(p) == subselectRef) mutationSelect(cntxt,m); else // proxy function @@ -110,7 +113,9 @@ MUTpolicy(Client cntxt, Mutant m) /* reset/expand the profiler */ if (ssize < src->ssize){ - mnstr_printf(GDKstdout, "\n Changed the size of stack"); + + DEBUG_MULTICORE + mnstr_printf(cntxt->fdout, "# \n Changed the size of stack"); GDKfree(src->profiler); src->profiler = 0; initProfiler(src); @@ -124,7 +129,7 @@ MUTpolicy(Client cntxt, Mutant m) DEBUG_MULTICORE printFunction(cntxt->fdout, src,0,LIST_MAL_ALL); if ( src->errors) - throw(MAL,"run_mutation","Internal error"); + throw(MAL,"run_multicore","Internal error"); } return MAL_SUCCEED; } diff --git a/monetdb5/scheduler/mut_stopRuns.c b/monetdb5/scheduler/mut_stopRuns.c --- a/monetdb5/scheduler/mut_stopRuns.c +++ b/monetdb5/scheduler/mut_stopRuns.c @@ -39,18 +39,24 @@ int checkIfRun(MalBlkPtr mb) { - // This is the 1st run, so always execute it Mutant mutant = (Mutant)mb->mutants; -// if((mb->mutants && mb->mutants->credit==1 && mb->mutants->debit == 0 && mb->mutants->next==NULL) + // This is the 1st run, so always execute it if(mutant && mutant->credit==1 && mutant->debit == 0 && mutant->next==NULL) - { - return 1; - }else if(mutant && (mutant->credit - mutant->debit) > 0 ) { return 1; + + // Let the next execution continue, because it is highly likely going to be normal unlike current noisy + }else if(mutant && mutant->isNoisyRun == TRUE) + { + return 1; + // normal execution + }else if ((mutant->credit - mutant->debit) > 0 ) + { + return 1; }else - return 0; + return 0; + } /* Find the rate of increase / decrease in execution time of current query with respect to the previous query @@ -67,7 +73,7 @@ checkRateOfFall(Client cntxt, Mutant mut lng baseTime, diffTime; lng queryTotalTime, prevQueryTotalTime; - int NUM_OF_CORES = sysconf(_SC_NPROCESSORS_ONLN) / 2; + int NUM_OF_CORES = MT_check_nr_cores() / 2; if(mutant) { @@ -75,7 +81,8 @@ checkRateOfFall(Client cntxt, Mutant mut prevQueryTotalTime = mutant->next->totalQueryTime; }else { - mnstr_printf(GDKstdout, "\nNext mutant not present"); + DEBUG_MULTICORE + mnstr_printf(cntxt->fdout, "# \nNext mutant not present"); return 0; } @@ -98,22 +105,49 @@ checkRateOfFall(Client cntxt, Mutant mut { mutant->debit = (rateOfImprove * NUM_OF_CORES) + mutant->next->debit; mutant->credit = mutant->next->credit; + + // The super noisy run condition check, if the exec time of current run is greater than serial exec time (0th run), then this is a noisy run, hence set flag which will + // allow the conditionCheck checkIfRun to bypass. The assumption is since this is a super noisy run, the next run will bring back the exec time to normal, and hence debit and credit + // will compensate each other. + if(queryTotalTime >= mutant->serialExecTime) + mutant->isNoisyRun = TRUE; + else + mutant->isNoisyRun = FALSE; + } // once the threshold of number of logical cores passed, and global minimum presence is located change the rate of debit based on different thresholds - if(mutant->currentRun > NUM_OF_CORES *2) - { - if(mutant->globalMinRun <= NUM_OF_CORES *2) - mutant->debit = mutant->debit + 0.25; + if(mutant->currentRun > NUM_OF_CORES ) + { + if(mutant->currentRun == NUM_OF_CORES + 1) + { + // The new leaking debit base is spread across the available credit at this run, so that at least NUM_OF_CORES*2 runs are guaranteed + // This guarantees the debit is dynamic based on credit accumulation so far, and thereafter static + mutant->baseDebit = (mutant->credit)/((NUM_OF_CORES*2.5) - mutant->currentRun); + + DEBUG_MULTICORE_PRINT_BASEDEBIT + mnstr_printf(cntxt->fdout,"#BaseDebit= %f\n", mutant->baseDebit); + } + else + mutant->baseDebit = mutant->next->baseDebit; + + + // If the global minimum is found in the initial window, we ensure atleast runs till NUM_OF_CORES, but if it falls in after Window, depending on which window it falls in the rate of aggressiveness + // of the debit leak is adjusted accordingly - else if(mutant->globalMinRun > (NUM_OF_CORES *2) && mutant->globalMinRun < (NUM_OF_CORES *2)*2) - mutant->debit = mutant->debit + 0.5; + if(mutant->globalMinRun <= NUM_OF_CORES ) + mutant->debit = mutant->debit + mutant->baseDebit; + + else if(mutant->globalMinRun > NUM_OF_CORES && mutant->globalMinRun <= (NUM_OF_CORES *2)) + mutant->debit = mutant->debit + mutant->baseDebit + 0.25; + + else if(mutant->globalMinRun > (NUM_OF_CORES *2) && mutant->globalMinRun <= (NUM_OF_CORES *2)*2) + mutant->debit = mutant->debit + mutant->baseDebit + 0.5; + + else if(mutant->globalMinRun > ((NUM_OF_CORES *2)*2) && mutant->globalMinRun < (NUM_OF_CORES *2)*4) + mutant->debit = mutant->debit + mutant->baseDebit + 0.75; + } - else if(mutant->globalMinRun > ((NUM_OF_CORES *2)*2) && mutant->globalMinRun < (NUM_OF_CORES *2)*4) - mutant->debit = mutant->debit + 0.75; - - } - - DEBUG_MULTICORE_STOP_RUN + DEBUG_MULTICORE_PRINT_BALANCE mnstr_printf(cntxt->fdout,"#Run- %d Credit- %f Debit- %f rateOfImprove- %f", mutant->currentRun, mutant->credit, mutant->debit, rateOfImprove); return 1; @@ -124,18 +158,29 @@ checkRateOfFall(Client cntxt, Mutant mut */ int checkGlobalMin(Mutant mutant) { + flt prcntDecreaseTimeCur, prcntDecreaseTimePrev; + if(mutant->totalQueryTime < mutant->next->globalMinExec) { - mutant->globalMinRun = mutant->currentRun; - mutant->globalMinExec = mutant->totalQueryTime; + // Find how much is the percent decrease in time of current run time with respect to serial exec time + // Find how muhc is the percent decrease in time of last global min with respect to serial exec time + // If the increase is greater than a threshold (globalMinImproveThreshold), then consider present time to be new global min time + prcntDecreaseTimeCur = 100 * ((flt)(mutant->serialExecTime - mutant->totalQueryTime)/mutant->serialExecTime); + prcntDecreaseTimePrev = 100 * ((flt)(mutant->serialExecTime - mutant->next->globalMinExec)/mutant->serialExecTime); + + if((prcntDecreaseTimeCur - prcntDecreaseTimePrev) >= globalMinImproveThreshold) _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list