Changeset: fc354657de5d for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=fc354657de5d Modified Files: monetdb5/extras/crackers/crackers.mx monetdb5/extras/crackers/crackers_core_unordered.mx monetdb5/extras/crackers/crackers_select_ops.mx Branch: holindex Log Message:
Add parallel scan operator in crackers module (used for experimental analysis). diffs (293 lines): diff --git a/monetdb5/extras/crackers/crackers.mx b/monetdb5/extras/crackers/crackers.mx --- a/monetdb5/extras/crackers/crackers.mx +++ b/monetdb5/extras/crackers/crackers.mx @@ -124,7 +124,6 @@ All Rights Reserved. @:@1(@2,flt)@ @:@1(@2,dbl)@ @:@1(@2,date)@ -@:@1(@2,lng)@ @ @= TypeSwitch_2 @:TypeSwitch_temp2(@1,int)@ @@ -308,6 +307,13 @@ address CRKparalleluselectBounds_@2 comment "Retrieve the subset using a cracker index producing preferably a BATview."; +#parallel scan operators + +command pselect(b:bat[:oid,:@2],l:@2,h:@2, nthreads:int):bat[:oid,:@2] +address CRKparallelscanselect_@2_MT +comment "Retrieve qualifying tuples with parallel scan."; + +# stochastic operators command selectst(b:bat[:oid,:@2],l:@2,h:@2):bat[:oid,:@2] address CRKselectst_@2 @@ -922,6 +928,11 @@ address CRKcrackUnorderedThree_@2 comment "Break a BAT into three pieces with tail<=low, low<tail<=hgh, tail>hgh, respectively."; + +command parallelscan (b:bat[:oid,:@2], mid:@2, nthreads:int) :bat[:oid,:@2] +address CRKparallelscan_@2 +comment "Scan a BAT using multiple threads"; + @ @= Validate @:crack_validate(@2,Ordered,; maintaining the head-oid order within each piece)@ diff --git a/monetdb5/extras/crackers/crackers_core_unordered.mx b/monetdb5/extras/crackers/crackers_core_unordered.mx --- a/monetdb5/extras/crackers/crackers_core_unordered.mx +++ b/monetdb5/extras/crackers/crackers_core_unordered.mx @@ -98,6 +98,7 @@ All Rights Reserved. @= CoreUnorderedFunctions_decl crackers_export str CRKcrackUnorderedZero_@1 (int *res, int *bid, @1 *mid, int nthreads, int vector_elements); crackers_export str CRKcrackUnorderedThree_@1 (int *res, int *bid, @1 *low, @1 *hgh); +crackers_export str CRKparallelscan_@1 (int *res, int *bid, int *bid_out, @1 *mid, int nthreads); @ * @- Signatures shared within the crackers module/library @= operations @@ -110,6 +111,7 @@ crackers_export str CRKcrackUnorderedThr @ @= crackInTwoUnorderedPieces_decl str CRKcrackUnorderedZero_@2_@1( BAT *b, @1 mval, oid first, oid last, oid *pos, int nthreads, int vector_elements); +str CRKparallelscan_@2_@1( BAT *b, BAT *ob, @1 mval, oid first, oid last, int nthreads); @ @= crackInThreeUnorderedPieces_decl str CRKcrackUnorderedThree_@2_@3_@1( BAT *b, @1 low, @1 hgh, oid first, oid last, oid *posl, oid *posh); @@ -174,6 +176,15 @@ typedef struct { } c_Thread_t; typedef struct { + const BAT *b; /* BAT to be scanned */ + BAT *ob; /* outputBAT */ + const void *mval; /* pivot value */ + BUN first; /* offset of first value in slice */ + BUN last; /* offset of last value in slice */ + const char *msg; /* error message */ +} s_Thread_t; + +typedef struct { BUN left, right; } cursorDeltas; @@ -229,6 +240,23 @@ CRKcrackUnorderedThree_@1 (int *res, int *res = *bid; return msg; } +str +CRKparallelscan_@1 (int *res, int *bid, int *bid_out, @1 *mid, int nthreads){ + BAT *b, *ob; + str msg; + + if ((b = BATdescriptor(*bid)) == NULL) + throw(MAL, "crackers.parallelscan", "Cannot access descriptor"); + + if ((ob = BATdescriptor(*bid_out)) == NULL) + throw(MAL, "crackers.parallelscan", "Cannot access descriptor"); + + msg = CRKparallelscan_LE_@1( b, ob, *mid,(BUN) 0, BATcount(b)-1, nthreads); + + BBPkeepref(b->batCacheid); + *res = *bid; + return msg; +} @ * @- Functions shared within the crackers module/library @= crackInTwoUnorderedPieces_impl @@ -1639,6 +1667,140 @@ CRKcrackUnorderedZero_@2_@1( BAT *b, @1 return msg; } +static str CRKscan_x_@2_@1 ( + /* input */ + const BAT* buffer, /* attribute (array) */ + BAT* outputbuffer, /*output (array)*/ + const @1 pivot, /* pivot value for attribute*/ + BUN first, /* first position of to-be-cracked piece */ + BUN last /* last position of to-be-cracked piece */ +) { + BUN ReadCursor = first; + BUN WriteCursor = 0; + BUN isLessThan; + @1 *src_t = (@1 *) Tloc(buffer, BUNfirst(buffer)); + @1 *src_t_output = (@1 *) Tloc(outputbuffer, BUNfirst(outputbuffer)); + + assert(buffer && outputbuffer && src_t && src_t_output); + + for (ReadCursor = first; ReadCursor <= last; ReadCursor++){ + src_t_output[WriteCursor] = src_t[ReadCursor]; + isLessThan = (src_t[ReadCursor] @7 pivot); + WriteCursor += isLessThan; + } + + BATsetcount(outputbuffer,WriteCursor); + + BATseqbase(outputbuffer,0); + outputbuffer->tsorted = FALSE; + outputbuffer->trevsorted = FALSE; + outputbuffer->tdense = FALSE; + outputbuffer->T->nonil = TRUE; + outputbuffer->T->nil = FALSE; + + return MAL_SUCCEED; +} +/* crackThread for new multi-threaded crack code */ +static void* +scan_MT_crackThread_@2_@1 ( void *arg_p ) +{ + s_Thread_t *arg = (s_Thread_t*) arg_p; + @1 mval = * (@1*) arg->mval; + + /* call actual cracking routine for this slice */ + arg->msg = CRKscan_x_@2_@1 ( arg->b, arg->ob, mval, arg->first, arg->last); + + pthread_exit(NULL); + return NULL; +} +static str CRKscan_MT_@2_@1 (const BAT *b, BAT *ob, BUN first, BUN last, const @1 pivot, int nthreads) +{ + BUN n = last - first + 1; /* total # tuples / values */ + BUN mm; /* # tuples / values in tmp arrays */ + BUN f, l; /* first / last BUN per slice */ + pthread_t *s_Thread; /* threads array */ + s_Thread_t *s_Thread_arg; /* thread arguments array */ + int i; + + /* adjust nthreads */ + if (nthreads == 0) { + /* automatic setting */ + nthreads = GDKnr_threads; + } + if ((BUN) nthreads > n / 10) { + /* more threads / smaller slices does not make sense */ + nthreads = (int) (n / 1000) + 1; + } + + + mm = (n / nthreads); + + s_Thread = GDKmalloc (nthreads * sizeof(pthread_t)); + s_Thread_arg = GDKmalloc(nthreads * sizeof(s_Thread_t)); + if (!s_Thread || !s_Thread_arg) { + if (s_Thread) + GDKfree(s_Thread); + if (s_Thread_arg) + GDKfree(s_Thread_arg); + throw (MAL, "crackers.parallelscan", "CRKscan_MT_@2_@1(): GDKmalloc() failed."); + } + + /* initialize crackThread arguments */ + /* Alternative 1: each thread cracks one consecutive slice */ + for (i = 0, f = first, l = f + mm - 1; i < nthreads; i++, f += mm, l += mm) { + s_Thread_arg[i].b = b; + s_Thread_arg[i].mval = &pivot; + s_Thread_arg[i].first = f; + s_Thread_arg[i].last = (i < nthreads - 1) ? l : last; + s_Thread_arg[i].ob = (i == 0) ? ob : BATnew(TYPE_void, b->ttype,s_Thread_arg[i].last-s_Thread_arg[i].first+1); + } + + /* spawn crackThreads */ + for (i = 0; i < nthreads; i++) { + if (pthread_create(&s_Thread[i], NULL, scan_MT_crackThread_@2_@1, &s_Thread_arg[i])) { + GDKfree(s_Thread); + GDKfree(s_Thread_arg); + throw (MAL, "crackers.select", "CRKscan_MT_@2_@1(): Failed spaning crackThread %d.", i); + } + } + /* join crackThreads */ + for (i = 0; i < nthreads; i++) { + if (pthread_join(s_Thread[i], NULL)) { + GDKfree(s_Thread); + GDKfree(s_Thread_arg); + throw (MAL, "crackers.select", "CRKscan_MT_@2_@1(): Failed joining crackThread %d.", i); + } + } + /* check for & report failing crackThreads */ + for (i = 0; i < nthreads; i++) { + if (s_Thread_arg[i].msg != MAL_SUCCEED) { + GDKfree(s_Thread); + GDKfree(s_Thread_arg); + throw (MAL, "crackers.select", "CRKscan_MT_@2_@1(): crackThread %d failed with '%s'.", i, s_Thread_arg[i].msg); + } + } + + /*merge phase*/ + i=1; + while (i < nthreads) { + ob = BATappend(ob, s_Thread_arg[i].ob, FALSE); + i++; + } + + return MAL_SUCCEED; +} +/* parallel scan implementation dispatcher */ +str +CRKparallelscan_@2_@1( BAT *b, BAT *ob, @1 mval, BUN first, BUN last, int nthreads){ + str msg = MAL_SUCCEED; + assert(b && ob); + assert(last >= first); + + msg = CRKscan_MT_@2_@1(b, ob, first, last, mval, nthreads); + + return msg; +} + @ @= crackInThreeUnorderedPieces_impl str diff --git a/monetdb5/extras/crackers/crackers_select_ops.mx b/monetdb5/extras/crackers/crackers_select_ops.mx --- a/monetdb5/extras/crackers/crackers_select_ops.mx +++ b/monetdb5/extras/crackers/crackers_select_ops.mx @@ -72,6 +72,7 @@ crackers_export str CRKselect_@1_MT(int crackers_export str CRKuselect_@1_MT(int *vid, int *bid, @1 *low, @1 *hgh, const int *nthreads, const int *vector_elements); crackers_export str CRKthetaselect_@1_MT(int *vid, int *bid, @1 *val, str *op, const int *nthreads, const int *vector_elements); crackers_export str CRKthetauselect_@1_MT(int *vid, int *bid, @1 *val, str *op, const int *nthreads, const int *vector_elements); +crackers_export str CRKparallelscanselect_@1_MT(int *vid, int *bid, @1 *low, @1 *hgh, const int *nthreads); @ @c /* @@ -216,6 +217,15 @@ CRKthetauselect_@1_MT(int *vid, int *bid return CRKuselectBounds_@1_MT(vid, bid, low, high, &lin, &rin, nthreads, vector_elements); } +str +CRKparallelscanselect_@1_MT(int *vid, int *bid, @1 *low, @1 *hgh, const int *nthreads){ + bit inclusiveHgh = TRUE; + + if (!@2_EQ(low,ATOMnilptr(TYPE_@1),@3@1)) + throw(MAL, "crackers.ParallelScanSelect", "Not implemented"); + + return CRKRangeParallelScan_@1(vid, bid, hgh, &inclusiveHgh, *nthreads); +} @ * @- Local support functions and macros @@ -1124,4 +1134,28 @@ CRKRangeRightNilTree_@1(int *vid, int *b @:CreateResult()@ return MAL_SUCCEED; } +static str +CRKRangeParallelScan_@1(int *vid, int *bid, @1 *hgh, bit *inclusiveHgh, int nthreads){ + BAT *b, *ob; + (void)*vid; + + if ((b = BATdescriptor(*bid)) == NULL) + throw(MAL, "crackers.ParallelScanRange", "Cannot access descriptor"); + + ob = BATnew(TYPE_void, b->ttype,BATcount(b)); + assert(ob); + + if (*inclusiveHgh == TRUE) + CRKparallelscan_LE_@1(b, ob, *hgh, (BUN) 0, BATcount(b)-1, nthreads); + else + CRKparallelscan_RE_@1(b, ob, *hgh, (BUN) 0, BATcount(b)-1, nthreads); + + + BBPunfix(b->batCacheid); + + *vid = ob->batCacheid; + BBPkeepref(*vid); + + return MAL_SUCCEED; +} @ _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list