Changeset: 33f5024e21c2 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=33f5024e21c2 Added Files: monetdb5/scheduler/mut_stopRuns.c monetdb5/scheduler/mut_stopRuns.h Modified Files: monetdb5/mal/mal_dataflow.c monetdb5/modules/mal/batExtensions.c monetdb5/optimizer/opt_multicore.c monetdb5/scheduler/Makefile.ag monetdb5/scheduler/mut_policy.c monetdb5/scheduler/mut_transforms.c monetdb5/scheduler/run_multicore.c monetdb5/scheduler/run_multicore.h Branch: mutation Log Message:
algorithm for identification of stop condition for runs added diffs (truncated from 508 to 300 lines): diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -226,6 +226,28 @@ q_dequeue(queue *q) return r; } +/* +static int stick_this_thread_to_core(int core_id) { + + pthread_t current_thread; + cpu_set_t cpuset; + int num_cores = sysconf(_SC_NPROCESSORS_ONLN); + + if (core_id >= num_cores) + { + mnstr_printf(GDKstdout,"\n Core id %d less than %d numcores",core_id,num_cores); + + return -1; + } + + CPU_ZERO(&cpuset); + CPU_SET(core_id, &cpuset); + + current_thread = pthread_self(); + return pthread_setaffinity_np(current_thread, sizeof(cpu_set_t), &cpuset); +} +*/ + /* * We simply move an instruction into the front of the queue. * Beware, we assume that variables are assigned a value once, otherwise @@ -258,6 +280,8 @@ DFLOWworker(void *t) thr = THRnew("DFLOWworker"); +// stick_this_thread_to_core(id); + GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */ GDKerrbuf[0] = 0; while (1) { diff --git a/monetdb5/modules/mal/batExtensions.c b/monetdb5/modules/mal/batExtensions.c --- a/monetdb5/modules/mal/batExtensions.c +++ b/monetdb5/modules/mal/batExtensions.c @@ -262,6 +262,7 @@ CMDbatpartition2(Client cntxt, MalBlkPtr hval = lval+step; bn = BATslice(b, lval,hval); BATseqbase(bn, lval + b->hseqbase) ; +// BATseqbase(bn, lval) ; if (bn== NULL){ BBPreleaseref(b->batCacheid); throw(MAL, "bat.partition", INTERNAL_OBJ_CREATE); diff --git a/monetdb5/optimizer/opt_multicore.c b/monetdb5/optimizer/opt_multicore.c --- a/monetdb5/optimizer/opt_multicore.c +++ b/monetdb5/optimizer/opt_multicore.c @@ -30,6 +30,8 @@ OPTmulticoreImplementation(Client cntxt, { int i,limit, slimit; InstrPtr *old= mb->stmt; + VarPtr *new; + (void) cntxt; (void) pci; @@ -40,6 +42,17 @@ OPTmulticoreImplementation(Client cntxt, limit= mb->stop; slimit = mb->ssize; + + mb->vsize = 700; + new = (VarPtr *) GDKzalloc(mb->vsize * sizeof(VarPtr)); + if (new == NULL) { + GDKerror("varPtrAlloc:" MAL_MALLOC_FAIL); + return -1; + } + memcpy((char *) new, (char *) mb->var, sizeof(VarPtr) * mb->vtop); + GDKfree(mb->var); + mb->var = new; + if ( newMalBlkStmt(mb, mb->ssize) < 0) return 0; diff --git a/monetdb5/scheduler/Makefile.ag b/monetdb5/scheduler/Makefile.ag --- a/monetdb5/scheduler/Makefile.ag +++ b/monetdb5/scheduler/Makefile.ag @@ -33,6 +33,7 @@ lib_scheduler = { run_multicore.c run_multicore.h \ mut_transforms.c mut_transforms.h \ mut_policy.c mut_policy.h \ + mut_stopRuns.c mut_stopRuns.h \ run_memo.c run_memo.h \ run_octopus.c run_octopus.h \ srvpool.c srvpool.h \ diff --git a/monetdb5/scheduler/mut_policy.c b/monetdb5/scheduler/mut_policy.c --- a/monetdb5/scheduler/mut_policy.c +++ b/monetdb5/scheduler/mut_policy.c @@ -110,6 +110,7 @@ MUTpolicy(Client cntxt, Mutant m) /* reset/expand the profiler */ if (ssize < src->ssize){ + mnstr_printf(GDKstdout, "\n Changed the size of stack"); GDKfree(src->profiler); src->profiler = 0; initProfiler(src); diff --git a/monetdb5/scheduler/mut_stopRuns.c b/monetdb5/scheduler/mut_stopRuns.c new file mode 100644 --- /dev/null +++ b/monetdb5/scheduler/mut_stopRuns.c @@ -0,0 +1,141 @@ +/* + * The contents of this file are subject to the MonetDB Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.monetdb.org/Legal/MonetDBLicense + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the + * License for the specific language governing rights and limitations + * under the License. + * + * The Original Code is the MonetDB Database System. + * + * The Initial Developer of the Original Code is CWI. + * Portions created by CWI are Copyright (C) 1997-July 2008 CWI. + * Copyright August 2008-2013 MonetDB B.V. + * All Rights Reserved. + */ + +/* + * @a M. Kersten, M. Gawade + * Run mutated query plans + * The infrastructure to adapatively create multi-processor parallel plans. + */ +#include "monetdb_config.h" +#include "run_multicore.h" +#include "mut_transforms.h" +#include "mal_profiler.h" +#include "opt_prelude.h" +#include "mut_policy.h" +#include "mut_stopRuns.h" + + /* Decrease in execution time with respect to previous mutation, adds credit points to current mutation + * increase in execution time with respect to previous mutation, addss debit points to current mutation + * The decision to whether the current run should be executed or not is taken based on "credit - debit". + * If "credit - debit" is atleast 1 current run is executed. + */ + +int +checkIfRun(MalBlkPtr mb) +{ + // This is the 1st run, so always execute it + Mutant mutant = (Mutant)mb->mutants; + +// if((mb->mutants && mb->mutants->credit==1 && mb->mutants->debit == 0 && mb->mutants->next==NULL) + if(mutant && mutant->credit==1 && mutant->debit == 0 && mutant->next==NULL) + { + return 1; + }else if(mutant && (mutant->credit - mutant->debit) > 0 ) + { + return 1; + }else + return 0; +} + +/* Find the rate of increase / decrease in execution time of current query with respect to the previous query + * If rate shows improvement, calculate the credit as (rate_of_improvement * number of cores) + accumulated credit so far + * else, calculate debit as (rate_of_improvement * number of cores) + accumulated debit so far. + * To calculate final number of runs to execute, use runs = credit - debit + * If value for runs is atleast 1, then execute the next run + */ + +int +checkRateOfFall(Client cntxt, Mutant mutant) +{ + flt rateOfImprove; + lng baseTime, diffTime; + lng queryTotalTime, prevQueryTotalTime; + + int NUM_OF_CORES = sysconf(_SC_NPROCESSORS_ONLN) / 2; + + if(mutant) + { + queryTotalTime = mutant->totalQueryTime; + prevQueryTotalTime = mutant->next->totalQueryTime; + }else + { + mnstr_printf(GDKstdout, "\nNext mutant not present"); + return 0; + } + + diffTime = abs(queryTotalTime - prevQueryTotalTime); + + if(queryTotalTime >= prevQueryTotalTime) + baseTime = queryTotalTime; + else + baseTime = prevQueryTotalTime; + + rateOfImprove = (flt)diffTime / baseTime; + + // Current query execution is faster than previous execution, hence positive improvement. hence add to credit + if(prevQueryTotalTime > queryTotalTime) + { + mutant->credit = (rateOfImprove * NUM_OF_CORES) + mutant->next->credit; + mutant->debit = mutant->next->debit; + } + else // negative improvement, so add to debit + { + mutant->debit = (rateOfImprove * NUM_OF_CORES) + mutant->next->debit; + mutant->credit = mutant->next->credit; + } + // once the threshold of number of logical cores passed, and global minimum presence is located change the rate of debit based on different thresholds + if(mutant->currentRun > NUM_OF_CORES *2) + { + if(mutant->globalMinRun <= NUM_OF_CORES *2) + mutant->debit = mutant->debit + 0.25; + + else if(mutant->globalMinRun > (NUM_OF_CORES *2) && mutant->globalMinRun < (NUM_OF_CORES *2)*2) + mutant->debit = mutant->debit + 0.5; + + else if(mutant->globalMinRun > ((NUM_OF_CORES *2)*2) && mutant->globalMinRun < (NUM_OF_CORES *2)*4) + mutant->debit = mutant->debit + 0.75; + + } + + DEBUG_MULTICORE_STOP_RUN + mnstr_printf(cntxt->fdout,"#Run- %d Credit- %f Debit- %f rateOfImprove- %f", mutant->currentRun, mutant->credit, mutant->debit, rateOfImprove); + + return 1; +} + +/* Check if the current execution is global minimum + * + */ +int checkGlobalMin(Mutant mutant) +{ + if(mutant->totalQueryTime < mutant->next->globalMinExec) + { + mutant->globalMinRun = mutant->currentRun; + mutant->globalMinExec = mutant->totalQueryTime; + + } + else + { + mutant->globalMinExec = mutant->next->globalMinExec; + mutant->globalMinRun = mutant->next->globalMinRun; + } + return 1; +} + + diff --git a/monetdb5/scheduler/mut_stopRuns.h b/monetdb5/scheduler/mut_stopRuns.h new file mode 100644 --- /dev/null +++ b/monetdb5/scheduler/mut_stopRuns.h @@ -0,0 +1,35 @@ +/* + * The contents of this file are subject to the MonetDB Public License + * Version 1.1 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * http://www.monetdb.org/Legal/MonetDBLicense + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the + * License for the specific language governing rights and limitations + * under the License. + * + * The Original Code is the MonetDB Database System. + * + * The Initial Developer of the Original Code is CWI. + * Portions created by CWI are Copyright (C) 1997-July 2008 CWI. + * Copyright August 2008-2013 MonetDB B.V. + * All Rights Reserved. + */ + +#ifndef _MUT_STOPRUNS_ +#define _MUT_STOPRUNS_ +#include "mal.h" +#include "mal_instruction.h" +#include "mal_interpreter.h" +#include "mal_client.h" +#include "run_multicore.h" + + +run_multicore_export int checkIfRun(MalBlkPtr mb); +run_multicore_export int checkRateOfFall(Client cntxt, Mutant mutant); +run_multicore_export int checkGlobalMin(Mutant mutant); + + +#endif /* _MUT_STOPRUNS_ */ + diff --git a/monetdb5/scheduler/mut_transforms.c b/monetdb5/scheduler/mut_transforms.c --- a/monetdb5/scheduler/mut_transforms.c +++ b/monetdb5/scheduler/mut_transforms.c @@ -178,6 +178,7 @@ void mutationJoinDouble(Client cntxt, Mu // inherit profiling m->src->profiler[m->src->stop-1].trace = profiler; + // replace its use in other mat packs for (j = i+1; j < limit; j++) { _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list