Changeset: 7ec32e256e6a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=7ec32e256e6a
Modified Files:
        monetdb5/optimizer/opt_slicing.mx
Branch: default
Log Message:

TPCH does not crash anymore
Various refinements to make sure that the test set does not crash.
Plans are to be validated, and are still incomplete. Missing
operators to deal with, idxbat, and possibly rounding errors.


diffs (truncated from 430 to 300 lines):

diff --git a/monetdb5/optimizer/opt_slicing.mx 
b/monetdb5/optimizer/opt_slicing.mx
--- a/monetdb5/optimizer/opt_slicing.mx
+++ b/monetdb5/optimizer/opt_slicing.mx
@@ -29,6 +29,15 @@
 address OPTslicing
 comment "Modify the plan to exploit parallel processing on multiple cores";
 
+command sql.slice(b:bat[:any_1,:any_2], low:any_2, high:any_2) 
:bat[:any_1,:any_2]
+address OPTslice
+comment "Implement the slice operation. Throw an exception if the slice was 
empty, 
+because then the subquery should produce a NIL ";
+
+command sql.markH( b:bat[:any_1,:any_2] ) :bat[:oid,:any_2] 
+address OPTmarkHead
+comment "Ignore a NIL bat";
+
 pattern sql.gauges(b:bat[:oid,:any_1]) :any_1...
 address OPTgauges
 comment "Derive a series of gauge values based on sampling";
@@ -42,7 +51,10 @@
 @:exportOptimizer(slicing)@
 
 opt_export str OPTgauges(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
+opt_export str OPTslice(int *result, int *bid, ptr low, ptr high);
+opt_export str OPTmarkHead(int *result, int *bid);
 
+/* #define DEBUG_DETAIL*/
 #define _DEBUG_OPT_SLICE_ 
 #define OPTDEBUGslicing  if ( optDebug & ((lng)1 <<DEBUG_OPT_SLICING) )
 #endif
@@ -88,6 +100,22 @@
     return getArg(p,0);
 }
 
+static void
+OPTcatchcode(MalBlkPtr nmb, int aggrflg){
+       InstrPtr p, q;
+       int i;
+
+       (void) aggrflg;
+       /* in case of aggregates we return empty bats */
+    q = newCatchStmt(nmb,"ANYexception");
+       p = getInstrPtr(nmb,0);
+       for ( i=0; i< p->retc; i++){
+               q = newAssignment(nmb);
+               getArg(q,0) = getArg(p,i);
+               q= pushNil(nmb,q, getArgType(nmb,p,i));
+       }
+       q = newExitStmt(nmb,"ANYexception");
+}
 static MalBlkPtr
 OPTslicingStub(Client cntxt, MalBlkPtr mb, int pc, MalBlkPtr pmb, Gauge gauge)
 {
@@ -126,8 +154,10 @@
        }
 
        /* (k1,...kn):= remote.exec(conn,slicing,qry,version....) */
-       snprintf(nme,BUFSIZ,"%s_sub",getFunctionId( getInstrPtr(mb,0)));
-       q= newFcnCall(smb,remoteRef,execRef);
+       snprintf(nme,BUFSIZ,"%s_qry%d",getFunctionId( getInstrPtr(mb,0)),pc);
+       q = newInstruction(smb,ASSIGNsymbol);
+       getModuleId(q) = remoteRef;
+       getFunctionId(q) = execRef;
        q->retc=  q->argc= 0;
        for (j=0; j < sig->retc; j++){
                arg[j]= newTmpVariable(smb,TYPE_str);
@@ -139,6 +169,7 @@
        /* deal with all arguments ! */
        for (j=sig->retc; j < sig->argc; j++)
                q = pushArgument(smb,q,arg[j]);
+       pushInstruction(smb,q);
 
        /* return exec_qry; */
        ret = newInstruction(smb, ASSIGNsymbol);
@@ -164,7 +195,8 @@
 
     q = newStmt(smb, remoteRef, disconnectRef);
     pushArgument(smb, q, conn);
-       pushInstruction(smb,ret);
+       if ( sig->retc)
+               pushInstruction(smb,ret);
     pushEndInstruction(smb);
 
        GDKfree(arg);
@@ -192,7 +224,8 @@
        if ( newMalBlkStmt(cmb,cmb->ssize) < 0 )
                return 0;
        q= getInstrPtr(mb,pc);
-       if ( getModuleId(q) == groupRef && getFunctionId(q) == doneRef) 
+       if ( getModuleId(q) == groupRef && 
+               ( getFunctionId(q) == doneRef ||getFunctionId(q) == newRef))
                nrpack = 1;
        else
        if ( getModuleId(q) == sqlRef && getFunctionId(q) == resultSetRef) 
@@ -276,9 +309,19 @@
                }
                pushInstruction(cmb,q);
        }
