Changeset: 9a81b9b3eeab for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=9a81b9b3eeab Modified Files: monetdb5/extras/rdf/rdfschema.c monetdb5/extras/rdf/rdfschema.h Branch: rdf Log Message:
Change the way of partitioning in finding csrel. This is for multi-threaded implementation. However, the performance result is not as good as without multi-thread version diffs (182 lines): diff --git a/monetdb5/extras/rdf/rdfschema.c b/monetdb5/extras/rdf/rdfschema.c --- a/monetdb5/extras/rdf/rdfschema.c +++ b/monetdb5/extras/rdf/rdfschema.c @@ -6171,13 +6171,15 @@ str RDFgetRefCounts(int *ret, BAT *sbat, pthread_mutex_t lock; +//This function insert the relationships for freqCS modularly by the thread id +// #if NEEDSUBCS static -str addRels_from_a_partition(int first, int last, BAT *sbat, BAT *pbat, BAT *obat, +str addRels_from_a_partition(int tid, int nthreads, int first, int last, BAT *sbat, BAT *pbat, BAT *obat, oid *subjCSMap, oid *subjSubCSMap, SubCSSet *csSubCSSet, CSrel *csrelSet, BUN maxSoid, int maxNumPwithDup,int *csIdFreqIdxMap){ #else static -str addRels_from_a_partition(int first, int last, BAT *sbat, BAT *pbat, BAT *obat, +str addRels_from_a_partition(int tid, int nthreads, int first, int last, BAT *sbat, BAT *pbat, BAT *obat, oid *subjCSMap, CSrel *csrelSet, BUN maxSoid, int maxNumPwithDup,int *csIdFreqIdxMap){ #endif @@ -6196,7 +6198,9 @@ str addRels_from_a_partition(int first, oid *sbatCursor = NULL, *pbatCursor = NULL, *obatCursor = NULL; int p; - + + (void) tid; + (void ) nthreads; buffTypes = (char *) malloc(sizeof(char) * (maxNumPwithDup + 1)); @@ -6215,6 +6219,8 @@ str addRels_from_a_partition(int first, #if GETSUBCS_FORALL == 0 if ( from == -1) continue; /* Do not consider infrequentCS */ #endif + if (from % nthreads != tid) continue; + if (sbt != curS){ #if NEEDSUBCS if (p != 0){ /* Not the first S */ @@ -6247,9 +6253,9 @@ str addRels_from_a_partition(int first, to = csIdFreqIdxMap[subjCSMap[realObjOid]]; if (objType == BLANKNODE) isBlankNode = 1; - pthread_mutex_lock(&lock); + //pthread_mutex_lock(&lock); addReltoCSRel(from, to, pbt, &csrelSet[from], isBlankNode); - pthread_mutex_unlock(&lock); + //pthread_mutex_unlock(&lock); } } @@ -6288,9 +6294,9 @@ addRels_Thread(void *arg_p){ printf("Start thread %d \n", arg->tid); #if NEEDSUBCS - addRels_from_a_partition(arg->first, arg->last, arg->sbat, arg->pbat, arg->obat, arg->subjCSMap, arg->subjSubCSMap, arg->csSubCSSet, arg->csrelSet, arg->maxSoid, arg->maxNumPwithDup, arg->csIdFreqIdxMap); + addRels_from_a_partition(arg->tid, arg->nthreads, arg->first, arg->last, arg->sbat, arg->pbat, arg->obat, arg->subjCSMap, arg->subjSubCSMap, arg->csSubCSSet, arg->csrelSet, arg->maxSoid, arg->maxNumPwithDup, arg->csIdFreqIdxMap); #else - addRels_from_a_partition(arg->first, arg->last, arg->sbat, arg->pbat, arg->obat, arg->subjCSMap, arg->csrelSet, arg->maxSoid, arg->maxNumPwithDup, arg->csIdFreqIdxMap); + addRels_from_a_partition(arg->tid, arg->nthreads, arg->first, arg->last, arg->sbat, arg->pbat, arg->obat, arg->subjCSMap, arg->csrelSet, arg->maxSoid, arg->maxNumPwithDup, arg->csIdFreqIdxMap); #endif pthread_exit(NULL); @@ -6299,26 +6305,29 @@ addRels_Thread(void *arg_p){ #if NEEDSUBCS static str RDFrelationships(int *ret, BAT *sbat, BAT *pbat, BAT *obat, - oid *subjCSMap, oid *subjSubCSMap, SubCSSet *csSubCSSet, CSrel *csrelSet, BUN maxSoid, int maxNumPwithDup,int *csIdFreqIdxMap){ + oid *subjCSMap, oid *subjSubCSMap, SubCSSet *csSubCSSet, CSrel *csrelSet, BUN maxSoid, int maxNumPwithDup,int *csIdFreqIdxMap, int numFreqCS){ #else static str RDFrelationships(int *ret, BAT *sbat, BAT *pbat, BAT *obat, - oid *subjCSMap, CSrel *csrelSet, BUN maxSoid, int maxNumPwithDup,int *csIdFreqIdxMap){ + oid *subjCSMap, CSrel *csrelSet, BUN maxSoid, int maxNumPwithDup,int *csIdFreqIdxMap, int numFreqCS){ #endif int i, first, last; - oid *sbatCursor = NULL; + //oid *sbatCursor = NULL; csRelThreadArg *threadArgs = NULL; pthread_t *threads = NULL; int nthreads = NUMTHEAD_CSREL; int ntp = 0; //Number of triples per partition - int tmplast =0; - - + //int tmplast =0; + + + + /* if (pthread_mutex_init(&lock, NULL) != 0) { throw (MAL, "rdf.RDFrelationships", "Failed to create threads mutex"); } + */ if (BATcount(sbat) == 0) { throw(RDF, "rdf.RDFrelationships", "sbat must not be empty"); @@ -6326,18 +6335,24 @@ str RDFrelationships(int *ret, BAT *sbat * cannot be dereferenced after the BATloop below */ } + if (numFreqCS < 100){ //Don't use multi thread with small number of cs rels + nthreads = 1; + } + first = 0; last = BATcount(sbat) -1; ntp = (last + 1)/nthreads; printf("Number of triples per partition %d\n", ntp); - sbatCursor = (oid *) Tloc(sbat, BUNfirst(sbat)); + //sbatCursor = (oid *) Tloc(sbat, BUNfirst(sbat)); threadArgs = (csRelThreadArg *) GDKmalloc(sizeof(csRelThreadArg) * nthreads); threads = (pthread_t *) GDKmalloc(sizeof(pthread_t) * nthreads); for (i = 0; i < nthreads; i++){ threadArgs[i].tid = i; + + /* Old code for partitioning the BAT of subjects threadArgs[i].first = (i == 0) ? first:(threadArgs[i-1].last + 1); tmplast = (i == (nthreads - 1))?last: (ntp * (i + 1)); //Go to all the triples of the current subjects @@ -6346,6 +6361,10 @@ str RDFrelationships(int *ret, BAT *sbat tmplast++; } threadArgs[i].last = tmplast; + */ + threadArgs[i].first = first; + threadArgs[i].last = last; + threadArgs[i].nthreads = nthreads; threadArgs[i].sbat = sbat; threadArgs[i].pbat = pbat; @@ -6362,8 +6381,9 @@ str RDFrelationships(int *ret, BAT *sbat } - //addRels_from_a_partition(first, last, sbat, pbat, obat, subjCSMap, csrelSet, maxSoid, maxNumPwithDup, csIdFreqIdxMap); - + + //addRels_from_a_partition(0, nthreads, first, last, sbat, pbat, obat, subjCSMap, csrelSet, maxSoid, maxNumPwithDup, csIdFreqIdxMap); + for (i = 0; i < nthreads; i++) { if (pthread_create(&threads[i], NULL, addRels_Thread, &threadArgs[i])) { GDKfree(threadArgs); @@ -6380,7 +6400,7 @@ str RDFrelationships(int *ret, BAT *sbat } } - pthread_mutex_destroy(&lock); + //pthread_mutex_destroy(&lock); GDKfree(threadArgs); GDKfree(threads); @@ -9290,9 +9310,9 @@ RDFextractCSwithTypes(int *ret, bat *sba csSubCSSet = initCS_SubCSSets(*maxCSoid +1); - RDFrelationships(ret, sbat, pbat, obat, *subjCSMap, subjSubCSMap, csSubCSSet, csrelSet, *maxSoid, *maxNumPwithDup, csIdFreqIdxMap); + RDFrelationships(ret, sbat, pbat, obat, *subjCSMap, subjSubCSMap, csSubCSSet, csrelSet, *maxSoid, *maxNumPwithDup, csIdFreqIdxMap, freqCSset->numCSadded); #else - RDFrelationships(ret, sbat, pbat, obat, *subjCSMap, csrelSet, *maxSoid, *maxNumPwithDup, csIdFreqIdxMap); + RDFrelationships(ret, sbat, pbat, obat, *subjCSMap, csrelSet, *maxSoid, *maxNumPwithDup, csIdFreqIdxMap, freqCSset->numCSadded); #endif curT = clock(); diff --git a/monetdb5/extras/rdf/rdfschema.h b/monetdb5/extras/rdf/rdfschema.h --- a/monetdb5/extras/rdf/rdfschema.h +++ b/monetdb5/extras/rdf/rdfschema.h @@ -332,6 +332,7 @@ typedef struct CSrel{ #define NUMTHEAD_CSREL 4 typedef struct csRelThreadArg { int tid; + int nthreads; int first; int last; BAT *sbat; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list