Changeset: 67e0e5c1d701 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=67e0e5c1d701 Modified Files: MonetDB5/src/mal/mal_function.mx MonetDB5/src/mal/mal_instruction.mx MonetDB5/src/optimizer/opt_tarantula.mx Branch: default Log Message:
Next step in tarantula Variables created but used outside a leg should be returned to the caller. One of them should be used in the remainder of the plan. More refinements of such parameter passing is needed. diffs (truncated from 484 to 300 lines): diff -r 817a64af4505 -r 67e0e5c1d701 MonetDB5/src/mal/mal_function.mx --- a/MonetDB5/src/mal/mal_function.mx Wed Aug 11 21:23:12 2010 +0200 +++ b/MonetDB5/src/mal/mal_function.mx Wed Aug 11 22:55:27 2010 +0200 @@ -88,7 +88,7 @@ mal_export int getBlockExit(MalBlkPtr mb,int pc); mal_export int getBlockBegin(MalBlkPtr mb,int pc); -#define newLifespan(M) (Lifespan)GDKmalloc(sizeof(LifespanRecord)*(M)->vsize) +#define newLifespan(M) (Lifespan)GDKzalloc(sizeof(LifespanRecord)*(M)->vsize) mal_export Lifespan setLifespan(MalBlkPtr mb); mal_export void malGarbageCollector(MalBlkPtr mb); diff -r 817a64af4505 -r 67e0e5c1d701 MonetDB5/src/mal/mal_instruction.mx --- a/MonetDB5/src/mal/mal_instruction.mx Wed Aug 11 21:23:12 2010 +0200 +++ b/MonetDB5/src/mal/mal_instruction.mx Wed Aug 11 22:55:27 2010 +0200 @@ -2099,8 +2099,7 @@ InstrPtr pushReturn(MalBlkPtr mb, InstrPtr p, int varid) { - assert(p->retc); - if (p->argv[0] == -1) { + if (p->retc == 1 && p->argv[0] == -1) { p->argv[0] = varid; return p; } diff -r 817a64af4505 -r 67e0e5c1d701 MonetDB5/src/optimizer/opt_tarantula.mx --- a/MonetDB5/src/optimizer/opt_tarantula.mx Wed Aug 11 21:23:12 2010 +0200 +++ b/MonetDB5/src/optimizer/opt_tarantula.mx Wed Aug 11 22:55:27 2010 +0200 @@ -37,8 +37,11 @@ The tarantula untangles the resulting plan into a controlling head function and a series of plans, one for each leg to execute concurrently. -The allocation of subplan to leg depends on the same bidding scheme -as the Octopus. +The allocation of a subplan to leg depends on a bidding scheme. +Bidding can not depend on BAT arguments, because that would cause +significant communication overhead. Scalar values could be used and +would function well in terms of using the recycler to get involved into +precise bidding. The Tarantula optimizer differs from the Octopus optimizer in performing a recursive divide and conquer method to deal with blocking operations. @@ -191,6 +194,7 @@ opt_export int TARgetPeer(str uri); +#define MINLEGSIZE 5 /* number of MAL instructions to consider for a leg */ #define MAXSITES 2048 /* should become dynamic at some point */ Peer peers[MAXSITES]; /* registry of peer servers */ int TARnrpeers=0; @@ -383,31 +387,37 @@ @- The TARmakeleg walks through the MAL block and extracts the dependent structure for execution. Note that information van be recomputed in all legs. Possibly doing duplicate work. +Keep track of the variables that will be used outside the leg. They have to be exported. +Likewise, look for variables that are produced by other legs and will become input parameters. @c static MalBlkPtr -TARmakeleg(Client cntxt, MalBlkPtr mb, InstrPtr *old, int pc, int idx) +TARmakeleg(Client cntxt, MalBlkPtr mb, InstrPtr *old, int pc, int idx, Lifespan span, int *map) { MalBlkPtr tm = NULL; - InstrPtr p = NULL,sig; - int varid, i, j, top=0, fnd, block = 0, retvar; + InstrPtr p = NULL; + int i, j, top=0, fnd, block = 0; char buf[BUFSIZ]; InstrPtr *list = (InstrPtr*) GDKzalloc(sizeof(InstrPtr) * mb->ssize); char *needed= (char*) GDKzalloc(mb->vtop); int *alias= (int*) GDKzalloc(mb->vtop * sizeof(int)); + int itop = 0, *input = ( int*) GDKzalloc(mb->vtop * sizeof(int)); + int otop = 0, *output = ( int*) GDKzalloc(mb->vtop * sizeof(int)); Symbol s; assert(old[pc]->fcnname == packRef); - retvar = getArg(old[pc],idx); OPTDEBUGtarantula { mnstr_printf(cntxt->fdout,"#create leg for %d %d %d\n",pc,idx -old[pc]->retc, getArg(old[pc],idx)); } - @- Find the instructions needed. Beware pack operations already have been handled and produce their target. +Build the variable admin table. @c needed[getArg(old[pc],idx)]=1; + output[otop++]= getArg(old[pc],idx); + for ( j = old[0]->retc; j< old[0]->argc; j++) + input[itop++]= getArg(old[0],j); for (i = pc-1; i > 0; i--){ p = old[i]; @@ -424,14 +434,41 @@ } for (j = 0; j < p->retc; j++) fnd += needed[getArg(p,j)]; - if ( fnd) { - for (j = 0; j < p->argc; j++) + if ( fnd) { /* instruction is needed */ + for (j = 0; j < p->argc; j++){ + assert(getArg(p,j) >=0); needed[getArg(p,j)]= 1; + if ( getEndLifespan(span, getArg(p,j)) > pc) + output[otop++]= getArg(p,j); + if ( map[getArg(p,j)] != getArg(p,j) ) + input[itop++] = getArg(p,j); + } list[top++] = p; } } +...@- +The leg should have enough instructions to warrant a distributed execution. This should involve +a careful analysis of the instructions assembled. For the time being, we only allow for a leg +if at least a sql.bind operation belongs to the list +...@c + fnd = 0; + for( i=0; i<top; i++) { + fnd += getModuleId(list[i]) == sqlRef && getFunctionId(list[i]) == bindRef; + fnd += getModuleId(list[i]) == sqlRef && getFunctionId(list[i]) == bindidxRef; + fnd += getModuleId(list[i]) == sqlRef && getFunctionId(list[i]) == binddbatRef; + } - /* create the Leg function signature */ + if ( top - fnd <= MINLEGSIZE) { + GDKfree(list); + GDKfree(alias); + GDKfree(needed); + GDKfree(input); + GDKfree(output); + return 0; + } +...@- +Create the leg function. +...@c snprintf(buf,BUFSIZ,"%s_%d_%d", getFunctionId(getInstrPtr(mb,0)), getArg(old[pc],0), idx-old[pc]->retc); putName(buf,strlen(buf)); @@ -439,42 +476,40 @@ insertSymbol(findModule(cntxt->nspace,tarantulaRef),s); tm= s->def; p = getInstrPtr(tm,0); - setVarType(tm, getArg(p,0), getVarType(mb,retvar)); - setVarUDFtype(tm,getArg(p,0)); - alias[retvar] = getArg(p,0); + + /* add the return variables */ + p->retc = 0; + p->argc = 0; + for ( i = 0; i< otop; i++){ + alias[output[i]] = cloneVariable(tm,mb,output[i]); + p = pushReturn(tm, p, alias[output[i]]); + setVarUDFtype(tm,getArg(p,i)); + } /* add the arguments from the query template */ - sig= getInstrPtr(mb,0); - for( j =sig->retc; j<sig->argc; j++){ - int a= getArg(sig,j); - alias[a] = cloneVariable(tm,mb,a); - p = pushArgument(tm, p, alias[a]); + for ( i = 0; i< itop; i++){ + alias[input[i]] = cloneVariable(tm,mb,input[i]); + p = pushArgument(tm, p, alias[input[i]]); } - varid = alias[getArg(old[pc],idx)] = cloneVariable(tm,mb,getArg(old[pc],idx)); + /* include the necessary functions */ for (top--; top >= 0; top--){ p = copyInstruction(list[top]); for (i= 0; i< p->argc; i++){ - int a= getArg(list[top],i); + int a= getArg(p,i); if (alias[a]==0) alias[a] = cloneVariable(tm,mb,a); getArg(p,i) = alias[a]; } - if (getModuleId(p) == sqlRef && - ( getFunctionId(p) == bindRef || - getFunctionId(p) == binddbatRef || - getFunctionId(p) == bindidxRef)){ - /* construct the path - setModuleId(p, attachRef); - */ - setVarUDFtype(tm,getArg(p,0)); - } pushInstruction(tm, p); } + /* return all variables of interest */ p = newAssignment(tm); p->barrier = RETURNsymbol; - p = pushArgument(tm,p,alias[retvar]); - getArg(p,0) = alias[varid]; + p->retc= 0; + p->argc= 0; + for ( i = 0; i < otop; i++) + p = pushReturn(tm,p, alias[output[i]]); pushEndInstruction(tm); clrDeclarations(tm); chkProgram(cntxt->nspace,tm); @@ -484,6 +519,8 @@ GDKfree(list); GDKfree(alias); GDKfree(needed); + GDKfree(input); + GDKfree(output); return tm; } @@ -583,13 +620,13 @@ assembled using a pack and returned to the caller. @c static void -TARmakeExecution(Client cntxt, MalBlkPtr mb, InstrPtr *old, int pc) +TARmakeExecution(Client cntxt, MalBlkPtr mb, InstrPtr *old, int pc, MalBlkPtr leg) { Symbol s; MalBlkPtr tm; char fcn[BUFSIZ]; - InstrPtr sig2, sig, r, q; - int conn,j,k = 0,l; + InstrPtr lsig,sig, r, q; + int conn,j,l; int arg[1024]; /* generate stubb code for the remote execution */ @@ -598,14 +635,19 @@ insertSymbol(findModule(cntxt->nspace,tarantulaRef),s); tm= s->def; sig = getInstrPtr(tm,0); - setVarType(tm,getArg(sig,0), getArgType(mb, old[pc],0)); + + /* add the return values */ + lsig= getInstrPtr(leg,0); + setVarType(tm, getArg(sig,0), getVarType(leg,getArg(lsig,0))); + for ( j=1;j < lsig->retc; j++) + sig= pushReturn(tm, sig, cloneVariable(tm, leg, getArg(lsig,j))); + + /* get the input arguments */ sig = pushArgument(tm,sig,newVariable(tm,GDKstrdup("node"),TYPE_int)); sig = pushArgument(tm,sig,newVariable(tm,GDKstrdup("fcn"),TYPE_str)); - sig2 = getInstrPtr(mb,0); /* copy the query arguments */ - for( j =sig2->retc; j<sig2->argc; j++){ - sig = pushArgument(tm,sig,cloneVariable(tm,mb,getArg(sig2,j))); - } + for ( j=lsig->retc; j < lsig->argc; j++) + sig= pushArgument(tm, sig, cloneVariable(tm, leg, getArg(lsig,j))); /* conn := tarantula.connect(node); */ @@ -614,29 +656,25 @@ q = pushArgument(tm, q, getArg(sig,1)); /* get addition arguments needed in a leg */ - for (j= sig->retc+2; j< sig->argc; j++){ - l= newTmpVariable(tm,TYPE_str); + /* k:= remote.put(conn,kvar) */ + for (j= 0; j < sig->retc; j++) + if ( j != sig->retc && j != sig->retc+1 ){ q= newFcnCall(tm,remoteRef,putRef); - arg[j]= getArg(q,0)= l; - q = pushArgument(tm,q, conn); - q = pushArgument(tm,q,findVariable(tm,getArgName(mb,sig2,j-2))); + setVarType(tm, getArg(q,0), TYPE_str); + setVarUDFtype(tm, getArg(q,0)); + q= pushArgument(tm,q,conn); + q= pushArgument(tm,q,getArg(sig,j)); + arg[j]= getArg(q,0); } - /* k:= remote.put(conn,kvar) */ - for (j= 0; j < sig->retc; j++){ - q= newFcnCall(tm,remoteRef,putRef); - q= pushArgument(tm,q,conn); - q= pushArgument(tm,q, getArg(getInstrPtr(tm,0),j)); - setVarUDFtype(tm,getArg(q,q->argc-1)); - k= getArg(q,0); - } - - /* k:= remote.exec(conn,tarantula,qry,version....) */ + /* (k1,...kn):= remote.exec(conn,tarantula,qry,version....) */ q= newFcnCall(tm,remoteRef,execRef); - getArg(q,0) = k; + q->retc= q->argc= 0; + for (j=0; j < sig->retc; j++) + q = pushReturn(tm,q,arg[j]); q= pushArgument(tm,q,conn); q= pushStr(tm,q,tarantulaRef); - q= pushArgument(tm,q,getArg(sig,2)); + q= pushArgument(tm,q,getArg(sig,sig->retc+1)); /* deal with all arguments ! */ for (j=sig->retc+2; j < sig->argc; j++) q = pushArgument(tm,q,arg[j]); @@ -645,29 +683,30 @@ /* return exec_qry; */ r= newInstruction(tm, ASSIGNsymbol); r->barrier= RETURNsymbol; + r->retc= r->argc= 0; _______________________________________________ Checkin-list mailing list Checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list