Changeset: 8d34e1cf03b0 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8d34e1cf03b0
Modified Files:
        monetdb5/extras/crackers/crackers_selectholpl_ops.mx
Branch: holindex
Log Message:

Make CRKRangeLeftNilTree_@1 and CRKRangeRightNilTree_@1 thread safe in holistic 
operators.


diffs (truncated from 305 to 300 lines):

diff --git a/monetdb5/extras/crackers/crackers_selectholpl_ops.mx 
b/monetdb5/extras/crackers/crackers_selectholpl_ops.mx
--- a/monetdb5/extras/crackers/crackers_selectholpl_ops.mx
+++ b/monetdb5/extras/crackers/crackers_selectholpl_ops.mx
@@ -1363,12 +1363,26 @@ CRKRangeLeftNilTree_@1(int *vid, int *bi
        bit HBound,foundHgh=0;
        int gapH = 1;
        int createIndex=0;      
+       struct Node *lowNode=NULL;
+       pthread_rwlock_t *plock=0;
+       BUN idxFirst = BUN_NONE;
+       int firstRetry=0;
+       int L1=0;
+       int countBatElements=0;
+       FrequencyNode* FN;
+       FrequencyNode *FrequencyStructA = getFrequencyStruct('A');
+
+       if ((bo = BATdescriptor(*bid)) != NULL)
+               L1=32000/ATOMsize(bo->ttype);
+
+        MT_lock_set(&CRKIndexLock,"Cracker Index Lock");
+
        if (*inclusiveHgh == TRUE) HBound = FALSE;
        else    HBound = TRUE;                          
+
        m = existsCrackerIndex(*bid);
-       assert(0);
 
-       /* if this is the first time we selectholpl something from this bat,
+       /* if this is the first time we selectpl something from this bat,
                we have to create the crack indx for it and
           if necessary materialize the head of the bat */
        if (m == -1){
@@ -1396,12 +1410,21 @@ CRKRangeLeftNilTree_@1(int *vid, int *bi
                if (gapH>0) addCrackerIndex_@1(m,hgh,HBound,vh,c);
                vl = BUNfirst(b);
                
+               createIndex=1;
+               
+               countBatElements=BATcount(b);   
+               FN=searchBAT(FrequencyStructA,*bid);                            
        
+               FN->f1 = FN->f1 + 1;
+               FN->c = FN->c + 1;
+               FN->weight = changeWeight_1(FN,countBatElements,L1);    
+
                BBPincref(b->batCacheid,TRUE);
                BBPunfix(bo->batCacheid);
-               goto createView;
+               goto tempcreateViewL;
        }
        
        if (CrackerIndex[m].cid == -1){
+               assert(0);
                reCreateMap_@1(m);
        
                /* Take the index of the bat */
@@ -1418,7 +1441,7 @@ CRKRangeLeftNilTree_@1(int *vid, int *bi
                if (gapH>0) addCrackerIndex_@1(m,hgh,HBound,vh,c);
                vl = BUNfirst(b);
 
-               goto createView;
+               goto tempcreateViewL;
        }
 
        /* Take the index of the bat */
@@ -1429,23 +1452,83 @@ CRKRangeLeftNilTree_@1(int *vid, int *bi
        if ((b = BATdescriptor(CrackerIndex[m].cbid)) == NULL)
                throw(MAL, "crackers.crackRange", "Cannot access crack index");
 
+       idxFirst = BUNfirst(c);
+
+       MT_lock_unset(&CRKIndexLock,"Cracker Index Lock");
+
        vl = BUNfirst(b);
 
+       retryL:
+
+       pthread_rwlock_rdlock(&CrackerIndex[m].columnRWLock);
+
+       FN=searchBAT(FrequencyStructA,*bid);
+       
+       if (firstRetry==0)
+       {
+               FN->f1 = FN->f1 + 1;
+       }
+
        /* find out where in the index the high falls */
        foundHgh = GetHgh_@1(*hgh, HBound, CrackerIndex[m].Tree, c, 
BUNfirst(c), &cl2, &ch2, 0, BUNlast(b)-(oid)1);
 
+       if((foundHgh!=0)&&(firstRetry==0)) FN->f2 = FN->f2 + 1;
+
+       pthread_rwlock_unlock(&CrackerIndex[m].columnRWLock);
+
        /*need to increase one position for the low bound only since we always 
store the previous position in the index*/
        if (cl2 != 0) cl2++;
 
        if (foundHgh == 0){
+               pthread_rwlock_rdlock(&CrackerIndex[m].columnRWLock);
+                lowNode = findNodeL_@1(*hgh, TRUE, CrackerIndex[m].Tree, c, 
idxFirst, NULL);
+                plock = (lowNode == NULL)? &CRKFirstPieceRWLock : 
&lowNode->pieceLock;
+                if (pthread_rwlock_trywrlock(plock)){
+                       pthread_rwlock_unlock(&CrackerIndex[m].columnRWLock);
+                        pthread_rwlock_wrlock(plock);
+                        pthread_rwlock_unlock(plock);
+                       firstRetry=1;
+                        goto retryL;     // some other thread might already 
crack on low, try to find it again
+                }
+                pthread_rwlock_unlock(&CrackerIndex[m].columnRWLock);
                @:crkTwoRTree(@1)@
+               pthread_rwlock_wrlock(&CrackerIndex[m].columnRWLock);
                if (IndexSize <IndexStop)
-                        if(gapH>0) addCrackerIndex_@1(m,hgh,HBound,vh,c);
+               {
+                       if(gapH>0)
+                       {
+                               addCrackerIndex_@1(m,hgh,HBound,vh,c);
+                               FN->c = FN->c + 1;
+                               countBatElements=BATcount(b);
+                               if(FN->weight > 0)
+                                       FN->weight = 
changeWeight_1(FN,countBatElements,L1);
+                       }
+               }
+               pthread_rwlock_unlock(&CrackerIndex[m].columnRWLock);
+                pthread_rwlock_unlock(plock);
+
        }       
        else
                vh = ch2;
-               
-       @:CreateResult()@
+
+       tempcreateViewL:
+
+       if (!tail)
+                view = BATslice(VIEWhead_(b, BAT_READ), vl, vh+1);
+        else
+                view = BATslice(b, vl, vh+1);
+
+        *vid = view->batCacheid;
+        BBPkeepref(*vid);
+        BBPunfix(b->batCacheid);
+        BBPunfix(c->batCacheid);
+
+        if (createIndex==1)
+        {
+                MT_lock_unset(&CRKIndexLock,"Cracker Index Lock");
+                createIndex=0;
+        }
+
        return MAL_SUCCEED;
 }
 
@@ -1460,10 +1543,23 @@ CRKRangeRightNilTree_@1(int *vid, int *b
        int gapL = 1;
        bit LBound=FALSE;
        int createIndex=0;
+        struct Node *lowNode=NULL;
+       pthread_rwlock_t *plock=0;
+        BUN idxFirst = BUN_NONE;
+       int firstRetry=0;
+       int L1=0;
+       int countBatElements;
+       FrequencyNode* FN;
+       FrequencyNode *FrequencyStructA = getFrequencyStruct('A');
+
+       if ((bo = BATdescriptor(*bid)) != NULL)
+               L1=32000/ATOMsize(bo->ttype);
+
+        MT_lock_set(&CRKIndexLock,"Cracker Index Lock");
+
        m = existsCrackerIndex(*bid);
-       assert(0);
 
-       /* if this is the first time we selectholpl something from this bat,
+       /* if this is the first time we selectpl something from this bat,
                we have to create the crack indx for it and
           if necessary materialize the head of the bat */
        if (m == -1){
@@ -1491,14 +1587,21 @@ CRKRangeRightNilTree_@1(int *vid, int *b
                 if (gapL>0) addCrackerIndex_@1(m,low,*inclusiveLow,vl,c);
                vh = BUNlast(b)-(oid)1;
                
+               createIndex=1;
+               countBatElements=BATcount(b);   
+               FN=searchBAT(FrequencyStructA,*bid);                            
        
+               FN->f1 = FN->f1 + 1;
+               FN->c = FN->c + 1;
+               FN->weight = changeWeight_1(FN,countBatElements,L1);
+
                BBPincref(b->batCacheid,TRUE);
                BBPunfix(bo->batCacheid);
-               goto createView;
+               goto tempcreateViewR;
        }
        
        if (CrackerIndex[m].cid == -1){
                reCreateMap_@1(m);
-       
+               assert(0);      
                /* Take the index of the bat */
                if ((c = BATdescriptor(CrackerIndex[m].cid)) == NULL)
                        throw(MAL, "crackers.crackRange", "Cannot access crack 
index");
@@ -1513,7 +1616,7 @@ CRKRangeRightNilTree_@1(int *vid, int *b
                if(gapL>0) addCrackerIndex_@1(m,low,*inclusiveLow,vl,c);
                vh = BUNlast(b)-(oid)1;
 
-               goto createView;
+               goto tempcreateViewR;
        }
 
 
@@ -1525,24 +1628,91 @@ CRKRangeRightNilTree_@1(int *vid, int *b
        if ((b = BATdescriptor(CrackerIndex[m].cbid)) == NULL)
                throw(MAL, "crackers.crackRange", "Cannot access crack index");
 
+       idxFirst = BUNfirst(c);
+
+       MT_lock_unset(&CRKIndexLock,"Cracker Index Lock");
+
        vh = BUNlast(b)-(oid)1;
 
+       retryR:
+
+       pthread_rwlock_rdlock(&CrackerIndex[m].columnRWLock);
+
+       FN=searchBAT(FrequencyStructA,*bid);
+       
+       if (firstRetry==0)
+       {
+               FN->f1 = FN->f1 + 1;
+       }
 
        /* find out where in the index the low falls */
        foundLow = GetLow_@1(*low,*inclusiveLow, CrackerIndex[m].Tree, c, 
BUNfirst(c), &cl1, &ch1, 0, BUNlast(b)-(oid)1,&LBound);
 
+       if((foundLow!=0)&&(firstRetry==0)) FN->f2 = FN->f2 + 1;
+       
+       pthread_rwlock_unlock(&CrackerIndex[m].columnRWLock);
+
        /*need to increase one position for the low bound only since we always 
store the previous position in the index*/
        if (cl1 != 0) cl1++;
 
        if (foundLow == 0){
+               pthread_rwlock_rdlock(&CrackerIndex[m].columnRWLock);
+                lowNode = findNodeL_@1(*low, TRUE, CrackerIndex[m].Tree, c, 
idxFirst, NULL);
+                plock = (lowNode == NULL)? &CRKFirstPieceRWLock : 
&lowNode->pieceLock;
+                if (pthread_rwlock_trywrlock(plock)){
+                       pthread_rwlock_unlock(&CrackerIndex[m].columnRWLock);
+                       pthread_rwlock_wrlock(plock);
+                        pthread_rwlock_unlock(plock);
+                       firstRetry=1;
+                        goto retryR;     // some other thread might already 
crack on low, try to find it again
+                }
+                pthread_rwlock_unlock(&CrackerIndex[m].columnRWLock);
                @:crkTwoLTree(@1)@
+               pthread_rwlock_wrlock(&CrackerIndex[m].columnRWLock);
                if (IndexSize <IndexStop)
-                       if(gapL>0) addCrackerIndex_@1(m,low,*inclusiveLow,vl,c);
+               {
+                       if(gapL>0)
+                       {
+                               addCrackerIndex_@1(m,low,*inclusiveLow,vl,c);
+                               FN->c = FN->c + 1;
+                               countBatElements=BATcount(b);
+                               if(FN->weight > 0)
+                                       FN->weight = 
changeWeight_1(FN,countBatElements,L1);
+                       }
+               }
+               pthread_rwlock_unlock(&CrackerIndex[m].columnRWLock);
+                pthread_rwlock_unlock(plock);
        }       
        else
                vl = cl1;
-               
-       @:CreateResult()@
+       
+       tempcreateViewR:
+
+       pthread_rwlock_rdlock(&CrackerIndex[m].columnRWLock);
+        lowNode = findNodeL_@1(*low, *inclusiveLow, CrackerIndex[m].Tree, c, 
idxFirst, NULL);
+        if (lowNode == NULL){
+                vl = 0;
+        } else {
+                vl = lowNode->indexPosition;
+        }
+        pthread_rwlock_unlock(&CrackerIndex[m].columnRWLock);
+
+       if (!tail)
+                view = BATslice(VIEWhead_(b, BAT_READ), vl, vh+1);
+        else
+                view = BATslice(b, vl, vh+1);
+
+        *vid = view->batCacheid;
+        BBPkeepref(*vid);
+        BBPunfix(b->batCacheid);
+        BBPunfix(c->batCacheid);
+
+        if (createIndex==1)
+        {
+                MT_lock_unset(&CRKIndexLock,"Cracker Index Lock");
+                createIndex=0;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to