Changeset: c66263db594c for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=c66263db594c Modified Files: monetdb5/mal/mal_instruction.h monetdb5/mal/mal_profiler.c monetdb5/mal/mal_runtime.c monetdb5/modules/mal/recycle.c monetdb5/optimizer/opt_centipede.c Branch: mutation Log Message:
merge with default diffs (truncated from 533 to 300 lines): diff --git a/monetdb5/mal/mal_instruction.h b/monetdb5/mal/mal_instruction.h --- a/monetdb5/mal/mal_instruction.h +++ b/monetdb5/mal/mal_instruction.h @@ -108,9 +108,9 @@ typedef struct PERF { #endif struct timeval clock; /* clock */ lng clk; /* microseconds clock */ - lng ticks; /* micro seconds spent */ - int counter; /* accumulate statistics */ - lng totalticks; + lng ticks; /* micro seconds spent on last call */ + lng totalticks; /* accumulate micro seconds send on this call */ + int calls; /* number of calls seen */ bit trace; /* facilitate filter-based profiling */ lng rbytes; /* bytes read by an instruction */ lng wbytes; /* bytes written by an instruction */ diff --git a/monetdb5/mal/mal_profiler.c b/monetdb5/mal/mal_profiler.c --- a/monetdb5/mal/mal_profiler.c +++ b/monetdb5/mal/mal_profiler.c @@ -239,9 +239,9 @@ static void logsent(char *logbuffer) MT_lock_set(&mal_profileLock, "profileLock"); eventcounter++; if (profileCounter[PROFevent].status && eventcounter) - mnstr_printf(eventstream,"[ %d,\t%s ]\n", eventcounter, logbuffer); + mnstr_printf(eventstream,"[ %d,\t%s", eventcounter, logbuffer); else - mnstr_printf(eventstream,"[ %s ]\n", logbuffer); + mnstr_printf(eventstream,"[ %s", logbuffer); mnstr_flush(eventstream); MT_lock_unset(&mal_profileLock, "profileLock"); } diff --git a/monetdb5/mal/mal_runtime.c b/monetdb5/mal/mal_runtime.c --- a/monetdb5/mal/mal_runtime.c +++ b/monetdb5/mal/mal_runtime.c @@ -175,7 +175,7 @@ runtimeProfileExit(Client cntxt, MalBlkP if (stk != NULL && prof->stkpc >= 0 && mb->profiler != NULL && mb->profiler[stkpc].trace ) { gettimeofday(&mb->profiler[stkpc].clock, NULL); - mb->profiler[stkpc].counter++; + mb->profiler[stkpc].calls++; mb->profiler[stkpc].totalticks += mb->profiler[stkpc].ticks; mb->profiler[stkpc].clk += mb->profiler[stkpc].ticks; if (pci) { diff --git a/monetdb5/modules/mal/recycle.c b/monetdb5/modules/mal/recycle.c --- a/monetdb5/modules/mal/recycle.c +++ b/monetdb5/modules/mal/recycle.c @@ -138,7 +138,7 @@ RECYCLEdump(stream *s) else mnstr_printf(s,"# "); mnstr_printf(s,"%4d\t"LLFMT"\t%d\t"LLFMT"\t"LLFMT"\t"LLFMT"\t%s\n", i, recycleBlk->profiler[i].clk, - recycleBlk->profiler[i].counter, + recycleBlk->profiler[i].calls, recycleBlk->profiler[i].ticks, recycleBlk->profiler[i].rbytes, recycleBlk->profiler[i].wbytes, @@ -266,7 +266,7 @@ RECYCLErunningStat(Client cntxt, MalBlkP #ifdef _DEBUG_CACHE_ if ( getInstrPtr(recycleBlk,i)->token != NOOPsymbol ) #endif - if ( recycleBlk->profiler[i].counter >1) + if ( recycleBlk->profiler[i].calls >1) reusedmem += recycleBlk->profiler[i].wbytes; mnstr_printf(s,"%d\t %7.2f\t ", ++q, (GDKusec()-cntxt->rcc->time0)/1000.0); diff --git a/monetdb5/optimizer/opt_centipede.c b/monetdb5/optimizer/opt_centipede.c --- a/monetdb5/optimizer/opt_centipede.c +++ b/monetdb5/optimizer/opt_centipede.c @@ -43,7 +43,6 @@ typedef struct{ int type, slice; int lslices, hslices; /* variables holding the range bound */ lng rowcnt; - ValRecord bounds[MAXSITES]; } Slices; static int nrservers; @@ -51,7 +50,6 @@ static int nrservers; /* * The query will be controlled from the coordinator with a plan * geared at parallel execution - * TODO pack is expensive, use incremental pack */ static MalBlkPtr OPTexecController(Client cntxt, MalBlkPtr mb, MalBlkPtr pmb, Slices *slices, oid plantag) @@ -257,8 +255,8 @@ OPTexecController(Client cntxt, MalBlkPt chkProgram(cntxt->fdout, cntxt->nspace, cmb); #ifdef _DEBUG_OPT_CENTIPEDE_ - mnstr_printf(cntxt->fdout,"#rough cntrl plan %d \n", cmb->errors); - printFunction(cntxt->fdout, cmb, 0, LIST_MAL_STMT); + //mnstr_printf(cntxt->fdout,"#rough cntrl plan %d \n", cmb->errors); + //printFunction(cntxt->fdout, cmb, 0, LIST_MAL_STMT); #endif GDKfree(alias); GDKfree(pack); @@ -370,17 +368,21 @@ static void OPTmaterializePartition(MalBlkPtr mb, InstrPtr p, int low, int hgh) { int v,oldvar; + int i; + InstrPtr q; - oldvar = getArg(p,0); - getArg(p,0) = v = newTmpVariable(mb, getVarType(mb,oldvar)); - setVarUDFtype(mb, v); - setVarFixed(mb,v); + for(i=0; i< p->retc; i++ ){ + oldvar = getArg(p,i); + getArg(p,i) = v = newTmpVariable(mb, getVarType(mb,oldvar)); + setVarUDFtype(mb, v); + setVarFixed(mb,v); - p = newStmt(mb, algebraRef, sliceRef); - p = pushArgument(mb, p, v); - p = pushArgument(mb, p, low); - p = pushArgument(mb, p, hgh); - getArg(p,0)= oldvar; + q = newStmt(mb, algebraRef, sliceRef); + q = pushArgument(mb, q, v); + q = pushArgument(mb, q, low); + q = pushArgument(mb, q, hgh); + getArg(q,0)= oldvar; + } } @@ -405,7 +407,7 @@ OPTsliceColumn(Client cntxt, MalBlkPtr n } if ( slices->slice == 0){ - slices->slice = newTmpVariable(nmb, getVarType(nmb, getArg(slices->target,0))); + slices->slice = newTmpVariable(nmb, slices->type); setVarUDFtype(nmb, slices->slice); setVarUsed(nmb, slices->slice); nmb->stmt[0] = pushArgument(nmb, nmb->stmt[0], slices->lslices); @@ -429,13 +431,14 @@ OPTsliceColumn(Client cntxt, MalBlkPtr n * when a connection is re-used by different client sessions. */ #define BLOCKED 1 -#define PARTITION 2 -#define SUPPORTIVE 3 -#define EXPORTED 4 -#define KEEPLOCAL 5 +#define PARTITION 2 // phase 1 result +#define PIVOT 3 // phase 2 result +#define SUPPORTIVE 4 // phase 2 result +#define EXPORTED 5 +#define KEEPLOCAL 6 #ifdef _DEBUG_OPT_CENTIPEDE_ -static char *statusname[6]= {"", "blocked ", "partition ", "support ", "exported ", "keeplocal "}; +static char *statusname[7]= {"", "blocked ", "partition ", "pivot ", "support ", "exported ", "keeplocal "}; #endif static void @@ -491,10 +494,8 @@ OPTbakePlans(Client cntxt, MalBlkPtr mb, #define VALS 2 /* Phase 1: determine all variables/instructions indirectly dependent on a fragmented column */ - /* Keep track on passing the OID or VALues around */ /* Instructions are marked as PARTITION if the have to be propagated */ last = limit; - // Calling instruction and arguments are supportive to the partitioning status[0]= PARTITION; for ( j = old[0]->retc; j < old[0]->argc; j++) vars[getArg(old[0],j)]= SUPPORTIVE; @@ -506,110 +507,60 @@ OPTbakePlans(Client cntxt, MalBlkPtr mb, last = i; } else // incorporate both single/double target sql.bind operations - if ( getModuleId(p) == sqlRef && (getFunctionId(p) == bindRef || getFunctionId(p) == bindidxRef) && - strcmp(slices->schema, getVarConstant(mb, getArg(p, p->retc + 1)).val.sval) == 0 && - strcmp(slices->table, getVarConstant(mb, getArg(p, p->retc + 2)).val.sval) == 0 ) { - status[i] = PARTITION; - if ( p->retc == 1) { - head[getArg(p,0)] = OIDS; - tail[getArg(p,0)] = VALS; - } else { - tail[getArg(p,0)] = OIDS; - tail[getArg(p,1)] = VALS; - } - } else - if ( getModuleId(p) == sqlRef && getFunctionId(p) == deltaRef ){ - if ( head[getArg(p,1)] ){ - status[i] = PARTITION; - tail[getArg(p,0)] = VALS; - } - } else - if ( getModuleId(p) == sqlRef && getFunctionId(p) == tidRef && + if ( getModuleId(p) == sqlRef && + ( getFunctionId(p) == tidRef || getFunctionId(p) == bindRef || getFunctionId(p) == bindidxRef) && strcmp(slices->schema, getVarConstant(mb, getArg(p, p->retc + 1)).val.sval) == 0 && strcmp(slices->table, getVarConstant(mb, getArg(p, p->retc + 2)).val.sval) == 0 ) { status[i] = PARTITION; - tail[getArg(p,0)] = OIDS; - } + vars[getArg(p,0)] = PARTITION; + } else + if ( getModuleId(p) == sqlRef && getFunctionId(p) == deltaRef ){ + if ( vars[getArg(p,1)] ){ + status[i] = PARTITION; + vars[getArg(p,0)] = PARTITION; + } + } /* blocking instructions are those that require data exchange, aggregation or total view */ - if ( getModuleId(p) == algebraRef && getFunctionId(p) == joinRef ) { - if ( head[getArg(p,p->retc)] ){ + if ( getModuleId(p) == algebraRef && (getFunctionId(p) == joinRef || getFunctionId(p) == leftjoinRef || getFunctionId(p) == leftfetchjoinRef) ) { + if ( vars[getArg(p,p->retc)] || vars[getArg(p,p->retc+1)] ){ status[i] = PARTITION; - head[getArg(p,0)] = head[getArg(p,p->retc)]; - head[getArg(p,1)] = head[getArg(p,p->retc)]; + vars[getArg(p,0)] = PARTITION; } - if ( tail[getArg(p,p->retc)] ){ + } else + if ( getModuleId(p) == algebraRef && (getFunctionId(p)== thetaselectRef || getFunctionId(p) == selectRef || getFunctionId(p) == subselectRef)){ + if ( vars[getArg(p,p->retc)] ){ status[i] = PARTITION; - tail[getArg(p,0)] = tail[getArg(p,p->retc)]; - tail[getArg(p,1)] = tail[getArg(p,p->retc)]; + vars[getArg(p,0)] = PARTITION; } } else - if ( getModuleId(p) == algebraRef && (getFunctionId(p) == leftjoinRef || getFunctionId(p) == leftfetchjoinRef) ) { - if ( tail[getArg(p,1)] ){ + if ( getModuleId(p) == batRef && getFunctionId(p) == mirrorRef ) { + if ( vars[getArg(p,p->retc)] ){ status[i] = PARTITION; - head[getArg(p,0)] = tail[getArg(p,1)]; - } - if ( tail[getArg(p,2)] ){ - status[i] = PARTITION; - tail[getArg(p,0)] = tail[getArg(p,p->retc+1)]; - } - } else - if ( getModuleId(p) == algebraRef && (getFunctionId(p)== thetauselectRef || getFunctionId(p) == uselectRef || getFunctionId(p) == selectRef ) ) { - if (head[getArg(p,p->retc)] ) { - head[getArg(p,0)] = head[getArg(p,p->retc)]; - tail[getArg(p,0)] = tail[getArg(p,p->retc)]; - status[i] = PARTITION; - } - } else -/* - if ( getModuleId(p) == algebraRef && (getFunctionId(p) == subsortRef) ) { - if (tail[getArg(p,1)] ){ - tail[getArg(p,0)] = tail[getArg(p,1)]; - status[i] = PARTITION; - } - } else -*/ - if ( getModuleId(p) == batRef && getFunctionId(p) == mirrorRef ) { - if (head[getArg(p,1)]){ - head[getArg(p,0)] = head[getArg(p,1)]; - tail[getArg(p,0)] = head[getArg(p,1)]; - status[i] = PARTITION; + vars[getArg(p,0)] = PARTITION; } } else if ( getModuleId(p) == batRef && getFunctionId(p)==reverseRef ) { - if (head[getArg(p,1)] || tail[getArg(p,1)] ){ - head[getArg(p,0)] = tail[getArg(p,p->retc)]; - tail[getArg(p,0)] = head[getArg(p,p->retc)]; + if ( vars[getArg(p,p->retc)] ){ status[i] = PARTITION; + vars[getArg(p,0)] = PARTITION; } } else - if ( getModuleId(p) == groupRef && ( getFunctionId(p) == subgroupRef || getFunctionId(p) == subgroupdoneRef) ){ - if ( head[getArg(p, p->retc)] ){ - /* groups against the partition column is allowed. - It calls for a proper group reconstruction at the receiver - */ - head[getArg(p,0)] = OIDS; - tail[getArg(p,0)] = 0; - head[getArg(p,1)] = OIDS; - tail[getArg(p,1)] = OIDS; - status[i] = PARTITION; + if ( getModuleId(p) == groupRef && ( getFunctionId(p) == subgroupRef || getFunctionId(p) == subgroupdoneRef) && p->retc== 3){ + if ( vars[getArg(p,p->retc)] ){ + status[i] = PIVOT; + vars[getArg(p,0)] = PARTITION; + vars[getArg(p,1)] = PARTITION; + vars[getArg(p,2)] = PARTITION; } - }else - if ( (getModuleId(p) == sqlRef && (getFunctionId(p) == resultSetRef || getFunctionId(p) == putName("exportValue",11) ) ) || getModuleId(p) == ioRef ) + } else + if ((getModuleId(p) == sqlRef && (getFunctionId(p) == resultSetRef || getFunctionId(p) == putName("exportValue",11))) || getModuleId(p) == ioRef ) status[i] = BLOCKED; else if ( getModuleId(p) == batcalcRef ){ - if ( p->argc == 2 /* coercions and unaries */ && vars[getArg(p,1)] == PARTITION ) { - status[i]= PARTITION; - head[getArg(p,0)] = head[getArg(p,1)]; - tail[getArg(p,0)] = tail[getArg(p,1)]; + if ( vars[getArg(p,p->retc)] || vars[getArg(p,p->retc+1)] ){ status[i] = PARTITION; - } - if ( p->argc == 3 /* binaries */ && (vars[getArg(p,1)] == PARTITION || vars[getArg(p,2)] == PARTITION)) { _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list