Changeset: 85ab277c39bc for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=85ab277c39bc Modified Files: gdk/gdk_mapreduce.c gdk/gdk_mapreduce.h monetdb5/modules/mal/joinpath.c monetdb5/optimizer/opt_joinpath.c monetdb5/optimizer/opt_pipes.c monetdb5/optimizer/opt_prelude.c monetdb5/optimizer/opt_prelude.h monetdb5/optimizer/optimizer.mal Branch: default Log Message:
Merging diffs (292 lines): diff --git a/gdk/gdk_mapreduce.c b/gdk/gdk_mapreduce.c --- a/gdk/gdk_mapreduce.c +++ b/gdk/gdk_mapreduce.c @@ -45,6 +45,7 @@ static MT_Sema mrqsema; /* threads wait static void MRworker(void *); +/* There is just a single queue for the workers */ static void MRqueueCreate(int sz) { @@ -54,14 +55,21 @@ MRqueueCreate(int sz) MT_lock_init(&mrqlock, "q_create"); MT_lock_set(&mrqlock, "q_create"); MT_sema_init(&mrqsema, 0, "q_create"); - sz = ((sz << 1) >> 1); /* we want a multiple of 2 */ + if ( mrqueue ) { + GDKerror("One map-reduce queue allowed"); + return; + } + sz *= 2; mrqueue = (MRqueue *) GDKzalloc(sizeof(MRqueue) * sz); - assert(mrqueue); + if ( mrqueue == 0) { + GDKerror("Could not create the map-reduce queue"); + return; + } mrqsize = sz; mrqlast = 0; /* create a worker thread for each core as specified as system parameter */ for (i = 0; i < GDKnr_threads; i++) - MT_create_thread(&tid, MRworker, (void *) 0, MT_THR_JOINABLE); + MT_create_thread(&tid, MRworker, (void *) 0, MT_THR_DETACHED); MT_lock_unset(&mrqlock, "q_create"); } diff --git a/gdk/gdk_mapreduce.h b/gdk/gdk_mapreduce.h --- a/gdk/gdk_mapreduce.h +++ b/gdk/gdk_mapreduce.h @@ -20,8 +20,6 @@ #ifndef _GDK_MAPREDUCE_H_ #define _GDK_MAPREDUCE_H_ -#include <monet_options.h> - typedef struct { MT_Sema *sema; /* micro scheduler handle */ void (*cmd) (void *); /* the function to be executed */ diff --git a/monetdb5/modules/mal/joinpath.c b/monetdb5/modules/mal/joinpath.c --- a/monetdb5/modules/mal/joinpath.c +++ b/monetdb5/modules/mal/joinpath.c @@ -208,6 +208,9 @@ ALGjoinPathBody(Client cntxt, int top, B break; case 2: b = BATsemijoin(joins[j], joins[j + 1]); + break; + case 3: + b = BATleftfetchjoin(joins[j], joins[j + 1], BATcount(joins[j])); } if (b==NULL){ if ( postpone[j] && postpone[j+1]){ @@ -272,9 +275,10 @@ ALGjoinPath(Client cntxt, MalBlkPtr mb, int i,*bid,top=0; int *r = (int*) getArgReference(stk, pci, 0); BAT *b, **joins = (BAT**)GDKmalloc(pci->argc*sizeof(BAT*)); + int error = 0; str joinPathRef = putName("joinPath",8); + str semijoinPathRef = putName("semijoinPath",12); str leftjoinPathRef = putName("leftjoinPath",12); - int error = 0; if ( joins == NULL) throw(MAL, "algebra.joinPath", MAL_MALLOC_FAIL); @@ -304,7 +308,17 @@ ALGjoinPath(Client cntxt, MalBlkPtr mb, fprintf(stderr,"#joinpath %s\n", ps ? ps : ""); GDKfree(ps); } - b= ALGjoinPathBody(cntxt,top,joins, (getFunctionId(pci)== joinPathRef?1: (getFunctionId(pci) == leftjoinPathRef? 0:2))); + if ( getFunctionId(pci) == joinPathRef) + b= ALGjoinPathBody(cntxt,top,joins, 1); + else + if ( getFunctionId(pci) == leftjoinPathRef) + b= ALGjoinPathBody(cntxt,top,joins, 0); + else + if ( getFunctionId(pci) == semijoinPathRef) + b= ALGjoinPathBody(cntxt,top,joins, 2); + else + b= ALGjoinPathBody(cntxt,top,joins, 3); + GDKfree(joins); if ( b) BBPkeepref( *r = b->batCacheid); diff --git a/monetdb5/optimizer/opt_joinpath.c b/monetdb5/optimizer/opt_joinpath.c --- a/monetdb5/optimizer/opt_joinpath.c +++ b/monetdb5/optimizer/opt_joinpath.c @@ -52,9 +52,6 @@ static int OPTjoinSubPath(Client cntxt, MalBlkPtr mb) { int i,j,k,top=0, actions =0; - str joinPathRef = putName("joinPath",8); - str leftjoinPathRef = putName("leftjoinPath",12); - str semijoinPathRef = putName("semijoinPath",12); InstrPtr q = NULL, p, *old; int limit, slimit; Candidate *candidate; @@ -68,7 +65,7 @@ OPTjoinSubPath(Client cntxt, MalBlkPtr m limit= mb->stop; slimit= mb->ssize; for(i=0, p= getInstrPtr(mb, i); i< limit; i++, p= getInstrPtr(mb, i)) - if ( getFunctionId(p)== joinPathRef || getFunctionId(p)== leftjoinPathRef || getFunctionId(p) == semijoinPathRef) + if ( getFunctionId(p)== joinPathRef || getFunctionId(p)== leftjoinPathRef || getFunctionId(p) == semijoinPathRef || getFunctionId(p) == leftfetchjoinPathRef) for ( j= p->retc; j< p->argc-1; j++){ for (k= top-1; k >= 0 ; k--) if ( candidate[k].lvar == getArg(p,j) && candidate[k].rvar == getArg(p,j+1) && candidate[k].fcn == getFunctionId(p)){ @@ -98,7 +95,7 @@ OPTjoinSubPath(Client cntxt, MalBlkPtr m } for(i=0, p= old[i]; i< limit; i++, p= old[i]) { - if( getFunctionId(p)== joinPathRef || getFunctionId(p)== leftjoinPathRef || getFunctionId(p) == semijoinPathRef) + if( getFunctionId(p)== joinPathRef || getFunctionId(p)== leftjoinPathRef || getFunctionId(p) == semijoinPathRef || getFunctionId(p)== leftfetchjoinPathRef) for ( j= p->retc ; j< p->argc-1; j++){ for (k= top-1; k >= 0 ; k--) if ( candidate[k].lvar == getArg(p,j) && candidate[k].rvar == getArg(p,j+1) && candidate[k].fcn == getFunctionId(p) && candidate[k].cnt > 1){ @@ -109,6 +106,8 @@ OPTjoinSubPath(Client cntxt, MalBlkPtr m q= newStmt(mb, algebraRef, leftjoinRef); else if ( candidate[k].fcn == semijoinPathRef) q= newStmt(mb, algebraRef, semijoinRef); + else if ( candidate[k].fcn == leftfetchjoinPathRef) + q= newStmt(mb, algebraRef, leftfetchjoinRef); q= pushArgument(mb,q, candidate[k].lvar); q= pushArgument(mb,q, candidate[k].rvar); candidate[k].p = q; @@ -122,6 +121,8 @@ OPTjoinSubPath(Client cntxt, MalBlkPtr m setFunctionId(p, semijoinRef); else if ( getFunctionId(p) == joinPathRef) setFunctionId(p, joinRef); + else if ( getFunctionId(p) == leftfetchjoinPathRef) + setFunctionId(p, leftfetchjoinRef); } actions ++; OPTDEBUGjoinPath { @@ -154,9 +155,6 @@ OPTjoinPathImplementation(Client cntxt, { int i,j,k, actions=0; int *pc; - str joinPathRef = putName("joinPath",8); - str leftjoinPathRef = putName("leftjoinPath",12); - str semijoinPathRef = putName("semijoinPath",12); InstrPtr q,r; InstrPtr *old; int *varcnt; /* use count */ @@ -182,7 +180,6 @@ OPTjoinPathImplementation(Client cntxt, return 0; } /* - * @- * Count the variable use as arguments first. */ for (i = 0; i<limit; i++){ @@ -193,9 +190,8 @@ OPTjoinPathImplementation(Client cntxt, for (i = 0; i<limit; i++){ p= old[i]; - if( getModuleId(p)== algebraRef && (getFunctionId(p)== joinRef || getFunctionId(p) == leftjoinRef || getFunctionId(p) == semijoinRef)){ + if( getModuleId(p)== algebraRef && (getFunctionId(p)== joinRef || getFunctionId(p) == leftjoinRef || getFunctionId(p) == semijoinRef || getFunctionId(p) == leftfetchjoinRef)){ /* - * @- * Try to expand its argument list with what we have found so far. * This creates a series of join paths, many of which will be removed during deadcode elimination. */ @@ -204,7 +200,6 @@ OPTjoinPathImplementation(Client cntxt, for(j=p->retc; j<p->argc; j++){ r= getInstrPtr(mb,pc[getArg(p,j)]); /* - * @- * Don't inject a pattern when it is used more than once. */ if (r && varcnt[getArg(p,j)] > 1){ @@ -237,6 +232,12 @@ OPTjoinPathImplementation(Client cntxt, q = pushArgument(mb,q,getArg(r,k)); } else q = pushArgument(mb,q,getArg(p,j)); + } else if ( getFunctionId(p) == leftfetchjoinRef){ + if( r && getModuleId(r)== algebraRef && ( getFunctionId(r)== leftfetchjoinRef || getFunctionId(r)== leftfetchjoinPathRef) ){ + for(k= r->retc; k<r->argc; k++) + q = pushArgument(mb,q,getArg(r,k)); + } else + q = pushArgument(mb,q,getArg(p,j)); } } OPTDEBUGjoinPath { @@ -250,7 +251,6 @@ OPTjoinPathImplementation(Client cntxt, goto wrapup; } /* - * @- * Final type check and hardwire the result type, because that can not be inferred directly from the signature */ for(j=1; j<q->argc-1; j++) @@ -273,6 +273,8 @@ OPTjoinPathImplementation(Client cntxt, setFunctionId(q,leftjoinPathRef); else if ( q->argc > 2 && getFunctionId(q) == semijoinRef) setFunctionId(q,semijoinPathRef); + else if ( q->argc > 2 && getFunctionId(q) == leftfetchjoinRef) + setFunctionId(q,leftfetchjoinPathRef); freeInstruction(p); p= q; actions++; diff --git a/monetdb5/optimizer/opt_pipes.c b/monetdb5/optimizer/opt_pipes.c --- a/monetdb5/optimizer/opt_pipes.c +++ b/monetdb5/optimizer/opt_pipes.c @@ -74,9 +74,9 @@ struct PIPELINES { "optimizer.mergetable();" "optimizer.deadcode();" "optimizer.commonTerms();" -// "optimizer.groups();" -// "optimizer.joinPath();" -// "optimizer.reorder();" + "optimizer.groups();" + "optimizer.joinPath();" + "optimizer.reorder();" "optimizer.deadcode();" "optimizer.reduce();" "optimizer.dataflow();" diff --git a/monetdb5/optimizer/opt_prelude.c b/monetdb5/optimizer/opt_prelude.c --- a/monetdb5/optimizer/opt_prelude.c +++ b/monetdb5/optimizer/opt_prelude.c @@ -117,6 +117,7 @@ str kunionRef; str kuniqueRef; str languageRef; str leftfetchjoinRef; +str leftfetchjoinPathRef; str leftjoinRef; str leftjoinPathRef; str likeselectRef; @@ -198,6 +199,7 @@ str selectNotNilRef; str selectRef; str semaRef; str semijoinRef; +str semijoinPathRef; str setAccessRef; str setWriteModeRef; str sliceRef; @@ -363,6 +365,7 @@ void optimizerInit(void){ kuniqueRef= putName("kunique",7); languageRef= putName("language",8); leftfetchjoinRef = putName("leftfetchjoin",13); + leftfetchjoinPathRef = putName("leftfetchjoinPath",17); leftjoinRef = putName("leftjoin",8); leftjoinPathRef = putName("leftjoinPath",12); likeselectRef = putName("like_select",11); @@ -443,6 +446,7 @@ void optimizerInit(void){ selectRef = putName("select",6); semaRef = putName("sema",4); semijoinRef = putName("semijoin",8); + semijoinPathRef = putName("semijoinPath",12); setAccessRef = putName("setAccess",9); setWriteModeRef= putName("setWriteMode",12); sliceRef = putName("slice",5); diff --git a/monetdb5/optimizer/opt_prelude.h b/monetdb5/optimizer/opt_prelude.h --- a/monetdb5/optimizer/opt_prelude.h +++ b/monetdb5/optimizer/opt_prelude.h @@ -116,6 +116,7 @@ opt_export str kunionRef; opt_export str kuniqueRef; opt_export str languageRef; opt_export str leftfetchjoinRef; +opt_export str leftfetchjoinPathRef; opt_export str leftjoinRef; opt_export str leftjoinPathRef; opt_export str likeselectRef; @@ -196,6 +197,7 @@ opt_export str selectNotNilRef; opt_export str selectRef; opt_export str semaRef; opt_export str semijoinRef; +opt_export str semijoinPathRef; opt_export str setAccessRef; opt_export str setWriteModeRef; opt_export str sliceRef; diff --git a/monetdb5/optimizer/optimizer.mal b/monetdb5/optimizer/optimizer.mal --- a/monetdb5/optimizer/optimizer.mal +++ b/monetdb5/optimizer/optimizer.mal @@ -288,6 +288,10 @@ pattern algebra.leftjoinPath(l:bat[:any, address ALGjoinPath comment "Routine to handle join paths. The type analysis is rather tricky."; +pattern algebra.leftfetchjoinPath(l:bat[:any,:any]...):bat[:any,:any] +address ALGjoinPath +comment "Routine to handle join paths. The type analysis is rather tricky."; + pattern algebra.semijoinPath(l:bat[:any,:any]...):bat[:any,:any] address ALGjoinPath comment "Routine to handle join paths. The type analysis is rather tricky."; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list