Changeset: 9a81b9b3eeab for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=9a81b9b3eeab
Modified Files:
        monetdb5/extras/rdf/rdfschema.c
        monetdb5/extras/rdf/rdfschema.h
Branch: rdf
Log Message:

Change the way of partitioning in finding csrel.

This is for multi-threaded implementation. However, the performance result is 
not as good as without multi-thread version


diffs (182 lines):

diff --git a/monetdb5/extras/rdf/rdfschema.c b/monetdb5/extras/rdf/rdfschema.c
--- a/monetdb5/extras/rdf/rdfschema.c
+++ b/monetdb5/extras/rdf/rdfschema.c
@@ -6171,13 +6171,15 @@ str RDFgetRefCounts(int *ret, BAT *sbat,
 
 pthread_mutex_t lock;
 
+//This function insert the relationships for freqCS modularly by the thread id
+//
 #if NEEDSUBCS
 static 
-str addRels_from_a_partition(int first, int last, BAT *sbat, BAT *pbat, BAT 
*obat, 
+str addRels_from_a_partition(int tid, int nthreads, int first, int last, BAT 
*sbat, BAT *pbat, BAT *obat, 
                oid *subjCSMap, oid *subjSubCSMap, SubCSSet *csSubCSSet, CSrel 
*csrelSet, BUN maxSoid, int maxNumPwithDup,int *csIdFreqIdxMap){
 #else
 static
-str addRels_from_a_partition(int first, int last, BAT *sbat, BAT *pbat, BAT 
*obat,
+str addRels_from_a_partition(int tid, int nthreads, int first, int last, BAT 
*sbat, BAT *pbat, BAT *obat,
                oid *subjCSMap, CSrel *csrelSet, BUN maxSoid, int 
maxNumPwithDup,int *csIdFreqIdxMap){
 #endif 
        
@@ -6196,7 +6198,9 @@ str addRels_from_a_partition(int first, 
        
        oid             *sbatCursor = NULL, *pbatCursor = NULL, *obatCursor = 
NULL; 
        int             p;
-
+       
+       (void) tid;
+       (void ) nthreads;
 
        buffTypes = (char *) malloc(sizeof(char) * (maxNumPwithDup + 1)); 
 
@@ -6215,6 +6219,8 @@ str addRels_from_a_partition(int first, 
                #if GETSUBCS_FORALL == 0
                if ( from == -1) continue; /* Do not consider infrequentCS */
                #endif
+               if (from % nthreads != tid) continue; 
+
                if (sbt != curS){
                        #if NEEDSUBCS
                        if (p != 0){    /* Not the first S */
@@ -6247,9 +6253,9 @@ str addRels_from_a_partition(int first, 
                                to = csIdFreqIdxMap[subjCSMap[realObjOid]];
                                if (objType == BLANKNODE) isBlankNode = 1;
 
-                               pthread_mutex_lock(&lock);
+                               //pthread_mutex_lock(&lock);
                                addReltoCSRel(from, to, pbt, &csrelSet[from], 
isBlankNode);
-                               pthread_mutex_unlock(&lock);
+                               //pthread_mutex_unlock(&lock);
                        }
                }
 
@@ -6288,9 +6294,9 @@ addRels_Thread(void *arg_p){
        printf("Start thread %d \n", arg->tid);
 
        #if NEEDSUBCS
-       addRels_from_a_partition(arg->first, arg->last, arg->sbat, arg->pbat, 
arg->obat, arg->subjCSMap, arg->subjSubCSMap, arg->csSubCSSet, arg->csrelSet, 
arg->maxSoid, arg->maxNumPwithDup, arg->csIdFreqIdxMap);
+       addRels_from_a_partition(arg->tid, arg->nthreads, arg->first, 
arg->last, arg->sbat, arg->pbat, arg->obat, arg->subjCSMap, arg->subjSubCSMap, 
arg->csSubCSSet, arg->csrelSet, arg->maxSoid, arg->maxNumPwithDup, 
arg->csIdFreqIdxMap);
        #else
-       addRels_from_a_partition(arg->first, arg->last, arg->sbat, arg->pbat, 
arg->obat, arg->subjCSMap, arg->csrelSet, arg->maxSoid, arg->maxNumPwithDup, 
arg->csIdFreqIdxMap);
+       addRels_from_a_partition(arg->tid, arg->nthreads, arg->first, 
arg->last, arg->sbat, arg->pbat, arg->obat, arg->subjCSMap, arg->csrelSet, 
arg->maxSoid, arg->maxNumPwithDup, arg->csIdFreqIdxMap);
        #endif
 
        pthread_exit(NULL);
@@ -6299,26 +6305,29 @@ addRels_Thread(void *arg_p){
 #if NEEDSUBCS
 static 
 str RDFrelationships(int *ret, BAT *sbat, BAT *pbat, BAT *obat, 
-               oid *subjCSMap, oid *subjSubCSMap, SubCSSet *csSubCSSet, CSrel 
*csrelSet, BUN maxSoid, int maxNumPwithDup,int *csIdFreqIdxMap){
+               oid *subjCSMap, oid *subjSubCSMap, SubCSSet *csSubCSSet, CSrel 
*csrelSet, BUN maxSoid, int maxNumPwithDup,int *csIdFreqIdxMap, int numFreqCS){
 #else
 static
 str RDFrelationships(int *ret, BAT *sbat, BAT *pbat, BAT *obat,
-               oid *subjCSMap, CSrel *csrelSet, BUN maxSoid, int 
maxNumPwithDup,int *csIdFreqIdxMap){
+               oid *subjCSMap, CSrel *csrelSet, BUN maxSoid, int 
maxNumPwithDup,int *csIdFreqIdxMap, int numFreqCS){
 #endif 
        
        int             i, first, last; 
-       oid             *sbatCursor = NULL; 
+       //oid           *sbatCursor = NULL; 
        csRelThreadArg  *threadArgs = NULL;
        pthread_t       *threads = NULL; 
        int             nthreads = NUMTHEAD_CSREL;
        int             ntp = 0;        //Number of triples per partition
-       int             tmplast =0; 
-
-       
+       //int           tmplast =0; 
+       
+       
+
+       /*
        if (pthread_mutex_init(&lock, NULL) != 0)
        {
                throw (MAL, "rdf.RDFrelationships", "Failed to create threads 
mutex");
        }
+       */
 
        if (BATcount(sbat) == 0) {
                throw(RDF, "rdf.RDFrelationships", "sbat must not be empty");
@@ -6326,18 +6335,24 @@ str RDFrelationships(int *ret, BAT *sbat
                 * cannot be dereferenced after the BATloop below */
        }
 
+       if (numFreqCS < 100){           //Don't use multi thread with small 
number of cs rels
+               nthreads = 1;
+       }
+
        first = 0; 
        last = BATcount(sbat) -1; 
        ntp = (last + 1)/nthreads;
        
        printf("Number of triples per partition %d\n", ntp);
 
-       sbatCursor = (oid *) Tloc(sbat, BUNfirst(sbat)); 
+       //sbatCursor = (oid *) Tloc(sbat, BUNfirst(sbat)); 
        threadArgs = (csRelThreadArg *) GDKmalloc(sizeof(csRelThreadArg) * 
nthreads);
        threads = (pthread_t *) GDKmalloc(sizeof(pthread_t) * nthreads);
        
        for (i = 0; i < nthreads; i++){
                threadArgs[i].tid = i; 
+               
+               /* Old code for partitioning the BAT of subjects
                threadArgs[i].first = (i == 0) ? first:(threadArgs[i-1].last + 
1);
                tmplast = (i == (nthreads - 1))?last: (ntp * (i + 1));
                //Go to all the triples of the current subjects
@@ -6346,6 +6361,10 @@ str RDFrelationships(int *ret, BAT *sbat
                        tmplast++;
                }
                threadArgs[i].last = tmplast;
+               */
+               threadArgs[i].first = first; 
+               threadArgs[i].last = last;
+               threadArgs[i].nthreads = nthreads;
 
                threadArgs[i].sbat = sbat;
                threadArgs[i].pbat = pbat;
@@ -6362,8 +6381,9 @@ str RDFrelationships(int *ret, BAT *sbat
                        
        }
 
-       //addRels_from_a_partition(first, last, sbat, pbat, obat, subjCSMap, 
csrelSet, maxSoid, maxNumPwithDup, csIdFreqIdxMap);
-       
+       
+       //addRels_from_a_partition(0, nthreads, first, last, sbat, pbat, obat, 
subjCSMap, csrelSet, maxSoid, maxNumPwithDup, csIdFreqIdxMap);
+
        for (i = 0; i < nthreads; i++) {
                 if (pthread_create(&threads[i], NULL, addRels_Thread, 
&threadArgs[i])) {
                        GDKfree(threadArgs);
@@ -6380,7 +6400,7 @@ str RDFrelationships(int *ret, BAT *sbat
                }
        }
        
-       pthread_mutex_destroy(&lock);
+       //pthread_mutex_destroy(&lock);
        GDKfree(threadArgs);
        GDKfree(threads);
 
@@ -9290,9 +9310,9 @@ RDFextractCSwithTypes(int *ret, bat *sba
 
        csSubCSSet = initCS_SubCSSets(*maxCSoid +1); 
 
-       RDFrelationships(ret, sbat, pbat, obat, *subjCSMap, subjSubCSMap, 
csSubCSSet, csrelSet, *maxSoid, *maxNumPwithDup, csIdFreqIdxMap);
+       RDFrelationships(ret, sbat, pbat, obat, *subjCSMap, subjSubCSMap, 
csSubCSSet, csrelSet, *maxSoid, *maxNumPwithDup, csIdFreqIdxMap, 
freqCSset->numCSadded);
        #else
-       RDFrelationships(ret, sbat, pbat, obat, *subjCSMap, csrelSet, *maxSoid, 
*maxNumPwithDup, csIdFreqIdxMap);
+       RDFrelationships(ret, sbat, pbat, obat, *subjCSMap, csrelSet, *maxSoid, 
*maxNumPwithDup, csIdFreqIdxMap, freqCSset->numCSadded);
        #endif
 
        curT = clock(); 
diff --git a/monetdb5/extras/rdf/rdfschema.h b/monetdb5/extras/rdf/rdfschema.h
--- a/monetdb5/extras/rdf/rdfschema.h
+++ b/monetdb5/extras/rdf/rdfschema.h
@@ -332,6 +332,7 @@ typedef struct CSrel{
 #define NUMTHEAD_CSREL 4
 typedef struct csRelThreadArg {
        int     tid; 
+       int     nthreads; 
        int     first; 
        int     last; 
        BAT     *sbat;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to