Changeset: fc354657de5d for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=fc354657de5d
Modified Files:
        monetdb5/extras/crackers/crackers.mx
        monetdb5/extras/crackers/crackers_core_unordered.mx
        monetdb5/extras/crackers/crackers_select_ops.mx
Branch: holindex
Log Message:

Add parallel scan operator in crackers module (used for experimental analysis).


diffs (293 lines):

diff --git a/monetdb5/extras/crackers/crackers.mx 
b/monetdb5/extras/crackers/crackers.mx
--- a/monetdb5/extras/crackers/crackers.mx
+++ b/monetdb5/extras/crackers/crackers.mx
@@ -124,7 +124,6 @@ All Rights Reserved.
 @:@1(@2,flt)@
 @:@1(@2,dbl)@
 @:@1(@2,date)@
-@:@1(@2,lng)@
 @
 @= TypeSwitch_2
 @:TypeSwitch_temp2(@1,int)@
@@ -308,6 +307,13 @@ address CRKparalleluselectBounds_@2
 comment "Retrieve the subset using a cracker
         index producing preferably a BATview.";
 
+#parallel scan operators
+
+command pselect(b:bat[:oid,:@2],l:@2,h:@2, nthreads:int):bat[:oid,:@2]
+address CRKparallelscanselect_@2_MT
+comment "Retrieve qualifying tuples with parallel scan.";
+
+# stochastic operators
 
 command selectst(b:bat[:oid,:@2],l:@2,h:@2):bat[:oid,:@2]
 address CRKselectst_@2
@@ -922,6 +928,11 @@ address CRKcrackUnorderedThree_@2
 comment "Break a BAT into three pieces with
         tail<=low, low<tail<=hgh, tail>hgh,
         respectively.";
+
+command parallelscan (b:bat[:oid,:@2], mid:@2, nthreads:int) :bat[:oid,:@2]
+address CRKparallelscan_@2
+comment "Scan a BAT using multiple threads";
+
 @
 @= Validate
 @:crack_validate(@2,Ordered,; maintaining the head-oid order within each 
piece)@
diff --git a/monetdb5/extras/crackers/crackers_core_unordered.mx 
b/monetdb5/extras/crackers/crackers_core_unordered.mx
--- a/monetdb5/extras/crackers/crackers_core_unordered.mx
+++ b/monetdb5/extras/crackers/crackers_core_unordered.mx
@@ -98,6 +98,7 @@ All Rights Reserved.
 @= CoreUnorderedFunctions_decl
 crackers_export str CRKcrackUnorderedZero_@1 (int *res, int *bid, @1 *mid, int 
nthreads, int vector_elements);
 crackers_export str CRKcrackUnorderedThree_@1 (int *res, int *bid, @1 *low, @1 
*hgh);
+crackers_export str CRKparallelscan_@1 (int *res, int *bid, int *bid_out, @1 
*mid, int nthreads);
 @
  * @- Signatures shared within the crackers module/library
 @= operations
@@ -110,6 +111,7 @@ crackers_export str CRKcrackUnorderedThr
 @
 @= crackInTwoUnorderedPieces_decl
 str CRKcrackUnorderedZero_@2_@1( BAT *b, @1 mval, oid first, oid last, oid 
*pos, int nthreads, int vector_elements);
+str CRKparallelscan_@2_@1( BAT *b, BAT *ob, @1 mval, oid first, oid last, int 
nthreads);
 @
 @= crackInThreeUnorderedPieces_decl
 str CRKcrackUnorderedThree_@2_@3_@1( BAT *b, @1 low, @1 hgh, oid first, oid 
last, oid *posl, oid *posh);
@@ -174,6 +176,15 @@ typedef struct {
 } c_Thread_t;
 
 typedef struct {
+       const BAT *b;      /* BAT to be scanned */
+       BAT *ob;      /* outputBAT */
+       const void *mval;  /* pivot value */
+       BUN first;         /* offset of first value in slice */
+       BUN last;          /* offset of last value in slice */
+       const char *msg;   /* error message */
+} s_Thread_t;
+
+typedef struct {
        BUN left, right;
 } cursorDeltas;
 
@@ -229,6 +240,23 @@ CRKcrackUnorderedThree_@1 (int *res, int
         *res = *bid;
         return msg;
 }
