Changeset: bfdc45aad8e5 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=bfdc45aad8e5
Modified Files:
        monetdb5/scheduler/mut_policy.c
        monetdb5/scheduler/mut_transforms.c
        monetdb5/scheduler/mut_transforms.h
        monetdb5/scheduler/run_multicore.h
Branch: mutation
Log Message:

added join mutation operator with both relations splits


diffs (270 lines):

diff --git a/monetdb5/scheduler/mut_policy.c b/monetdb5/scheduler/mut_policy.c
--- a/monetdb5/scheduler/mut_policy.c
+++ b/monetdb5/scheduler/mut_policy.c
@@ -98,10 +98,12 @@ MUTpolicy(Client cntxt, Mutant m)
                if ( getModuleId(p) && strncmp(getModuleId(p), "algebra",7)== 0)
                {
                        if(getFunctionId(p) == joinRef)
-                               mutationJoin(cntxt,m);
+                               mutationJoinDouble(cntxt,m);
+//                             mutationJoin(cntxt,m);
                        else if(getFunctionId(p) == subselectRef)
                                mutationSelect(cntxt,m);
-
+                       else    // proxy function
+                               mutationJoinDouble(cntxt,m);
                }
                if ( getModuleId(p) && strncmp(getModuleId(p), "aggr",4)== 0)
                        mutationSum(cntxt,m);
diff --git a/monetdb5/scheduler/mut_transforms.c 
b/monetdb5/scheduler/mut_transforms.c
--- a/monetdb5/scheduler/mut_transforms.c
+++ b/monetdb5/scheduler/mut_transforms.c
@@ -32,6 +32,225 @@
  * and glues the result using matpack.
  * If the target is already input to a mat.pack, then we should add
  * the pieces produced to that instruction instead of making a new mat.pack.