+       /* put all mat.pak instructions into the program
+         and make sure that they have correct void headed columns 
+       */
        if ( gauge.column) 
-       for ( k=0 ; k < nrpack; k++)
+       for ( k=0 ; k < nrpack; k++) {
                pushInstruction(cmb, pack[k]);
+               j = newTmpVariable(cmb,getArgType(cmb,pack[k],0));
+               q= newFcnCall(cmb,sqlRef,markHRef);
+               getArg(q,0) = getArg(pack[k],0);
+               q= pushArgument(cmb,q, j);
+               getArg(pack[k],0) = j;
+               pack[k] = q;
+       }
 
 
        if ( slicingLocal == 0){
@@ -288,14 +331,21 @@
        }
 
        q = copyInstruction(getInstrPtr(mb,pc));
-       if ( getFunctionId(q) == countRef) {
+       if ( getFunctionId(q) == countRef || getFunctionId(q) == sumRef ){
                getFunctionId(q) = sumRef;
                getArg(q,1) = getArg(pack[0],0);
        } else 
+       if ( getFunctionId(q) == minRef || getFunctionId(q) == maxRef) {
+               getArg(q,1) = getArg(pack[0],0);
+       } else 
+       if ( getFunctionId(q) == avgRef)
+               assert(0);
+       else
        if ( getFunctionId(q) == putName("exportValue",11)) 
                getArg(q,8)= getArg(pack[0],0);
        else
-       if ( getModuleId(q) == groupRef && getFunctionId(q) == doneRef) {
+       if ( getModuleId(q) == groupRef && 
+               ( getFunctionId(q) == doneRef || getFunctionId(q) == newRef)) {
                /* just return the argument */
                clrFunction(q);
                q->argc = q->retc = 1;
@@ -370,7 +420,7 @@
 }
 
 static InstrPtr
-OPTcodegen(Client cntxt, MalBlkPtr mb, int pc, Gauge gauge )
+OPTcodegen(Client cntxt, MalBlkPtr mb, int pc, Gauge gauge, int aggrflag)
 {
        char *vec = 0;
        int parallel = 0, last, i, k, limit;
@@ -427,7 +477,8 @@
                                if ( getFunctionId(old[i]) == 
putName("exportValue",11)) {
                                        ret = pushReturn(nmb, ret, getArg(p,0));
                                } else 
-                               if ( getModuleId(p)== groupRef && 
getFunctionId(p) == doneRef){
+                               if ( getModuleId(p)== groupRef && 
+                                       ( getFunctionId(p) == doneRef || 
getFunctionId(p) == newRef)){
                                        /* return alternative */
                                        ret = 
pushReturn(nmb,ret,getArg(p,p->retc));
                                        nmb->stmt[0] =  pushReturn(nmb, 
getInstrPtr(nmb,0), getArg(p,p->retc));
@@ -462,8 +513,8 @@
                                /* add the slice operation */
                                parallel = 1;
                                q= newInstruction(nmb,ASSIGNsymbol);
-                               setModuleId(q,algebraRef);
-                               setFunctionId(q,selectRef);
+                               setModuleId(q,sqlRef);
+                               setFunctionId(q,sliceRef);
                                gauge.slice = getArg(p,0);
                                k = newTmpVariable(nmb, getVarType(nmb, 
getArg(p,0)));
                                setVarUDFtype(nmb,k);
@@ -474,8 +525,6 @@
                                q= pushArgument(nmb,q, getArg(p,0));
                                q= pushArgument(nmb,q, gauge.lgauge);
                                q= pushArgument(nmb,q, gauge.hgauge);
-                               q= pushBit(nmb,q, TRUE);
-                               q= pushBit(nmb,q, FALSE);
                                pushInstruction(nmb,q);
                        } else {
                                /* sideways projection the range selection */
@@ -492,12 +541,13 @@
                                pushInstruction(nmb,q);
                        }
                } else if ( p->token == ENDsymbol) {
+                       OPTcatchcode(nmb, aggrflag);
                        pushInstruction(nmb,ret);
                        pushEndInstruction(nmb);
                        continue;
                }
                else if ( getModuleId(p) == optimizerRef)
-               pushInstruction(nmb,old[i]);
+                       pushInstruction(nmb,old[i]);
                else {
                        freeInstruction(old[i]);
                        old[i] = 0;
@@ -532,12 +582,12 @@
                goto wrapup;
 
        cmb = OPTslicingCntrl(cntxt,mb,pc,nmb, gauge);
+       chkProgram(cntxt->nspace, cmb);
 #ifdef DEBUG_DETAIL
-       mnstr_printf(cntxt->fdout,"#cntrl,BEFORE test? %d\n", cmb->errors);
+       mnstr_printf(cntxt->fdout,"#cntrl,AFTER test? %d\n", cmb->errors);
        if ( cmb)
                printFunction(cntxt->fdout, cmb, 0, LIST_MAL_STMT);
 #endif
-       chkProgram(cntxt->nspace, cmb);
        if ( cmb->errors)
                goto wrapup;
 
@@ -581,16 +631,31 @@
        return q;
 }
 
+static void
+remapVariable( MalBlkPtr mb, int i, int old, int new)
+{
+       InstrPtr p;
+       int k;
+
+       for( ; i< mb->stop; i++){
+               p= getInstrPtr(mb,i);
+               for ( k = 0; k < p->argc; k++)
+               if ( getArg(p,k) == old)
+                       getArg(p,k) = new;
+       }
+}
+
 static int
 OPTslicingImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci)
 {
-       int i,j,pc = 0;
+       int i,j,k,pc = 0;
        wrd r = 0, rowcnt=0;    /* table should be sizeable to consider 
parallel execution*/
-       InstrPtr sig,p,q, target= 0, rsset=0;
+       InstrPtr p,q, target= 0, rsset=0;
        Gauge gauge;
        str msg = NULL;
        MalBlkPtr  omb;
        char *vec;
+       int     *alias;
 
        (void)cntxt;
        (void) stk;
@@ -603,6 +668,7 @@
 
        /* modify the block as we go */
        omb= copyMalBlk(mb);
+       memset((char*) &gauge,0, sizeof(gauge));
        gauge.column = 0;
        /* locate the largest non-partitioned table */
        for (i=1; i< mb->stop; i++){
@@ -660,41 +726,54 @@
                                getVarConstant(mb, 
getArg(gauge.target,3)).val.sval,
                                rowcnt, nrpeers);
 
+       /* reserve space for all exisiting and new variables */
+       alias= (int*) GDKzalloc(sizeof(int) * 2 * mb->vsize);
+
        for(i=0; i < mb->stop; i++){
                p= getInstrPtr(mb,i);
 
 
                if ( getModuleId(p) == sqlRef && getFunctionId(p) == 
resultSetRef) {
                        mb->stmt[i]  = rsset;
-                       q = OPTcodegen(cntxt, mb, i, gauge);
+                       q = OPTcodegen(cntxt, mb, i, gauge, FALSE);
 
                        if ( q ) {
+                               clrFunction(rsset);
+#ifdef DEBUGDETAILS
                                mnstr_printf(cntxt->fdout, "#codegenerated 
resultSet constructor\n");
                                printInstruction(cntxt->fdout, mb, 0, q, 
LIST_MAL_STMT);
+#endif
                                getModuleId(rsset) = userRef;
                                getFunctionId(rsset) = getFunctionId(q);
                                delArgument(rsset,0);
                                /* now drop the column arguments added and pass 
the additional argument */
                                rsset->argc = rsset->retc;
+                               getArg(p,3) = getArg(rsset,0);
+                               /* add original arguments to cntrl call */
+                               q = getInstrPtr(mb,0);
+                               for( j = q->retc; j < q->argc; j++)
+                                       mb->stmt[i] = pushArgument(mb, 
mb->stmt[i], getArg(q,j));
                                insertInstruction(mb,p,i+1);
+                               /* remap the other result variables as well */
+                               for ( k = 0; k < rsset->retc; k++) {
+                                       j = newTmpVariable(mb, 
getArgType(mb,rsset,k));
+                                       remapVariable(mb, i, getArg(rsset,k), 
j);
+                                       getArg(rsset,k) = j;
+                               }
                                i++;
-                               /* pass the query arguments */
-                               q= getInstrPtr(mb,0);
-                               for ( j = q->retc; j < q->argc; j++)
-                                       rsset = pushArgument(mb,rsset, 
getArg(q,j));
-                               getArg(p,3) = getArg(rsset,0);
                                msg = OPTdeadcode(cntxt, mb, 0, 0);
                                if ( msg )
                                        mnstr_printf(cntxt->fdout,"codegen 
deadcode %s\n",msg);
                        } else {
                                mb->stmt[i] = p;
_______________________________________________
Checkin-list mailing list
Checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to