Copilot commented on code in PR #2637:
URL: https://github.com/apache/sedona/pull/2637#discussion_r2786831975


##########
docs/api/sql/Function.md:
##########
@@ -22,11 +22,14 @@
 Introduction: Returns an array of expanded forms of the input address string. 
This is backed by the [libpostal](https://github.com/openvenues/libpostal) 
library's address expanding functionality.
 
 !!!Note
-    Jpostal requires at least 2 GB of free disk space to store the data files 
used for address parsing and expanding. The data files are downloaded 
automatically when the function is called for the first time.
+    Jpostal requires at least 2 GB of free disk space to store the data files 
used for address parsing and expanding. The data files are downloaded 
automatically when the function is called for the first time, unless the data 
directory is configured to point to a pre-populated remote or local path.
 
 !!!Note
     The version of jpostal installed with this package only supports Linux and 
MacOS. If you are using Windows, you will need to install libjpostal and 
libpostal manually and ensure that they are available in your 
`java.library.path`.
 
+!!!Note
+    The data directory can be configured via `spark.sedona.libpostal.dataDir`. 
You can point it to a remote filesystem path (HDFS, S3, GCS, ABFS, etc.) such 
as `hdfs:///data/libpostal/` or `s3a://my-bucket/libpostal/`. When a remote 
path is used, Sedona will automatically copy the data to a local cache 
directory on each executor before initializing jpostal. The automatic internet 
download is disabled in this mode, so the remote directory must already contain 
the libpostal model files.

Review Comment:
   The updated notes say downloads are skipped when `dataDir` points to a 
pre-populated *local* path, but the code only disables `downloadDataIfNeeded` 
for *remote* paths (`downloadDataIfNeeded(!isRemote)`). Please adjust the docs 
to match the behavior (i.e., remote URI mode disables internet download; local 
paths still allow jpostal’s download-if-needed unless explicitly disabled).
   ```suggestion
       Jpostal requires at least 2 GB of free disk space to store the data 
files used for address parsing and expanding. By default, the data files are 
downloaded automatically when the function is called for the first time.
   
   !!!Note
       The version of jpostal installed with this package only supports Linux 
and MacOS. If you are using Windows, you will need to install libjpostal and 
libpostal manually and ensure that they are available in your 
`java.library.path`.
   
   !!!Note
       The data directory can be configured via 
`spark.sedona.libpostal.dataDir`. You can point it to a remote filesystem path 
(HDFS, S3, GCS, ABFS, etc.) such as `hdfs:///data/libpostal/` or 
`s3a://my-bucket/libpostal/`. When a remote path is used, Sedona will 
automatically copy the data to a local cache directory on each executor before 
initializing jpostal. In this remote-URI mode, the automatic internet download 
performed by jpostal is disabled, so the remote directory must already contain 
the libpostal model files. For local filesystem paths, jpostal's 
download-if-needed behavior remains enabled unless you disable it separately, 
even if the directory is already populated with data files.
   ```



##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.expressions
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.sedona.sql.utils.HadoopFileSystemUtils
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.slf4j.LoggerFactory
+
+import java.io.File
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.security.MessageDigest
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * Resolves libpostal data directory paths. When the configured data directory 
points to a remote
+ * filesystem (HDFS, S3, GCS, ABFS, etc.), the data is copied to a local cache 
directory so that
+ * jpostal's native code can access it.
+ *
+ * The local cache uses a hash of the remote URI to avoid re-downloading on 
subsequent
+ * invocations. A marker file `.sedona_libpostal_complete` is written after a 
successful copy so
+ * that partially-copied directories are detected and re-copied.
+ */
+object LibPostalDataLoader {
+
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  private val MARKER_FILE = ".sedona_libpostal_complete"
+
+  /** Per-cache-key lock objects to prevent concurrent downloads to the same 
directory. */
+  private val locks = new ConcurrentHashMap[String, AnyRef]()
+
+  /**
+   * Resolve the data directory to a local filesystem path. If the configured 
path already points
+   * to the local filesystem, it is returned as-is. If it points to a remote 
filesystem, the
+   * directory is recursively copied to a local cache under `java.io.tmpdir`.
+   *
+   * @param configuredDir
+   *   the data directory path from Sedona configuration (may be local or 
remote)
+   * @return
+   *   a local filesystem path suitable for jpostal
+   */
+  def resolveDataDir(configuredDir: String): String = {
+    if (isRemotePath(configuredDir)) {
+      copyRemoteToLocalCache(configuredDir)
+    } else {
+      configuredDir
+    }
+  }
+
+  /**
+   * Determine whether a path string refers to a remote (non-local) filesystem.
+   */
+  def isRemotePath(path: String): Boolean = {
+    try {
+      val uri = new URI(path)
+      val scheme = uri.getScheme
+      scheme != null && scheme != "file" && scheme.length > 1
+    } catch {
+      case _: Exception => false
+    }
+  }
+
+  /**
+   * Copy a remote directory to a local cache and return the local path. Uses 
a hash-based cache
+   * key so that different remote paths get different local directories. A 
marker file is used to
+   * ensure completeness — if the marker is missing (e.g. from a previous 
interrupted copy), the
+   * directory is re-copied.
+   */
+  private def copyRemoteToLocalCache(remotePath: String): String = {
+    val cacheKey = hashPath(remotePath)
+    val localCacheDir =
+      new File(System.getProperty("java.io.tmpdir"), 
s"sedona-libpostal-cache/$cacheKey")
+    val markerFile = new File(localCacheDir, MARKER_FILE)
+
+    if (markerFile.exists()) {
+      logger.info(
+        "Libpostal data already cached at {}, skipping download from {}",
+        localCacheDir.getAbsolutePath: Any,
+        remotePath: Any)
+      return ensureTrailingSlash(localCacheDir.getAbsolutePath)
+    }
+
+    // Synchronize on a canonical lock object per cache key so that concurrent
+    // threads on the same JVM don't race to download the same data.
+    val lock = locks.computeIfAbsent(cacheKey, _ => new AnyRef)
+    lock.synchronized {
+      // Double-check after acquiring lock
+      if (markerFile.exists()) {
+        return ensureTrailingSlash(localCacheDir.getAbsolutePath)
+      }
+
+      logger.info(
+        "Copying libpostal data from {} to local cache at {}",
+        remotePath: Any,
+        localCacheDir.getAbsolutePath: Any)
+
+      localCacheDir.mkdirs()

Review Comment:
   `mkdirs()` can fail and return `false` without throwing, which can lead to 
confusing downstream failures when copying into a non-existent/unwritable 
directory. Consider validating directory creation (and/or that 
`localCacheDir.isDirectory` is true) and throwing a clear exception if it can’t 
be created.
   ```suggestion
         if (localCacheDir.exists()) {
           if (!localCacheDir.isDirectory) {
             throw new IllegalStateException(
               s"Libpostal local cache path exists but is not a directory: 
${localCacheDir.getAbsolutePath}"
             )
           }
         } else {
           // Attempt to create the directory; mkdirs() may return false 
without throwing.
           if (!localCacheDir.mkdirs() && !localCacheDir.isDirectory) {
             throw new IllegalStateException(
               s"Failed to create libpostal local cache directory at 
${localCacheDir.getAbsolutePath}"
             )
           }
         }
   ```



##########
docs/api/sql/Function.md:
##########
@@ -48,11 +51,14 @@ Output:
 Introduction: Returns an array of the components (e.g. street, postal code) of 
the input address string. This is backed by the 
[libpostal](https://github.com/openvenues/libpostal) library's address parsing 
functionality.
 
 !!!Note
-    Jpostal requires at least 2 GB of free disk space to store the data files 
used for address parsing and expanding. The data files are downloaded 
automatically when the library is initialized.
+    Jpostal requires at least 2 GB of free disk space to store the data files 
used for address parsing and expanding. The data files are downloaded 
automatically when the library is initialized, unless the data directory is 
configured to point to a pre-populated remote or local path.

Review Comment:
   The updated notes say downloads are skipped when `dataDir` points to a 
pre-populated *local* path, but the code only disables `downloadDataIfNeeded` 
for *remote* paths (`downloadDataIfNeeded(!isRemote)`). Please adjust the docs 
to match the behavior (i.e., remote URI mode disables internet download; local 
paths still allow jpostal’s download-if-needed unless explicitly disabled).



##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.expressions
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.sedona.sql.utils.HadoopFileSystemUtils
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.slf4j.LoggerFactory
+
+import java.io.File
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.security.MessageDigest
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * Resolves libpostal data directory paths. When the configured data directory 
points to a remote
+ * filesystem (HDFS, S3, GCS, ABFS, etc.), the data is copied to a local cache 
directory so that
+ * jpostal's native code can access it.
+ *
+ * The local cache uses a hash of the remote URI to avoid re-downloading on 
subsequent
+ * invocations. A marker file `.sedona_libpostal_complete` is written after a 
successful copy so
+ * that partially-copied directories are detected and re-copied.
+ */
+object LibPostalDataLoader {
+
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  private val MARKER_FILE = ".sedona_libpostal_complete"
+
+  /** Per-cache-key lock objects to prevent concurrent downloads to the same 
directory. */
+  private val locks = new ConcurrentHashMap[String, AnyRef]()
+
+  /**
+   * Resolve the data directory to a local filesystem path. If the configured 
path already points
+   * to the local filesystem, it is returned as-is. If it points to a remote 
filesystem, the
+   * directory is recursively copied to a local cache under `java.io.tmpdir`.
+   *
+   * @param configuredDir
+   *   the data directory path from Sedona configuration (may be local or 
remote)
+   * @return
+   *   a local filesystem path suitable for jpostal
+   */
+  def resolveDataDir(configuredDir: String): String = {
+    if (isRemotePath(configuredDir)) {
+      copyRemoteToLocalCache(configuredDir)
+    } else {
+      configuredDir
+    }
+  }
+
+  /**
+   * Determine whether a path string refers to a remote (non-local) filesystem.
+   */
+  def isRemotePath(path: String): Boolean = {
+    try {
+      val uri = new URI(path)
+      val scheme = uri.getScheme
+      scheme != null && scheme != "file" && scheme.length > 1
+    } catch {
+      case _: Exception => false
+    }
+  }
+
+  /**
+   * Copy a remote directory to a local cache and return the local path. Uses 
a hash-based cache
+   * key so that different remote paths get different local directories. A 
marker file is used to
+   * ensure completeness — if the marker is missing (e.g. from a previous 
interrupted copy), the
+   * directory is re-copied.
+   */
+  private def copyRemoteToLocalCache(remotePath: String): String = {
+    val cacheKey = hashPath(remotePath)
+    val localCacheDir =
+      new File(System.getProperty("java.io.tmpdir"), 
s"sedona-libpostal-cache/$cacheKey")
+    val markerFile = new File(localCacheDir, MARKER_FILE)
+
+    if (markerFile.exists()) {
+      logger.info(
+        "Libpostal data already cached at {}, skipping download from {}",
+        localCacheDir.getAbsolutePath: Any,
+        remotePath: Any)
+      return ensureTrailingSlash(localCacheDir.getAbsolutePath)
+    }
+
+    // Synchronize on a canonical lock object per cache key so that concurrent
+    // threads on the same JVM don't race to download the same data.
+    val lock = locks.computeIfAbsent(cacheKey, _ => new AnyRef)
+    lock.synchronized {
+      // Double-check after acquiring lock
+      if (markerFile.exists()) {
+        return ensureTrailingSlash(localCacheDir.getAbsolutePath)
+      }
+
+      logger.info(
+        "Copying libpostal data from {} to local cache at {}",
+        remotePath: Any,
+        localCacheDir.getAbsolutePath: Any)
+
+      localCacheDir.mkdirs()
+
+      val hadoopConf =
+        try {
+          SparkHadoopUtil.get.conf
+        } catch {
+          case _: Exception => new Configuration()
+        }
+      val remoteHadoopPath = new Path(remotePath)
+
+      HadoopFileSystemUtils.copyDirectoryToLocal(hadoopConf, remoteHadoopPath, 
localCacheDir)
+
+      // Write marker file to indicate successful completion
+      markerFile.createNewFile()
+
+      // Remove the lock entry now that the cache is populated — future callers
+      // will return early via the markerFile.exists() fast path.
+      locks.remove(cacheKey)
+
+      logger.info("Successfully cached libpostal data at {}", 
localCacheDir.getAbsolutePath)

Review Comment:
   Lock cleanup only happens on the success path. If `copyDirectoryToLocal` 
throws, the lock entry remains in `locks` indefinitely (and per-key entries 
could accumulate across distinct remote URIs). Consider restructuring with a 
`try`/`finally` to perform appropriate cleanup, and/or using 
`locks.remove(cacheKey, lock)` conditionally to avoid races while still 
preventing unbounded growth.
   ```suggestion
         try {
           HadoopFileSystemUtils.copyDirectoryToLocal(hadoopConf, 
remoteHadoopPath, localCacheDir)
   
           // Write marker file to indicate successful completion
           markerFile.createNewFile()
   
           logger.info("Successfully cached libpostal data at {}", 
localCacheDir.getAbsolutePath)
         } finally {
           // Ensure the lock entry is removed even if an exception occurs 
during copy
           // or marker creation, while avoiding races with other potential 
lock objects.
           locks.remove(cacheKey, lock)
         }
   ```



##########
spark/common/src/test/scala/org/apache/sedona/sql/LibPostalDataLoaderTest.scala:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.spark.sql.sedona_sql.expressions.LibPostalDataLoader
+import org.scalatest.matchers.should.Matchers
+
+import java.io.File
+import java.nio.file.Files
+import java.util.concurrent.{CyclicBarrier, Executors, TimeUnit}
+import scala.collection.mutable.ListBuffer
+
+class LibPostalDataLoaderTest extends TestBaseScala with Matchers {
+
+  describe("LibPostalDataLoader") {
+
+    describe("isRemotePath") {
+      it("should return false for local paths") {
+        LibPostalDataLoader.isRemotePath("/tmp/libpostal/") shouldBe false
+      }
+
+      it("should return false for relative paths") {
+        LibPostalDataLoader.isRemotePath("data/libpostal/") shouldBe false
+      }
+
+      it("should return false for file:// URIs") {
+        LibPostalDataLoader.isRemotePath("file:///tmp/libpostal/") shouldBe 
false
+      }
+
+      it("should return true for hdfs:// URIs") {
+        LibPostalDataLoader.isRemotePath("hdfs:///data/libpostal/") shouldBe 
true
+      }
+
+      it("should return true for hdfs:// URIs with host") {
+        
LibPostalDataLoader.isRemotePath("hdfs://namenode:9000/data/libpostal/") 
shouldBe true
+      }
+
+      it("should return true for s3a:// URIs") {
+        LibPostalDataLoader.isRemotePath("s3a://my-bucket/libpostal/") 
shouldBe true
+      }
+
+      it("should return true for s3:// URIs") {
+        LibPostalDataLoader.isRemotePath("s3://my-bucket/libpostal/") shouldBe 
true
+      }
+
+      it("should return true for gs:// URIs") {
+        LibPostalDataLoader.isRemotePath("gs://my-bucket/libpostal/") shouldBe 
true
+      }
+
+      it("should return true for abfs:// URIs") {
+        LibPostalDataLoader.isRemotePath(
+          "abfs://[email protected]/libpostal/") shouldBe 
true
+      }
+
+      it("should return true for wasb:// URIs") {
+        LibPostalDataLoader.isRemotePath(
+          "wasb://[email protected]/libpostal/") 
shouldBe true
+      }
+
+      it("should return false for empty string") {
+        LibPostalDataLoader.isRemotePath("") shouldBe false
+      }
+
+      it("should return false for Windows-like paths") {
+        // Single-letter scheme like C: should not be treated as remote
+        LibPostalDataLoader.isRemotePath("C:\\libpostal\\data\\") shouldBe 
false
+      }
+    }
+
+    describe("resolveDataDir") {
+      it("should return local path unchanged") {
+        val tempDir = Files.createTempDirectory("sedona-libpostal-test").toFile
+        try {
+          val result = 
LibPostalDataLoader.resolveDataDir(tempDir.getAbsolutePath)
+          result shouldBe tempDir.getAbsolutePath
+        } finally {
+          tempDir.delete()
+        }
+      }
+
+      it("should copy remote directory to local cache and return local path") {
+        val (hdfsCluster, hdfsUri) = creatMiniHdfs()
+        try {
+          val hdfsConf = hdfsCluster.getConfiguration(0)
+          val fs = FileSystem.get(hdfsConf)
+
+          // Create a mock libpostal data directory on HDFS with expected 
subdirs
+          val remotePath = hdfsUri + "libpostal-data/"
+          val basePath = new Path(remotePath)
+          fs.mkdirs(basePath)
+
+          // Create the subdirectories that libpostal expects
+          val subdirs =
+            Seq(
+              "transliteration",
+              "numex",
+              "address_parser",
+              "address_expansions",
+              "language_classifier")
+          for (subdir <- subdirs) {
+            val subdirPath = new Path(basePath, subdir)
+            fs.mkdirs(subdirPath)
+            // Write a dummy file
+            val out = fs.create(new Path(subdirPath, "model.dat"))
+            out.writeBytes(s"data for $subdir")
+            out.close()
+          }
+
+          // Resolve the remote path
+          val localPath = LibPostalDataLoader.resolveDataDir(remotePath)
+
+          // Verify the result is a local path
+          localPath should not startWith "hdfs://"
+          new File(localPath).exists() shouldBe true
+
+          // Verify all subdirs were copied
+          for (subdir <- subdirs) {
+            val localSubdir = new File(localPath, subdir)
+            localSubdir.exists() shouldBe true
+            localSubdir.isDirectory shouldBe true
+            new File(localSubdir, "model.dat").exists() shouldBe true
+          }
+
+          // Verify the marker file exists
+          new File(localPath, ".sedona_libpostal_complete").exists() shouldBe 
true
+
+          // Verify trailing separator
+          localPath should endWith(File.separator)
+
+          // Call again — should use cache (no re-copy)
+          val localPath2 = LibPostalDataLoader.resolveDataDir(remotePath)
+          localPath2 shouldBe localPath
+
+          // Clean up local cache
+          deleteDirectory(new File(localPath))
+        } finally {
+          hdfsCluster.shutdown()
+        }
+      }
+
+      it("should handle concurrent access from multiple threads safely") {
+        val (hdfsCluster, hdfsUri) = creatMiniHdfs()
+        try {
+          val hdfsConf = hdfsCluster.getConfiguration(0)
+          val fs = FileSystem.get(hdfsConf)
+
+          // Create a mock libpostal data directory on HDFS
+          val remotePath = hdfsUri + "libpostal-concurrent/"
+          val basePath = new Path(remotePath)
+          fs.mkdirs(basePath)
+
+          val subdirs =
+            Seq(
+              "transliteration",
+              "numex",
+              "address_parser",
+              "address_expansions",
+              "language_classifier")
+          for (subdir <- subdirs) {
+            val subdirPath = new Path(basePath, subdir)
+            fs.mkdirs(subdirPath)
+            val out = fs.create(new Path(subdirPath, "model.dat"))
+            out.writeBytes(s"data for $subdir")
+            out.close()
+          }
+
+          val numThreads = 8
+          val barrier = new CyclicBarrier(numThreads)
+          val executor = Executors.newFixedThreadPool(numThreads)
+          val results = new ListBuffer[String]()
+          val errors = new ListBuffer[Throwable]()
+          val resultsLock = new AnyRef
+
+          val futures = (1 to numThreads).map { _ =>
+            executor.submit(new Runnable {
+              override def run(): Unit = {
+                try {
+                  // All threads wait here until all are ready, then start 
simultaneously
+                  barrier.await(30, TimeUnit.SECONDS)
+                  val localPath = 
LibPostalDataLoader.resolveDataDir(remotePath)
+                  resultsLock.synchronized {
+                    results += localPath
+                  }
+                } catch {
+                  case e: Throwable =>
+                    resultsLock.synchronized {
+                      errors += e
+                    }
+                }
+              }
+            })
+          }
+
+          // Wait for all threads to complete
+          futures.foreach(_.get(60, TimeUnit.SECONDS))
+          executor.shutdown()
+
+          // No errors should have occurred
+          errors shouldBe empty
+
+          // All threads should have resolved to the same local path
+          results.size shouldBe numThreads
+          results.distinct.size shouldBe 1
+
+          val localPath = results.head
+
+          // Verify the data is intact
+          for (subdir <- subdirs) {
+            val localSubdir = new File(localPath, subdir)
+            localSubdir.exists() shouldBe true
+            new File(localSubdir, "model.dat").exists() shouldBe true
+          }
+
+          // Exactly one marker file should exist
+          new File(localPath, ".sedona_libpostal_complete").exists() shouldBe 
true
+
+          // Clean up
+          deleteDirectory(new File(localPath))
+        } finally {
+          hdfsCluster.shutdown()
+        }
+      }
+    }
+  }
+
+  private def deleteDirectory(dir: File): Unit = {
+    if (dir.isDirectory) {
+      dir.listFiles().foreach(deleteDirectory)
+    }
+    dir.delete()

Review Comment:
   Test cleanup helper can throw NPE because `dir.listFiles()` may return 
`null` (I/O error, permissions) and `dir.delete()` return value is ignored 
(silent cleanup failures can make tests flaky, especially on Windows). Consider 
guarding `listFiles()` with `Option(...)` and asserting/handling delete 
failures to keep test runs deterministic.
   ```suggestion
         
Option(dir.listFiles()).getOrElse(Array.empty[File]).foreach(deleteDirectory)
       }
       val deleted = dir.delete()
       assert(deleted || !dir.exists(), s"Failed to delete directory: 
${dir.getAbsolutePath}")
   ```



##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.expressions
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.sedona.sql.utils.HadoopFileSystemUtils
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.slf4j.LoggerFactory
+
+import java.io.File
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.security.MessageDigest
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * Resolves libpostal data directory paths. When the configured data directory 
points to a remote
+ * filesystem (HDFS, S3, GCS, ABFS, etc.), the data is copied to a local cache 
directory so that
+ * jpostal's native code can access it.
+ *
+ * The local cache uses a hash of the remote URI to avoid re-downloading on 
subsequent
+ * invocations. A marker file `.sedona_libpostal_complete` is written after a 
successful copy so
+ * that partially-copied directories are detected and re-copied.
+ */
+object LibPostalDataLoader {
+
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  private val MARKER_FILE = ".sedona_libpostal_complete"
+
+  /** Per-cache-key lock objects to prevent concurrent downloads to the same 
directory. */
+  private val locks = new ConcurrentHashMap[String, AnyRef]()
+
+  /**
+   * Resolve the data directory to a local filesystem path. If the configured 
path already points
+   * to the local filesystem, it is returned as-is. If it points to a remote 
filesystem, the
+   * directory is recursively copied to a local cache under `java.io.tmpdir`.
+   *
+   * @param configuredDir
+   *   the data directory path from Sedona configuration (may be local or 
remote)
+   * @return
+   *   a local filesystem path suitable for jpostal
+   */
+  def resolveDataDir(configuredDir: String): String = {
+    if (isRemotePath(configuredDir)) {
+      copyRemoteToLocalCache(configuredDir)
+    } else {
+      configuredDir
+    }
+  }
+
+  /**
+   * Determine whether a path string refers to a remote (non-local) filesystem.
+   */
+  def isRemotePath(path: String): Boolean = {
+    try {
+      val uri = new URI(path)
+      val scheme = uri.getScheme
+      scheme != null && scheme != "file" && scheme.length > 1
+    } catch {
+      case _: Exception => false
+    }
+  }
+
+  /**
+   * Copy a remote directory to a local cache and return the local path. Uses 
a hash-based cache
+   * key so that different remote paths get different local directories. A 
marker file is used to
+   * ensure completeness — if the marker is missing (e.g. from a previous 
interrupted copy), the
+   * directory is re-copied.
+   */
+  private def copyRemoteToLocalCache(remotePath: String): String = {
+    val cacheKey = hashPath(remotePath)
+    val localCacheDir =
+      new File(System.getProperty("java.io.tmpdir"), 
s"sedona-libpostal-cache/$cacheKey")
+    val markerFile = new File(localCacheDir, MARKER_FILE)
+
+    if (markerFile.exists()) {
+      logger.info(
+        "Libpostal data already cached at {}, skipping download from {}",
+        localCacheDir.getAbsolutePath: Any,
+        remotePath: Any)
+      return ensureTrailingSlash(localCacheDir.getAbsolutePath)
+    }
+
+    // Synchronize on a canonical lock object per cache key so that concurrent
+    // threads on the same JVM don't race to download the same data.
+    val lock = locks.computeIfAbsent(cacheKey, _ => new AnyRef)
+    lock.synchronized {

Review Comment:
   Lock cleanup only happens on the success path. If `copyDirectoryToLocal` 
throws, the lock entry remains in `locks` indefinitely (and per-key entries 
could accumulate across distinct remote URIs). Consider restructuring with a 
`try`/`finally` to perform appropriate cleanup, and/or using 
`locks.remove(cacheKey, lock)` conditionally to avoid races while still 
preventing unbounded growth.



##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalDataLoader.scala:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.expressions
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.sedona.sql.utils.HadoopFileSystemUtils
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.slf4j.LoggerFactory
+
+import java.io.File
+import java.net.URI
+import java.nio.charset.StandardCharsets
+import java.security.MessageDigest
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * Resolves libpostal data directory paths. When the configured data directory 
points to a remote
+ * filesystem (HDFS, S3, GCS, ABFS, etc.), the data is copied to a local cache 
directory so that
+ * jpostal's native code can access it.
+ *
+ * The local cache uses a hash of the remote URI to avoid re-downloading on 
subsequent
+ * invocations. A marker file `.sedona_libpostal_complete` is written after a 
successful copy so
+ * that partially-copied directories are detected and re-copied.
+ */
+object LibPostalDataLoader {
+
+  private val logger = LoggerFactory.getLogger(getClass)
+
+  private val MARKER_FILE = ".sedona_libpostal_complete"
+
+  /** Per-cache-key lock objects to prevent concurrent downloads to the same 
directory. */
+  private val locks = new ConcurrentHashMap[String, AnyRef]()
+
+  /**
+   * Resolve the data directory to a local filesystem path. If the configured 
path already points
+   * to the local filesystem, it is returned as-is. If it points to a remote 
filesystem, the
+   * directory is recursively copied to a local cache under `java.io.tmpdir`.
+   *
+   * @param configuredDir
+   *   the data directory path from Sedona configuration (may be local or 
remote)
+   * @return
+   *   a local filesystem path suitable for jpostal
+   */
+  def resolveDataDir(configuredDir: String): String = {
+    if (isRemotePath(configuredDir)) {
+      copyRemoteToLocalCache(configuredDir)
+    } else {
+      configuredDir
+    }
+  }
+
+  /**
+   * Determine whether a path string refers to a remote (non-local) filesystem.
+   */
+  def isRemotePath(path: String): Boolean = {
+    try {
+      val uri = new URI(path)
+      val scheme = uri.getScheme
+      scheme != null && scheme != "file" && scheme.length > 1
+    } catch {
+      case _: Exception => false
+    }
+  }
+
+  /**
+   * Copy a remote directory to a local cache and return the local path. Uses 
a hash-based cache
+   * key so that different remote paths get different local directories. A 
marker file is used to
+   * ensure completeness — if the marker is missing (e.g. from a previous 
interrupted copy), the
+   * directory is re-copied.
+   */
+  private def copyRemoteToLocalCache(remotePath: String): String = {
+    val cacheKey = hashPath(remotePath)
+    val localCacheDir =
+      new File(System.getProperty("java.io.tmpdir"), 
s"sedona-libpostal-cache/$cacheKey")
+    val markerFile = new File(localCacheDir, MARKER_FILE)
+
+    if (markerFile.exists()) {
+      logger.info(
+        "Libpostal data already cached at {}, skipping download from {}",
+        localCacheDir.getAbsolutePath: Any,
+        remotePath: Any)
+      return ensureTrailingSlash(localCacheDir.getAbsolutePath)
+    }
+
+    // Synchronize on a canonical lock object per cache key so that concurrent
+    // threads on the same JVM don't race to download the same data.
+    val lock = locks.computeIfAbsent(cacheKey, _ => new AnyRef)
+    lock.synchronized {
+      // Double-check after acquiring lock
+      if (markerFile.exists()) {
+        return ensureTrailingSlash(localCacheDir.getAbsolutePath)
+      }
+
+      logger.info(
+        "Copying libpostal data from {} to local cache at {}",
+        remotePath: Any,
+        localCacheDir.getAbsolutePath: Any)
+
+      localCacheDir.mkdirs()
+
+      val hadoopConf =
+        try {
+          SparkHadoopUtil.get.conf
+        } catch {
+          case _: Exception => new Configuration()
+        }
+      val remoteHadoopPath = new Path(remotePath)
+
+      HadoopFileSystemUtils.copyDirectoryToLocal(hadoopConf, remoteHadoopPath, 
localCacheDir)
+
+      // Write marker file to indicate successful completion
+      markerFile.createNewFile()
+
+      // Remove the lock entry now that the cache is populated — future callers
+      // will return early via the markerFile.exists() fast path.
+      locks.remove(cacheKey)
+
+      logger.info("Successfully cached libpostal data at {}", 
localCacheDir.getAbsolutePath)

Review Comment:
   Lock cleanup only happens on the success path. If `copyDirectoryToLocal` 
throws, the lock entry remains in `locks` indefinitely (and per-key entries 
could accumulate across distinct remote URIs). Consider restructuring with a 
`try`/`finally` to perform appropriate cleanup, and/or using 
`locks.remove(cacheKey, lock)` conditionally to avoid races while still 
preventing unbounded growth.
   ```suggestion
         try {
           // Double-check after acquiring lock
           if (markerFile.exists()) {
             return ensureTrailingSlash(localCacheDir.getAbsolutePath)
           }
   
           logger.info(
             "Copying libpostal data from {} to local cache at {}",
             remotePath: Any,
             localCacheDir.getAbsolutePath: Any)
   
           localCacheDir.mkdirs()
   
           val hadoopConf =
             try {
               SparkHadoopUtil.get.conf
             } catch {
               case _: Exception => new Configuration()
             }
           val remoteHadoopPath = new Path(remotePath)
   
           HadoopFileSystemUtils.copyDirectoryToLocal(hadoopConf, 
remoteHadoopPath, localCacheDir)
   
           // Write marker file to indicate successful completion
           markerFile.createNewFile()
   
           logger.info("Successfully cached libpostal data at {}", 
localCacheDir.getAbsolutePath)
         } finally {
           // Always attempt to remove the lock entry to avoid unbounded growth.
           // Use the value-based remove to avoid interfering with any updated 
mapping.
           locks.remove(cacheKey, lock)
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to