Changeset: a3863d5bf0a8 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=a3863d5bf0a8
Modified Files:
        monetdb5/scheduler/mut_policy.c
        monetdb5/scheduler/mut_transforms.c
        monetdb5/scheduler/mut_transforms.h
Branch: mutation
Log Message:

left-fetch-join parallelization code + new mutation policy with partition 
propagation to join operator when join is not parallelized yet and is the 
dependency operator from previous mat.pack


diffs (truncated from 409 to 300 lines):

diff --git a/monetdb5/scheduler/mut_policy.c b/monetdb5/scheduler/mut_policy.c
--- a/monetdb5/scheduler/mut_policy.c
+++ b/monetdb5/scheduler/mut_policy.c
@@ -43,6 +43,11 @@ mutationCandidate(MalBlkPtr mb, InstrPtr
                if (getFunctionId(p) == joinRef)
                        return 1;
        }
+        if ( getModuleId(p) == algebraRef) {
+                if (getFunctionId(p) == leftfetchjoinRef)
+                        return 1;
+        }
+
 
 /*
        if ( getModuleId(p) == aggrRef){
@@ -105,6 +110,8 @@ MUTpolicy(Client cntxt, Mutant m)
                                mutationJoin(cntxt,m);
                        else if(getFunctionId(p) == subselectRef)
                                mutationSelect(cntxt,m);
+                        else if(getFunctionId(p) == leftfetchjoinRef)
+                                mutationLeftFetchJoin(cntxt,m);
                        else    // proxy function
                                mutationJoinDouble(cntxt,m);
                }
diff --git a/monetdb5/scheduler/mut_transforms.c 
b/monetdb5/scheduler/mut_transforms.c
--- a/monetdb5/scheduler/mut_transforms.c
+++ b/monetdb5/scheduler/mut_transforms.c
@@ -246,7 +246,6 @@ mutationJoin_(MalBlkPtr mb, MalStkPtr st
        pushInstruction(mb,q);
        // inherit profiling
        mb->profiler[mb->stop-1].trace = profiler;
-
 }
 
 void 
@@ -394,7 +393,7 @@ mutationSelect(Client cntxt, Mutant m){
     if ( newMalBlkStmt(m->src, m->src->ssize) < 0)
         return;
 
-       assert( m->src->profiler);
+    assert( m->src->profiler);
     pushInstruction(m->src, old[0]);
     for (i = 1; i < limit; i++) {
         p= old[i];
@@ -488,3 +487,346 @@ mutationSum(Client cntxt, Mutant m){
     GDKfree(old);
 }
 
+static void 
+mutationLeftFetchJoin_(MalBlkPtr mb, MalStkPtr stk, InstrPtr p, int 
partitions, int slice, int profiler, int *v1Ptr)
+{
+       int b1;
+       InstrPtr q;
+
+       q= newStmt(mb, batRef, partitionRef);
+       setVarType(mb, getArg(q,0), getArgType(mb, p, p->retc));
+       b1 = getArg(q,0);
+       q = pushArgument(mb,q,getArg(p,p->retc));
+       q = pushInt(mb,q,2);
+       // update the stack as well, because we are executing
+       stk->stk[getArg(q, q->argc-1)].val.ival = partitions;
+       q = pushInt(mb,q,slice);
+       stk->stk[getArg(q, q->argc-1)].val.ival = slice;
+       // inherit profiling
+       mb->profiler[mb->stop-1].trace = profiler;
+
+       q= copyInstruction(p);
+       getArg(q,1)= b1;
+       *v1Ptr = getArg(q,0)= newTmpVariable(mb,TYPE_any);
+       pushInstruction(mb,q);
+       // inherit profiling
+       mb->profiler[mb->stop-1].trace = profiler;
+}
+
+void 
+mutationLeftFetchJoin(Client cntxt, Mutant m){
+    int pc = m->target, i, j, k, limit, v1,v2;
+    InstrPtr p=0, *old= m->src->stmt, q;
+    int matpc = 0, profiler;
+      
+
+    (void) cntxt;
+    limit= m->src->stop;
+    if ( newMalBlkStmt(m->src, m->src->ssize) < 0)
+        return;
+
+    pushInstruction(m->src, old[0]);
+    for (i = 1; i < limit; i++) {
+        p= old[i];
+               if ( i == pc){
+                       //   replace the instruction, e.g. with a partioned one.
+                       //   Dont use any partition intelligence, simple half 
split
+                       //   x1 := algebra.LeftFetchJoin(b,Y) =>
+                       //      b1 := bat.partition(b,2,0);
+                       //      b2 := bat.partition(b,2,1);
+                       //      v1:= algebra.leftfetchjoin(b1,Y);
+                       //      v2:= algebra.leftfetchjoin(b2,Y);
+                       //      x1:= mat.pack(v1,v2);
+                       //      
+                       //  Be careful not to change the size of the stack,
+                       //  for we can not easily pass it back to the 
+                       //  current interpreter call sequence
+                       //
+                       if ( m->stk->stksize < m->src->vtop + 7){
+                               pushInstruction(m->src,p);
+                               continue;
+                       }
+               
+                       profiler = m->src->profiler[i].trace;
+                       
+                       mutationLeftFetchJoin_(m->src, m->stk, p, 2, 0, 
profiler, &v1);
+                       mutationLeftFetchJoin_(m->src, m->stk, p, 2, 1, 
profiler, &v2);
+                       
+                       // replace its use in other mat packs
+                       for (j = i+1; j < limit; j++) {
+                               q= old[j];
+                               if ( getModuleId(q) == matRef && 
getFunctionId(q) == packRef){
+                                       for( k= old[j]->retc; k < old[j]->argc; 
k++)
+                                               if ( getArg(q,k) == 
getArg(p,0)){
+                                                       // replace this 
argument 
+                                                       matpc++;
+                                                       delArgument(old[j],k);
+                                                       old[j] = 
setArgument(m->src,old[j],k, v2);
+                                                       old[j] = 
setArgument(m->src,old[j],k, v1);
+                                                       break;
+                                               }
+                               }
+                       }
+
+                       if ( matpc == 0){
+                               q= newStmt(m->src,matRef,packRef);
+                               getArg(q,0)= getArg(p,0);
+                               q= pushArgument(m->src,q,v1);
+                               q= pushArgument(m->src,q,v2);
+                               m->src->profiler[m->src->stop-1].trace = 
profiler;
+
+                               q= newStmt(m->src, languageRef, passRef);
+                               q = pushArgument(m->src,q, getArg(p,p->retc+1));
+                               // inherit profiling
+                               m->src->profiler[m->src->stop-1].trace = 
profiler;
+                       }
+
+                       //pushInstruction(m->src,p);
+                       m->target = pc;
+                       m->comment = GDKstrdup("mutationLeftFetchJoin");
+               } else
+                       pushInstruction(m->src,p);
+       }
+    GDKfree(old);
+}
+
+static void
+mutateNonPartitionedOperators(Mutant m, int stmtLoop, int matPackRefInstr, int 
matPackRefInstrArg, InstrPtr instrMatPack, int profiler)
+{
+    int  j, k, l, limit, v1[MAX_PARTITIONS], z1[MAX_PARTITIONS];
+    InstrPtr  *old= m->src->stmt, q;
+    int matpc = 0, mat_pack_partitions;
+
+    limit= m->src->stop;
+ 
+    if(getModuleId(getInstrPtr(m->src, matPackRefInstr)) == algebraRef)
+    {  
+               if(getFunctionId(getInstrPtr(m->src, matPackRefInstr)) == 
joinRef)
+               {
+                       // number_of_partitions = Get number of partitions from 
mat.pack
+                        mat_pack_partitions = instrMatPack->argc - 
instrMatPack->retc;                                                 
+                        
+                        for (k=0; k<mat_pack_partitions; k++)
+                        {
+                            q= copyInstruction(getInstrPtr(m->src, 
matPackRefInstr));  // matPackRefInstr = join
+                       
+                            getArg(q,matPackRefInstrArg) = 
getArg(instrMatPack, instrMatPack->retc + k);
+                       
+                            v1[k] = getArg(q,0)= 
newTmpVariable(m->src,TYPE_any);
+                       
+                            z1[k] = getArg(q,1)= 
newTmpVariable(m->src,TYPE_any);
+                       
+                             pushInstruction(m->src,q);
+                       
+                            // inherit profiling
+                            m->src->profiler[m->src->stop-1].trace = profiler; 
                        
+                        }
+                                       
+                       // replace its use in other mat packs
+                       for (j = stmtLoop+1; j < limit; j++) 
+                       {
+                               q= old[j];
+                               if ( getModuleId(q) == matRef && 
getFunctionId(q) == packRef)
+                               {
+                                       for( k= old[j]->retc; k < old[j]->argc; 
k++)
+                                       {
+                                               if ( getArg(q,k) == 
getArg(getInstrPtr(m->src, matPackRefInstr),0))
+                                               {
+                                                       // replace this 
argument 
+                                                       matpc++;
+                                                       delArgument(old[j],k);
+                                                       
for(l=mat_pack_partitions-1; l>=0; l--)
+                                                               old[j] = 
setArgument(m->src,old[j],k, v1[l]);
+                                                       break;
+                                               }
+                                       }                               
+                               }
+                               if(matpc>0 && j+1<limit)
+                               {       
+                                       j++;
+                                       q= old[j];
+                                       if ( getModuleId(q) == matRef && 
getFunctionId(q) == packRef)
+                                       {
+                                               for( k= old[j]->retc; k < 
old[j]->argc; k++)
+                                               {
+                                                       if ( getArg(q,k) == 
getArg(getInstrPtr(m->src,matPackRefInstr),1))
+                                                       {
+                                                               // replace this 
argument 
+                                                               matpc++;
+                                                               
delArgument(old[j],k);
+
+                                                               
for(l=mat_pack_partitions-1; l>=0; l--)
+                                                                       old[j] 
= setArgument(m->src,old[j],k, z1[l]);
+                                                               break;
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+                       if (matpc == 0)
+                       {
+                               for(k=0; k<mat_pack_partitions; k++)
+                               {                       
+                                       q= newStmt(m->src,matRef,packRef);
+                                       getArg(q,0)= 
getArg(getInstrPtr(m->src,matPackRefInstr),0);
+                                       
+                                       for(k=0; k<mat_pack_partitions; k++)
+                                               q= pushArgument(m->src,q,v1[k]);
+                                       m->src->profiler[m->src->stop-1].trace 
= profiler;
+
+                                       q= newStmt(m->src,matRef,packRef);
+                                       getArg(q,0)= 
getArg(getInstrPtr(m->src,matPackRefInstr),1);
+                                       
+                                       for(k=0; k<mat_pack_partitions; k++)
+                                               q= pushArgument(m->src,q,z1[k]);
+                                       m->src->profiler[m->src->stop-1].trace 
= profiler;
+
+                                       q= newStmt(m->src, languageRef, 
passRef); // The instruction below should be remembered if done correct
+                                       q = pushArgument(m->src,q, 
getArg(getInstrPtr(m->src,matPackRefInstr),getInstrPtr(m->src,matPackRefInstr)->retc+1));
+
+                                       // inherit profiling
+                                       m->src->profiler[m->src->stop-1].trace 
= profiler;
+                               }
+                       }
+                       // mask the original algebra.join instruction
+                       getInstrPtr(m->src,matPackRefInstr)->token = 
NOOPsymbol;                        
+                       
+               }
+       }
+
+//end of the function
+}
+
+
+
+void 
+mutationMatPack(Client cntxt, Mutant m) {
+    int pc = m->target, i, j, limit, profiler;
+    InstrPtr p=0, *old= m->src->stmt,  instrMatPack;
+
+    int stmtLoop, matPackRefInstr, matPackRefInstrArg;
+
+               
+       // search for the mat.pack outuput variable
+       // check in which instruction this mat.pack output appears in the 
dependency instructions. 
+
+       // If the referenced instruction is a non-bat-ref instruction, and its 
cost is relatively 
+       // comparable to the cost of mat.pack operator, then count the number 
of partitions in the
+       // mat.pack instruction and introduce those many partitions for the new 
dependency operator 
+       // which we just found. That is partition the input of the dependency 
operator (This dependency operator could be
+       // any for time being consider it to be a join operator and mat.pack 
combines a select operator
+       // Then we have join(A,x)   propagate to 
+       // join(s1,x), join(s2,x), join(s3,x)  that is propagate the input to 
mat.pack to the newly introduced join operator
+       // and remove the old join operator. also remove the old mat.pack 
operator from the select output, and introduce a new
+       // mat.pack operator to combine the join output
+       
+       // Case 1
+       // 
+       // A1 := algebra.subselect(....);
+       // A2 := algebra.subselect(....);
+       // M1 := mat.pack(A1, A2);
+       // J := algebra.join(M1, b);
+       //
+       //
+       // Morphed into new plan
+       //
+       // A1 := algebra.subselect(....);
+       // A2 := algebra.subselect(....);
+       // J1 := algebra.join(A1,b);
+       // J2 := algebra.join(A2,b);
+       // M := mat.pack(J1, J2);
+
+
+       // Case 2       
+       //
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to