Changeset: 6e5c0175c0fd for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=6e5c0175c0fd Modified Files: monetdb5/optimizer/opt_slicing.mx Branch: default Log Message:
Get resultset code correct. Slicing now deals with simple aggregates, export single values, and multicolumn projection. diffs (truncated from 508 to 300 lines): diff --git a/monetdb5/optimizer/opt_slicing.mx b/monetdb5/optimizer/opt_slicing.mx --- a/monetdb5/optimizer/opt_slicing.mx +++ b/monetdb5/optimizer/opt_slicing.mx @@ -42,7 +42,7 @@ opt_export str OPTgauges(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); -/* #define _DEBUG_OPT_SLICE_ */ +#define _DEBUG_OPT_SLICE_ #define OPTDEBUGslicing if ( optDebug & ((lng)1 <<DEBUG_OPT_SLICING) ) #endif @c @@ -50,29 +50,9 @@ #include "opt_slicing.h" #include "opt_deadcode.h" #include "mal_builder.h" -#include <mapi.h> -#include "remote.h" -#include "mal_sabaoth.h" #include "mal_recycle.h" - #include "mal_interpreter.h" -typedef struct REGMAL{ - str fcn; - struct REGMAL *nxt; -} *Registry; - -typedef struct { - str uri; - str usr; - str pwd; - Registry nxt; /* list of registered mal functions */ - bte active; - str conn; - int inuse; -} Peer; - -static Peer peers[MAXSITES]; /* registry of peer servers */ static int nrpeers; /* peers active in sliced processing */ static bte slicingLocal; /* only use local node without remote calls*/ @@ -85,110 +65,6 @@ } Gauge; static int -OPTfindPeer(str uri) -{ - int i; - for (i = 0; i < nrpeers; i++) - if ( strcmp(uri, peers[i].uri) == 0 ) - return i; - return -1; -} -/* Look for and add a peer with uri in the registry. Return index in registry */ -int -OPTgetPeer(str uri) -{ - int i; - - i = OPTfindPeer(uri); - if ( i >=0 ) { - peers[i].active = 1; - return i; - } - if ( nrpeers == MAXSITES) - return -1; - i = nrpeers; - peers[i].usr = GDKstrdup("monetdb"); - peers[i].uri = GDKstrdup(uri); - peers[i].pwd = GDKstrdup("monetdb"); - peers[i].active = 1; - peers[i].nxt = NULL; - peers[i].inuse = 0; - nrpeers++; - return i; -} - -/* Clean function registry of non-active peers */ - -void OPTcleanFunReg(int i) -{ - Registry r, q; - mal_set_lock(mal_contextLock,"slicing.cleanFunReg"); - r = peers[i].nxt; - peers[i].nxt = NULL; - mal_unset_lock(mal_contextLock,"slicing.cleanFunReg"); - while ( r ) { - q = r->nxt; - GDKfree(r->fcn); - GDKfree(r); - r = q; - } -} - -str -OPTdiscover(Client cntxt) -{ - bat bid = 0; - BAT *b; - BUN p,q; - str msg = MAL_SUCCEED; - BATiter bi; - char buf[BUFSIZ]= "*/slicing", *s= buf; - int i, nrworkers = 0; - - slicingLocal = 0; - - /* we have a new list of candidate peers */ - for (i=0; i<nrpeers; i++) - peers[i].active = 0; - - msg = RMTresolve(&bid,&s); - if ( msg == MAL_SUCCEED) { - b = BATdescriptor(bid); - if ( b != NULL && BATcount(b) > 0 ) { - bi = bat_iterator(b); - BATloop(b,p,q){ - str t= (str) BUNtail(bi,p); - nrworkers += OPTgetPeer(t) >= 0; - } - } - BBPreleaseref(bid); - } else - GDKfree(msg); - - if ( !nrworkers ) { - /* there is a last resort, local execution */ - SABAOTHgetLocalConnection(&s); - - nrworkers += OPTgetPeer(s) >= 0; - slicingLocal = 1; - } - -#ifdef DEBUG_RUN_OPT - mnstr_printf(cntxt->fdout,"Active peers discovered %d\n",nrworkers); - for (i=0; i<nrpeers; i++) - if ( peers[i].uri ) - mnstr_printf(cntxt->fdout,"%s\n", peers[i].uri); -#else - (void) cntxt; -#endif - - for (i=0; i<nrpeers; i++) - if ( !peers[i].active ) - OPTcleanFunReg(i); - - return MAL_SUCCEED; -} -static int OPTinitcode(Client cntxt, MalBlkPtr mb){ InstrPtr p; str s; @@ -199,8 +75,9 @@ /* _x := remote.connect(uri,"monetdb","monetdb","msql"); */ p = newStmt(mb, remoteRef,connectRef); s = GDKgetenv("merovingian_uri"); - if (s == NULL) /* aparently not under Merovingian control, fall back to local only */ - SABAOTHgetLocalConnection(&l); + if (s == NULL) /* aparently not under Merovingian control, fall back to local only */ + s= "dummyconnection"; + /* SABAOTHgetLocalConnection(&l);*/ p= pushStr(mb,p, s == NULL ? l : s); p= pushStr(mb,p,"monetdb"); p= pushStr(mb,p,"monetdb"); @@ -300,7 +177,7 @@ Symbol s; char nme[BUFSIZ]; int /*tpe,*/ x, i, j, k, *alias, nrpack; - InstrPtr call, ret, q, *pack; + InstrPtr call, ret, p, q, *pack; (void) gauge; /* define the query controller */ @@ -313,7 +190,14 @@ cmb = s->def; if ( newMalBlkStmt(cmb,cmb->ssize) < 0 ) return 0; - nrpack = getInstrPtr(mb,pc)->retc; + q= getInstrPtr(mb,pc); + if ( getModuleId(q) == groupRef && getFunctionId(q) == doneRef) + nrpack = 1; + else + if ( getModuleId(q) == sqlRef && getFunctionId(q) == resultSetRef) + nrpack = q->retc -1; + else + nrpack = q->retc; pack = (InstrPtr *) GDKzalloc(sizeof(InstrPtr) * nrpack); pushInstruction(cmb, copyInstruction(pmb->stmt[0])); call = copyInstruction(getInstrPtr(pmb,0)); @@ -331,6 +215,7 @@ getFunctionId(q) = bindRef; q = pushArgument(cmb,q,x); j = getArg(q,0) = newTmpVariable(cmb,newBatType(TYPE_oid, gauge.type)); + setVarUDFtype(cmb,j); q= pushStr(cmb,q, gauge.schema); q= pushStr(cmb,q, gauge.table); q= pushStr(cmb,q, gauge.column); @@ -349,22 +234,23 @@ } /* dataflow */ if ( gauge.column) { - for ( k = 0; k < nrpack ; k++){ + p = getInstrPtr(pmb,0); + for ( k=0 ;k < nrpack ; k++){ pack[k] = newInstruction(cmb,ASSIGNsymbol); getModuleId(pack[k]) = matRef; getFunctionId(pack[k]) = packRef; - getArg(pack[k],0) = newTmpVariable(cmb, newBatType(TYPE_oid, getTailType(getArgType(cmb,getInstrPtr(pmb,0),k))) ); + getArg(pack[k],0) = newTmpVariable(cmb, newBatType(TYPE_oid, getTailType(getArgType(cmb,p,k))) ); } } - for ( k = 0; k < nrpack ; k++){ - q = newInstruction(cmb,ASSIGNsymbol); - getArg(q,0) = getArg(pack[k],0); - pushNil(cmb,q, getArgType(cmb,pack[k],0)); - pushInstruction(cmb,q); - } + if ( slicingLocal == 0){ + for ( k=0 ; k < nrpack ; k++){ + q = newInstruction(cmb,ASSIGNsymbol); + getArg(q,0) = getArg(pack[k],0); + pushNil(cmb,q, getArgType(cmb,pack[k],0)); + pushInstruction(cmb,q); + } - if ( slicingLocal == 0){ q= newFcnCall(cmb,languageRef,dataflowRef); q->barrier= BARRIERsymbol; x = getArg(q,0); @@ -377,8 +263,8 @@ q= copyInstruction(call); q->token = ASSIGNsymbol; q->barrier = 0; - for ( k = 0; k < q->retc; k++) { - getArg(q,k) = newTmpVariable(cmb, getArgType(mb,q,k)); + for ( k=0 ; k < q->retc; k++) { + getArg(q,k) = newTmpVariable(cmb, getArgType(pmb,q,k)); pack[k] = pushArgument(cmb,pack[k], getArg(q,k)); } @@ -389,7 +275,7 @@ pushInstruction(cmb,q); } if ( gauge.column) - for ( k = 0; k < nrpack; k++) + for ( k=0 ; k < nrpack; k++) pushInstruction(cmb, pack[k]); @@ -407,6 +293,21 @@ if ( getFunctionId(q) == putName("exportValue",11)) getArg(q,8)= getArg(pack[0],0); else + if ( getModuleId(q) == groupRef && getFunctionId(q) == doneRef) { + /* just return the argument */ + clrFunction(q); + q->argc = q->retc = 1; + q = pushArgument(cmb,q, getArg(pack[0],0)); + getArg(q,0) = getArg(q,1); + } else + if ( getModuleId(q) == sqlRef && getFunctionId(q) == resultSetRef) { + p = newInstruction(NULL,ASSIGNsymbol); + for ( k = q->retc +3; k< q->argc; k++){ + p = pushReturn(cmb,p,getArg(q,k)); + p = pushArgument(cmb,p, getArg(pack[k - q->retc-3], 0)); + } + q= p; + } else if ( getFunctionId(q) == NULL){ /* simple assignment */ assert(nrpack == q->retc); @@ -511,13 +412,24 @@ p = old[i]; if ( vec[i]) { if ( i == last) { - if ( getFunctionId(old[i]) == rsColumnRef) { - ret = pushReturn(nmb, ret, getArg(p,7)); - nmb->stmt[0] = pushReturn(nmb, getInstrPtr(nmb,0), getArg(p,7)); + if ( getFunctionId(old[i]) == resultSetRef) { + for ( k = p->retc +3; k < p->argc; k++) { + ret = pushReturn(nmb, ret, getArg(p,k)); + nmb->stmt[0] = pushReturn(nmb, getInstrPtr(nmb,0), getArg(p,k)); + } + p->argc = p->retc + 3; + for ( k = p->retc-1; k > 0 ; k--) + delArgument(p,1); continue; - } else + } else if ( getFunctionId(old[i]) == putName("exportValue",11)) { ret = pushReturn(nmb, ret, getArg(p,0)); + } else + if ( getModuleId(p)== groupRef && getFunctionId(p) == doneRef){ + /* return alternative */ + ret = pushReturn(nmb,ret,getArg(p,p->retc)); + nmb->stmt[0] = pushReturn(nmb, getInstrPtr(nmb,0), getArg(p,p->retc)); + continue; } else { _______________________________________________ Checkin-list mailing list Checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list