Changeset: cc2956f1a6eb for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=cc2956f1a6eb Modified Files: Branch: default Log Message:
merging diffs (truncated from 354 to 300 lines): diff --git a/monetdb5/mal/mal_resolve.mx b/monetdb5/mal/mal_resolve.mx --- a/monetdb5/mal/mal_resolve.mx +++ b/monetdb5/mal/mal_resolve.mx @@ -160,6 +160,10 @@ findFunctionType(Module scope, MalBlkPtr m = scope; s = m->subscope[(int)(getSubScope(getFunctionId(p)))]; if (s == 0) return -1; + + returntype= (int*) GDKzalloc(p->retc * sizeof(int)); + if ( returntype == 0) return -1; + while (s != NULL) { /* single scope element check */ if (getFunctionId(p) != s->name) { s = s->skip; continue; @@ -357,7 +361,6 @@ findFunctionType(Module scope, MalBlkPtr * the resulting type can not be determined. */ s1 = 0; - returntype= (int*) GDKzalloc(p->retc * sizeof(int)); if (sig->polymorphic) for (k = i = 0; i < p->retc; k++, i++) { int actual = getArgType(mb, p, i); diff --git a/monetdb5/optimizer/opt_dataflow.mx b/monetdb5/optimizer/opt_dataflow.mx --- a/monetdb5/optimizer/opt_dataflow.mx +++ b/monetdb5/optimizer/opt_dataflow.mx @@ -110,6 +110,11 @@ opt_export void removeDataflow(InstrPtr #include "mal_instruction.h" #include "mal_interpreter.h" +/* + * dataflow processing incurs overhead and is only + * relevant if multiple tasks kan be handled at the same time. + * Also simple expressions dont had to be done in parallel. +*/ static int simpleFlow(InstrPtr *old, int start, int last) { @@ -125,6 +130,8 @@ simpleFlow(InstrPtr *old, int start, int if( getArg(p,0) == getArg(q,j)) simple= TRUE; if( !simple) + simple = getModuleId(p) == calcRef || getModuleId(p) == mtimeRef || getModuleId(p) == strRef || getModuleId(p)== mmathRef; + if( !simple) return 0; p = q; } diff --git a/monetdb5/optimizer/opt_partition.mx b/monetdb5/optimizer/opt_partition.mx --- a/monetdb5/optimizer/opt_partition.mx +++ b/monetdb5/optimizer/opt_partition.mx @@ -38,10 +38,6 @@ address OPTpartitionMaterialize comment "Implement the partition operation. Throw an exception if the partition was empty, because then the subquery should produce a NIL "; -command partition.markH( b:bat[:any_1,:any_2] ) :bat[:oid,:any_2] -address OPTmarkHead -comment "Ignore a NIL bat"; - pattern partition.vector(b:bat[:oid,:any_1]) :any_1... address OPTvector comment "Derive a series of slices values based on sampling"; @@ -57,7 +53,6 @@ comment "Derive a series of slices value opt_export str OPTvector(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); opt_export str OPTpartitionMaterialize(int *result, int *bid, ptr low, ptr high); -opt_export str OPTmarkHead(int *result, int *bid); /* #define DEBUG_DETAIL*/ #define _DEBUG_OPT_PARTITION_ @@ -81,6 +76,11 @@ typedef struct{ ValRecord bounds[MAXSITES]; } Slices; +/* + * The query will be controlled from the coordinator with a plan + * geared at parallel execution + * TODO pack is expensive, move to mat.new +*/ static MalBlkPtr OPTplanCntrl(Client cntxt, MalBlkPtr mb, MalBlkPtr pmb, Slices *slices) { @@ -181,18 +181,13 @@ OPTplanCntrl(Client cntxt, MalBlkPtr mb, pushInstruction(cmb,q); } /* put all mat.pack instructions into the program - and make sure that they have contiguous void headed columns + make sure that they have contiguous void headed columns */ p = getInstrPtr(pmb,0); if ( slices->column) for ( k=0 ; k < nrpack; k++) { pushInstruction(cmb, pack[k]); getArg(pack[k],0)= getArg(p,k); -/* - q= newFcnCall(cmb,partitionRef,markHRef); - getArg(q,0) = getArg(p,k); - q= pushArgument(cmb,q, getArg(pack[k],0)); -*/ } /* finalize the dataflow block */ @@ -218,7 +213,7 @@ OPTplanCntrl(Client cntxt, MalBlkPtr mb, return cmb; } -/* prepare slicing a column by addition over the target */ +/* prepare access to partitions by injection of the materialize instructions */ static int OPTpreparePartition(MalBlkPtr nmb, InstrPtr p, Slices *slices, int pc) { @@ -256,6 +251,9 @@ OPTpreparePartition(MalBlkPtr nmb, Instr return parallel; } +/* + * For bind instructions we have to inject materialize and semijoin instructions +*/ static int OPTsliceColumn(Client cntxt, MalBlkPtr nmb, MalBlkPtr mb, InstrPtr p, Slices *slices, int pc) { @@ -307,12 +305,15 @@ OPTsliceColumn(Client cntxt, MalBlkPtr n /* * The plan is analysed for the maximal subplan that involves a partitioned table * and that does not require data exchanges. - * This portion is extracted for possibly remote execution. + * Algebraic operators that can be executed on fragments are delegated too. + * For example join(A,B) where A is fragmented and B is not can be done elsewhere. + * In all cases we should ensure that the result of the remote execution can be + * simply unioned together. */ #define BLOCKED 1 #define REQUIRED 2 -#define EXPORTED 3 -#define NEEDED 4 +#define SUPPORTIVE 3 + static int OPTplanFragment(Client cntxt, MalBlkPtr mb, Slices *slices) { @@ -358,9 +359,7 @@ OPTplanFragment(Client cntxt, MalBlkPtr (void) slices; #endif - /* Phase 1: determine all variables/instructions indirectly dependent on a - fragmented column - */ + /* Phase 1: determine all variables/instructions indirectly dependent on a fragmented column */ last = limit; for ( i = 0; i < limit ; i++) { p = old[i]; @@ -379,11 +378,24 @@ OPTplanFragment(Client cntxt, MalBlkPtr if (vars[getArg(p,j)] == BLOCKED) plan[i] = BLOCKED; - /* blocking instructions */ + /* blocking instructions are those that require data exchange or total view */ + if ( getModuleId(p) == algebraRef && getFunctionId(p) == joinRef ) { + if (vars[getArg(p,1)] == REQUIRED && vars[getArg(p,2)] == REQUIRED) { + /* not possible to delegate */ + plan[i] = BLOCKED; + } else { + /* other variable is supportive */ + if (vars[getArg(p,1)] != REQUIRED) + vars[getArg(p,1)] = SUPPORTIVE; + if (vars[getArg(p,2)] != REQUIRED) + vars[getArg(p,2)] = SUPPORTIVE; + plan[i] = SUPPORTIVE; + } + } else if ( (getModuleId(p) == groupRef && (getFunctionId(p) == doneRef || getFunctionId(p) == newRef ||getFunctionId(p) == deriveRef) ) || getModuleId(p) == pqueueRef || getModuleId(p) == aggrRef || getModuleId(p) == ioRef || (getModuleId(p) == sqlRef && (getFunctionId(p) == resultSetRef || getFunctionId(p) == putName("exportValue",11) )) || - (getModuleId(p) == algebraRef &&(getFunctionId(p) == sliceRef || getFunctionId(p) == joinRef || getFunctionId(p)==markTRef)) ) { + (getModuleId(p) == algebraRef &&(getFunctionId(p) == sliceRef || getFunctionId(p)==markTRef)) ) { /* add the targets of its argument to the output */ plan[i] = BLOCKED; } @@ -393,18 +405,50 @@ OPTplanFragment(Client cntxt, MalBlkPtr vars[getArg(p,j)] = BLOCKED; } else { for( j = 0; j < p->argc; j++) - if (vars[getArg(p,j)] == REQUIRED) + if (vars[getArg(p,j)] == REQUIRED ) break; - if ( j != p->argc) { - for ( j= 0; j< p->retc; j++) + if ( j != p->argc) + plan[i]= REQUIRED; + + for( j = 0; j < p->argc; j++) + if (vars[getArg(p,j)] == SUPPORTIVE ) + break; + if ( j != p->argc && plan[i] != REQUIRED) + plan[i]= SUPPORTIVE; + + if ( plan[i] == REQUIRED) + for ( j= 0; j< p->argc; j++) vars[getArg(p,j)] = REQUIRED; - plan[i] = REQUIRED; - } + if ( plan[i] == SUPPORTIVE) + for ( j= 0; j< p->argc; j++) + if ( vars[getArg(p,j)] == 0) + vars[getArg(p,j)] = SUPPORTIVE; } } - /* Phase 2: determine all variables/instructions contributing */ - mnstr_printf(cntxt->fdout,"#phase 2\n"); +#ifdef _DEBUG_OPT_PARTITION_ + mnstr_printf(cntxt->fdout,"\n#phase 1\n"); + for( i= 0; i< limit; i++) + if (plan[i] ) { + switch (plan[i]) { + case BLOCKED: + mnstr_printf(cntxt->fdout,"#blocked "); + break; + case REQUIRED: + mnstr_printf(cntxt->fdout,"#required "); + break; + case SUPPORTIVE: + mnstr_printf(cntxt->fdout,"#support "); + } + if( old[i]) + printInstruction(cntxt->fdout, mb,0,old[i],LIST_MAL_STMT); + } +#endif + + /* Phase 2: determine all variables/instructions contributing + instructions based on supportive variables remain marked as supportive + because we have to avoid common ancestor dependency on partitioned variables + */ for ( i = limit -1; i >= 0 ; i--) if ( plan[i] != BLOCKED ){ p = old[i]; @@ -412,12 +456,69 @@ OPTplanFragment(Client cntxt, MalBlkPtr if (vars[getArg(p,j)] == REQUIRED) plan[i] = REQUIRED; - if( plan[i] == REQUIRED) - for ( j= p->retc; j< p->argc; j++) + if ( plan[i] == 0) + for( j = 0; j < p->argc; j++) + if (vars[getArg(p,j)] == SUPPORTIVE) + plan[i] = SUPPORTIVE; + + if( plan[i]== REQUIRED ) + for ( j= 0; j< p->argc; j++) vars[getArg(p,j)] = REQUIRED; } - /* Phase 3: determine all variables to be exported */ - mnstr_printf(cntxt->fdout,"#phase 3\n"); +#ifdef _DEBUG_OPT_PARTITION_ + mnstr_printf(cntxt->fdout,"\n#phase 2\n"); + for( i= 0; i< limit; i++) + if (plan[i] ) { + switch (plan[i]) { + case BLOCKED: + mnstr_printf(cntxt->fdout,"#blocked "); + break; + case REQUIRED: + mnstr_printf(cntxt->fdout,"#required "); + break; + case SUPPORTIVE: + mnstr_printf(cntxt->fdout,"#support "); + } + if( old[i]) + printInstruction(cntxt->fdout, mb,0,old[i],LIST_MAL_STMT); + } +#endif + /* Phase 3: turn all supportive instructions into required ones + if it helps to produce the required intermediate + */ + for ( i = 0; i < limit; i++) + if( plan[i] == SUPPORTIVE){ + p = old[i]; + for( j = 0; j < p->argc; j++) + if (vars[getArg(p,j)] == SUPPORTIVE) + break; + if( j == p->argc) + plan[i] = REQUIRED; + else + plan[i] = BLOCKED; + } +#ifdef _DEBUG_OPT_PARTITION_ + mnstr_printf(cntxt->fdout,"\n#phase 3\n"); + for( i= 0; i< limit; i++) + if (plan[i] ) { + switch (plan[i]) { + case BLOCKED: + mnstr_printf(cntxt->fdout,"#blocked "); + break; + case REQUIRED: + mnstr_printf(cntxt->fdout,"#required "); + break; + case SUPPORTIVE: + mnstr_printf(cntxt->fdout,"#support "); + } + if( old[i]) + printInstruction(cntxt->fdout, mb,0,old[i],LIST_MAL_STMT); + } _______________________________________________ Checkin-list mailing list Checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list