Changeset: 4a061b7ecc63 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=4a061b7ecc63 Modified Files: gdk/gdk_select.c monetdb5/optimizer/opt_mergetable.c Branch: Intra-Par Log Message:
Commit for the entire intra operator parallel branch experimentation track diffs (truncated from 452 to 300 lines): diff --git a/gdk/gdk_select.c b/gdk/gdk_select.c --- a/gdk/gdk_select.c +++ b/gdk/gdk_select.c @@ -161,22 +161,106 @@ BAT_hashselect(BAT *b, BAT *s, BAT *bn, } \ } while (0) + + + /* scan select loop without candidates */ #define scanloop(TEST) \ do { \ - ALGODEBUG fprintf(stderr, \ - "#BATsubselect(b=%s#"BUNFMT",s=%s,anti=%d): " \ - "scanselect %s\n", BATgetId(b), BATcount(b), \ - s ? BATgetId(s) : "NULL", anti, #TEST); \ + unsigned long n1=0, g1=0; \ while (p < q) { \ v = BUNtail(bi, p); \ if (TEST) { \ o = (oid) p + off; \ bunfastins(bn, NULL, &o); \ + g1++; \ } \ p++; \ + n1++; \ } \ - } while (0) + printf("\n Total stride length %lu, suitable values %lu",n1,g1); \ + } while (0) + +//////////////////////////////////////////////////////////////////////// + +/* ALGODEBUG fprintf(stderr, \ + "#BATsubselect(b=%s#"BUNFMT",s=%s,anti=%d): " \ + "scanselect %s\n", BATgetId(b), BATcount(b), \ + s ? BATgetId(s) : "NULL", anti, #TEST); \ + +// printf("\n Total stride length %lu, suitable values %lu",n1,g1); \ + +*/ + + +typedef struct _wthread { + pthread_t id; + BAT *bat_; + const void *tl; + const void *th; + int lval; + int hval; + int li; + int hi; + int (*cmp)(const void *, const void *); + BUN p, q; + BATiter bi; + const void *nil; +} wthread; + +/* +static void * +threadFunc(void *d) +{ + struct timeval tv; + time_t prevclock, curclock; + BATiter bi; + + int (*cmp)(const void *, const void *); + wthread *wthr= (wthread *)d; + const void *tl = wthr->tl; + const void *th = wthr->th; + const void *nil = wthr->nil; + int lval = wthr->lval; + int hval = wthr->hval; + int li = wthr->li; + int hi = wthr->hi; + BUN p = wthr->p; + BUN q = wthr->q; + oid o; + oid off = 0; //wthr->off; + int c; + const void *v; + + BAT *bn=NULL; + + cmp = wthr->cmp; + + bi = wthr->bi; + gettimeofday(&tv,NULL); + prevclock = (time_t) tv.tv_usec + (time_t)(1000000 * tv.tv_sec); + + bn = wthr->bat_; + + scanloop((nil == NULL || (*cmp)(v, nil) != 0) && + ((!lval || + (c = cmp(tl, v)) < 0 || + (li && c == 0)) && + (!hval || + (c = cmp(th, v)) > 0 || + (hi && c == 0)))); + + gettimeofday(&tv,NULL); + curclock = (time_t) tv.tv_usec + (time_t)(1000000 * tv.tv_sec); + + printf("\nTime of thread %lu is %lu", (unsigned long)wthr->id, (unsigned long)(curclock - prevclock)); + return NULL; + + bunins_failed: + BBPreclaim(bn); + return NULL; +} +*/ static BAT * BAT_scanselect(BAT *b, BAT *s, BAT *bn, const void *tl, const void *th, @@ -189,6 +273,21 @@ BAT_scanselect(BAT *b, BAT *s, BAT *bn, const void *nil, *v; int c; + // Threaded parameters + +/* struct timeval tv; + time_t prevclock, curclock; //, prevclock, curclock; + int l, h=-1, core=8; + BAT *b2, *b1; + BUN cap=0, stride, mod; +// struct timeval tv; +// time_t prevclock, curclock; //, prevclock, curclock; + wthread **walk; + bit strideFlag = FALSE; + int ht = TYPE_any , tt;// = TYPE_any; + BUN estimate; +*/ + assert(b != NULL); assert(bn != NULL); assert(bn->htype == TYPE_void); @@ -267,13 +366,178 @@ BAT_scanselect(BAT *b, BAT *s, BAT *bn, ((c = (*cmp)(th, v)) < 0 || (!hi && c == 0))))); } else { - scanloop((nil == NULL || (*cmp)(v, nil) != 0) && - ((!lval || - (c = cmp(tl, v)) < 0 || - (li && c == 0)) && - (!hval || - (c = cmp(th, v)) > 0 || - (hi && c == 0)))); + +// if(BATcount(b) <= 128) + if(BATcount(b)) + { + // orinigla scan loop function + +// gettimeofday(&tv,NULL); +// prevclock = (time_t) tv.tv_usec + (time_t)(1000000 * tv.tv_sec); + + scanloop((nil == NULL || (*cmp)(v, nil) != 0) && + ((!lval || + (c = cmp(tl, v)) < 0 || + (li && c == 0)) && + (!hval || + (c = cmp(th, v)) > 0 || + (hi && c == 0)))); + +// gettimeofday(&tv,NULL); +// curclock = (time_t) tv.tv_usec + (time_t)(1000000 * tv.tv_sec); +// printf("\n Time of scanloop is %lu", (unsigned long)(curclock - prevclock)); + + } +/* else + { + + printf("\n In my function"); + mod = BATcount(b) % core; + + + if(0 != mod) // If there is excess remaining tuples e.g. 27 % 8 = 3 tuples in last stride + { + strideFlag = TRUE; + } + + stride = BATcount(b) / core; + printf("\n Stride is %lu",stride); +// printf("\n main bat head %d tail %d",b->htype, b->ttype); + + walk = (wthread **)malloc(sizeof(wthread *) * core); + if(NULL == walk) + { + printf("\n Malloc failed for walk"); +// throw(MAL, "walk", MAL_MALLOC_FAIL); + exit(1); // What to put in here is a question since there is no MAL_ERROR return + } + + gettimeofday(&tv,NULL); + prevclock = (time_t) tv.tv_usec + (time_t)(1000000 * tv.tv_sec); + estimate = BATcount(bn); + printf("\n Estimate is %lu",estimate); + + if(stride >= 1) // when number of tuples are more than the number of cores to make sure at least 1 tuple per core + { + int i; + for(i=0; i<core; i++) + { + if(i==0) // to handle special case of batslice excluding the upper range value in slice + { + l = h + 1; + h= l+stride; + } + else + { + l = h; + h = l+stride; + } + + if((i + 1 == core) && strideFlag == TRUE) + { + h = h + mod; // add the remaining number of tuples to the last stride + + } + + walk[i] = (wthread *)malloc(sizeof(wthread)); + walk[i]->bat_ = BATnew(TYPE_void, TYPE_oid, estimate / core); //BATslice(b, l, h); + + walk[i]->tl = tl; + walk[i]->th = th; + walk[i]->lval = lval; + walk[i]->hval = hval; + walk[i]->li = li; + walk[i]->hi = hi; + walk[i]->p = l; + walk[i]->q = h; + walk[i]->bi = bat_iterator(b);; + walk[i]->nil = nil; + walk[i]->cmp = cmp; + + printf("\n Stride limits are %d %d ",l,h); + + gettimeofday(&tv,NULL); +// prevclock1 = (time_t) tv.tv_usec + (time_t)(1000000 * tv.tv_sec); + + pthread_create(&walk[i]->id, NULL, &threadFunc, walk[i]); + // printf(" \nRSS size after coming out of pthread %d is %s", i, MT_heapcur()); // MT_getrss()); + + gettimeofday(&tv,NULL); +// curclock1 = (time_t) tv.tv_usec + (time_t)(1000000 * tv.tv_sec); + //printf("\nthread %lu forking %lu", (unsigned long)walk[i]->id,(unsigned long)(curclock1 - prevclock1)); + + } + + for(i=0; i<core; i++) + { + pthread_join(walk[i]->id, NULL); + } + + gettimeofday(&tv,NULL); + curclock = (time_t) tv.tv_usec + (time_t)(1000000 * tv.tv_sec); + + printf("\nTotal time of thread works is %lu", (unsigned long)(curclock - prevclock)); + + printf("\n After pthread_join"); + gettimeofday(&tv,NULL); + prevclock = (time_t) tv.tv_usec + (time_t)(1000000 * tv.tv_sec); + +// BBPunfix(b->batCacheid); + // printf(" \nRSS size after unfixing first batCacheId is %s", MT_heapcur()); // MT_getrss()); + + // mat.pack implementation + for (i = 0; i < core; i++) + { + //printf("\n outside bn id is %d",bid); + int bid; + b2 = walk[i]->bat_; + bid = b2->batCacheid; + + if (b2 && bid < 0) + b2 = BATmirror(b2); + if( b2 ){ + if (ht == TYPE_any && tt){ + ht = ATOMtype(b->htype); + tt = ATOMtype(b->ttype); + } + cap += BATcount(b2); + printf("\n batCount %d is %lu cap =%lu", i, BATcount(b2), cap); + } + } + + if (ht == TYPE_any){ + // *ret = 0; + printf("\nMal success"); + return NULL; // This is a hack + } + +// printf("\n size of matpack is %lu", cap); + + for (i = 0; i < core; i++) { + b1 = walk[i]->bat_; + if( b1 ){ + // use the right oid ranges, don't change the input + BATins(bn,b1,FALSE); _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list