Changeset: a3863d5bf0a8 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=a3863d5bf0a8 Modified Files: monetdb5/scheduler/mut_policy.c monetdb5/scheduler/mut_transforms.c monetdb5/scheduler/mut_transforms.h Branch: mutation Log Message:
left-fetch-join parallelization code + new mutation policy with partition propagation to join operator when join is not parallelized yet and is the dependency operator from previous mat.pack diffs (truncated from 409 to 300 lines): diff --git a/monetdb5/scheduler/mut_policy.c b/monetdb5/scheduler/mut_policy.c --- a/monetdb5/scheduler/mut_policy.c +++ b/monetdb5/scheduler/mut_policy.c @@ -43,6 +43,11 @@ mutationCandidate(MalBlkPtr mb, InstrPtr if (getFunctionId(p) == joinRef) return 1; } + if ( getModuleId(p) == algebraRef) { + if (getFunctionId(p) == leftfetchjoinRef) + return 1; + } + /* if ( getModuleId(p) == aggrRef){ @@ -105,6 +110,8 @@ MUTpolicy(Client cntxt, Mutant m) mutationJoin(cntxt,m); else if(getFunctionId(p) == subselectRef) mutationSelect(cntxt,m); + else if(getFunctionId(p) == leftfetchjoinRef) + mutationLeftFetchJoin(cntxt,m); else // proxy function mutationJoinDouble(cntxt,m); } diff --git a/monetdb5/scheduler/mut_transforms.c b/monetdb5/scheduler/mut_transforms.c --- a/monetdb5/scheduler/mut_transforms.c +++ b/monetdb5/scheduler/mut_transforms.c @@ -246,7 +246,6 @@ mutationJoin_(MalBlkPtr mb, MalStkPtr st pushInstruction(mb,q); // inherit profiling mb->profiler[mb->stop-1].trace = profiler; - } void @@ -394,7 +393,7 @@ mutationSelect(Client cntxt, Mutant m){ if ( newMalBlkStmt(m->src, m->src->ssize) < 0) return; - assert( m->src->profiler); + assert( m->src->profiler); pushInstruction(m->src, old[0]); for (i = 1; i < limit; i++) { p= old[i]; @@ -488,3 +487,346 @@ mutationSum(Client cntxt, Mutant m){ GDKfree(old); } +static void +mutationLeftFetchJoin_(MalBlkPtr mb, MalStkPtr stk, InstrPtr p, int partitions, int slice, int profiler, int *v1Ptr) +{ + int b1; + InstrPtr q; + + q= newStmt(mb, batRef, partitionRef); + setVarType(mb, getArg(q,0), getArgType(mb, p, p->retc)); + b1 = getArg(q,0); + q = pushArgument(mb,q,getArg(p,p->retc)); + q = pushInt(mb,q,2); + // update the stack as well, because we are executing + stk->stk[getArg(q, q->argc-1)].val.ival = partitions; + q = pushInt(mb,q,slice); + stk->stk[getArg(q, q->argc-1)].val.ival = slice; + // inherit profiling + mb->profiler[mb->stop-1].trace = profiler; + + q= copyInstruction(p); + getArg(q,1)= b1; + *v1Ptr = getArg(q,0)= newTmpVariable(mb,TYPE_any); + pushInstruction(mb,q); + // inherit profiling + mb->profiler[mb->stop-1].trace = profiler; +} + +void +mutationLeftFetchJoin(Client cntxt, Mutant m){ + int pc = m->target, i, j, k, limit, v1,v2; + InstrPtr p=0, *old= m->src->stmt, q; + int matpc = 0, profiler; + + + (void) cntxt; + limit= m->src->stop; + if ( newMalBlkStmt(m->src, m->src->ssize) < 0) + return; + + pushInstruction(m->src, old[0]); + for (i = 1; i < limit; i++) { + p= old[i]; + if ( i == pc){ + // replace the instruction, e.g. with a partioned one. + // Dont use any partition intelligence, simple half split + // x1 := algebra.LeftFetchJoin(b,Y) => + // b1 := bat.partition(b,2,0); + // b2 := bat.partition(b,2,1); + // v1:= algebra.leftfetchjoin(b1,Y); + // v2:= algebra.leftfetchjoin(b2,Y); + // x1:= mat.pack(v1,v2); + // + // Be careful not to change the size of the stack, + // for we can not easily pass it back to the + // current interpreter call sequence + // + if ( m->stk->stksize < m->src->vtop + 7){ + pushInstruction(m->src,p); + continue; + } + + profiler = m->src->profiler[i].trace; + + mutationLeftFetchJoin_(m->src, m->stk, p, 2, 0, profiler, &v1); + mutationLeftFetchJoin_(m->src, m->stk, p, 2, 1, profiler, &v2); + + // replace its use in other mat packs + for (j = i+1; j < limit; j++) { + q= old[j]; + if ( getModuleId(q) == matRef && getFunctionId(q) == packRef){ + for( k= old[j]->retc; k < old[j]->argc; k++) + if ( getArg(q,k) == getArg(p,0)){ + // replace this argument + matpc++; + delArgument(old[j],k); + old[j] = setArgument(m->src,old[j],k, v2); + old[j] = setArgument(m->src,old[j],k, v1); + break; + } + } + } + + if ( matpc == 0){ + q= newStmt(m->src,matRef,packRef); + getArg(q,0)= getArg(p,0); + q= pushArgument(m->src,q,v1); + q= pushArgument(m->src,q,v2); + m->src->profiler[m->src->stop-1].trace = profiler; + + q= newStmt(m->src, languageRef, passRef); + q = pushArgument(m->src,q, getArg(p,p->retc+1)); + // inherit profiling + m->src->profiler[m->src->stop-1].trace = profiler; + } + + //pushInstruction(m->src,p); + m->target = pc; + m->comment = GDKstrdup("mutationLeftFetchJoin"); + } else + pushInstruction(m->src,p); + } + GDKfree(old); +} + +static void +mutateNonPartitionedOperators(Mutant m, int stmtLoop, int matPackRefInstr, int matPackRefInstrArg, InstrPtr instrMatPack, int profiler) +{ + int j, k, l, limit, v1[MAX_PARTITIONS], z1[MAX_PARTITIONS]; + InstrPtr *old= m->src->stmt, q; + int matpc = 0, mat_pack_partitions; + + limit= m->src->stop; + + if(getModuleId(getInstrPtr(m->src, matPackRefInstr)) == algebraRef) + { + if(getFunctionId(getInstrPtr(m->src, matPackRefInstr)) == joinRef) + { + // number_of_partitions = Get number of partitions from mat.pack + mat_pack_partitions = instrMatPack->argc - instrMatPack->retc; + + for (k=0; k<mat_pack_partitions; k++) + { + q= copyInstruction(getInstrPtr(m->src, matPackRefInstr)); // matPackRefInstr = join + + getArg(q,matPackRefInstrArg) = getArg(instrMatPack, instrMatPack->retc + k); + + v1[k] = getArg(q,0)= newTmpVariable(m->src,TYPE_any); + + z1[k] = getArg(q,1)= newTmpVariable(m->src,TYPE_any); + + pushInstruction(m->src,q); + + // inherit profiling + m->src->profiler[m->src->stop-1].trace = profiler; + } + + // replace its use in other mat packs + for (j = stmtLoop+1; j < limit; j++) + { + q= old[j]; + if ( getModuleId(q) == matRef && getFunctionId(q) == packRef) + { + for( k= old[j]->retc; k < old[j]->argc; k++) + { + if ( getArg(q,k) == getArg(getInstrPtr(m->src, matPackRefInstr),0)) + { + // replace this argument + matpc++; + delArgument(old[j],k); + for(l=mat_pack_partitions-1; l>=0; l--) + old[j] = setArgument(m->src,old[j],k, v1[l]); + break; + } + } + } + if(matpc>0 && j+1<limit) + { + j++; + q= old[j]; + if ( getModuleId(q) == matRef && getFunctionId(q) == packRef) + { + for( k= old[j]->retc; k < old[j]->argc; k++) + { + if ( getArg(q,k) == getArg(getInstrPtr(m->src,matPackRefInstr),1)) + { + // replace this argument + matpc++; + delArgument(old[j],k); + + for(l=mat_pack_partitions-1; l>=0; l--) + old[j] = setArgument(m->src,old[j],k, z1[l]); + break; + } + } + } + } + } + if (matpc == 0) + { + for(k=0; k<mat_pack_partitions; k++) + { + q= newStmt(m->src,matRef,packRef); + getArg(q,0)= getArg(getInstrPtr(m->src,matPackRefInstr),0); + + for(k=0; k<mat_pack_partitions; k++) + q= pushArgument(m->src,q,v1[k]); + m->src->profiler[m->src->stop-1].trace = profiler; + + q= newStmt(m->src,matRef,packRef); + getArg(q,0)= getArg(getInstrPtr(m->src,matPackRefInstr),1); + + for(k=0; k<mat_pack_partitions; k++) + q= pushArgument(m->src,q,z1[k]); + m->src->profiler[m->src->stop-1].trace = profiler; + + q= newStmt(m->src, languageRef, passRef); // The instruction below should be remembered if done correct + q = pushArgument(m->src,q, getArg(getInstrPtr(m->src,matPackRefInstr),getInstrPtr(m->src,matPackRefInstr)->retc+1)); + + // inherit profiling + m->src->profiler[m->src->stop-1].trace = profiler; + } + } + // mask the original algebra.join instruction + getInstrPtr(m->src,matPackRefInstr)->token = NOOPsymbol; + + } + } + +//end of the function +} + + + +void +mutationMatPack(Client cntxt, Mutant m) { + int pc = m->target, i, j, limit, profiler; + InstrPtr p=0, *old= m->src->stmt, instrMatPack; + + int stmtLoop, matPackRefInstr, matPackRefInstrArg; + + + // search for the mat.pack outuput variable + // check in which instruction this mat.pack output appears in the dependency instructions. + + // If the referenced instruction is a non-bat-ref instruction, and its cost is relatively + // comparable to the cost of mat.pack operator, then count the number of partitions in the + // mat.pack instruction and introduce those many partitions for the new dependency operator + // which we just found. That is partition the input of the dependency operator (This dependency operator could be + // any for time being consider it to be a join operator and mat.pack combines a select operator + // Then we have join(A,x) propagate to + // join(s1,x), join(s2,x), join(s3,x) that is propagate the input to mat.pack to the newly introduced join operator + // and remove the old join operator. also remove the old mat.pack operator from the select output, and introduce a new + // mat.pack operator to combine the join output + + // Case 1 + // + // A1 := algebra.subselect(....); + // A2 := algebra.subselect(....); + // M1 := mat.pack(A1, A2); + // J := algebra.join(M1, b); + // + // + // Morphed into new plan + // + // A1 := algebra.subselect(....); + // A2 := algebra.subselect(....); + // J1 := algebra.join(A1,b); + // J2 := algebra.join(A2,b); + // M := mat.pack(J1, J2); + + + // Case 2 + // _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list