Changeset: cc2956f1a6eb for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=cc2956f1a6eb
Modified Files:
        
Branch: default
Log Message:

merging


diffs (truncated from 354 to 300 lines):

diff --git a/monetdb5/mal/mal_resolve.mx b/monetdb5/mal/mal_resolve.mx
--- a/monetdb5/mal/mal_resolve.mx
+++ b/monetdb5/mal/mal_resolve.mx
@@ -160,6 +160,10 @@ findFunctionType(Module scope, MalBlkPtr
        m = scope;
        s = m->subscope[(int)(getSubScope(getFunctionId(p)))];
        if (s == 0) return -1;
+
+       returntype= (int*) GDKzalloc(p->retc * sizeof(int));
+       if ( returntype == 0) return -1;
+
        while (s != NULL) {  /* single scope element check */
                if (getFunctionId(p) != s->name) {
                        s = s->skip; continue;
@@ -357,7 +361,6 @@ findFunctionType(Module scope, MalBlkPtr
                 * the resulting type can not be determined.
                 */
                s1 = 0;
-               returntype= (int*) GDKzalloc(p->retc * sizeof(int));
                if (sig->polymorphic)
                        for (k = i = 0; i < p->retc; k++, i++) {
                                int actual = getArgType(mb, p, i);
diff --git a/monetdb5/optimizer/opt_dataflow.mx 
b/monetdb5/optimizer/opt_dataflow.mx
--- a/monetdb5/optimizer/opt_dataflow.mx
+++ b/monetdb5/optimizer/opt_dataflow.mx
@@ -110,6 +110,11 @@ opt_export void removeDataflow(InstrPtr 
 #include "mal_instruction.h"
 #include "mal_interpreter.h"
 
+/*
+ * dataflow processing incurs overhead and is only
+ * relevant if multiple tasks kan be handled at the same time.
+ * Also simple expressions dont had to be done in parallel.
+*/
 static int
 simpleFlow(InstrPtr *old, int start, int last)
 {
@@ -125,6 +130,8 @@ simpleFlow(InstrPtr *old, int start, int
                        if( getArg(p,0) == getArg(q,j))
                                simple= TRUE;
                if( !simple)
+                       simple = getModuleId(p) == calcRef || getModuleId(p) == 
mtimeRef || getModuleId(p) == strRef || getModuleId(p)== mmathRef;
+               if( !simple)
                        return 0;
                p = q;
        }
diff --git a/monetdb5/optimizer/opt_partition.mx 
b/monetdb5/optimizer/opt_partition.mx
--- a/monetdb5/optimizer/opt_partition.mx
+++ b/monetdb5/optimizer/opt_partition.mx
@@ -38,10 +38,6 @@ address OPTpartitionMaterialize
 comment "Implement the partition operation. Throw an exception if the 
partition was empty, 
 because then the subquery should produce a NIL ";
 
-command partition.markH( b:bat[:any_1,:any_2] ) :bat[:oid,:any_2] 
-address OPTmarkHead
-comment "Ignore a NIL bat";
-
 pattern partition.vector(b:bat[:oid,:any_1]) :any_1...
 address OPTvector
 comment "Derive a series of slices values based on sampling";
@@ -57,7 +53,6 @@ comment "Derive a series of slices value
 
 opt_export str OPTvector(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 opt_export str OPTpartitionMaterialize(int *result, int *bid, ptr low, ptr 
high);
-opt_export str OPTmarkHead(int *result, int *bid);
 
 /* #define DEBUG_DETAIL*/
 #define _DEBUG_OPT_PARTITION_ 
@@ -81,6 +76,11 @@ typedef      struct{
        ValRecord bounds[MAXSITES];
 } Slices;
 
+/*
+ * The query will be controlled from the coordinator with a plan
+ * geared at parallel execution 
+ * TODO pack is expensive, move to mat.new
+*/
 static MalBlkPtr
 OPTplanCntrl(Client cntxt, MalBlkPtr mb, MalBlkPtr pmb, Slices *slices)
 {
@@ -181,18 +181,13 @@ OPTplanCntrl(Client cntxt, MalBlkPtr mb,
                pushInstruction(cmb,q);
        }
        /* put all mat.pack instructions into the program
-         and make sure that they have contiguous void headed columns 
+         make sure that they have contiguous void headed columns 
        */
        p = getInstrPtr(pmb,0);
        if ( slices->column) 
        for ( k=0 ; k < nrpack; k++) {
                pushInstruction(cmb, pack[k]);
                getArg(pack[k],0)= getArg(p,k);
-/*
-               q= newFcnCall(cmb,partitionRef,markHRef);
-               getArg(q,0) = getArg(p,k); 
-               q= pushArgument(cmb,q, getArg(pack[k],0));
-*/
        }
 
        /* finalize the dataflow block */
@@ -218,7 +213,7 @@ OPTplanCntrl(Client cntxt, MalBlkPtr mb,
        return cmb;
 }
 
-/* prepare slicing a column  by addition over the target */
+/* prepare access to partitions by injection of the materialize instructions */
 static int
 OPTpreparePartition(MalBlkPtr nmb, InstrPtr p, Slices *slices, int pc)
 {
@@ -256,6 +251,9 @@ OPTpreparePartition(MalBlkPtr nmb, Instr
        return parallel;
 }
 
+/*
+ * For bind instructions we have to inject materialize and semijoin 
instructions
+*/
 static int 
 OPTsliceColumn(Client cntxt, MalBlkPtr nmb, MalBlkPtr mb, InstrPtr p, Slices 
*slices, int pc)
 {
@@ -307,12 +305,15 @@ OPTsliceColumn(Client cntxt, MalBlkPtr n
 /* 
  * The plan is analysed for the maximal subplan that involves a partitioned 
table
  * and that does not require data exchanges.
- * This portion is extracted for possibly remote execution.
+ * Algebraic operators that can be executed on fragments are delegated too.
+ * For example join(A,B) where A is fragmented and B is not can be done 
elsewhere.
+ * In all cases we should ensure that the result of the remote execution can be
+ * simply unioned together.
 */
 #define BLOCKED 1
 #define REQUIRED 2
-#define EXPORTED 3
-#define NEEDED 4
+#define SUPPORTIVE 3
+
 static int 
 OPTplanFragment(Client cntxt, MalBlkPtr mb, Slices *slices)
 {
@@ -358,9 +359,7 @@ OPTplanFragment(Client cntxt, MalBlkPtr 
        (void) slices;
 #endif
 
-       /* Phase 1: determine all variables/instructions indirectly dependent 
on a
-          fragmented column
-       */
+       /* Phase 1: determine all variables/instructions indirectly dependent 
on a fragmented column */
        last = limit;
        for ( i = 0; i < limit ; i++) {
                p = old[i];
@@ -379,11 +378,24 @@ OPTplanFragment(Client cntxt, MalBlkPtr 
                if (vars[getArg(p,j)] == BLOCKED) 
                        plan[i] = BLOCKED;
 
-               /* blocking instructions */
+               /* blocking instructions are those that require data exchange 
or total view */
+               if (    getModuleId(p) == algebraRef && getFunctionId(p) == 
joinRef ) {
+                       if (vars[getArg(p,1)] == REQUIRED && vars[getArg(p,2)] 
== REQUIRED)  {
+                               /* not possible to delegate */
+                               plan[i] = BLOCKED;
+                       } else {
+                               /* other variable is supportive */
+                               if (vars[getArg(p,1)] != REQUIRED) 
+                                       vars[getArg(p,1)] = SUPPORTIVE; 
+                               if (vars[getArg(p,2)] != REQUIRED) 
+                                       vars[getArg(p,2)] = SUPPORTIVE; 
+                               plan[i] = SUPPORTIVE;
+                       }
+               } else
                if (    (getModuleId(p) == groupRef && (getFunctionId(p) == 
doneRef || getFunctionId(p) == newRef ||getFunctionId(p) == deriveRef) )  ||
                                getModuleId(p) == pqueueRef || getModuleId(p) 
== aggrRef || getModuleId(p) == ioRef ||
                                (getModuleId(p) == sqlRef && (getFunctionId(p) 
== resultSetRef || getFunctionId(p) == putName("exportValue",11) )) ||
-                               (getModuleId(p) == algebraRef 
&&(getFunctionId(p) == sliceRef || getFunctionId(p) == joinRef || 
getFunctionId(p)==markTRef)) )  {
+                               (getModuleId(p) == algebraRef 
&&(getFunctionId(p) == sliceRef || getFunctionId(p)==markTRef)) )  {
                        /* add the targets of its argument to the output */
                        plan[i] = BLOCKED;
                }
@@ -393,18 +405,50 @@ OPTplanFragment(Client cntxt, MalBlkPtr 
                                vars[getArg(p,j)] = BLOCKED;
                } else {
                        for( j = 0; j < p->argc; j++)
-                       if (vars[getArg(p,j)] == REQUIRED) 
+                       if (vars[getArg(p,j)] == REQUIRED ) 
                                break;
-                       if ( j != p->argc) {
-                               for ( j= 0; j< p->retc; j++)
+                       if ( j != p->argc)
+                               plan[i]= REQUIRED;
+
+                       for( j = 0; j < p->argc; j++)
+                       if (vars[getArg(p,j)] == SUPPORTIVE ) 
+                               break;
+                       if ( j != p->argc && plan[i] != REQUIRED)
+                               plan[i]= SUPPORTIVE;
+
+                       if ( plan[i] == REQUIRED)
+                               for ( j= 0; j< p->argc; j++)
                                        vars[getArg(p,j)] = REQUIRED;
-                               plan[i] = REQUIRED;
-                       }
+                       if ( plan[i] == SUPPORTIVE)
+                               for ( j= 0; j< p->argc; j++)
+                               if ( vars[getArg(p,j)] == 0)
+                                       vars[getArg(p,j)] = SUPPORTIVE;
                }
        }
 
-       /* Phase 2: determine all variables/instructions contributing */
-       mnstr_printf(cntxt->fdout,"#phase 2\n");
+#ifdef _DEBUG_OPT_PARTITION_ 
+       mnstr_printf(cntxt->fdout,"\n#phase 1\n");
+       for( i= 0; i< limit; i++)
+       if (plan[i] ) {
+               switch (plan[i]) {
+               case BLOCKED:
+                       mnstr_printf(cntxt->fdout,"#blocked  ");
+                       break;
+               case REQUIRED:
+                       mnstr_printf(cntxt->fdout,"#required ");
+                       break;
+               case SUPPORTIVE:
+                       mnstr_printf(cntxt->fdout,"#support  ");
+               }
+               if( old[i])
+                       printInstruction(cntxt->fdout, 
mb,0,old[i],LIST_MAL_STMT);
+       }
+#endif
+
+       /* Phase 2: determine all variables/instructions contributing 
+          instructions based on supportive variables remain marked as 
supportive
+          because we have to avoid common ancestor dependency on partitioned 
variables
+       */
        for ( i = limit -1; i >= 0 ; i--)
        if ( plan[i] != BLOCKED ){
                p = old[i];
@@ -412,12 +456,69 @@ OPTplanFragment(Client cntxt, MalBlkPtr 
                if (vars[getArg(p,j)] == REQUIRED) 
                        plan[i] = REQUIRED;
 
-               if( plan[i] == REQUIRED)
-                       for ( j= p->retc; j< p->argc; j++)
+               if ( plan[i] == 0)
+               for( j = 0; j < p->argc; j++)
+               if (vars[getArg(p,j)] == SUPPORTIVE) 
+                       plan[i] = SUPPORTIVE;
+
+               if( plan[i]== REQUIRED )
+                       for ( j= 0; j< p->argc; j++)
                                vars[getArg(p,j)] = REQUIRED;
        }
-       /* Phase 3: determine all variables to be exported */
-       mnstr_printf(cntxt->fdout,"#phase 3\n");
+#ifdef _DEBUG_OPT_PARTITION_ 
+       mnstr_printf(cntxt->fdout,"\n#phase 2\n");
+       for( i= 0; i< limit; i++)
+       if (plan[i] ) {
+               switch (plan[i]) {
+               case BLOCKED:
+                       mnstr_printf(cntxt->fdout,"#blocked  ");
+                       break;
+               case REQUIRED:
+                       mnstr_printf(cntxt->fdout,"#required ");
+                       break;
+               case SUPPORTIVE:
+                       mnstr_printf(cntxt->fdout,"#support  ");
+               }
+               if( old[i])
+                       printInstruction(cntxt->fdout, 
mb,0,old[i],LIST_MAL_STMT);
+       }
+#endif
+       /* Phase 3: turn all supportive instructions into required ones 
+          if it helps to produce the required intermediate
+       */
+       for ( i = 0; i < limit; i++) 
+       if( plan[i] == SUPPORTIVE){
+               p = old[i];
+               for( j = 0; j < p->argc; j++)
+               if (vars[getArg(p,j)] == SUPPORTIVE) 
+                       break;
+               if( j == p->argc)
+                       plan[i] = REQUIRED;
+               else
+                       plan[i] = BLOCKED;
+       }
+#ifdef _DEBUG_OPT_PARTITION_ 
+       mnstr_printf(cntxt->fdout,"\n#phase 3\n");
+       for( i= 0; i< limit; i++)
+       if (plan[i] ) {
+               switch (plan[i]) {
+               case BLOCKED:
+                       mnstr_printf(cntxt->fdout,"#blocked  ");
+                       break;
+               case REQUIRED:
+                       mnstr_printf(cntxt->fdout,"#required ");
+                       break;
+               case SUPPORTIVE:
+                       mnstr_printf(cntxt->fdout,"#support  ");
+               }
+               if( old[i])
+                       printInstruction(cntxt->fdout, 
mb,0,old[i],LIST_MAL_STMT);
+       }
_______________________________________________
Checkin-list mailing list
Checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to