+str
+CRKparallelscan_@1 (int *res, int *bid, int *bid_out, @1 *mid, int nthreads){
+        BAT *b, *ob;
+        str msg;
+
+        if ((b = BATdescriptor(*bid)) == NULL)
+                throw(MAL, "crackers.parallelscan", "Cannot access 
descriptor");
+
+        if ((ob = BATdescriptor(*bid_out)) == NULL)
+                throw(MAL, "crackers.parallelscan", "Cannot access 
descriptor");
+
+        msg = CRKparallelscan_LE_@1( b, ob, *mid,(BUN) 0, BATcount(b)-1, 
nthreads);
+
+        BBPkeepref(b->batCacheid);
+        *res = *bid;
+        return msg;
+}
 @
  * @- Functions shared within the crackers module/library
 @= crackInTwoUnorderedPieces_impl
@@ -1639,6 +1667,140 @@ CRKcrackUnorderedZero_@2_@1( BAT *b, @1 
        return msg;
 }
 
+static str CRKscan_x_@2_@1 (
+       /* input */
+       const BAT* buffer,      /* attribute (array) */
+       BAT* outputbuffer, /*output (array)*/
+       const @1 pivot, /* pivot value for attribute*/
+       BUN first,  /* first position of to-be-cracked piece */
+       BUN last  /* last position of to-be-cracked piece */
+) {
+       BUN ReadCursor = first;
+       BUN WriteCursor = 0;
+       BUN  isLessThan;
+       @1  *src_t = (@1 *) Tloc(buffer, BUNfirst(buffer));
+       @1  *src_t_output = (@1 *) Tloc(outputbuffer, BUNfirst(outputbuffer));
+
+       assert(buffer && outputbuffer && src_t && src_t_output);
+
+       for (ReadCursor = first; ReadCursor <= last; ReadCursor++){
+               src_t_output[WriteCursor] = src_t[ReadCursor];
+               isLessThan = (src_t[ReadCursor] @7 pivot);
+               WriteCursor += isLessThan;
+       }
+
+       BATsetcount(outputbuffer,WriteCursor);
+
+       BATseqbase(outputbuffer,0);
+       outputbuffer->tsorted = FALSE;
+       outputbuffer->trevsorted = FALSE;
+       outputbuffer->tdense = FALSE;
+       outputbuffer->T->nonil = TRUE;
+       outputbuffer->T->nil = FALSE;
+
+       return MAL_SUCCEED;
+}
+/* crackThread for new multi-threaded crack code */
+static void*
+scan_MT_crackThread_@2_@1 ( void *arg_p )
+{
+       s_Thread_t *arg = (s_Thread_t*) arg_p;
+       @1 mval = * (@1*) arg->mval;
+       
+       /* call actual cracking routine for this slice */
+       arg->msg = CRKscan_x_@2_@1 ( arg->b, arg->ob, mval, arg->first, 
arg->last);
+
+       pthread_exit(NULL);
+       return NULL;
+}
+static str CRKscan_MT_@2_@1 (const BAT *b, BAT *ob, BUN first, BUN last, const 
@1 pivot, int nthreads)
+{
+        BUN n = last - first + 1; /* total # tuples / values */
+        BUN mm;                   /* # tuples / values in tmp arrays */
+        BUN f, l;                 /* first / last BUN per slice */
+       pthread_t *s_Thread;      /* threads array */
+        s_Thread_t *s_Thread_arg; /* thread arguments array */
+        int i;
+
+       /* adjust nthreads */
+       if (nthreads == 0) {
+               /* automatic setting */
+               nthreads = GDKnr_threads;
+       }
+        if ((BUN) nthreads > n / 10) {
+                /* more threads / smaller slices does not make sense */
+                nthreads = (int) (n / 1000) + 1;
+        }
+
+
+       mm = (n / nthreads);
+
+       s_Thread = GDKmalloc (nthreads * sizeof(pthread_t));
+        s_Thread_arg  = GDKmalloc(nthreads * sizeof(s_Thread_t));
+       if (!s_Thread || !s_Thread_arg) {
+               if (s_Thread)
+                       GDKfree(s_Thread);
+               if (s_Thread_arg)
+                       GDKfree(s_Thread_arg);
+               throw (MAL, "crackers.parallelscan", "CRKscan_MT_@2_@1(): 
GDKmalloc() failed.");
+       }
+
+        /* initialize crackThread arguments */
+        /* Alternative 1: each thread cracks one consecutive slice */
+        for (i = 0, f = first, l = f + mm - 1; i < nthreads; i++, f += mm, l 
+= mm) {
+                s_Thread_arg[i].b       = b;
+                s_Thread_arg[i].mval    = &pivot;
+                s_Thread_arg[i].first   = f;
+                s_Thread_arg[i].last    = (i < nthreads - 1) ? l : last;
+                s_Thread_arg[i].ob      = (i == 0) ? ob : BATnew(TYPE_void, 
b->ttype,s_Thread_arg[i].last-s_Thread_arg[i].first+1);
+        }
+       
+       /* spawn crackThreads */
+       for (i = 0; i < nthreads; i++) {
+               if (pthread_create(&s_Thread[i], NULL, 
scan_MT_crackThread_@2_@1, &s_Thread_arg[i])) {
+                       GDKfree(s_Thread);
+                       GDKfree(s_Thread_arg);
+                       throw (MAL, "crackers.select", "CRKscan_MT_@2_@1(): 
Failed spaning crackThread %d.", i);
+               }
+       }
+       /* join crackThreads */
+       for (i = 0; i < nthreads; i++) {
+               if (pthread_join(s_Thread[i], NULL)) {
+                       GDKfree(s_Thread);
+                       GDKfree(s_Thread_arg);
+                       throw (MAL, "crackers.select", "CRKscan_MT_@2_@1(): 
Failed joining crackThread %d.", i);
+                }
+       }
+       /* check for & report failing crackThreads */
+       for (i = 0; i < nthreads; i++) {
+               if (s_Thread_arg[i].msg != MAL_SUCCEED) {
+                       GDKfree(s_Thread);
+                       GDKfree(s_Thread_arg);
+                       throw (MAL, "crackers.select", "CRKscan_MT_@2_@1(): 
crackThread %d failed with '%s'.", i, s_Thread_arg[i].msg);
+                }
+       }
+
+       /*merge phase*/
+       i=1;
+       while (i < nthreads) {
+               ob = BATappend(ob, s_Thread_arg[i].ob, FALSE);
+               i++;
+       }
+
+       return MAL_SUCCEED;
+}
+/* parallel scan implementation dispatcher */
+str
+CRKparallelscan_@2_@1( BAT *b, BAT *ob, @1 mval, BUN first, BUN last, int 
nthreads){
+       str msg = MAL_SUCCEED;
+       assert(b && ob);
+       assert(last >= first);
+
+       msg = CRKscan_MT_@2_@1(b, ob, first, last, mval, nthreads);
+
+       return msg;
+}
+
 @
 @= crackInThreeUnorderedPieces_impl
 str
diff --git a/monetdb5/extras/crackers/crackers_select_ops.mx 
b/monetdb5/extras/crackers/crackers_select_ops.mx
--- a/monetdb5/extras/crackers/crackers_select_ops.mx
+++ b/monetdb5/extras/crackers/crackers_select_ops.mx
@@ -72,6 +72,7 @@ crackers_export str CRKselect_@1_MT(int 
 crackers_export str CRKuselect_@1_MT(int *vid, int *bid, @1 *low, @1 *hgh, 
const int *nthreads, const int *vector_elements);
 crackers_export str CRKthetaselect_@1_MT(int *vid, int *bid, @1 *val, str *op, 
const int *nthreads, const int *vector_elements);
 crackers_export str CRKthetauselect_@1_MT(int *vid, int *bid, @1 *val, str 
*op, const int *nthreads, const int *vector_elements);
+crackers_export str CRKparallelscanselect_@1_MT(int *vid, int *bid, @1 *low, 
@1 *hgh, const int *nthreads);
 @
 @c
 /*
@@ -216,6 +217,15 @@ CRKthetauselect_@1_MT(int *vid, int *bid
 
        return CRKuselectBounds_@1_MT(vid, bid, low, high, &lin, &rin, 
nthreads, vector_elements);
 }
+str
+CRKparallelscanselect_@1_MT(int *vid, int *bid, @1 *low, @1 *hgh, const int 
*nthreads){
+       bit inclusiveHgh = TRUE;
+
+       if (!@2_EQ(low,ATOMnilptr(TYPE_@1),@3@1))
+               throw(MAL, "crackers.ParallelScanSelect", "Not implemented");
+
+       return CRKRangeParallelScan_@1(vid, bid, hgh, &inclusiveHgh, *nthreads);
+}
 
 @
  * @- Local support functions and macros
@@ -1124,4 +1134,28 @@ CRKRangeRightNilTree_@1(int *vid, int *b
        @:CreateResult()@
        return MAL_SUCCEED;
 }
+static str
+CRKRangeParallelScan_@1(int *vid, int *bid, @1 *hgh, bit *inclusiveHgh, int 
nthreads){
+       BAT *b, *ob;
+       (void)*vid;
+
+       if ((b = BATdescriptor(*bid)) == NULL)
+               throw(MAL, "crackers.ParallelScanRange", "Cannot access 
descriptor");
+
+       ob = BATnew(TYPE_void, b->ttype,BATcount(b));
+       assert(ob);
+
+       if (*inclusiveHgh == TRUE)
+               CRKparallelscan_LE_@1(b, ob, *hgh, (BUN) 0, BATcount(b)-1, 
nthreads);
+       else
+               CRKparallelscan_RE_@1(b, ob, *hgh, (BUN) 0, BATcount(b)-1, 
nthreads);
+
+
+       BBPunfix(b->batCacheid);
+
+       *vid = ob->batCacheid;
+       BBPkeepref(*vid);
+
+       return MAL_SUCCEED;
+}
 @
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to