Changeset: d5294af6e7cb for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d5294af6e7cb
Modified Files:
        monetdb5/extras/crackers/crackers_multicore_unordered.mx
        monetdb5/extras/crackers/crackers_parallelselect_ops.mx
Branch: holindex
Log Message:

Add restore fuction for multicore cracking.


diffs (truncated from 353 to 300 lines):

diff --git a/monetdb5/extras/crackers/crackers_multicore_unordered.mx 
b/monetdb5/extras/crackers/crackers_multicore_unordered.mx
--- a/monetdb5/extras/crackers/crackers_multicore_unordered.mx
+++ b/monetdb5/extras/crackers/crackers_multicore_unordered.mx
@@ -69,6 +69,17 @@ typedef struct {
         int pivot;
 } thread_data_new_t;
 
+typedef struct {
+        BAT *array;
+        int *temp_array;
+        oid *temp_array_oid;
+        int nthreads; /*number of threads*/
+        oid first;
+        oid last;
+       int tempfirst;
+       int templast;
+        int tid;
+} thread_data_restore_t;
 
 /* Signatures shared within the crackers module/library */
 @:TypeSwitch(operations,_decl)@
@@ -97,9 +108,11 @@ crackers_export str CRKcrackUnorderedThr
 @= crackInTwoUnorderedPieces_decl
 str CRKcrackUnorderedZeroParallel_@2_@1( BAT *b, @1 mval, oid first, oid last, 
oid *pos);
 str CRKscanUnorderedZeroParallel_@2_@1( BAT *b, @1 *temp_array, oid 
*temp_array_oid, @1 mval, oid first, oid last, int nthreads, int *data_less, 
int *data_greater);
-str CRKreorganizeUnorderedZeroParallel_@2_@1( BAT *b, @1 *temp_array, oid 
*temp_arra_oid, @1 mval, oid first, oid last, int nthreads, int *data_less, int 
*data_greater);
+str CRKreorganizeUnorderedZeroParallel_@2_@1( BAT *b, @1 *temp_array, oid 
*temp_array_oid, @1 mval, oid first, oid last, int nthreads, int *data_less, 
int *data_greater);
+str CRKrestoreUnorderedZeroParallel_@2_@1( BAT *b, @1 *temp_array, oid 
*temp_array_oid, @1 mval, oid first, oid last, int nthreads, int *data_less, 
int *data_greater, oid *pos);
 void *threadFuncScan_@2_@1(void *arg);
 void *threadFuncReorganize_@2_@1(void *arg);
+void *threadFuncRestore_@2_@1(void *arg);
 @
 @= crackInThreeUnorderedPieces_decl
 str CRKcrackUnorderedThreeParallel_@2_@3_@1( BAT *b, @1 low, @1 hgh, oid 
first, oid last, oid *posl, oid *posh);
@@ -332,6 +345,90 @@ CRKreorganizeUnorderedZeroParallel_@2_@1
        return MAL_SUCCEED;
 }
 
