Changeset: 552aeee10375 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=552aeee10375
Modified Files:
        monetdb5/extras/crackers/crackers_multicore_unordered.mx
        monetdb5/extras/crackers/crackers_parallelselect_ops.mx
Branch: holindex
Log Message:

Implementation of multicore crackTwo algorithm.


diffs (287 lines):

diff --git a/monetdb5/extras/crackers/crackers_multicore_unordered.mx 
b/monetdb5/extras/crackers/crackers_multicore_unordered.mx
--- a/monetdb5/extras/crackers/crackers_multicore_unordered.mx
+++ b/monetdb5/extras/crackers/crackers_multicore_unordered.mx
@@ -48,8 +48,8 @@ typedef struct {
         int *temp_array;
         int *data_less;
         int *data_greater;
-        oid* left;
-        oid* right;
+        oid left;
+        oid right;
         int tid;
         int *pivot;
 } thread_data_t;
@@ -61,8 +61,8 @@ typedef struct {
         int *data_greater;
         int nthreads; /*number of threads*/
         int n; /*number of array elements*/
-        oid* left;
-        oid* right;
+        oid left;
+        oid right;
         int tid;
         int *pivot;
 } thread_data_new_t;
@@ -95,7 +95,9 @@ crackers_export str CRKcrackUnorderedThr
 @= crackInTwoUnorderedPieces_decl
 str CRKcrackUnorderedZeroParallel_@2_@1( BAT *b, @1 mval, oid first, oid last, 
oid *pos);
 str CRKscanUnorderedZeroParallel_@2_@1( BAT *b, @1 *temp_array, @1 *mval, oid 
first, oid last, int nthreads, int *data_less, int *data_greater);
+str CRKreorganizeUnorderedZeroParallel_@2_@1( BAT *b, @1 *temp_array, @1 
*mval, oid first, oid last, int nthreads, int *data_less, int *data_greater);
 void *threadFuncScan_@2_@1(void *arg);
