Changeset: 827721149484 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=827721149484 Modified Files: geom/monetdb5/geom.c Branch: sfcgal Log Message:
GEOSIntersects and GEOSDistance are not thread safe, therefore, when we use OpenMP we run into memory corruption issues. To avoid that we have clone the lGeometry for each thread. Despite data redundacy it is still worth it to improve execution times. The issue needs to be taken care once we have more control of the GEOSgeom data structure. Improve debug messages and error messages. Fix a compilation issue diffs (286 lines): diff --git a/geom/monetdb5/geom.c b/geom/monetdb5/geom.c --- a/geom/monetdb5/geom.c +++ b/geom/monetdb5/geom.c @@ -4513,6 +4513,7 @@ wkbBasicBoolean(bit *out, wkb **geom, ch { int ret; GEOSGeom geosGeometry; + str msg = MAL_SUCCEED; if (wkb_isnil(*geom)) { *out = bit_nil; @@ -4526,8 +4527,17 @@ wkbBasicBoolean(bit *out, wkb **geom, ch ret = (*func) (geosGeometry); //it is supposed to return char but treating it as such gives wrong results GEOSGeom_destroy(geosGeometry); + //if there was an error returned by geos + if (GDKerrbuf && GDKerrbuf[0]) { + //create an exception with this name + msg = createException(MAL, name, "%s", GDKerrbuf); + + //clear the error buffer + GDKerrbuf[0] = '\0'; + } + if (ret == 2) - throw(MAL, name, "GEOSis%s failed", name + 7); + throw(MAL, name, "GEOSis%s failed: %s", name + 7, msg); *out = ret; @@ -7706,7 +7716,7 @@ pnpolyWithHoles(bat *out, int nvert, dbl #ifdef GEOMBULK_DEBUG gettimeofday(&stop, NULL); t = 1000 * (stop.tv_sec - start.tv_sec) + (stop.tv_usec - start.tv_usec) / 1000; - fprintf(stdout, "%s %llu ms\n", name, t); + fprintf(stdout, "pnpolyWithHoles %llu ms\n", t); #endif @@ -9089,9 +9099,10 @@ WKBWKBtoBITsubjoin_intern_light(bat *lre BUN pr = 0, pl = 0, qr = 0, ql = 0; GEOSGeom *rGeometries = NULL; int *rSRIDs = NULL; - int numIters = 1; str msg = MAL_SUCCEED; bit* outs = NULL; + int numThreads = 1, i = 0; + GEOSGeom *lGeometries = NULL; #ifdef GEOMBULK_DEBUG static struct timeval start, stop; unsigned long long t; @@ -9128,7 +9139,7 @@ WKBWKBtoBITsubjoin_intern_light(bat *lre return MAL_SUCCEED; } #ifdef OPENMP - numIters = OPENCL_THREADS; + numThreads = OPENCL_THREADS; #endif #ifdef GEOMBULK_DEBUG @@ -9136,14 +9147,14 @@ WKBWKBtoBITsubjoin_intern_light(bat *lre #endif /*iterator over the BATs*/ lBAT_iter = bat_iterator(bl); - if ( (rBAT_iters = (BATiter*) GDKmalloc(sizeof(BATiter)*numIters)) == NULL) { + if ( (rBAT_iters = (BATiter*) GDKmalloc(sizeof(BATiter)*numThreads)) == NULL) { BBPunfix(*lid); BBPunfix(*rid); BBPunfix(xl->batCacheid); BBPunfix(xr->batCacheid); throw(MAL, name, MAL_MALLOC_FAIL); } - for (j = 0; j < numIters; j++) { + for (j = 0; j < numThreads; j++) { rBAT_iters[j] = bat_iterator(br); } @@ -9227,6 +9238,19 @@ WKBWKBtoBITsubjoin_intern_light(bat *lre BBPunfix(xr->batCacheid); return msg; } + if ( (lGeometries = GDKmalloc(sizeof(GEOSGeom) * numThreads)) == NULL) { + for (j = 0; j < pr;j++) { + if (rGeometries[j]) + GEOSGeom_destroy(rGeometries[j]); + } + GDKfree(rSRIDs); + GDKfree(rGeometries); + BBPunfix(*lid); + BBPunfix(*rid); + BBPunfix(xl->batCacheid); + BBPunfix(xr->batCacheid); + throw(MAL, name, MAL_MALLOC_FAIL); + } #ifdef GEOMBULK_DEBUG gettimeofday(&start, NULL); @@ -9235,17 +9259,23 @@ WKBWKBtoBITsubjoin_intern_light(bat *lre BATloop(bl, pl, ql) { str err = NULL; wkb *lWKB = NULL; - GEOSGeom lGeometry = NULL; ro = br->hseqbase; int lSRID = 0; lWKB = (wkb *) BUNtail(lBAT_iter, pl); - lGeometry = wkb2geos(lWKB); - if ( !lGeometry ) { - msg = createException(MAL, name, "wkb2geos failed"); + for (j = 0; j < numThreads; j++) { + lGeometries[j] = wkb2geos(lWKB); + if ( !lGeometries[j] ) { + for ( i = 0; i < j; i++ ){ + GEOSGeom_destroy(lGeometries[i]); + } + msg = createException(MAL, name, "wkb2geos failed"); + break; + } + } + if (msg != MAL_SUCCEED) break; - } - lSRID = GEOSGetSRID(lGeometry); + lSRID = GEOSGetSRID(lGeometries[0]); #ifdef OPENMP omp_set_dynamic(OPENCL_DYNAMIC); // Explicitly disable dynamic teams @@ -9255,6 +9285,10 @@ WKBWKBtoBITsubjoin_intern_light(bat *lre for (j = 0; j < BATcount(br); j++) { GEOSGeom rGeometry = rGeometries[j]; outs[j] = 0; + int tNum = 0; +#ifdef OPENMP + tNum = omp_get_thread_num(); +#endif if (msg != MAL_SUCCEED) continue; @@ -9268,7 +9302,7 @@ WKBWKBtoBITsubjoin_intern_light(bat *lre #endif } - if ((outs[j] = (*func)(lGeometry, rGeometry)) == 2){ + if ((outs[j] = (*func)(lGeometries[tNum], rGeometry)) == 2){ msg = createException(MAL, name, "%s failed", name); #ifdef OPENMP #pragma omp cancelregion @@ -9278,8 +9312,10 @@ WKBWKBtoBITsubjoin_intern_light(bat *lre } } - if (lGeometry) - GEOSGeom_destroy(lGeometry); + for (j = 0; j < numThreads; j++) { + GEOSGeom_destroy(lGeometries[j]); + lGeometries[j] = NULL; + } if (msg != MAL_SUCCEED) break; @@ -9617,7 +9653,7 @@ Intersectssubjoin(bat *lres, bat *rres, if (*estimate != lng_nil) throw(MAL, "Intersectssubjoin", "It has estimate"); - return WKBWKBtoBITsubjoin_intern(lres, rres, lid, rid, GEOSIntersects, "geom.Intersectssubjoin"); + return WKBWKBtoBITsubjoin_intern_light(lres, rres, lid, rid, GEOSIntersects, "geom.Intersectssubjoin"); } str @@ -10254,6 +10290,8 @@ DWithinXYZsubjoin_intern(bat *lres, bat BUN px = 0, py = 0, pz =0, pl = 0, qx = 0, qy = 0, qz = 0, ql = 0; GEOSGeom *rGeometries = NULL; bit *outs = NULL; + int numThreads = 1, i = 0; + GEOSGeom *lGeometries = NULL; #ifdef GEOMBULK_DEBUG static struct timeval start, stop; unsigned long long t; @@ -10396,6 +10434,18 @@ DWithinXYZsubjoin_intern(bat *lres, bat BBPunfix(xr->batCacheid); return msg; } +#ifdef OPENMP + numThreads = OPENCL_THREADS; +#endif + if ( (lGeometries = GDKmalloc(sizeof(GEOSGeom) * numThreads)) == NULL) { + GDKfree(rGeometries); + BBPunfix(*lid); + BBPunfix(*xid); + BBPunfix(*yid); + BBPunfix(*zid); + BBPunfix(xl->batCacheid); + BBPunfix(xr->batCacheid); + } lo = bl->hseqbase; #ifdef GEOMBULK_DEBUG @@ -10404,40 +10454,52 @@ DWithinXYZsubjoin_intern(bat *lres, bat BATloop(bl, pl, ql) { str err = NULL; wkb *lWKB = NULL; - GEOSGeom lGeometry = NULL; ro = bx->hseqbase; int lSRID = 0; lWKB = (wkb *) BUNtail(lBAT_iter, pl); - lGeometry = wkb2geos(lWKB); - if ( !lGeometry ) { - msg = createException(MAL, name, "wkb2geos failed"); + for (j = 0; j < numThreads; j++) { + lGeometries[j] = wkb2geos(lWKB); + if ( !lGeometries[j] ) { + for ( i = 0; i < j; i++ ){ + GEOSGeom_destroy(lGeometries[i]); + } + msg = createException(MAL, name, "wkb2geos failed"); + break; + } + } + if (msg != MAL_SUCCEED) break; - } - lSRID = GEOSGetSRID(lGeometry); + lSRID = GEOSGetSRID(lGeometries[0]); if (lSRID != *srid) { - GEOSGeom_destroy(lGeometry); + for (j = 0; j < numThreads; j++) { + GEOSGeom_destroy(lGeometries[j]); + } msg = createException(MAL, name, "Geometries of different SRID"); break; } //for (j = 0; j < BATcount(bx); j++, ro++) { -#ifdef OPENMP +#ifdef OPENMP omp_set_dynamic(OPENCL_DYNAMIC); // Explicitly disable dynamic teams omp_set_num_threads(OPENCL_THREADS); - #pragma omp parallel for + #pragma omp parallel for #endif for (j = 0; j < BATcount(bx); j++) { double distance = 0.0; int res = 0; + int tNum = 0; +#ifdef OPENMP + tNum = omp_get_thread_num(); +#endif GEOSGeom rGeometry = rGeometries[j]; outs[j] = 0; if (msg != MAL_SUCCEED) continue; - if ( (res = GEOSDistance(lGeometry, rGeometry, &distance)) == 0) { + if ( (res = GEOSDistance(lGeometries[tNum], rGeometry, &distance)) == 0) { msg = createException(MAL, name, "GEOSDistance failed"); #ifdef OPENMP #pragma omp cancelregion @@ -10447,8 +10509,11 @@ DWithinXYZsubjoin_intern(bat *lres, bat } outs[j] = (distance <= *dist); } - if (lGeometry) - GEOSGeom_destroy(lGeometry); + + for (j = 0; j < numThreads; j++) { + GEOSGeom_destroy(lGeometries[j]); + lGeometries[j] = NULL; + } if (msg != MAL_SUCCEED) break; @@ -10468,6 +10533,11 @@ DWithinXYZsubjoin_intern(bat *lres, bat t = 1000 * (stop.tv_sec - start.tv_sec) + (stop.tv_usec - start.tv_usec) / 1000; fprintf(stdout, "%s second BATloop %llu ms\n", name, t); #endif + +#ifdef OPENMP + GDKfree(lGeometries); +#endif + if (outs) GDKfree(outs); if (rGeometries) { _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list