+str
+CRKrestoreUnorderedZeroParallel_@2_@1(BAT *b, @1 *temp_array, oid 
*temp_array_oid, @1 mval, oid first, oid last, int nthreads,int *data_less, int 
*data_greater,oid *pos) {
+
+        int i;
+       @1 *lt, *t0;
+        oid partition_elements, position=0;
+        thread_data_restore_t *data;
+
+        pthread_t **thread = (pthread_t **) malloc (sizeof(pthread_t *) * 
nthreads);
+
+       (void) data_greater;
+
+        /* Allocate memory for the pthread_ts. */
+        for (i = 0; i < nthreads; i++)
+                thread[i] = (pthread_t *) malloc (sizeof(pthread_t));
+
+        /* Allocate memory for thread data. */
+        data = malloc (nthreads * sizeof(thread_data_restore_t));
+
+        /* Now create the threads. */
+        partition_elements=(last-first+1)/nthreads;
+        //printf("Each partition consists of %d elements 
(almost).\n",partition_elements);
+        for (i = 0; i < nthreads; i++) {
+
+                data[i].array = b;
+                data[i].temp_array = temp_array;
+                data[i].temp_array_oid = temp_array_oid;
+                data[i].nthreads = nthreads;
+                data[i].first = first + i*partition_elements;
+               data[i].tempfirst = i*partition_elements;
+                if(i==(nthreads-1))
+               {
+                        data[i].last = last;
+                       data[i].templast = partition_elements-1;
+               }
+               else
+               {
+                        data[i].last = first + 
(i*partition_elements)+partition_elements-1;
+                       data[i].templast = 
(i*partition_elements)+partition_elements-1;
+               }
+                data[i].tid = i;
+               if (pthread_create (thread[i], NULL, threadFuncRestore_@2_@1, 
&data[i]))
+                {
+                        printf("Failed to create thread %d.\n",i);
+                        exit (-1);
+                }
+        }
+        // Wait for threads
+        for (i = 0; i < nthreads; i++) {
+                if (pthread_join(*thread[i], NULL)) {
+                        printf ("Error joining thread %d.\n", i);
+                        exit (-1);
+                 }
+        }
+
+        /* Clean up. */
+        for (i = 0; i < nthreads; i++)
+                free (thread[i]);
+
+        free (thread);
+
+       for (i = 0; i < nthreads; i++)
+                position=position+data_less[i];
+
+       t0 = (@1 *)Tloc(b, BUNfirst(b));
+       lt = (@1 *)Tloc(b, BUNfirst(b) + first + position-1);
+//     *pos=(oid) (lt-t0);
+
+       if (@5_@4(lt, &mval,@6@1)){
+               if (lt==t0)
+                       *pos = (oid) BUNfirst(b);
+                else
+                       *pos = (oid) (lt - t0) - 1; /*works for empty left 
piece also*/
+        }
+        else{
+                *pos = (oid) (lt - t0);
+                if (*pos==last) /*empty right piece*/
+                      *pos = *pos + 1;
+       }
+
+
+        return MAL_SUCCEED;
+}
+
 
 void
 *threadFuncScan_@2_@1(void *arg) {
@@ -393,6 +490,27 @@ void *threadFuncReorganize_@2_@1(void *a
 
        pthread_exit(NULL);
 }
+
+void *threadFuncRestore_@2_@1(void *arg) {
+        thread_data_restore_t *my_data = (thread_data_restore_t *) arg;
+        @1  *ft;
+        oid *fh, *lh;
+
+        /* set bounds for the iterator */
+        ft = (@1 *)Tloc(my_data->array, BUNfirst(my_data->array) + 
my_data->first);
+        fh = (oid*)Hloc(my_data->array, BUNfirst(my_data->array) + 
my_data->first);
+        lh = (oid*)Hloc(my_data->array, BUNfirst(my_data->array) + 
my_data->last);
+
+        while(fh<=lh) {
+                *ft=my_data->temp_array[my_data->tempfirst];
+                *fh=my_data->temp_array_oid[my_data->tempfirst];
+                my_data->tempfirst++;
+                ft++; fh++;
+        }
+
+       pthread_exit(NULL);
+}
+
 @
 
 @= crackInThreeUnorderedPieces_impl
diff --git a/monetdb5/extras/crackers/crackers_parallelselect_ops.mx 
b/monetdb5/extras/crackers/crackers_parallelselect_ops.mx
--- a/monetdb5/extras/crackers/crackers_parallelselect_ops.mx
+++ b/monetdb5/extras/crackers/crackers_parallelselect_ops.mx
@@ -133,19 +133,21 @@ CRKparalleluselectBounds_@1(int *vid, in
        /*CRACK in two pieces cl1-ch1 using >incLow bound*/
        if (*inclusiveLow == TRUE)
        {
-               CRKcrackUnorderedZeroParallel@2_RE_@1(b,*low, cl1, ch1,&vl);
-               CRKscanUnorderedZeroParallel@2_RE_@1(b, p_new, p_new_oid, *low, 
cl1, ch1, nthreads, p_data_less, p_data_greater);
-               p_new=(int *)malloc((ch1-cl1) * sizeof(int));
-               p_new_oid=(oid *)malloc((ch1-cl1) * sizeof(oid));
-               CRKreorganizeUnorderedZeroParallel@2_RE_@1(b, p_new, p_new_oid, 
*low, cl1, ch1, nthreads, p_data_less, p_data_greater);
+               //CRKcrackUnorderedZeroParallel@2_RE_@1(b,*low, cl1, ch1,&vl);
+               CRKscanUnorderedZeroParallel@2_RE_@1(b, pl_new, pl_new_oid, 
*low, cl1, ch1, nthreads, p_data_less, p_data_greater);
+               pl_new=(int *)malloc((ch1-cl1) * sizeof(int));
+               pl_new_oid=(oid *)malloc((ch1-cl1) * sizeof(oid));
+               CRKreorganizeUnorderedZeroParallel@2_RE_@1(b, pl_new, 
pl_new_oid, *low, cl1, ch1, nthreads, p_data_less, p_data_greater);
+               CRKrestoreUnorderedZeroParallel@2_RE_@1(b, pl_new, pl_new_oid, 
*low, cl1, ch1, nthreads, p_data_less, p_data_greater, &vl);
        }
        else
        {
-               CRKcrackUnorderedZeroParallel@2_LE_@1(b,*low, cl1, ch1,&vl);
-               CRKscanUnorderedZeroParallel@2_LE_@1(b, p_new, p_new_oid, *low, 
cl1, ch1, nthreads, p_data_less, p_data_greater);
-               p_new=(int *)malloc((ch1-cl1) * sizeof(int));
-               p_new_oid=(oid *)malloc((ch1-cl1) * sizeof(oid));
-               CRKreorganizeUnorderedZeroParallel@2_LE_@1(b, p_new, p_new_oid, 
*low, cl1, ch1, nthreads, p_data_less, p_data_greater);
+               //CRKcrackUnorderedZeroParallel@2_LE_@1(b,*low, cl1, ch1,&vl);
+               CRKscanUnorderedZeroParallel@2_LE_@1(b, pl_new, pl_new_oid, 
*low, cl1, ch1, nthreads, p_data_less, p_data_greater);
+               pl_new=(int *)malloc((ch1-cl1) * sizeof(int));
+               pl_new_oid=(oid *)malloc((ch1-cl1) * sizeof(oid));
+               CRKreorganizeUnorderedZeroParallel@2_LE_@1(b, pl_new, 
pl_new_oid, *low, cl1, ch1, nthreads, p_data_less, p_data_greater);
+               CRKrestoreUnorderedZeroParallel@2_LE_@1(b, pl_new, pl_new_oid, 
*low, cl1, ch1, nthreads, p_data_less, p_data_greater, &vl);
        }
 
        if (vl < cl1){
@@ -165,19 +167,21 @@ CRKparalleluselectBounds_@1(int *vid, in
        /*CRACK in two pieces cl2-ch2 using <incHgh bound*/
        if (*inclusiveHgh == TRUE)
        {
-               CRKcrackUnorderedZeroParallel@2_LE_@1(b,*hgh, cl2, ch2,&vh);
-               CRKscanUnorderedZeroParallel@2_LE_@1(b, p_new, p_new_oid, *hgh, 
cl2, ch2, nthreads, p_data_less, p_data_greater);
-               p_new=(int *)malloc((ch2-cl2) * sizeof(int));
-               p_new_oid=(oid *)malloc((ch2-cl2) * sizeof(oid));
-               CRKreorganizeUnorderedZeroParallel@2_LE_@1(b, p_new, p_new_oid, 
*hgh, cl2, ch2, nthreads, p_data_less, p_data_greater);
-        }
+               //CRKcrackUnorderedZeroParallel@2_LE_@1(b,*hgh, cl2, ch2,&vh);
+               CRKscanUnorderedZeroParallel@2_LE_@1(b, pr_new, pr_new_oid, 
*hgh, cl2, ch2, nthreads, p_data_less, p_data_greater);
+               pr_new=(int *)malloc((ch2-cl2) * sizeof(int));
+               pr_new_oid=(oid *)malloc((ch2-cl2) * sizeof(oid));
+               CRKreorganizeUnorderedZeroParallel@2_LE_@1(b, pr_new, 
pr_new_oid, *hgh, cl2, ch2, nthreads, p_data_less, p_data_greater);
+                       CRKrestoreUnorderedZeroParallel@2_LE_@1(b, pr_new, 
pr_new_oid, *hgh, cl2, ch2, nthreads, p_data_less, p_data_greater, &vh);
+        }
        else
        {
-               CRKcrackUnorderedZeroParallel@2_RE_@1(b,*hgh, cl2, ch2,&vh);
-               CRKscanUnorderedZeroParallel@2_RE_@1(b, p_new, p_new_oid, *hgh, 
cl2, ch2, nthreads, p_data_less, p_data_greater);
-               p_new=(int *)malloc((ch2-cl2) * sizeof(int));
-               p_new_oid=(oid *)malloc((ch2-cl2) * sizeof(oid));
-               CRKreorganizeUnorderedZeroParallel@2_RE_@1(b, p_new, p_new_oid, 
*hgh, cl2, ch2, nthreads, p_data_less, p_data_greater);
+               //CRKcrackUnorderedZeroParallel@2_RE_@1(b,*hgh, cl2, ch2,&vh);
+               CRKscanUnorderedZeroParallel@2_RE_@1(b, pr_new, pr_new_oid, 
*hgh, cl2, ch2, nthreads, p_data_less, p_data_greater);
+               pr_new=(int *)malloc((ch2-cl2) * sizeof(int));
+               pr_new_oid=(oid *)malloc((ch2-cl2) * sizeof(oid));
+               CRKreorganizeUnorderedZeroParallel@2_RE_@1(b, pr_new, 
pr_new_oid, *hgh, cl2, ch2, nthreads, p_data_less, p_data_greater);
+               CRKrestoreUnorderedZeroParallel@2_RE_@1(b, pr_new, pr_new_oid, 
*hgh, cl2, ch2, nthreads, p_data_less, p_data_greater, &vh);
        }
 
        /*check for gaps*/
@@ -189,7 +193,6 @@ CRKparalleluselectBounds_@1(int *vid, in
                gapH = -1;
                /*vh--;*/
        }
-
 @
 @= CreateResult
 createView:
@@ -227,9 +230,12 @@ createView:
        struct Node *lowNode=NULL, *hghNode=NULL, *lowNodeNext=NULL, *temp;
        BUN idxFirst;
        bit copy=TRUE;
+       int i;
 
-       int *p_new = NULL;
-       oid *p_new_oid = NULL;
+       int *pl_new = NULL;
+       oid *pl_new_oid = NULL;
+       int *pr_new = NULL;
+        oid *pr_new_oid = NULL;
        int *p_data_less = NULL;
        int *p_data_greater = NULL;
        int nthreads=2;
@@ -486,34 +492,44 @@ createView:
        index then we have to crack */
        if (foundLow == 0 || foundHgh == 0) {
                if (foundLow == 0 && foundHgh == 0) {
-                       /* We have to do two cracks separatelly */
+                        /* We have to do two cracks separatelly */
 
-                       /* For the cl bound and the next one in the index*/
-                       @:crkTwoLTree@5(@1,@5)@
-                       t = (int *) Tloc(b, BUNfirst(b));
+                        /* For the cl bound and the next one in the index*/
+                        @:crkTwoLTree@5(@1,@5)@
+                        t = (int *) Tloc(b, BUNfirst(b));
 
-                       /* For the ch bound and the previous one in the index*/
-                       @:crkTwoRTree@5(@1,@5)@
+                       foundHgh = GetHgh_@1(*hgh, *inclusiveHgh, 
CrackerIndex[m].Tree, c, BUNfirst(c), &cl2, &ch2, 0, BUNlast(b)-(oid)1);
+                        if (cl2 != 0) cl2++;
+                        for(i=0;i<nthreads;i++)
+                        {
+                               p_data_less[i]=0;
+                               p_data_greater[i]=0;
+                        }
 
-                       if (IndexSize < IndexStop) {
-                               if (vl > 0)
-                                       _vl = vl - 1;
-                               else
-                                       _vl = vl;
-                               if (gapL>0)
-                               {
-                                       addCrackerIndex_@1(m, low, 
*inclusiveLow, _vl, c);
-                                       pieces = pieces + 1;
-                               }
-                               if (gapH>0)
-                               {
-                                       addCrackerIndex_@1(m, hgh, HBound, vh, 
c);
-                                       pieces = pieces + 1;
-                               }       
-                               if ((vl == 1) && (*t == *low) && (*inclusiveLow 
== TRUE))
-                                       vl = vl - 1;
-                       }
-               } else if (foundLow == 0) {
+
+                        /* For the ch bound and the previous one in the index*/
+                        @:crkTwoRTree@5(@1,@5)@
+
+                        if (IndexSize < IndexStop) {
+                                if (vl > 0)
+                                        _vl = vl - 1;
+                                else
+                                        _vl = vl;
+                                if (gapL>0)
+                                {
+                                        addCrackerIndex_@1(m, low, 
*inclusiveLow, _vl, c);
+                                        pieces = pieces + 1;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to