+void *threadFuncReorganize_@2_@1(void *arg);
 @
 @= crackInThreeUnorderedPieces_decl
 str CRKcrackUnorderedThreeParallel_@2_@3_@1( BAT *b, @1 low, @1 hgh, oid 
first, oid last, oid *posl, oid *posh);
@@ -222,7 +224,6 @@ CRKscanUnorderedZeroParallel_@2_@1(BAT *
 
        int i;
        oid partition_elements;
-       oid *lh;
         thread_data_t *data;
 
         pthread_t **thread = (pthread_t **) malloc (sizeof(pthread_t *) * 
nthreads);
@@ -231,7 +232,6 @@ CRKscanUnorderedZeroParallel_@2_@1(BAT *
 
         data = malloc (nthreads * sizeof(thread_data_t));
 
-        lh = (oid*)Hloc(b, BUNfirst(b) + last);
 
         partition_elements=(last-first+1)/nthreads;
         for (i = 0; i < nthreads; i++) {
@@ -239,12 +239,12 @@ CRKscanUnorderedZeroParallel_@2_@1(BAT *
                 data[i].temp_array = temp_array;
                 data[i].data_less = data_less;
                 data[i].data_greater = data_greater;
-                data[i].left = (oid*)Hloc(b, BUNfirst(b) + 
i*partition_elements);
+                data[i].left = first + i*partition_elements;
                 data[i].pivot = mval;
                 if(i==(nthreads-1))
-                        data[i].right = lh;
+                        data[i].right = last;
                 else
-                        data[i].right = (oid*)Hloc(b, BUNfirst(b) + 
(i*partition_elements)+partition_elements-1);
+                        data[i].right = first + 
(i*partition_elements)+partition_elements-1;
 
                 data[i].tid = i;
 
@@ -270,20 +270,123 @@ CRKscanUnorderedZeroParallel_@2_@1(BAT *
 
        return MAL_SUCCEED;
 }
+str 
+CRKreorganizeUnorderedZeroParallel_@2_@1(BAT *b, @1 *temp_array, @1 *mval, oid 
first, oid last, int nthreads,int *data_less, int* data_greater) {
+
+       int i;
+       oid partition_elements;
+       thread_data_new_t *data;
+
+       pthread_t **thread = (pthread_t **) malloc (sizeof(pthread_t *) * 
nthreads);
+
+       /* Allocate memory for the pthread_ts. */
+       for (i = 0; i < nthreads; i++) 
+               thread[i] = (pthread_t *) malloc (sizeof(pthread_t));
+       
+       /* Allocate memory for thread data. */
+       data = malloc (nthreads * sizeof(thread_data_new_t));
+
+       /* Now create the threads. */
+       partition_elements=((last-first)+1)/nthreads;
+        //printf("Each partition consists of %d elements 
(almost).\n",partition_elements);     
+       for (i = 0; i < nthreads; i++) {
+
+               data[i].array = b;
+               data[i].temp_array = temp_array; 
+               data[i].data_less = data_less;
+               data[i].data_greater = data_greater;    
+               data[i].n = last-first+1;
+               data[i].nthreads = nthreads; 
+               data[i].left = first + i*partition_elements;
+                data[i].pivot = mval;
+                if(i==(nthreads-1))
+                        data[i].right = last;
+                else
+                        data[i].right = first + 
(i*partition_elements)+partition_elements-1;
+
+                data[i].tid = i;
+
+               if (pthread_create (thread[i], NULL, 
threadFuncReorganize_@2_@1, &data[i]))
+               {
+                       printf("Failed to create thread %d.\n",i);
+                       exit (-1);
+               }
+       }
+       // Wait for threads
+       for (i = 0; i < nthreads; i++) {
+               if (pthread_join(*thread[i], NULL)) {
+                       printf ("Error joining thread %d.\n", i);
+                       exit (-1);
+                }
+       }
+
+       /* Clean up. */
+       for (i = 0; i < nthreads; i++)
+               free (thread[i]);
+       
+       free (thread);
+       return MAL_SUCCEED;
+}
+
 
 void
 *threadFuncScan_@2_@1(void *arg) {
-        oid *i;
-        thread_data_t *my_data = (thread_data_t *) arg;
-        for(i=my_data->left; i <= my_data->right; i++)
-        {
-                if(((@1 *)Tloc(my_data->array, BUNfirst(my_data->array))) > 
my_data->pivot)
-                        my_data->data_greater[my_data->tid]++;
-                else
-                        my_data->data_less[my_data->tid]++;
+        
+       thread_data_t *my_data = (thread_data_t *) arg;
+       @1  *ft;
+        oid *fh, *lh;
+
+        /* set bounds for the iterator */
+        ft = (@1 *)Tloc(my_data->array, BUNfirst(my_data->array) + 
my_data->left);
+        fh = (oid*)Hloc(my_data->array, BUNfirst(my_data->array) + 
my_data->left);
+        lh = (oid*)Hloc(my_data->array, BUNfirst(my_data->array) + 
my_data->right);
+
+        while(fh<lh) {
+                if (@5_@3(ft, my_data->pivot,@6@1)){
+                       my_data->data_greater[my_data->tid]++;
+                }
+                else {
+                       my_data->data_less[my_data->tid]++;
+                }
+               ft++; fh++;
         }
+        
         pthread_exit(NULL);
 }
+
+void *threadFuncReorganize_@2_@1(void *arg) {
+       int i;
+       int positionLess=0;
+       thread_data_new_t *my_data = (thread_data_new_t *) arg;
+        int positionGreater=my_data->n;
+       @1  *ft;
+        oid *fh, *lh;
+
+        /* set bounds for the iterator */
+        ft = (@1 *)Tloc(my_data->array, BUNfirst(my_data->array) + 
my_data->left);
+        fh = (oid*)Hloc(my_data->array, BUNfirst(my_data->array) + 
my_data->left);
+        lh = (oid*)Hloc(my_data->array, BUNfirst(my_data->array) + 
my_data->right);
+
+
+       for(i=0; i < my_data->nthreads && i < my_data->tid; i++)
+               positionLess=positionLess+my_data->data_less[i];
+       for(i=0; i < my_data->nthreads && i <= my_data->tid; i++)
+               positionGreater=positionGreater - my_data->data_greater[i];
+
+       while(fh<lh) {
+                if (@5_@3(ft, my_data->pivot,@6@1)){
+                       //my_data->temp_array[positionGreater]=ft;
+                       positionGreater++;
+               }
+                else {
+                       //my_data->temp_array[positionLess]=ft;
+                        positionLess++;
+                }
+                ft++; fh++;
+        }
+
+       pthread_exit(NULL);
+}
 @
 
 @= crackInThreeUnorderedPieces_impl
diff --git a/monetdb5/extras/crackers/crackers_parallelselect_ops.mx 
b/monetdb5/extras/crackers/crackers_parallelselect_ops.mx
--- a/monetdb5/extras/crackers/crackers_parallelselect_ops.mx
+++ b/monetdb5/extras/crackers/crackers_parallelselect_ops.mx
@@ -132,10 +132,16 @@ CRKparalleluselectBounds_@1(int *vid, in
 @= crkTwoLTree
        /*CRACK in two pieces cl1-ch1 using >incLow bound*/
        if (*inclusiveLow == TRUE)
+       {
                CRKcrackUnorderedZeroParallel@2_RE_@1(b,*low, cl1, ch1,&vl);
+               CRKscanUnorderedZeroParallel@2_RE_@1(b, p_new, low, cl1, ch1, 
nthreads, p_data_less, p_data_greater);
+       }
        else
+       {
                CRKcrackUnorderedZeroParallel@2_LE_@1(b,*low, cl1, ch1,&vl);
-       
+               CRKscanUnorderedZeroParallel@2_LE_@1(b, p_new, low, cl1, ch1, 
nthreads, p_data_less, p_data_greater);
+        }
+
        if (vl < cl1){
                /*then the left piece is empty*/
                gapL = -1;
@@ -152,9 +158,16 @@ CRKparalleluselectBounds_@1(int *vid, in
 @= crkTwoRTree
        /*CRACK in two pieces cl2-ch2 using <incHgh bound*/
        if (*inclusiveHgh == TRUE)
+       {
                CRKcrackUnorderedZeroParallel@2_LE_@1(b,*hgh, cl2, ch2,&vh);
+               CRKscanUnorderedZeroParallel@2_LE_@1(b, p_new, hgh, cl2, ch2, 
nthreads, p_data_less, p_data_greater);
+        }
        else
+       {
                CRKcrackUnorderedZeroParallel@2_RE_@1(b,*hgh, cl2, ch2,&vh);
+               CRKscanUnorderedZeroParallel@2_RE_@1(b, p_new, hgh, cl2, ch2, 
nthreads, p_data_less, p_data_greater);
+        }
+
        /*check for gaps*/
        if (vh < cl2)
                /*then the left piece is empty*/
@@ -203,7 +216,18 @@ createView:
        BUN idxFirst;
        bit copy=TRUE;
 
+       int *p_new = NULL;
+       int *p_data_less = NULL;
+       int *p_data_greater = NULL;
+       int nthreads=2;
+
+
        int pieces=0;
+       
+       p_data_less=(int *)malloc(nthreads * sizeof(int));
+       p_data_greater=(int *)malloc(nthreads * sizeof(int));
+
+
        /*FILE *ofp;
        char outputFilename1[] = 
"/export/scratch2/petraki/experiments_1st_paper/experiments/stochastic/idle_time_2/pieces_cracking.txt";
        ofp = fopen(outputFilename1,"a");
@@ -820,6 +844,14 @@ CRKRangeLeftNilTree_@1(int *vid, int *bi
        oid cl2=0, ch2=0;
        bit HBound,foundHgh=0;
        int gapH = 1;
+
+       int *p_new = NULL;
+        int *p_data_less = NULL;
+        int *p_data_greater = NULL;
+        int nthreads=2;
+
+        p_data_less=(int *)malloc(nthreads * sizeof(int));
+        p_data_greater=(int *)malloc(nthreads * sizeof(int));
        
        
        if (*inclusiveHgh == TRUE) HBound = FALSE;
@@ -921,6 +953,15 @@ CRKRangeRightNilTree_@1(int *vid, int *b
        int gapL = 1;
        bit LBound=FALSE;
 
+       int *p_new = NULL;
+        int *p_data_less = NULL;
+        int *p_data_greater = NULL;
+        int nthreads=2;
+
+        p_data_less=(int *)malloc(nthreads * sizeof(int));
+        p_data_greater=(int *)malloc(nthreads * sizeof(int));
+
+
        m = existsCrackerIndex(*bid);
 
        /* if this is the first time we parallelselect something from this bat,
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to