+ * 
+ * mutationJoinDouble splits both the input relations to the join operator 
+ */
+
+void mutationJoinDouble(Client cntxt, Mutant m){
+    int pc = m->target, i, j, k, limit, b1,b2, v1,v2,v3,v4, z1,z2,z3,z4,y1,y2 ;
+    InstrPtr p=0, *old= m->src->stmt, q;
+    int matpc = 0, profiler;
+
+    (void) cntxt;
+    limit= m->src->stop;
+    if ( newMalBlkStmt(m->src, m->src->ssize) < 0)
+        return;
+
+    pushInstruction(m->src, old[0]);
+    for (i = 1; i < limit; i++) {
+        p= old[i];
+               if ( i == pc){
+                       /* replace the instruction, e.g. with a partioned one.
+                          Dont use any partition intelligence, simple half 
split
+                          x1,x2 := algebra.join(b,Y) =>
+                               b1 := bat.partition(b,2,0);
+                               b2 := bat.partition(b,2,1);
+                               y1 := bat.partition(Y,2,0);
+                               y2 := bat.partition(Y,2,1);
+                               v1,z1:= algebra.join(b1,y1);
+                               v2,z2:= algebra.join(b2,y1);
+                               v3,z3:= algebra.join(b1,y2);
+                               v4,z4:= algebra.join(b2,y2);
+                               x1:= mat.pack(v1,v2,v3,v4);
+                               x2:= mat.pack(z1,z2,z3,z4);
+                         Be careful not to change the size of the stack,
+                         for we can not easily pass it back to the 
+                         current interpreter call sequence
+                       */
+                       if ( m->stk->stksize < m->src->vtop + 7){
+                               pushInstruction(m->src,p);
+                               continue;
+                       }
+               
+                       profiler = m->src->profiler[i].trace;
+                       
+                       q= newStmt(m->src, batRef, partitionRef);
+                       setVarType(m->src, getArg(q,0), getArgType(m->src, p, 
p->retc));
+                       b1 = getArg(q,0);
+                       q = pushArgument(m->src,q,getArg(p,p->retc));
+                       q = pushInt(m->src,q,2);
+                       // update the stack as well, because we are executing
+                       m->stk->stk[getArg(q, q->argc-1)].val.ival = 2;
+                       q = pushInt(m->src,q,0);
+                       m->stk->stk[getArg(q, q->argc-1)].val.ival = 0;
+                       // inherit profiling
+                       m->src->profiler[m->src->stop-1].trace = profiler;
+
+                       q= newStmt(m->src, batRef, partitionRef);
+                       setVarType(m->src, getArg(q,0), getArgType(m->src, p, 
p->retc));
+                       b2 = getArg(q,0);
+                       q = pushArgument(m->src,q,getArg(p,p->retc));
+                       q = pushInt(m->src,q,2);
+                       // update the stack as well, because we are executing
+                       m->stk->stk[getArg(q, q->argc-1)].val.ival = 2;
+                       q = pushInt(m->src,q,1);
+                       m->stk->stk[getArg(q, q->argc-1)].val.ival = 1;
+                       // inherit profiling
+                       m->src->profiler[m->src->stop-1].trace = profiler;
+
+                       q= newStmt(m->src, batRef, partitionRef);
+                       setVarType(m->src, getArg(q,0), getArgType(m->src, p, 
p->retc+1));
+                       y1 = getArg(q,0);
+                       q = pushArgument(m->src,q,getArg(p,p->retc+1));
+                       q = pushInt(m->src,q,2);
+                       // update the stack as well, because we are executing
+                       m->stk->stk[getArg(q, q->argc-1)].val.ival = 2;
+                       q = pushInt(m->src,q,0);
+                       m->stk->stk[getArg(q, q->argc-1)].val.ival = 0;
+                       // inherit profiling
+                       m->src->profiler[m->src->stop-1].trace = profiler;
+
+                       q= newStmt(m->src, batRef, partitionRef);
+                       setVarType(m->src, getArg(q,0), getArgType(m->src, p, 
p->retc+1));
+                       y2 = getArg(q,0);
+                       q = pushArgument(m->src,q,getArg(p,p->retc+1));
+                       q = pushInt(m->src,q,2);
+                       // update the stack as well, because we are executing
+                       m->stk->stk[getArg(q, q->argc-1)].val.ival = 2;
+                       q = pushInt(m->src,q,1);
+                       m->stk->stk[getArg(q, q->argc-1)].val.ival = 1;
+                       // inherit profiling
+                       m->src->profiler[m->src->stop-1].trace = profiler;
+
+                       q= copyInstruction(p);
+                       getArg(q,2)= b1;
+                       getArg(q,3)= y1;
+                       v1 = getArg(q,0)= newTmpVariable(m->src,TYPE_any);
+                       z1 = getArg(q,1)= newTmpVariable(m->src,TYPE_any);
+                       pushInstruction(m->src,q);
+                       // inherit profiling
+                       m->src->profiler[m->src->stop-1].trace = profiler;
+
+                       q= copyInstruction(p);
+                       getArg(q,2)= b2;
+                       getArg(q,3)= y1;
+                       v2 = getArg(q,0)= newTmpVariable(m->src,TYPE_any);
+                       z2 = getArg(q,1)= newTmpVariable(m->src,TYPE_any);
+                       pushInstruction(m->src,q);
+                       // inherit profiling
+                       m->src->profiler[m->src->stop-1].trace = profiler;
+
+                       q= copyInstruction(p);
+                       getArg(q,2)= b1;
+                       getArg(q,3)= y2;
+                       v3 = getArg(q,0)= newTmpVariable(m->src,TYPE_any);
+                       z3 = getArg(q,1)= newTmpVariable(m->src,TYPE_any);
+                       pushInstruction(m->src,q);
+                       // inherit profiling
+                       m->src->profiler[m->src->stop-1].trace = profiler;
+
+                       q= copyInstruction(p);
+                       getArg(q,2)= b2;
+                       getArg(q,3)= y2;
+                       v4 = getArg(q,0)= newTmpVariable(m->src,TYPE_any);
+                       z4 = getArg(q,1)= newTmpVariable(m->src,TYPE_any);
+                       pushInstruction(m->src,q);
+                       // inherit profiling
+                       m->src->profiler[m->src->stop-1].trace = profiler;
+
+                       q= newStmt(m->src, languageRef, passRef);
+                       q = pushArgument(m->src,q, b1);
+                       // inherit profiling
+                       m->src->profiler[m->src->stop-1].trace = profiler;
+
+                       q= newStmt(m->src, languageRef, passRef);
+                       q = pushArgument(m->src,q, b2);
+                       // inherit profiling
+                       m->src->profiler[m->src->stop-1].trace = profiler;
+                       
+                       q= newStmt(m->src, languageRef, passRef);
+                       q = pushArgument(m->src,q, y1);
+                       // inherit profiling
+                       m->src->profiler[m->src->stop-1].trace = profiler;
+
+                       q= newStmt(m->src, languageRef, passRef);
+                       q = pushArgument(m->src,q, y2);
+                       // inherit profiling
+                       m->src->profiler[m->src->stop-1].trace = profiler;
+
+                       // replace its use in other mat packs
+                       for (j = i+1; j < limit; j++) {
+                               q= old[j];
+                               if ( getModuleId(q) == matRef && 
getFunctionId(q) == packRef){
+                                       for( k= old[j]->retc; k < old[j]->argc; 
k++)
+                                               if ( getArg(q,k) == 
getArg(p,0)){
+                                                       /* replace this 
argument */
+                                                       matpc++;
+                                                       delArgument(old[j],k);
+                                                       old[j] = 
setArgument(m->src,old[j],k, v4);
+                                                       old[j] = 
setArgument(m->src,old[j],k, v3);
+                                                       old[j] = 
setArgument(m->src,old[j],k, v2);
+                                                       old[j] = 
setArgument(m->src,old[j],k, v1);
+                                                       break;
+                                               }
+                                                               
+                               }
+                               if(matpc>0 && j+1<limit)
+                               {       
+                                       j++;
+                                       q= old[j];
+                                       if ( getModuleId(q) == matRef && 
getFunctionId(q) == packRef){
+                                               for( k= old[j]->retc; k < 
old[j]->argc; k++)
+                                                       if ( getArg(q,k) == 
getArg(p,1)){
+                                                               // replace this 
argument 
+                                                               matpc++;
+                                                               
delArgument(old[j],k);
+                                                               old[j] = 
setArgument(m->src,old[j],k, z4);
+                                                               old[j] = 
setArgument(m->src,old[j],k, z3);
+                                                               old[j] = 
setArgument(m->src,old[j],k, z2);
+                                                               old[j] = 
setArgument(m->src,old[j],k, z1);
+                                                               break;
+                                                       }
+                                       }
+                               }
+                       }
+
+                       if ( matpc == 0){
+                               q= newStmt(m->src,matRef,packRef);
+                               getArg(q,0)= getArg(p,0);
+                               q= pushArgument(m->src,q,v1);
+                               q= pushArgument(m->src,q,v2);
+                               q= pushArgument(m->src,q,v3);
+                               q= pushArgument(m->src,q,v4);
+                               m->src->profiler[m->src->stop-1].trace = 
profiler;
+
+                               q= newStmt(m->src,matRef,packRef);
+                               getArg(q,0)= getArg(p,1);
+                               q= pushArgument(m->src,q,z1);
+                               q= pushArgument(m->src,q,z2);
+                               q= pushArgument(m->src,q,z3);
+                               q= pushArgument(m->src,q,z4);
+                               m->src->profiler[m->src->stop-1].trace = 
profiler;
+
+                       }
+
+                       //pushInstruction(m->src,p);
+                       m->target = pc;
+                       m->comment = GDKstrdup("mutationJoin");
+               } else
+                       pushInstruction(m->src,p);
+       }
+    GDKfree(old);
+}
+
+/* Sample plan mutation actions
+ * The join mutation simply splits the target instruction
+ * and glues the result using matpack.
+ * If the target is already input to a mat.pack, then we should add
+ * the pieces produced to that instruction instead of making a new mat.pack.
+ * 
+ * mutationJoin splits only the left relation of the join operator
+ *
  */
 
 void 
