Changeset: d5294af6e7cb for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d5294af6e7cb Modified Files: monetdb5/extras/crackers/crackers_multicore_unordered.mx monetdb5/extras/crackers/crackers_parallelselect_ops.mx Branch: holindex Log Message:
Add restore fuction for multicore cracking. diffs (truncated from 353 to 300 lines): diff --git a/monetdb5/extras/crackers/crackers_multicore_unordered.mx b/monetdb5/extras/crackers/crackers_multicore_unordered.mx --- a/monetdb5/extras/crackers/crackers_multicore_unordered.mx +++ b/monetdb5/extras/crackers/crackers_multicore_unordered.mx @@ -69,6 +69,17 @@ typedef struct { int pivot; } thread_data_new_t; +typedef struct { + BAT *array; + int *temp_array; + oid *temp_array_oid; + int nthreads; /*number of threads*/ + oid first; + oid last; + int tempfirst; + int templast; + int tid; +} thread_data_restore_t; /* Signatures shared within the crackers module/library */ @:TypeSwitch(operations,_decl)@ @@ -97,9 +108,11 @@ crackers_export str CRKcrackUnorderedThr @= crackInTwoUnorderedPieces_decl str CRKcrackUnorderedZeroParallel_@2_@1( BAT *b, @1 mval, oid first, oid last, oid *pos); str CRKscanUnorderedZeroParallel_@2_@1( BAT *b, @1 *temp_array, oid *temp_array_oid, @1 mval, oid first, oid last, int nthreads, int *data_less, int *data_greater); -str CRKreorganizeUnorderedZeroParallel_@2_@1( BAT *b, @1 *temp_array, oid *temp_arra_oid, @1 mval, oid first, oid last, int nthreads, int *data_less, int *data_greater); +str CRKreorganizeUnorderedZeroParallel_@2_@1( BAT *b, @1 *temp_array, oid *temp_array_oid, @1 mval, oid first, oid last, int nthreads, int *data_less, int *data_greater); +str CRKrestoreUnorderedZeroParallel_@2_@1( BAT *b, @1 *temp_array, oid *temp_array_oid, @1 mval, oid first, oid last, int nthreads, int *data_less, int *data_greater, oid *pos); void *threadFuncScan_@2_@1(void *arg); void *threadFuncReorganize_@2_@1(void *arg); +void *threadFuncRestore_@2_@1(void *arg); @ @= crackInThreeUnorderedPieces_decl str CRKcrackUnorderedThreeParallel_@2_@3_@1( BAT *b, @1 low, @1 hgh, oid first, oid last, oid *posl, oid *posh); @@ -332,6 +345,90 @@ CRKreorganizeUnorderedZeroParallel_@2_@1 return MAL_SUCCEED; } +str +CRKrestoreUnorderedZeroParallel_@2_@1(BAT *b, @1 *temp_array, oid *temp_array_oid, @1 mval, oid first, oid last, int nthreads,int *data_less, int *data_greater,oid *pos) { + + int i; + @1 *lt, *t0; + oid partition_elements, position=0; + thread_data_restore_t *data; + + pthread_t **thread = (pthread_t **) malloc (sizeof(pthread_t *) * nthreads); + + (void) data_greater; + + /* Allocate memory for the pthread_ts. */ + for (i = 0; i < nthreads; i++) + thread[i] = (pthread_t *) malloc (sizeof(pthread_t)); + + /* Allocate memory for thread data. */ + data = malloc (nthreads * sizeof(thread_data_restore_t)); + + /* Now create the threads. */ + partition_elements=(last-first+1)/nthreads; + //printf("Each partition consists of %d elements (almost).\n",partition_elements); + for (i = 0; i < nthreads; i++) { + + data[i].array = b; + data[i].temp_array = temp_array; + data[i].temp_array_oid = temp_array_oid; + data[i].nthreads = nthreads; + data[i].first = first + i*partition_elements; + data[i].tempfirst = i*partition_elements; + if(i==(nthreads-1)) + { + data[i].last = last; + data[i].templast = partition_elements-1; + } + else + { + data[i].last = first + (i*partition_elements)+partition_elements-1; + data[i].templast = (i*partition_elements)+partition_elements-1; + } + data[i].tid = i; + if (pthread_create (thread[i], NULL, threadFuncRestore_@2_@1, &data[i])) + { + printf("Failed to create thread %d.\n",i); + exit (-1); + } + } + // Wait for threads + for (i = 0; i < nthreads; i++) { + if (pthread_join(*thread[i], NULL)) { + printf ("Error joining thread %d.\n", i); + exit (-1); + } + } + + /* Clean up. */ + for (i = 0; i < nthreads; i++) + free (thread[i]); + + free (thread); + + for (i = 0; i < nthreads; i++) + position=position+data_less[i]; + + t0 = (@1 *)Tloc(b, BUNfirst(b)); + lt = (@1 *)Tloc(b, BUNfirst(b) + first + position-1); +// *pos=(oid) (lt-t0); + + if (@5_@4(lt, &mval,@6@1)){ + if (lt==t0) + *pos = (oid) BUNfirst(b); + else + *pos = (oid) (lt - t0) - 1; /*works for empty left piece also*/ + } + else{ + *pos = (oid) (lt - t0); + if (*pos==last) /*empty right piece*/ + *pos = *pos + 1; + } + + + return MAL_SUCCEED; +} + void *threadFuncScan_@2_@1(void *arg) { @@ -393,6 +490,27 @@ void *threadFuncReorganize_@2_@1(void *a pthread_exit(NULL); } + +void *threadFuncRestore_@2_@1(void *arg) { + thread_data_restore_t *my_data = (thread_data_restore_t *) arg; + @1 *ft; + oid *fh, *lh; + + /* set bounds for the iterator */ + ft = (@1 *)Tloc(my_data->array, BUNfirst(my_data->array) + my_data->first); + fh = (oid*)Hloc(my_data->array, BUNfirst(my_data->array) + my_data->first); + lh = (oid*)Hloc(my_data->array, BUNfirst(my_data->array) + my_data->last); + + while(fh<=lh) { + *ft=my_data->temp_array[my_data->tempfirst]; + *fh=my_data->temp_array_oid[my_data->tempfirst]; + my_data->tempfirst++; + ft++; fh++; + } + + pthread_exit(NULL); +} + @ @= crackInThreeUnorderedPieces_impl diff --git a/monetdb5/extras/crackers/crackers_parallelselect_ops.mx b/monetdb5/extras/crackers/crackers_parallelselect_ops.mx --- a/monetdb5/extras/crackers/crackers_parallelselect_ops.mx +++ b/monetdb5/extras/crackers/crackers_parallelselect_ops.mx @@ -133,19 +133,21 @@ CRKparalleluselectBounds_@1(int *vid, in /*CRACK in two pieces cl1-ch1 using >incLow bound*/ if (*inclusiveLow == TRUE) { - CRKcrackUnorderedZeroParallel@2_RE_@1(b,*low, cl1, ch1,&vl); - CRKscanUnorderedZeroParallel@2_RE_@1(b, p_new, p_new_oid, *low, cl1, ch1, nthreads, p_data_less, p_data_greater); - p_new=(int *)malloc((ch1-cl1) * sizeof(int)); - p_new_oid=(oid *)malloc((ch1-cl1) * sizeof(oid)); - CRKreorganizeUnorderedZeroParallel@2_RE_@1(b, p_new, p_new_oid, *low, cl1, ch1, nthreads, p_data_less, p_data_greater); + //CRKcrackUnorderedZeroParallel@2_RE_@1(b,*low, cl1, ch1,&vl); + CRKscanUnorderedZeroParallel@2_RE_@1(b, pl_new, pl_new_oid, *low, cl1, ch1, nthreads, p_data_less, p_data_greater); + pl_new=(int *)malloc((ch1-cl1) * sizeof(int)); + pl_new_oid=(oid *)malloc((ch1-cl1) * sizeof(oid)); + CRKreorganizeUnorderedZeroParallel@2_RE_@1(b, pl_new, pl_new_oid, *low, cl1, ch1, nthreads, p_data_less, p_data_greater); + CRKrestoreUnorderedZeroParallel@2_RE_@1(b, pl_new, pl_new_oid, *low, cl1, ch1, nthreads, p_data_less, p_data_greater, &vl); } else { - CRKcrackUnorderedZeroParallel@2_LE_@1(b,*low, cl1, ch1,&vl); - CRKscanUnorderedZeroParallel@2_LE_@1(b, p_new, p_new_oid, *low, cl1, ch1, nthreads, p_data_less, p_data_greater); - p_new=(int *)malloc((ch1-cl1) * sizeof(int)); - p_new_oid=(oid *)malloc((ch1-cl1) * sizeof(oid)); - CRKreorganizeUnorderedZeroParallel@2_LE_@1(b, p_new, p_new_oid, *low, cl1, ch1, nthreads, p_data_less, p_data_greater); + //CRKcrackUnorderedZeroParallel@2_LE_@1(b,*low, cl1, ch1,&vl); + CRKscanUnorderedZeroParallel@2_LE_@1(b, pl_new, pl_new_oid, *low, cl1, ch1, nthreads, p_data_less, p_data_greater); + pl_new=(int *)malloc((ch1-cl1) * sizeof(int)); + pl_new_oid=(oid *)malloc((ch1-cl1) * sizeof(oid)); + CRKreorganizeUnorderedZeroParallel@2_LE_@1(b, pl_new, pl_new_oid, *low, cl1, ch1, nthreads, p_data_less, p_data_greater); + CRKrestoreUnorderedZeroParallel@2_LE_@1(b, pl_new, pl_new_oid, *low, cl1, ch1, nthreads, p_data_less, p_data_greater, &vl); } if (vl < cl1){ @@ -165,19 +167,21 @@ CRKparalleluselectBounds_@1(int *vid, in /*CRACK in two pieces cl2-ch2 using <incHgh bound*/ if (*inclusiveHgh == TRUE) { - CRKcrackUnorderedZeroParallel@2_LE_@1(b,*hgh, cl2, ch2,&vh); - CRKscanUnorderedZeroParallel@2_LE_@1(b, p_new, p_new_oid, *hgh, cl2, ch2, nthreads, p_data_less, p_data_greater); - p_new=(int *)malloc((ch2-cl2) * sizeof(int)); - p_new_oid=(oid *)malloc((ch2-cl2) * sizeof(oid)); - CRKreorganizeUnorderedZeroParallel@2_LE_@1(b, p_new, p_new_oid, *hgh, cl2, ch2, nthreads, p_data_less, p_data_greater); - } + //CRKcrackUnorderedZeroParallel@2_LE_@1(b,*hgh, cl2, ch2,&vh); + CRKscanUnorderedZeroParallel@2_LE_@1(b, pr_new, pr_new_oid, *hgh, cl2, ch2, nthreads, p_data_less, p_data_greater); + pr_new=(int *)malloc((ch2-cl2) * sizeof(int)); + pr_new_oid=(oid *)malloc((ch2-cl2) * sizeof(oid)); + CRKreorganizeUnorderedZeroParallel@2_LE_@1(b, pr_new, pr_new_oid, *hgh, cl2, ch2, nthreads, p_data_less, p_data_greater); + CRKrestoreUnorderedZeroParallel@2_LE_@1(b, pr_new, pr_new_oid, *hgh, cl2, ch2, nthreads, p_data_less, p_data_greater, &vh); + } else { - CRKcrackUnorderedZeroParallel@2_RE_@1(b,*hgh, cl2, ch2,&vh); - CRKscanUnorderedZeroParallel@2_RE_@1(b, p_new, p_new_oid, *hgh, cl2, ch2, nthreads, p_data_less, p_data_greater); - p_new=(int *)malloc((ch2-cl2) * sizeof(int)); - p_new_oid=(oid *)malloc((ch2-cl2) * sizeof(oid)); - CRKreorganizeUnorderedZeroParallel@2_RE_@1(b, p_new, p_new_oid, *hgh, cl2, ch2, nthreads, p_data_less, p_data_greater); + //CRKcrackUnorderedZeroParallel@2_RE_@1(b,*hgh, cl2, ch2,&vh); + CRKscanUnorderedZeroParallel@2_RE_@1(b, pr_new, pr_new_oid, *hgh, cl2, ch2, nthreads, p_data_less, p_data_greater); + pr_new=(int *)malloc((ch2-cl2) * sizeof(int)); + pr_new_oid=(oid *)malloc((ch2-cl2) * sizeof(oid)); + CRKreorganizeUnorderedZeroParallel@2_RE_@1(b, pr_new, pr_new_oid, *hgh, cl2, ch2, nthreads, p_data_less, p_data_greater); + CRKrestoreUnorderedZeroParallel@2_RE_@1(b, pr_new, pr_new_oid, *hgh, cl2, ch2, nthreads, p_data_less, p_data_greater, &vh); } /*check for gaps*/ @@ -189,7 +193,6 @@ CRKparalleluselectBounds_@1(int *vid, in gapH = -1; /*vh--;*/ } - @ @= CreateResult createView: @@ -227,9 +230,12 @@ createView: struct Node *lowNode=NULL, *hghNode=NULL, *lowNodeNext=NULL, *temp; BUN idxFirst; bit copy=TRUE; + int i; - int *p_new = NULL; - oid *p_new_oid = NULL; + int *pl_new = NULL; + oid *pl_new_oid = NULL; + int *pr_new = NULL; + oid *pr_new_oid = NULL; int *p_data_less = NULL; int *p_data_greater = NULL; int nthreads=2; @@ -486,34 +492,44 @@ createView: index then we have to crack */ if (foundLow == 0 || foundHgh == 0) { if (foundLow == 0 && foundHgh == 0) { - /* We have to do two cracks separatelly */ + /* We have to do two cracks separatelly */ - /* For the cl bound and the next one in the index*/ - @:crkTwoLTree@5(@1,@5)@ - t = (int *) Tloc(b, BUNfirst(b)); + /* For the cl bound and the next one in the index*/ + @:crkTwoLTree@5(@1,@5)@ + t = (int *) Tloc(b, BUNfirst(b)); - /* For the ch bound and the previous one in the index*/ - @:crkTwoRTree@5(@1,@5)@ + foundHgh = GetHgh_@1(*hgh, *inclusiveHgh, CrackerIndex[m].Tree, c, BUNfirst(c), &cl2, &ch2, 0, BUNlast(b)-(oid)1); + if (cl2 != 0) cl2++; + for(i=0;i<nthreads;i++) + { + p_data_less[i]=0; + p_data_greater[i]=0; + } - if (IndexSize < IndexStop) { - if (vl > 0) - _vl = vl - 1; - else - _vl = vl; - if (gapL>0) - { - addCrackerIndex_@1(m, low, *inclusiveLow, _vl, c); - pieces = pieces + 1; - } - if (gapH>0) - { - addCrackerIndex_@1(m, hgh, HBound, vh, c); - pieces = pieces + 1; - } - if ((vl == 1) && (*t == *low) && (*inclusiveLow == TRUE)) - vl = vl - 1; - } - } else if (foundLow == 0) { + + /* For the ch bound and the previous one in the index*/ + @:crkTwoRTree@5(@1,@5)@ + + if (IndexSize < IndexStop) { + if (vl > 0) + _vl = vl - 1; + else + _vl = vl; + if (gapL>0) + { + addCrackerIndex_@1(m, low, *inclusiveLow, _vl, c); + pieces = pieces + 1; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list