Changeset: 4a061b7ecc63 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=4a061b7ecc63
Modified Files:
        gdk/gdk_select.c
        monetdb5/optimizer/opt_mergetable.c
Branch: Intra-Par
Log Message:

Commit for the entire intra operator parallel branch experimentation track


diffs (truncated from 452 to 300 lines):

diff --git a/gdk/gdk_select.c b/gdk/gdk_select.c
--- a/gdk/gdk_select.c
+++ b/gdk/gdk_select.c
@@ -161,22 +161,106 @@ BAT_hashselect(BAT *b, BAT *s, BAT *bn, 
                }                                                       \
        } while (0)
 
+
+
+
 /* scan select loop without candidates */
 #define scanloop(TEST)                                                 \
        do {                                                            \
-               ALGODEBUG fprintf(stderr,                               \
-                           "#BATsubselect(b=%s#"BUNFMT",s=%s,anti=%d): " \
-                           "scanselect %s\n", BATgetId(b), BATcount(b), \
-                           s ? BATgetId(s) : "NULL", anti, #TEST);     \
+               unsigned long n1=0, g1=0;                               \
                while (p < q) {                                         \
                        v = BUNtail(bi, p);                             \
                        if (TEST) {                                     \
                                o = (oid) p + off;                      \
                                bunfastins(bn, NULL, &o);               \
+                               g1++;                                   \
                        }                                               \
                        p++;                                            \
+                       n1++;                                           \
                }                                                       \
-       } while (0)
+               printf("\n Total stride length %lu, suitable values 
%lu",n1,g1);                        \
+       } while (0)                                     
+
+////////////////////////////////////////////////////////////////////////
+
+/*             ALGODEBUG fprintf(stderr,                               \
+                           "#BATsubselect(b=%s#"BUNFMT",s=%s,anti=%d): " \
+                           "scanselect %s\n", BATgetId(b), BATcount(b), \
+                           s ? BATgetId(s) : "NULL", anti, #TEST);     \
+
+//             printf("\n Total stride length %lu, suitable values 
%lu",n1,g1);                        \
+
+*/
+
+
+typedef struct _wthread {
+        pthread_t id;
+        BAT *bat_;
+       const void *tl;
+       const void *th;
+       int lval;
+       int hval;
+       int li;
+       int hi;
+        int (*cmp)(const void *, const void *);
+       BUN p, q;
+       BATiter bi;
+       const void *nil;
+} wthread;
+
+/*
+static void *
+threadFunc(void *d)
+{
+       struct timeval tv;
+        time_t prevclock, curclock;
+       BATiter bi;
+
+        int (*cmp)(const void *, const void *);
+       wthread *wthr= (wthread *)d;
+       const void *tl = wthr->tl;
+       const void *th = wthr->th;
+       const void *nil = wthr->nil;
+       int lval = wthr->lval;
+       int hval = wthr->hval;
+       int li = wthr->li;
+       int hi = wthr->hi;
+       BUN p = wthr->p;
+       BUN q = wthr->q;
+       oid o;
+       oid off = 0; //wthr->off;
+       int c;  
+       const void *v;
+
+       BAT *bn=NULL;   
+       
+       cmp = wthr->cmp;
+
+       bi = wthr->bi;
+        gettimeofday(&tv,NULL);
+        prevclock = (time_t) tv.tv_usec + (time_t)(1000000 * tv.tv_sec);
+
+       bn = wthr->bat_;
+
+        scanloop((nil == NULL || (*cmp)(v, nil) != 0) &&
+                                 ((!lval ||
+                                   (c = cmp(tl, v)) < 0 ||
+                                   (li && c == 0)) &&
+                                  (!hval ||
+                                   (c = cmp(th, v)) > 0 ||
+                                   (hi && c == 0))));
+
+               gettimeofday(&tv,NULL);
+        curclock = (time_t) tv.tv_usec + (time_t)(1000000 * tv.tv_sec);
+
+        printf("\nTime of thread %lu is %lu", (unsigned long)wthr->id, 
(unsigned long)(curclock - prevclock));
+       return NULL;
+
+  bunins_failed:
+       BBPreclaim(bn);
+       return NULL;
+}
+*/
 
 static BAT *
 BAT_scanselect(BAT *b, BAT *s, BAT *bn, const void *tl, const void *th,
@@ -189,6 +273,21 @@ BAT_scanselect(BAT *b, BAT *s, BAT *bn, 
        const void *nil, *v;
        int c;
 
+       // Threaded parameters
+
+/*     struct timeval tv;
+        time_t prevclock, curclock; //, prevclock, curclock;
+       int l, h=-1, core=8;
+       BAT *b2, *b1;
+       BUN cap=0, stride, mod;
+//     struct timeval tv;
+//      time_t prevclock, curclock; //, prevclock, curclock;
+        wthread **walk;
+        bit strideFlag = FALSE;
+        int ht = TYPE_any , tt;// = TYPE_any;
+       BUN estimate;
+*/
+
        assert(b != NULL);
        assert(bn != NULL);
        assert(bn->htype == TYPE_void);
@@ -267,13 +366,178 @@ BAT_scanselect(BAT *b, BAT *s, BAT *bn, 
                                   ((c = (*cmp)(th, v)) < 0 ||
                                    (!hi && c == 0)))));
                } else {
-                       scanloop((nil == NULL || (*cmp)(v, nil) != 0) &&
-                                ((!lval ||
-                                  (c = cmp(tl, v)) < 0 ||
-                                  (li && c == 0)) &&
-                                 (!hval ||
-                                  (c = cmp(th, v)) > 0 ||
-                                  (hi && c == 0))));
+
+//                     if(BATcount(b) <= 128)
+                       if(BATcount(b))
+                       {
+                               // orinigla scan loop function
+
+//                             gettimeofday(&tv,NULL);
+//                             prevclock = (time_t) tv.tv_usec  + 
(time_t)(1000000 * tv.tv_sec);
+
+                                       scanloop((nil == NULL || (*cmp)(v, nil) 
!= 0) &&
+                                                ((!lval ||
+                                                  (c = cmp(tl, v)) < 0 ||
+                                                  (li && c == 0)) &&
+                                                 (!hval ||
+                                                  (c = cmp(th, v)) > 0 ||
+                                                  (hi && c == 0))));
+
+//                             gettimeofday(&tv,NULL);
+//                             curclock = (time_t) tv.tv_usec + 
(time_t)(1000000 * tv.tv_sec);
+//                             printf("\n Time of scanloop is %lu", (unsigned 
long)(curclock - prevclock));
+
+                       }
+/*                      else
+                       {
+
+                               printf("\n In my function");
+                               mod = BATcount(b) % core; 
+
+
+                               if(0 != mod)            // If there is excess 
remaining tuples e.g. 27 % 8 = 3 tuples in last stride
+                               {
+                                       strideFlag = TRUE;
+                               }                       
+                       
+                               stride = BATcount(b) / core;
+                               printf("\n Stride is %lu",stride);
+//                             printf("\n main bat head %d tail %d",b->htype, 
b->ttype);
+
+                               walk = (wthread **)malloc(sizeof(wthread *) * 
core);
+                               if(NULL == walk)
+                               {
+                                       printf("\n Malloc failed for walk");
+//                                     throw(MAL, "walk", MAL_MALLOC_FAIL);
+                                       exit(1);        // What to put in here 
is a question since there is no MAL_ERROR return
+                               }
+
+                               gettimeofday(&tv,NULL);
+                               prevclock = (time_t) tv.tv_usec  + 
(time_t)(1000000 * tv.tv_sec);
+                               estimate = BATcount(bn);
+                               printf("\n Estimate is %lu",estimate);
+                               
+                               if(stride >= 1)         // when number of 
tuples are more than the number of cores to make sure at least 1 tuple per core 
+                               {
+                                       int i;  
+                                       for(i=0; i<core; i++)
+                                       {
+                                               if(i==0)                // to 
handle special case of batslice excluding the upper range value in slice
+                                               {
+                                                       l = h + 1;
+                                                       h= l+stride;
+                                               }
+                                               else
+                                               {       
+                                                       l = h; 
+                                                       h = l+stride;
+                                               }
+                                               
+                                               if((i + 1 == core) && 
strideFlag == TRUE)
+                                               {
+                                                       h = h + mod;            
        // add the remaining number of tuples to the last stride                
        
+                               
+                                               }                       
+                                               
+                                               walk[i] = (wthread 
*)malloc(sizeof(wthread));                                           
+                                               walk[i]->bat_ = 
BATnew(TYPE_void, TYPE_oid, estimate / core);           //BATslice(b, l, h);    
+
+                                               walk[i]->tl = tl;
+                                               walk[i]->th = th;
+                                               walk[i]->lval = lval;
+                                               walk[i]->hval = hval;
+                                               walk[i]->li = li;
+                                               walk[i]->hi = hi;
+                                               walk[i]->p = l;
+                                               walk[i]->q = h;
+                                               walk[i]->bi = bat_iterator(b);;
+                                               walk[i]->nil = nil;
+                                               walk[i]->cmp = cmp;
+
+                                               printf("\n Stride limits are %d 
%d ",l,h);
+                                       
+                                               gettimeofday(&tv,NULL);
+//                                             prevclock1 = (time_t) 
tv.tv_usec + (time_t)(1000000 * tv.tv_sec);
+                                       
+                                               pthread_create(&walk[i]->id, 
NULL, &threadFunc, walk[i]);
+                                               //              printf(" \nRSS 
size after coming out of pthread %d is  %s", i, MT_heapcur()); // MT_getrss()); 
 
+
+                                               gettimeofday(&tv,NULL);
+//                                             curclock1 = (time_t) tv.tv_usec 
+ (time_t)(1000000 * tv.tv_sec);
+                                               //printf("\nthread %lu forking  
%lu", (unsigned long)walk[i]->id,(unsigned long)(curclock1 - prevclock1));
+
+                                       }
+
+                                       for(i=0; i<core; i++)
+                                       {
+                                               pthread_join(walk[i]->id, NULL);
+                                       }
+
+                                       gettimeofday(&tv,NULL);
+                                       curclock = (time_t) tv.tv_usec + 
(time_t)(1000000 * tv.tv_sec);
+
+                                       printf("\nTotal time of thread works is 
%lu", (unsigned long)(curclock - prevclock));
+                                       
+                                       printf("\n After pthread_join");
+                                       gettimeofday(&tv,NULL);
+                                       prevclock = (time_t) tv.tv_usec + 
(time_t)(1000000 * tv.tv_sec);
+
+//                                     BBPunfix(b->batCacheid);
+                                       //      printf(" \nRSS size after 
unfixing first batCacheId is  %s", MT_heapcur()); //   MT_getrss());  
+
+                                       // mat.pack implementation
+                                       for (i = 0; i < core; i++) 
+                                       {
+                                               //printf("\n outside bn id is 
%d",bid);
+                                               int bid;
+                                               b2 = walk[i]->bat_;   
+                                               bid = b2->batCacheid;
+
+                                               if (b2 && bid < 0)
+                                                       b2 = BATmirror(b2);
+                                               if( b2 ){
+                                                       if (ht == TYPE_any && 
tt){
+                                                               ht = 
ATOMtype(b->htype);
+                                                               tt = 
ATOMtype(b->ttype);
+                                                       }
+                                                       cap += BATcount(b2);
+                                                       printf("\n batCount %d 
is %lu cap =%lu", i, BATcount(b2), cap);
+                                               }
+                                       }
+
+                                       if (ht == TYPE_any){
+                                       //                *ret = 0;
+                                               printf("\nMal success");        
        
+                                               return NULL;                    
// This is a hack
+                                       }
+
+//                                     printf("\n size of matpack is %lu", 
cap);
+
+                                       for (i = 0; i < core; i++) {
+                                               b1 = walk[i]->bat_;             
                                                
+                                               if( b1 ){
+                                                       // use the right oid 
ranges, don't change the input 
+                                                       BATins(bn,b1,FALSE);
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to