diff --git a/monetdb5/scheduler/mut_transforms.h 
b/monetdb5/scheduler/mut_transforms.h
--- a/monetdb5/scheduler/mut_transforms.h
+++ b/monetdb5/scheduler/mut_transforms.h
@@ -26,6 +26,7 @@
 #include "run_multicore.h"
 
 run_multicore_export void mutationJoin(Client cntxt, Mutant m);
+run_multicore_export void mutationJoinDouble(Client cntxt, Mutant m);
 run_multicore_export void mutationSelect(Client cntxt, Mutant m);
 run_multicore_export void mutationSum(Client cntxt, Mutant m);
 
diff --git a/monetdb5/scheduler/run_multicore.h 
b/monetdb5/scheduler/run_multicore.h
--- a/monetdb5/scheduler/run_multicore.h
+++ b/monetdb5/scheduler/run_multicore.h
@@ -48,7 +48,7 @@ typedef struct MUTANT{
        int target;             // operation changed from previous
        struct MUTANT *next;
 } *Mutant;
-#define DEBUG_MULTICORE if(0)
+#define DEBUG_MULTICORE if(1)
 run_multicore_export str RUNmulticore(Client cntxt, MalBlkPtr mb, MalStkPtr 
stk, InstrPtr p);
 run_multicore_export void multicorePrint(Client cntxt, Mutant m);
 #endif /* MAL_RUN_MULTICORE */
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to