Copilot commented on code in PR #2637: URL: https://github.com/apache/sedona/pull/2637#discussion_r2786831975
########## docs/api/sql/Function.md: ########## @@ -22,11 +22,14 @@ Introduction: Returns an array of expanded forms of the input address string. This is backed by the [libpostal](https://github.com/openvenues/libpostal) library's address expanding functionality. !!!Note - Jpostal requires at least 2 GB of free disk space to store the data files used for address parsing and expanding. The data files are downloaded automatically when the function is called for the first time. + Jpostal requires at least 2 GB of free disk space to store the data files used for address parsing and expanding. The data files are downloaded automatically when the function is called for the first time, unless the data directory is configured to point to a pre-populated remote or local path. !!!Note The version of jpostal installed with this package only supports Linux and MacOS. If you are using Windows, you will need to install libjpostal and libpostal manually and ensure that they are available in your `java.library.path`. +!!!Note + The data directory can be configured via `spark.sedona.libpostal.dataDir`. You can point it to a remote filesystem path (HDFS, S3, GCS, ABFS, etc.) such as `hdfs:///data/libpostal/` or `s3a://my-bucket/libpostal/`. When a remote path is used, Sedona will automatically copy the data to a local cache directory on each executor before initializing jpostal. The automatic internet download is disabled in this mode, so the remote directory must already contain the libpostal model files. Review Comment: The updated notes say downloads are skipped when `dataDir` points to a pre-populated *local* path, but the code only disables `downloadDataIfNeeded` for *remote* paths (`downloadDataIfNeeded(!isRemote)`). Please adjust the docs to match the behavior (i.e., remote URI mode disables internet download; local paths still allow jpostal’s download-if-needed unless explicitly disabled). ```suggestion Jpostal requires at least 2 GB of free disk space to store the data files used for address parsing and expanding. By default, the data files are downloaded automatically when the function is called for the first time. !!!Note The version of jpostal installed with this package only supports Linux and MacOS. If you are using Windows, you will need to install libjpostal and libpostal manually and ensure that they are available in your `java.library.path`. !!!Note The data directory can be configured via `spark.sedona.libpostal.dataDir`. You can point it to a remote filesystem path (HDFS, S3, GCS, ABFS, etc.) such as `hdfs:///data/libpostal/` or `s3a://my-bucket/libpostal/`. When a remote path is used, Sedona will automatically copy the data to a local cache directory on each executor before initializing jpostal. In this remote-URI mode, the automatic internet download performed by jpostal is disabled, so the remote directory must already contain the libpostal model files. For local filesystem paths, jpostal's download-if-needed behavior remains enabled unless you disable it separately, even if the directory is already populated with data files. ``` ########## spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.sedona_sql.expressions + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.sedona.sql.utils.HadoopFileSystemUtils +import org.apache.spark.deploy.SparkHadoopUtil +import org.slf4j.LoggerFactory + +import java.io.File +import java.net.URI +import java.nio.charset.StandardCharsets +import java.security.MessageDigest +import java.util.concurrent.ConcurrentHashMap + +/** + * Resolves libpostal data directory paths. When the configured data directory points to a remote + * filesystem (HDFS, S3, GCS, ABFS, etc.), the data is copied to a local cache directory so that + * jpostal's native code can access it. + * + * The local cache uses a hash of the remote URI to avoid re-downloading on subsequent + * invocations. A marker file `.sedona_libpostal_complete` is written after a successful copy so + * that partially-copied directories are detected and re-copied. + */ +object LibPostalDataLoader { + + private val logger = LoggerFactory.getLogger(getClass) + + private val MARKER_FILE = ".sedona_libpostal_complete" + + /** Per-cache-key lock objects to prevent concurrent downloads to the same directory. */ + private val locks = new ConcurrentHashMap[String, AnyRef]() + + /** + * Resolve the data directory to a local filesystem path. If the configured path already points + * to the local filesystem, it is returned as-is. If it points to a remote filesystem, the + * directory is recursively copied to a local cache under `java.io.tmpdir`. + * + * @param configuredDir + * the data directory path from Sedona configuration (may be local or remote) + * @return + * a local filesystem path suitable for jpostal + */ + def resolveDataDir(configuredDir: String): String = { + if (isRemotePath(configuredDir)) { + copyRemoteToLocalCache(configuredDir) + } else { + configuredDir + } + } + + /** + * Determine whether a path string refers to a remote (non-local) filesystem. + */ + def isRemotePath(path: String): Boolean = { + try { + val uri = new URI(path) + val scheme = uri.getScheme + scheme != null && scheme != "file" && scheme.length > 1 + } catch { + case _: Exception => false + } + } + + /** + * Copy a remote directory to a local cache and return the local path. Uses a hash-based cache + * key so that different remote paths get different local directories. A marker file is used to + * ensure completeness — if the marker is missing (e.g. from a previous interrupted copy), the + * directory is re-copied. + */ + private def copyRemoteToLocalCache(remotePath: String): String = { + val cacheKey = hashPath(remotePath) + val localCacheDir = + new File(System.getProperty("java.io.tmpdir"), s"sedona-libpostal-cache/$cacheKey") + val markerFile = new File(localCacheDir, MARKER_FILE) + + if (markerFile.exists()) { + logger.info( + "Libpostal data already cached at {}, skipping download from {}", + localCacheDir.getAbsolutePath: Any, + remotePath: Any) + return ensureTrailingSlash(localCacheDir.getAbsolutePath) + } + + // Synchronize on a canonical lock object per cache key so that concurrent + // threads on the same JVM don't race to download the same data. + val lock = locks.computeIfAbsent(cacheKey, _ => new AnyRef) + lock.synchronized { + // Double-check after acquiring lock + if (markerFile.exists()) { + return ensureTrailingSlash(localCacheDir.getAbsolutePath) + } + + logger.info( + "Copying libpostal data from {} to local cache at {}", + remotePath: Any, + localCacheDir.getAbsolutePath: Any) + + localCacheDir.mkdirs() Review Comment: `mkdirs()` can fail and return `false` without throwing, which can lead to confusing downstream failures when copying into a non-existent/unwritable directory. Consider validating directory creation (and/or that `localCacheDir.isDirectory` is true) and throwing a clear exception if it can’t be created. ```suggestion if (localCacheDir.exists()) { if (!localCacheDir.isDirectory) { throw new IllegalStateException( s"Libpostal local cache path exists but is not a directory: ${localCacheDir.getAbsolutePath}" ) } } else { // Attempt to create the directory; mkdirs() may return false without throwing. if (!localCacheDir.mkdirs() && !localCacheDir.isDirectory) { throw new IllegalStateException( s"Failed to create libpostal local cache directory at ${localCacheDir.getAbsolutePath}" ) } } ``` ########## docs/api/sql/Function.md: ########## @@ -48,11 +51,14 @@ Output: Introduction: Returns an array of the components (e.g. street, postal code) of the input address string. This is backed by the [libpostal](https://github.com/openvenues/libpostal) library's address parsing functionality. !!!Note - Jpostal requires at least 2 GB of free disk space to store the data files used for address parsing and expanding. The data files are downloaded automatically when the library is initialized. + Jpostal requires at least 2 GB of free disk space to store the data files used for address parsing and expanding. The data files are downloaded automatically when the library is initialized, unless the data directory is configured to point to a pre-populated remote or local path. Review Comment: The updated notes say downloads are skipped when `dataDir` points to a pre-populated *local* path, but the code only disables `downloadDataIfNeeded` for *remote* paths (`downloadDataIfNeeded(!isRemote)`). Please adjust the docs to match the behavior (i.e., remote URI mode disables internet download; local paths still allow jpostal’s download-if-needed unless explicitly disabled). ########## spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.sedona_sql.expressions + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.sedona.sql.utils.HadoopFileSystemUtils +import org.apache.spark.deploy.SparkHadoopUtil +import org.slf4j.LoggerFactory + +import java.io.File +import java.net.URI +import java.nio.charset.StandardCharsets +import java.security.MessageDigest +import java.util.concurrent.ConcurrentHashMap + +/** + * Resolves libpostal data directory paths. When the configured data directory points to a remote + * filesystem (HDFS, S3, GCS, ABFS, etc.), the data is copied to a local cache directory so that + * jpostal's native code can access it. + * + * The local cache uses a hash of the remote URI to avoid re-downloading on subsequent + * invocations. A marker file `.sedona_libpostal_complete` is written after a successful copy so + * that partially-copied directories are detected and re-copied. + */ +object LibPostalDataLoader { + + private val logger = LoggerFactory.getLogger(getClass) + + private val MARKER_FILE = ".sedona_libpostal_complete" + + /** Per-cache-key lock objects to prevent concurrent downloads to the same directory. */ + private val locks = new ConcurrentHashMap[String, AnyRef]() + + /** + * Resolve the data directory to a local filesystem path. If the configured path already points + * to the local filesystem, it is returned as-is. If it points to a remote filesystem, the + * directory is recursively copied to a local cache under `java.io.tmpdir`. + * + * @param configuredDir + * the data directory path from Sedona configuration (may be local or remote) + * @return + * a local filesystem path suitable for jpostal + */ + def resolveDataDir(configuredDir: String): String = { + if (isRemotePath(configuredDir)) { + copyRemoteToLocalCache(configuredDir) + } else { + configuredDir + } + } + + /** + * Determine whether a path string refers to a remote (non-local) filesystem. + */ + def isRemotePath(path: String): Boolean = { + try { + val uri = new URI(path) + val scheme = uri.getScheme + scheme != null && scheme != "file" && scheme.length > 1 + } catch { + case _: Exception => false + } + } + + /** + * Copy a remote directory to a local cache and return the local path. Uses a hash-based cache + * key so that different remote paths get different local directories. A marker file is used to + * ensure completeness — if the marker is missing (e.g. from a previous interrupted copy), the + * directory is re-copied. + */ + private def copyRemoteToLocalCache(remotePath: String): String = { + val cacheKey = hashPath(remotePath) + val localCacheDir = + new File(System.getProperty("java.io.tmpdir"), s"sedona-libpostal-cache/$cacheKey") + val markerFile = new File(localCacheDir, MARKER_FILE) + + if (markerFile.exists()) { + logger.info( + "Libpostal data already cached at {}, skipping download from {}", + localCacheDir.getAbsolutePath: Any, + remotePath: Any) + return ensureTrailingSlash(localCacheDir.getAbsolutePath) + } + + // Synchronize on a canonical lock object per cache key so that concurrent + // threads on the same JVM don't race to download the same data. + val lock = locks.computeIfAbsent(cacheKey, _ => new AnyRef) + lock.synchronized { + // Double-check after acquiring lock + if (markerFile.exists()) { + return ensureTrailingSlash(localCacheDir.getAbsolutePath) + } + + logger.info( + "Copying libpostal data from {} to local cache at {}", + remotePath: Any, + localCacheDir.getAbsolutePath: Any) + + localCacheDir.mkdirs() + + val hadoopConf = + try { + SparkHadoopUtil.get.conf + } catch { + case _: Exception => new Configuration() + } + val remoteHadoopPath = new Path(remotePath) + + HadoopFileSystemUtils.copyDirectoryToLocal(hadoopConf, remoteHadoopPath, localCacheDir) + + // Write marker file to indicate successful completion + markerFile.createNewFile() + + // Remove the lock entry now that the cache is populated — future callers + // will return early via the markerFile.exists() fast path. + locks.remove(cacheKey) + + logger.info("Successfully cached libpostal data at {}", localCacheDir.getAbsolutePath) Review Comment: Lock cleanup only happens on the success path. If `copyDirectoryToLocal` throws, the lock entry remains in `locks` indefinitely (and per-key entries could accumulate across distinct remote URIs). Consider restructuring with a `try`/`finally` to perform appropriate cleanup, and/or using `locks.remove(cacheKey, lock)` conditionally to avoid races while still preventing unbounded growth. ```suggestion try { HadoopFileSystemUtils.copyDirectoryToLocal(hadoopConf, remoteHadoopPath, localCacheDir) // Write marker file to indicate successful completion markerFile.createNewFile() logger.info("Successfully cached libpostal data at {}", localCacheDir.getAbsolutePath) } finally { // Ensure the lock entry is removed even if an exception occurs during copy // or marker creation, while avoiding races with other potential lock objects. locks.remove(cacheKey, lock) } ``` ########## spark/common/src/test/scala/org/apache/sedona/sql/LibPostalDataLoaderTest.scala: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.sql + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.sql.sedona_sql.expressions.LibPostalDataLoader +import org.scalatest.matchers.should.Matchers + +import java.io.File +import java.nio.file.Files +import java.util.concurrent.{CyclicBarrier, Executors, TimeUnit} +import scala.collection.mutable.ListBuffer + +class LibPostalDataLoaderTest extends TestBaseScala with Matchers { + + describe("LibPostalDataLoader") { + + describe("isRemotePath") { + it("should return false for local paths") { + LibPostalDataLoader.isRemotePath("/tmp/libpostal/") shouldBe false + } + + it("should return false for relative paths") { + LibPostalDataLoader.isRemotePath("data/libpostal/") shouldBe false + } + + it("should return false for file:// URIs") { + LibPostalDataLoader.isRemotePath("file:///tmp/libpostal/") shouldBe false + } + + it("should return true for hdfs:// URIs") { + LibPostalDataLoader.isRemotePath("hdfs:///data/libpostal/") shouldBe true + } + + it("should return true for hdfs:// URIs with host") { + LibPostalDataLoader.isRemotePath("hdfs://namenode:9000/data/libpostal/") shouldBe true + } + + it("should return true for s3a:// URIs") { + LibPostalDataLoader.isRemotePath("s3a://my-bucket/libpostal/") shouldBe true + } + + it("should return true for s3:// URIs") { + LibPostalDataLoader.isRemotePath("s3://my-bucket/libpostal/") shouldBe true + } + + it("should return true for gs:// URIs") { + LibPostalDataLoader.isRemotePath("gs://my-bucket/libpostal/") shouldBe true + } + + it("should return true for abfs:// URIs") { + LibPostalDataLoader.isRemotePath( + "abfs://[email protected]/libpostal/") shouldBe true + } + + it("should return true for wasb:// URIs") { + LibPostalDataLoader.isRemotePath( + "wasb://[email protected]/libpostal/") shouldBe true + } + + it("should return false for empty string") { + LibPostalDataLoader.isRemotePath("") shouldBe false + } + + it("should return false for Windows-like paths") { + // Single-letter scheme like C: should not be treated as remote + LibPostalDataLoader.isRemotePath("C:\\libpostal\\data\\") shouldBe false + } + } + + describe("resolveDataDir") { + it("should return local path unchanged") { + val tempDir = Files.createTempDirectory("sedona-libpostal-test").toFile + try { + val result = LibPostalDataLoader.resolveDataDir(tempDir.getAbsolutePath) + result shouldBe tempDir.getAbsolutePath + } finally { + tempDir.delete() + } + } + + it("should copy remote directory to local cache and return local path") { + val (hdfsCluster, hdfsUri) = creatMiniHdfs() + try { + val hdfsConf = hdfsCluster.getConfiguration(0) + val fs = FileSystem.get(hdfsConf) + + // Create a mock libpostal data directory on HDFS with expected subdirs + val remotePath = hdfsUri + "libpostal-data/" + val basePath = new Path(remotePath) + fs.mkdirs(basePath) + + // Create the subdirectories that libpostal expects + val subdirs = + Seq( + "transliteration", + "numex", + "address_parser", + "address_expansions", + "language_classifier") + for (subdir <- subdirs) { + val subdirPath = new Path(basePath, subdir) + fs.mkdirs(subdirPath) + // Write a dummy file + val out = fs.create(new Path(subdirPath, "model.dat")) + out.writeBytes(s"data for $subdir") + out.close() + } + + // Resolve the remote path + val localPath = LibPostalDataLoader.resolveDataDir(remotePath) + + // Verify the result is a local path + localPath should not startWith "hdfs://" + new File(localPath).exists() shouldBe true + + // Verify all subdirs were copied + for (subdir <- subdirs) { + val localSubdir = new File(localPath, subdir) + localSubdir.exists() shouldBe true + localSubdir.isDirectory shouldBe true + new File(localSubdir, "model.dat").exists() shouldBe true + } + + // Verify the marker file exists + new File(localPath, ".sedona_libpostal_complete").exists() shouldBe true + + // Verify trailing separator + localPath should endWith(File.separator) + + // Call again — should use cache (no re-copy) + val localPath2 = LibPostalDataLoader.resolveDataDir(remotePath) + localPath2 shouldBe localPath + + // Clean up local cache + deleteDirectory(new File(localPath)) + } finally { + hdfsCluster.shutdown() + } + } + + it("should handle concurrent access from multiple threads safely") { + val (hdfsCluster, hdfsUri) = creatMiniHdfs() + try { + val hdfsConf = hdfsCluster.getConfiguration(0) + val fs = FileSystem.get(hdfsConf) + + // Create a mock libpostal data directory on HDFS + val remotePath = hdfsUri + "libpostal-concurrent/" + val basePath = new Path(remotePath) + fs.mkdirs(basePath) + + val subdirs = + Seq( + "transliteration", + "numex", + "address_parser", + "address_expansions", + "language_classifier") + for (subdir <- subdirs) { + val subdirPath = new Path(basePath, subdir) + fs.mkdirs(subdirPath) + val out = fs.create(new Path(subdirPath, "model.dat")) + out.writeBytes(s"data for $subdir") + out.close() + } + + val numThreads = 8 + val barrier = new CyclicBarrier(numThreads) + val executor = Executors.newFixedThreadPool(numThreads) + val results = new ListBuffer[String]() + val errors = new ListBuffer[Throwable]() + val resultsLock = new AnyRef + + val futures = (1 to numThreads).map { _ => + executor.submit(new Runnable { + override def run(): Unit = { + try { + // All threads wait here until all are ready, then start simultaneously + barrier.await(30, TimeUnit.SECONDS) + val localPath = LibPostalDataLoader.resolveDataDir(remotePath) + resultsLock.synchronized { + results += localPath + } + } catch { + case e: Throwable => + resultsLock.synchronized { + errors += e + } + } + } + }) + } + + // Wait for all threads to complete + futures.foreach(_.get(60, TimeUnit.SECONDS)) + executor.shutdown() + + // No errors should have occurred + errors shouldBe empty + + // All threads should have resolved to the same local path + results.size shouldBe numThreads + results.distinct.size shouldBe 1 + + val localPath = results.head + + // Verify the data is intact + for (subdir <- subdirs) { + val localSubdir = new File(localPath, subdir) + localSubdir.exists() shouldBe true + new File(localSubdir, "model.dat").exists() shouldBe true + } + + // Exactly one marker file should exist + new File(localPath, ".sedona_libpostal_complete").exists() shouldBe true + + // Clean up + deleteDirectory(new File(localPath)) + } finally { + hdfsCluster.shutdown() + } + } + } + } + + private def deleteDirectory(dir: File): Unit = { + if (dir.isDirectory) { + dir.listFiles().foreach(deleteDirectory) + } + dir.delete() Review Comment: Test cleanup helper can throw NPE because `dir.listFiles()` may return `null` (I/O error, permissions) and `dir.delete()` return value is ignored (silent cleanup failures can make tests flaky, especially on Windows). Consider guarding `listFiles()` with `Option(...)` and asserting/handling delete failures to keep test runs deterministic. ```suggestion Option(dir.listFiles()).getOrElse(Array.empty[File]).foreach(deleteDirectory) } val deleted = dir.delete() assert(deleted || !dir.exists(), s"Failed to delete directory: ${dir.getAbsolutePath}") ``` ########## spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.sedona_sql.expressions + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.sedona.sql.utils.HadoopFileSystemUtils +import org.apache.spark.deploy.SparkHadoopUtil +import org.slf4j.LoggerFactory + +import java.io.File +import java.net.URI +import java.nio.charset.StandardCharsets +import java.security.MessageDigest +import java.util.concurrent.ConcurrentHashMap + +/** + * Resolves libpostal data directory paths. When the configured data directory points to a remote + * filesystem (HDFS, S3, GCS, ABFS, etc.), the data is copied to a local cache directory so that + * jpostal's native code can access it. + * + * The local cache uses a hash of the remote URI to avoid re-downloading on subsequent + * invocations. A marker file `.sedona_libpostal_complete` is written after a successful copy so + * that partially-copied directories are detected and re-copied. + */ +object LibPostalDataLoader { + + private val logger = LoggerFactory.getLogger(getClass) + + private val MARKER_FILE = ".sedona_libpostal_complete" + + /** Per-cache-key lock objects to prevent concurrent downloads to the same directory. */ + private val locks = new ConcurrentHashMap[String, AnyRef]() + + /** + * Resolve the data directory to a local filesystem path. If the configured path already points + * to the local filesystem, it is returned as-is. If it points to a remote filesystem, the + * directory is recursively copied to a local cache under `java.io.tmpdir`. + * + * @param configuredDir + * the data directory path from Sedona configuration (may be local or remote) + * @return + * a local filesystem path suitable for jpostal + */ + def resolveDataDir(configuredDir: String): String = { + if (isRemotePath(configuredDir)) { + copyRemoteToLocalCache(configuredDir) + } else { + configuredDir + } + } + + /** + * Determine whether a path string refers to a remote (non-local) filesystem. + */ + def isRemotePath(path: String): Boolean = { + try { + val uri = new URI(path) + val scheme = uri.getScheme + scheme != null && scheme != "file" && scheme.length > 1 + } catch { + case _: Exception => false + } + } + + /** + * Copy a remote directory to a local cache and return the local path. Uses a hash-based cache + * key so that different remote paths get different local directories. A marker file is used to + * ensure completeness — if the marker is missing (e.g. from a previous interrupted copy), the + * directory is re-copied. + */ + private def copyRemoteToLocalCache(remotePath: String): String = { + val cacheKey = hashPath(remotePath) + val localCacheDir = + new File(System.getProperty("java.io.tmpdir"), s"sedona-libpostal-cache/$cacheKey") + val markerFile = new File(localCacheDir, MARKER_FILE) + + if (markerFile.exists()) { + logger.info( + "Libpostal data already cached at {}, skipping download from {}", + localCacheDir.getAbsolutePath: Any, + remotePath: Any) + return ensureTrailingSlash(localCacheDir.getAbsolutePath) + } + + // Synchronize on a canonical lock object per cache key so that concurrent + // threads on the same JVM don't race to download the same data. + val lock = locks.computeIfAbsent(cacheKey, _ => new AnyRef) + lock.synchronized { Review Comment: Lock cleanup only happens on the success path. If `copyDirectoryToLocal` throws, the lock entry remains in `locks` indefinitely (and per-key entries could accumulate across distinct remote URIs). Consider restructuring with a `try`/`finally` to perform appropriate cleanup, and/or using `locks.remove(cacheKey, lock)` conditionally to avoid races while still preventing unbounded growth. ########## spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.sedona_sql.expressions + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.sedona.sql.utils.HadoopFileSystemUtils +import org.apache.spark.deploy.SparkHadoopUtil +import org.slf4j.LoggerFactory + +import java.io.File +import java.net.URI +import java.nio.charset.StandardCharsets +import java.security.MessageDigest +import java.util.concurrent.ConcurrentHashMap + +/** + * Resolves libpostal data directory paths. When the configured data directory points to a remote + * filesystem (HDFS, S3, GCS, ABFS, etc.), the data is copied to a local cache directory so that + * jpostal's native code can access it. + * + * The local cache uses a hash of the remote URI to avoid re-downloading on subsequent + * invocations. A marker file `.sedona_libpostal_complete` is written after a successful copy so + * that partially-copied directories are detected and re-copied. + */ +object LibPostalDataLoader { + + private val logger = LoggerFactory.getLogger(getClass) + + private val MARKER_FILE = ".sedona_libpostal_complete" + + /** Per-cache-key lock objects to prevent concurrent downloads to the same directory. */ + private val locks = new ConcurrentHashMap[String, AnyRef]() + + /** + * Resolve the data directory to a local filesystem path. If the configured path already points + * to the local filesystem, it is returned as-is. If it points to a remote filesystem, the + * directory is recursively copied to a local cache under `java.io.tmpdir`. + * + * @param configuredDir + * the data directory path from Sedona configuration (may be local or remote) + * @return + * a local filesystem path suitable for jpostal + */ + def resolveDataDir(configuredDir: String): String = { + if (isRemotePath(configuredDir)) { + copyRemoteToLocalCache(configuredDir) + } else { + configuredDir + } + } + + /** + * Determine whether a path string refers to a remote (non-local) filesystem. + */ + def isRemotePath(path: String): Boolean = { + try { + val uri = new URI(path) + val scheme = uri.getScheme + scheme != null && scheme != "file" && scheme.length > 1 + } catch { + case _: Exception => false + } + } + + /** + * Copy a remote directory to a local cache and return the local path. Uses a hash-based cache + * key so that different remote paths get different local directories. A marker file is used to + * ensure completeness — if the marker is missing (e.g. from a previous interrupted copy), the + * directory is re-copied. + */ + private def copyRemoteToLocalCache(remotePath: String): String = { + val cacheKey = hashPath(remotePath) + val localCacheDir = + new File(System.getProperty("java.io.tmpdir"), s"sedona-libpostal-cache/$cacheKey") + val markerFile = new File(localCacheDir, MARKER_FILE) + + if (markerFile.exists()) { + logger.info( + "Libpostal data already cached at {}, skipping download from {}", + localCacheDir.getAbsolutePath: Any, + remotePath: Any) + return ensureTrailingSlash(localCacheDir.getAbsolutePath) + } + + // Synchronize on a canonical lock object per cache key so that concurrent + // threads on the same JVM don't race to download the same data. + val lock = locks.computeIfAbsent(cacheKey, _ => new AnyRef) + lock.synchronized { + // Double-check after acquiring lock + if (markerFile.exists()) { + return ensureTrailingSlash(localCacheDir.getAbsolutePath) + } + + logger.info( + "Copying libpostal data from {} to local cache at {}", + remotePath: Any, + localCacheDir.getAbsolutePath: Any) + + localCacheDir.mkdirs() + + val hadoopConf = + try { + SparkHadoopUtil.get.conf + } catch { + case _: Exception => new Configuration() + } + val remoteHadoopPath = new Path(remotePath) + + HadoopFileSystemUtils.copyDirectoryToLocal(hadoopConf, remoteHadoopPath, localCacheDir) + + // Write marker file to indicate successful completion + markerFile.createNewFile() + + // Remove the lock entry now that the cache is populated — future callers + // will return early via the markerFile.exists() fast path. + locks.remove(cacheKey) + + logger.info("Successfully cached libpostal data at {}", localCacheDir.getAbsolutePath) Review Comment: Lock cleanup only happens on the success path. If `copyDirectoryToLocal` throws, the lock entry remains in `locks` indefinitely (and per-key entries could accumulate across distinct remote URIs). Consider restructuring with a `try`/`finally` to perform appropriate cleanup, and/or using `locks.remove(cacheKey, lock)` conditionally to avoid races while still preventing unbounded growth. ```suggestion try { // Double-check after acquiring lock if (markerFile.exists()) { return ensureTrailingSlash(localCacheDir.getAbsolutePath) } logger.info( "Copying libpostal data from {} to local cache at {}", remotePath: Any, localCacheDir.getAbsolutePath: Any) localCacheDir.mkdirs() val hadoopConf = try { SparkHadoopUtil.get.conf } catch { case _: Exception => new Configuration() } val remoteHadoopPath = new Path(remotePath) HadoopFileSystemUtils.copyDirectoryToLocal(hadoopConf, remoteHadoopPath, localCacheDir) // Write marker file to indicate successful completion markerFile.createNewFile() logger.info("Successfully cached libpostal data at {}", localCacheDir.getAbsolutePath) } finally { // Always attempt to remove the lock entry to avoid unbounded growth. // Use the value-based remove to avoid interfering with any updated mapping. locks.remove(cacheKey, lock) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
