Changeset: 552aeee10375 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=552aeee10375 Modified Files: monetdb5/extras/crackers/crackers_multicore_unordered.mx monetdb5/extras/crackers/crackers_parallelselect_ops.mx Branch: holindex Log Message:
Implementation of multicore crackTwo algorithm. diffs (287 lines): diff --git a/monetdb5/extras/crackers/crackers_multicore_unordered.mx b/monetdb5/extras/crackers/crackers_multicore_unordered.mx --- a/monetdb5/extras/crackers/crackers_multicore_unordered.mx +++ b/monetdb5/extras/crackers/crackers_multicore_unordered.mx @@ -48,8 +48,8 @@ typedef struct { int *temp_array; int *data_less; int *data_greater; - oid* left; - oid* right; + oid left; + oid right; int tid; int *pivot; } thread_data_t; @@ -61,8 +61,8 @@ typedef struct { int *data_greater; int nthreads; /*number of threads*/ int n; /*number of array elements*/ - oid* left; - oid* right; + oid left; + oid right; int tid; int *pivot; } thread_data_new_t; @@ -95,7 +95,9 @@ crackers_export str CRKcrackUnorderedThr @= crackInTwoUnorderedPieces_decl str CRKcrackUnorderedZeroParallel_@2_@1( BAT *b, @1 mval, oid first, oid last, oid *pos); str CRKscanUnorderedZeroParallel_@2_@1( BAT *b, @1 *temp_array, @1 *mval, oid first, oid last, int nthreads, int *data_less, int *data_greater); +str CRKreorganizeUnorderedZeroParallel_@2_@1( BAT *b, @1 *temp_array, @1 *mval, oid first, oid last, int nthreads, int *data_less, int *data_greater); void *threadFuncScan_@2_@1(void *arg); +void *threadFuncReorganize_@2_@1(void *arg); @ @= crackInThreeUnorderedPieces_decl str CRKcrackUnorderedThreeParallel_@2_@3_@1( BAT *b, @1 low, @1 hgh, oid first, oid last, oid *posl, oid *posh); @@ -222,7 +224,6 @@ CRKscanUnorderedZeroParallel_@2_@1(BAT * int i; oid partition_elements; - oid *lh; thread_data_t *data; pthread_t **thread = (pthread_t **) malloc (sizeof(pthread_t *) * nthreads); @@ -231,7 +232,6 @@ CRKscanUnorderedZeroParallel_@2_@1(BAT * data = malloc (nthreads * sizeof(thread_data_t)); - lh = (oid*)Hloc(b, BUNfirst(b) + last); partition_elements=(last-first+1)/nthreads; for (i = 0; i < nthreads; i++) { @@ -239,12 +239,12 @@ CRKscanUnorderedZeroParallel_@2_@1(BAT * data[i].temp_array = temp_array; data[i].data_less = data_less; data[i].data_greater = data_greater; - data[i].left = (oid*)Hloc(b, BUNfirst(b) + i*partition_elements); + data[i].left = first + i*partition_elements; data[i].pivot = mval; if(i==(nthreads-1)) - data[i].right = lh; + data[i].right = last; else - data[i].right = (oid*)Hloc(b, BUNfirst(b) + (i*partition_elements)+partition_elements-1); + data[i].right = first + (i*partition_elements)+partition_elements-1; data[i].tid = i; @@ -270,20 +270,123 @@ CRKscanUnorderedZeroParallel_@2_@1(BAT * return MAL_SUCCEED; } +str +CRKreorganizeUnorderedZeroParallel_@2_@1(BAT *b, @1 *temp_array, @1 *mval, oid first, oid last, int nthreads,int *data_less, int* data_greater) { + + int i; + oid partition_elements; + thread_data_new_t *data; + + pthread_t **thread = (pthread_t **) malloc (sizeof(pthread_t *) * nthreads); + + /* Allocate memory for the pthread_ts. */ + for (i = 0; i < nthreads; i++) + thread[i] = (pthread_t *) malloc (sizeof(pthread_t)); + + /* Allocate memory for thread data. */ + data = malloc (nthreads * sizeof(thread_data_new_t)); + + /* Now create the threads. */ + partition_elements=((last-first)+1)/nthreads; + //printf("Each partition consists of %d elements (almost).\n",partition_elements); + for (i = 0; i < nthreads; i++) { + + data[i].array = b; + data[i].temp_array = temp_array; + data[i].data_less = data_less; + data[i].data_greater = data_greater; + data[i].n = last-first+1; + data[i].nthreads = nthreads; + data[i].left = first + i*partition_elements; + data[i].pivot = mval; + if(i==(nthreads-1)) + data[i].right = last; + else + data[i].right = first + (i*partition_elements)+partition_elements-1; + + data[i].tid = i; + + if (pthread_create (thread[i], NULL, threadFuncReorganize_@2_@1, &data[i])) + { + printf("Failed to create thread %d.\n",i); + exit (-1); + } + } + // Wait for threads + for (i = 0; i < nthreads; i++) { + if (pthread_join(*thread[i], NULL)) { + printf ("Error joining thread %d.\n", i); + exit (-1); + } + } + + /* Clean up. */ + for (i = 0; i < nthreads; i++) + free (thread[i]); + + free (thread); + return MAL_SUCCEED; +} + void *threadFuncScan_@2_@1(void *arg) { - oid *i; - thread_data_t *my_data = (thread_data_t *) arg; - for(i=my_data->left; i <= my_data->right; i++) - { - if(((@1 *)Tloc(my_data->array, BUNfirst(my_data->array))) > my_data->pivot) - my_data->data_greater[my_data->tid]++; - else - my_data->data_less[my_data->tid]++; + + thread_data_t *my_data = (thread_data_t *) arg; + @1 *ft; + oid *fh, *lh; + + /* set bounds for the iterator */ + ft = (@1 *)Tloc(my_data->array, BUNfirst(my_data->array) + my_data->left); + fh = (oid*)Hloc(my_data->array, BUNfirst(my_data->array) + my_data->left); + lh = (oid*)Hloc(my_data->array, BUNfirst(my_data->array) + my_data->right); + + while(fh<lh) { + if (@5_@3(ft, my_data->pivot,@6@1)){ + my_data->data_greater[my_data->tid]++; + } + else { + my_data->data_less[my_data->tid]++; + } + ft++; fh++; } + pthread_exit(NULL); } + +void *threadFuncReorganize_@2_@1(void *arg) { + int i; + int positionLess=0; + thread_data_new_t *my_data = (thread_data_new_t *) arg; + int positionGreater=my_data->n; + @1 *ft; + oid *fh, *lh; + + /* set bounds for the iterator */ + ft = (@1 *)Tloc(my_data->array, BUNfirst(my_data->array) + my_data->left); + fh = (oid*)Hloc(my_data->array, BUNfirst(my_data->array) + my_data->left); + lh = (oid*)Hloc(my_data->array, BUNfirst(my_data->array) + my_data->right); + + + for(i=0; i < my_data->nthreads && i < my_data->tid; i++) + positionLess=positionLess+my_data->data_less[i]; + for(i=0; i < my_data->nthreads && i <= my_data->tid; i++) + positionGreater=positionGreater - my_data->data_greater[i]; + + while(fh<lh) { + if (@5_@3(ft, my_data->pivot,@6@1)){ + //my_data->temp_array[positionGreater]=ft; + positionGreater++; + } + else { + //my_data->temp_array[positionLess]=ft; + positionLess++; + } + ft++; fh++; + } + + pthread_exit(NULL); +} @ @= crackInThreeUnorderedPieces_impl diff --git a/monetdb5/extras/crackers/crackers_parallelselect_ops.mx b/monetdb5/extras/crackers/crackers_parallelselect_ops.mx --- a/monetdb5/extras/crackers/crackers_parallelselect_ops.mx +++ b/monetdb5/extras/crackers/crackers_parallelselect_ops.mx @@ -132,10 +132,16 @@ CRKparalleluselectBounds_@1(int *vid, in @= crkTwoLTree /*CRACK in two pieces cl1-ch1 using >incLow bound*/ if (*inclusiveLow == TRUE) + { CRKcrackUnorderedZeroParallel@2_RE_@1(b,*low, cl1, ch1,&vl); + CRKscanUnorderedZeroParallel@2_RE_@1(b, p_new, low, cl1, ch1, nthreads, p_data_less, p_data_greater); + } else + { CRKcrackUnorderedZeroParallel@2_LE_@1(b,*low, cl1, ch1,&vl); - + CRKscanUnorderedZeroParallel@2_LE_@1(b, p_new, low, cl1, ch1, nthreads, p_data_less, p_data_greater); + } + if (vl < cl1){ /*then the left piece is empty*/ gapL = -1; @@ -152,9 +158,16 @@ CRKparalleluselectBounds_@1(int *vid, in @= crkTwoRTree /*CRACK in two pieces cl2-ch2 using <incHgh bound*/ if (*inclusiveHgh == TRUE) + { CRKcrackUnorderedZeroParallel@2_LE_@1(b,*hgh, cl2, ch2,&vh); + CRKscanUnorderedZeroParallel@2_LE_@1(b, p_new, hgh, cl2, ch2, nthreads, p_data_less, p_data_greater); + } else + { CRKcrackUnorderedZeroParallel@2_RE_@1(b,*hgh, cl2, ch2,&vh); + CRKscanUnorderedZeroParallel@2_RE_@1(b, p_new, hgh, cl2, ch2, nthreads, p_data_less, p_data_greater); + } + /*check for gaps*/ if (vh < cl2) /*then the left piece is empty*/ @@ -203,7 +216,18 @@ createView: BUN idxFirst; bit copy=TRUE; + int *p_new = NULL; + int *p_data_less = NULL; + int *p_data_greater = NULL; + int nthreads=2; + + int pieces=0; + + p_data_less=(int *)malloc(nthreads * sizeof(int)); + p_data_greater=(int *)malloc(nthreads * sizeof(int)); + + /*FILE *ofp; char outputFilename1[] = "/export/scratch2/petraki/experiments_1st_paper/experiments/stochastic/idle_time_2/pieces_cracking.txt"; ofp = fopen(outputFilename1,"a"); @@ -820,6 +844,14 @@ CRKRangeLeftNilTree_@1(int *vid, int *bi oid cl2=0, ch2=0; bit HBound,foundHgh=0; int gapH = 1; + + int *p_new = NULL; + int *p_data_less = NULL; + int *p_data_greater = NULL; + int nthreads=2; + + p_data_less=(int *)malloc(nthreads * sizeof(int)); + p_data_greater=(int *)malloc(nthreads * sizeof(int)); if (*inclusiveHgh == TRUE) HBound = FALSE; @@ -921,6 +953,15 @@ CRKRangeRightNilTree_@1(int *vid, int *b int gapL = 1; bit LBound=FALSE; + int *p_new = NULL; + int *p_data_less = NULL; + int *p_data_greater = NULL; + int nthreads=2; + + p_data_less=(int *)malloc(nthreads * sizeof(int)); + p_data_greater=(int *)malloc(nthreads * sizeof(int)); + + m = existsCrackerIndex(*bid); /* if this is the first time we parallelselect something from this bat, _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list