Changeset: bfdc45aad8e5 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=bfdc45aad8e5 Modified Files: monetdb5/scheduler/mut_policy.c monetdb5/scheduler/mut_transforms.c monetdb5/scheduler/mut_transforms.h monetdb5/scheduler/run_multicore.h Branch: mutation Log Message:
added join mutation operator with both relations splits diffs (270 lines): diff --git a/monetdb5/scheduler/mut_policy.c b/monetdb5/scheduler/mut_policy.c --- a/monetdb5/scheduler/mut_policy.c +++ b/monetdb5/scheduler/mut_policy.c @@ -98,10 +98,12 @@ MUTpolicy(Client cntxt, Mutant m) if ( getModuleId(p) && strncmp(getModuleId(p), "algebra",7)== 0) { if(getFunctionId(p) == joinRef) - mutationJoin(cntxt,m); + mutationJoinDouble(cntxt,m); +// mutationJoin(cntxt,m); else if(getFunctionId(p) == subselectRef) mutationSelect(cntxt,m); - + else // proxy function + mutationJoinDouble(cntxt,m); } if ( getModuleId(p) && strncmp(getModuleId(p), "aggr",4)== 0) mutationSum(cntxt,m); diff --git a/monetdb5/scheduler/mut_transforms.c b/monetdb5/scheduler/mut_transforms.c --- a/monetdb5/scheduler/mut_transforms.c +++ b/monetdb5/scheduler/mut_transforms.c @@ -32,6 +32,225 @@ * and glues the result using matpack. * If the target is already input to a mat.pack, then we should add * the pieces produced to that instruction instead of making a new mat.pack. + * + * mutationJoinDouble splits both the input relations to the join operator + */ + +void mutationJoinDouble(Client cntxt, Mutant m){ + int pc = m->target, i, j, k, limit, b1,b2, v1,v2,v3,v4, z1,z2,z3,z4,y1,y2 ; + InstrPtr p=0, *old= m->src->stmt, q; + int matpc = 0, profiler; + + (void) cntxt; + limit= m->src->stop; + if ( newMalBlkStmt(m->src, m->src->ssize) < 0) + return; + + pushInstruction(m->src, old[0]); + for (i = 1; i < limit; i++) { + p= old[i]; + if ( i == pc){ + /* replace the instruction, e.g. with a partioned one. + Dont use any partition intelligence, simple half split + x1,x2 := algebra.join(b,Y) => + b1 := bat.partition(b,2,0); + b2 := bat.partition(b,2,1); + y1 := bat.partition(Y,2,0); + y2 := bat.partition(Y,2,1); + v1,z1:= algebra.join(b1,y1); + v2,z2:= algebra.join(b2,y1); + v3,z3:= algebra.join(b1,y2); + v4,z4:= algebra.join(b2,y2); + x1:= mat.pack(v1,v2,v3,v4); + x2:= mat.pack(z1,z2,z3,z4); + Be careful not to change the size of the stack, + for we can not easily pass it back to the + current interpreter call sequence + */ + if ( m->stk->stksize < m->src->vtop + 7){ + pushInstruction(m->src,p); + continue; + } + + profiler = m->src->profiler[i].trace; + + q= newStmt(m->src, batRef, partitionRef); + setVarType(m->src, getArg(q,0), getArgType(m->src, p, p->retc)); + b1 = getArg(q,0); + q = pushArgument(m->src,q,getArg(p,p->retc)); + q = pushInt(m->src,q,2); + // update the stack as well, because we are executing + m->stk->stk[getArg(q, q->argc-1)].val.ival = 2; + q = pushInt(m->src,q,0); + m->stk->stk[getArg(q, q->argc-1)].val.ival = 0; + // inherit profiling + m->src->profiler[m->src->stop-1].trace = profiler; + + q= newStmt(m->src, batRef, partitionRef); + setVarType(m->src, getArg(q,0), getArgType(m->src, p, p->retc)); + b2 = getArg(q,0); + q = pushArgument(m->src,q,getArg(p,p->retc)); + q = pushInt(m->src,q,2); + // update the stack as well, because we are executing + m->stk->stk[getArg(q, q->argc-1)].val.ival = 2; + q = pushInt(m->src,q,1); + m->stk->stk[getArg(q, q->argc-1)].val.ival = 1; + // inherit profiling + m->src->profiler[m->src->stop-1].trace = profiler; + + q= newStmt(m->src, batRef, partitionRef); + setVarType(m->src, getArg(q,0), getArgType(m->src, p, p->retc+1)); + y1 = getArg(q,0); + q = pushArgument(m->src,q,getArg(p,p->retc+1)); + q = pushInt(m->src,q,2); + // update the stack as well, because we are executing + m->stk->stk[getArg(q, q->argc-1)].val.ival = 2; + q = pushInt(m->src,q,0); + m->stk->stk[getArg(q, q->argc-1)].val.ival = 0; + // inherit profiling + m->src->profiler[m->src->stop-1].trace = profiler; + + q= newStmt(m->src, batRef, partitionRef); + setVarType(m->src, getArg(q,0), getArgType(m->src, p, p->retc+1)); + y2 = getArg(q,0); + q = pushArgument(m->src,q,getArg(p,p->retc+1)); + q = pushInt(m->src,q,2); + // update the stack as well, because we are executing + m->stk->stk[getArg(q, q->argc-1)].val.ival = 2; + q = pushInt(m->src,q,1); + m->stk->stk[getArg(q, q->argc-1)].val.ival = 1; + // inherit profiling + m->src->profiler[m->src->stop-1].trace = profiler; + + q= copyInstruction(p); + getArg(q,2)= b1; + getArg(q,3)= y1; + v1 = getArg(q,0)= newTmpVariable(m->src,TYPE_any); + z1 = getArg(q,1)= newTmpVariable(m->src,TYPE_any); + pushInstruction(m->src,q); + // inherit profiling + m->src->profiler[m->src->stop-1].trace = profiler; + + q= copyInstruction(p); + getArg(q,2)= b2; + getArg(q,3)= y1; + v2 = getArg(q,0)= newTmpVariable(m->src,TYPE_any); + z2 = getArg(q,1)= newTmpVariable(m->src,TYPE_any); + pushInstruction(m->src,q); + // inherit profiling + m->src->profiler[m->src->stop-1].trace = profiler; + + q= copyInstruction(p); + getArg(q,2)= b1; + getArg(q,3)= y2; + v3 = getArg(q,0)= newTmpVariable(m->src,TYPE_any); + z3 = getArg(q,1)= newTmpVariable(m->src,TYPE_any); + pushInstruction(m->src,q); + // inherit profiling + m->src->profiler[m->src->stop-1].trace = profiler; + + q= copyInstruction(p); + getArg(q,2)= b2; + getArg(q,3)= y2; + v4 = getArg(q,0)= newTmpVariable(m->src,TYPE_any); + z4 = getArg(q,1)= newTmpVariable(m->src,TYPE_any); + pushInstruction(m->src,q); + // inherit profiling + m->src->profiler[m->src->stop-1].trace = profiler; + + q= newStmt(m->src, languageRef, passRef); + q = pushArgument(m->src,q, b1); + // inherit profiling + m->src->profiler[m->src->stop-1].trace = profiler; + + q= newStmt(m->src, languageRef, passRef); + q = pushArgument(m->src,q, b2); + // inherit profiling + m->src->profiler[m->src->stop-1].trace = profiler; + + q= newStmt(m->src, languageRef, passRef); + q = pushArgument(m->src,q, y1); + // inherit profiling + m->src->profiler[m->src->stop-1].trace = profiler; + + q= newStmt(m->src, languageRef, passRef); + q = pushArgument(m->src,q, y2); + // inherit profiling + m->src->profiler[m->src->stop-1].trace = profiler; + + // replace its use in other mat packs + for (j = i+1; j < limit; j++) { + q= old[j]; + if ( getModuleId(q) == matRef && getFunctionId(q) == packRef){ + for( k= old[j]->retc; k < old[j]->argc; k++) + if ( getArg(q,k) == getArg(p,0)){ + /* replace this argument */ + matpc++; + delArgument(old[j],k); + old[j] = setArgument(m->src,old[j],k, v4); + old[j] = setArgument(m->src,old[j],k, v3); + old[j] = setArgument(m->src,old[j],k, v2); + old[j] = setArgument(m->src,old[j],k, v1); + break; + } + + } + if(matpc>0 && j+1<limit) + { + j++; + q= old[j]; + if ( getModuleId(q) == matRef && getFunctionId(q) == packRef){ + for( k= old[j]->retc; k < old[j]->argc; k++) + if ( getArg(q,k) == getArg(p,1)){ + // replace this argument + matpc++; + delArgument(old[j],k); + old[j] = setArgument(m->src,old[j],k, z4); + old[j] = setArgument(m->src,old[j],k, z3); + old[j] = setArgument(m->src,old[j],k, z2); + old[j] = setArgument(m->src,old[j],k, z1); + break; + } + } + } + } + + if ( matpc == 0){ + q= newStmt(m->src,matRef,packRef); + getArg(q,0)= getArg(p,0); + q= pushArgument(m->src,q,v1); + q= pushArgument(m->src,q,v2); + q= pushArgument(m->src,q,v3); + q= pushArgument(m->src,q,v4); + m->src->profiler[m->src->stop-1].trace = profiler; + + q= newStmt(m->src,matRef,packRef); + getArg(q,0)= getArg(p,1); + q= pushArgument(m->src,q,z1); + q= pushArgument(m->src,q,z2); + q= pushArgument(m->src,q,z3); + q= pushArgument(m->src,q,z4); + m->src->profiler[m->src->stop-1].trace = profiler; + + } + + //pushInstruction(m->src,p); + m->target = pc; + m->comment = GDKstrdup("mutationJoin"); + } else + pushInstruction(m->src,p); + } + GDKfree(old); +} + +/* Sample plan mutation actions + * The join mutation simply splits the target instruction + * and glues the result using matpack. + * If the target is already input to a mat.pack, then we should add + * the pieces produced to that instruction instead of making a new mat.pack. + * + * mutationJoin splits only the left relation of the join operator + * */ void diff --git a/monetdb5/scheduler/mut_transforms.h b/monetdb5/scheduler/mut_transforms.h --- a/monetdb5/scheduler/mut_transforms.h +++ b/monetdb5/scheduler/mut_transforms.h @@ -26,6 +26,7 @@ #include "run_multicore.h" run_multicore_export void mutationJoin(Client cntxt, Mutant m); +run_multicore_export void mutationJoinDouble(Client cntxt, Mutant m); run_multicore_export void mutationSelect(Client cntxt, Mutant m); run_multicore_export void mutationSum(Client cntxt, Mutant m); diff --git a/monetdb5/scheduler/run_multicore.h b/monetdb5/scheduler/run_multicore.h --- a/monetdb5/scheduler/run_multicore.h +++ b/monetdb5/scheduler/run_multicore.h @@ -48,7 +48,7 @@ typedef struct MUTANT{ int target; // operation changed from previous struct MUTANT *next; } *Mutant; -#define DEBUG_MULTICORE if(0) +#define DEBUG_MULTICORE if(1) run_multicore_export str RUNmulticore(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p); run_multicore_export void multicorePrint(Client cntxt, Mutant m); #endif /* MAL_RUN_MULTICORE */ _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list