This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 178d0f4d46 [GH-2074] Add libpostal integration (#2077)
178d0f4d46 is described below

commit 178d0f4d46758f074518724359598afed70fd692
Author: James Willis <[email protected]>
AuthorDate: Mon Jul 14 21:46:21 2025 -0700

    [GH-2074] Add libpostal integration (#2077)
    
    * ExpandAddress and ParseAddess support via libpostal
    
    * PR comments; edge case in SedonaConf changes
    
    * Update 
spark/common/src/test/scala/org/apache/sedona/sql/AddressProcessingFunctionsTest.scala
    
    Co-authored-by: Copilot <[email protected]>
    
    * Update python/sedona/spark/sql/st_functions.py
    
    Co-authored-by: Copilot <[email protected]>
    
    * Update 
spark/common/src/test/java/org/apache/sedona/core/utils/SedonaConfTest.java
    
    Co-authored-by: Copilot <[email protected]>
    
    * Update 
spark/common/src/test/scala/org/apache/sedona/sql/AddressProcessingFunctionsTest.scala
    
    Co-authored-by: Copilot <[email protected]>
    
    * align null case logic between eval and codegen cases
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 .github/workflows/java.yml                         |  12 +-
 docs/api/sql/Function.md                           |  52 +++++
 python/sedona/spark/sql/st_functions.py            |  20 ++
 python/tests/sql/test_dataframe_api.py             |  37 +++
 spark/common/pom.xml                               |  11 +
 .../org/apache/sedona/core/utils/SedonaConf.java   | 118 ++++++++--
 .../scala/org/apache/sedona/sql/UDF/Catalog.scala  |   2 +
 .../sql/sedona_sql/expressions/Functions.scala     | 147 +++++++++++-
 .../sedona_sql/expressions/LibPostalUtils.scala    |  39 ++++
 .../sql/sedona_sql/expressions/st_functions.scala  |   4 +
 .../apache/sedona/core/utils/SedonaConfTest.java   |   2 +
 .../sql/AddressProcessingFunctionsTest.scala       | 249 +++++++++++++++++++++
 .../org/apache/sedona/sql/GeoStatsSuite.scala      |   4 +
 13 files changed, 669 insertions(+), 28 deletions(-)

diff --git a/.github/workflows/java.yml b/.github/workflows/java.yml
index d07e4c527f..54a4122c7d 100644
--- a/.github/workflows/java.yml
+++ b/.github/workflows/java.yml
@@ -65,6 +65,9 @@ jobs:
           - spark: 4.0.0
             scala: 2.13.8
             jdk: '17'
+          - spark: 3.5.4
+            scala: 2.12.18
+            jdk: '17'
           - spark: 3.5.0
             scala: 2.13.8
             jdk: '11'
@@ -103,13 +106,8 @@ jobs:
         run: |
           SPARK_COMPAT_VERSION=${SPARK_VERSION:0:3}
 
-          if [ "${SPARK_VERSION}" == "3.5.0" ]; then
-              pip install pyspark==3.5.0 pandas shapely apache-sedona pyarrow
-              export SPARK_HOME=$(python -c "import pyspark; 
print(pyspark.__path__[0])")
-          fi
-
-          if [ "${SPARK_VERSION}" == "4.0.0" ]; then
-              pip install pyspark==4.0.0 pandas shapely apache-sedona pyarrow
+          if [[ "${SPARK_VERSION}" == "3.5"* ]] || [[ "${SPARK_VERSION}" == 
"4."* ]]; then
+              pip install pyspark==$SPARK_VERSION pandas shapely apache-sedona 
pyarrow
               export SPARK_HOME=$(python -c "import pyspark; 
print(pyspark.__path__[0])")
           fi
 
diff --git a/docs/api/sql/Function.md b/docs/api/sql/Function.md
index e35f5d104d..8d3b769b11 100644
--- a/docs/api/sql/Function.md
+++ b/docs/api/sql/Function.md
@@ -17,6 +17,58 @@
  under the License.
  -->
 
+## ExpandAddress
+
+Introduction: Returns an array of expanded forms of the input address string. 
This is backed by the [libpostal](https://github.com/openvenues/libpostal) 
library's address expanding functionality.
+
+!!!Note
+    Jpostal requires at least 2 GB of free disk space to store the data files 
used for address parsing and expanding. The data files are downloaded 
automatically when the function is called for the first time.
+
+!!!Note
+    The version of jpostal installed with this package only supports Linux and 
MacOS. If you are using Windows, you will need to install libjpostal and 
libpostal manually and ensure that they are available in your 
`java.library.path`.
+
+Format: `ExpandAddress (address: String)`
+
+Since: `v1.8.0`
+
+SQL Example
+
+```sql
+SELECT ExpandAddress("100 W 1st St, Los Angeles, CA 90012");
+```
+
+Output:
+
+```
+[100 w 1st saint, 100 w 1st street, 100 west 1st saint, 100 west 1st street, 
100 w 1 saint, 100 w 1 street, 100 west 1 saint, 100 west 1 street]
+```
+
+## ParseAddress
+
+Introduction: Returns an array of the components (e.g. street, postal code) of 
the input address string. This is backed by the 
[libpostal](https://github.com/openvenues/libpostal) library's address parsing 
functionality.
+
+!!!Note
+    Jpostal requires at least 2 GB of free disk space to store the data files 
used for address parsing and expanding. The data files are downloaded 
automatically when the library is initialized.
+
+!!!Note
+    The version of jpostal installed with this package only supports Linux and 
MacOS. If you are using Windows, you will need to install libjpostal and 
libpostal manually and ensure that they are available in your 
`java.library.path`.
+
+Format: `ParseAddress (address: String)`
+
+Since: `v1.8.0`
+
+SQL Example
+
+```sql
+SELECT ParseAddress("100 W 1st St, Los Angeles, CA 90012");
+```
+
+Output:
+
+```
+[{house_number, 100}, {road, w 1st st}, {city, los angeles}, {state, ca}, 
{postcode, 90012}]
+```
+
 ## GeometryType
 
 Introduction: Returns the type of the geometry as a string. Eg: 'LINESTRING', 
'POLYGON', 'MULTIPOINT', etc. This function also indicates if the geometry is 
measured, by returning a string of the form 'POINTM'.
diff --git a/python/sedona/spark/sql/st_functions.py 
b/python/sedona/spark/sql/st_functions.py
index d1ea70d649..83a16811a3 100644
--- a/python/sedona/spark/sql/st_functions.py
+++ b/python/sedona/spark/sql/st_functions.py
@@ -32,6 +32,26 @@ from sedona.spark.sql.dataframe_api import (
 _call_st_function = partial(call_sedona_function, "st_functions")
 
 
+@validate_argument_types
+def ExpandAddress(address: ColumnOrName):  # noqa: N802
+    """Normalize an address string into its canonical forms.
+
+    :param address: The address string or column to normalize.
+    :return: An array of normalized address strings.
+    """
+    return _call_st_function("ExpandAddress", address)
+
+
+@validate_argument_types
+def ParseAddress(address: ColumnOrName):  # noqa: N802
+    """Parse an address string into its components (label/value pairs).
+
+    :param address: The address string or column to parse.
+    :return: An array of maps with label/value pairs for address components.
+    """
+    return _call_st_function("ParseAddress", address)
+
+
 @validate_argument_types
 def GeometryType(geometry: ColumnOrName):
     """Return the type of the geometry as a string.
diff --git a/python/tests/sql/test_dataframe_api.py 
b/python/tests/sql/test_dataframe_api.py
index c515cd4a3e..2ca5ef9edb 100644
--- a/python/tests/sql/test_dataframe_api.py
+++ b/python/tests/sql/test_dataframe_api.py
@@ -1783,3 +1783,40 @@ class TestDataFrameAPI(TestBase):
         df.withColumn(
             "localOutlierFactor", ST_LocalOutlierFactor("geometry", 2, False)
         ).collect()
+
+    def test_expand_address_df_api(self):
+        input_df = (
+            self.spark.range(1)
+            .selectExpr(
+                "'781 Franklin Ave Crown Heights Brooklyn NY 11216 USA' as 
address"
+            )
+            .cache()
+        )  # cache to avoid Constant Folding Optimization
+
+        # Actually running downloads the model and is very expensive, so we 
just check the plan
+        # Checking the plan should allow us to verify that the function is 
correctly registered
+        assert input_df.select(
+            ExpandAddress("address").alias("normalized")
+        ).sameSemantics(
+            
input_df.select(f.expr("ExpandAddress(address)").alias("normalized"))
+        )
+
+        input_df.unpersist()
+
+    def test_parse_address_df_api(self):
+        input_df = (
+            self.spark.range(1)
+            .selectExpr(
+                "'781 Franklin Ave Crown Heights Brooklyn NY 11216 USA' as 
address"
+            )
+            .cache()
+        )  # cache to avoid Constant Folding Optimization
+
+        # Actually running downloads the model and is very expensive, so we 
just check the plan
+        # Checking the plan should allow us to verify that the function is 
correctly registered
+        assert input_df.select(
+            ParseAddress(f.col("address")).alias("parsed")
+        ).sameSemantics(
+            input_df.select(f.expr("ParseAddress(address)").alias("parsed"))
+        )
+        input_df.unpersist()
diff --git a/spark/common/pom.xml b/spark/common/pom.xml
index fab7c9fd09..32a2b5eec7 100644
--- a/spark/common/pom.xml
+++ b/spark/common/pom.xml
@@ -242,6 +242,17 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>com.wherobots</groupId>
+            <artifactId>jpostal</artifactId>
+            <version>1.2.2</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-compress</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
         <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>minio</artifactId>
diff --git 
a/spark/common/src/main/java/org/apache/sedona/core/utils/SedonaConf.java 
b/spark/common/src/main/java/org/apache/sedona/core/utils/SedonaConf.java
index d02e96df93..38c4f061d0 100644
--- a/spark/common/src/main/java/org/apache/sedona/core/utils/SedonaConf.java
+++ b/spark/common/src/main/java/org/apache/sedona/core/utils/SedonaConf.java
@@ -20,11 +20,14 @@ package org.apache.sedona.core.utils;
 
 import java.io.Serializable;
 import java.lang.reflect.Field;
+import java.nio.file.Paths;
 import org.apache.sedona.core.enums.GridType;
 import org.apache.sedona.core.enums.IndexType;
 import org.apache.sedona.core.enums.JoinBuildSide;
 import org.apache.sedona.core.enums.JoinSpartitionDominantSide;
 import org.apache.sedona.core.enums.SpatialJoinOptimizationMode;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkEnv;
 import org.apache.spark.sql.RuntimeConfig;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.util.Utils;
@@ -62,17 +65,71 @@ public class SedonaConf implements Serializable {
   // Parameters for geostats
   private Boolean DBSCANIncludeOutliers = true;
 
+  // Parameters for libpostal integration
+  private String libPostalDataDir;
+  private Boolean libPostalUseSenzing = false;
+
   public static SedonaConf fromActiveSession() {
     return new SedonaConf(SparkSession.active().conf());
   }
 
+  public static SedonaConf fromSparkEnv() {
+    return new SedonaConf(SparkEnv.get().conf());
+  }
+
+  public SedonaConf(SparkConf sparkConf) {
+    this(
+        new ConfGetter() {
+          @Override
+          public String get(String key, String defaultValue) {
+            return sparkConf.get(key, defaultValue);
+          }
+
+          @Override
+          public String get(String key) {
+            return sparkConf.get(key, null);
+          }
+
+          public boolean contains(String key) {
+            return sparkConf.contains(key);
+          }
+        });
+  }
+
   public SedonaConf(RuntimeConfig runtimeConfig) {
-    this.useIndex = Boolean.parseBoolean(getConfigValue(runtimeConfig, 
"global.index", "true"));
+    this(
+        new ConfGetter() {
+          @Override
+          public String get(String key, String defaultValue) {
+            return runtimeConfig.get(key, defaultValue);
+          }
+
+          @Override
+          public String get(String key) {
+            return runtimeConfig.get(key);
+          }
+
+          public boolean contains(String key) {
+            return runtimeConfig.contains(key);
+          }
+        });
+  }
+
+  private interface ConfGetter {
+    String get(String key, String defaultValue);
+
+    String get(String key);
+
+    boolean contains(String key);
+  }
+
+  private SedonaConf(ConfGetter confGetter) {
+    this.useIndex = Boolean.parseBoolean(getConfigValue(confGetter, 
"global.index", "true"));
     this.indexType =
-        IndexType.getIndexType(getConfigValue(runtimeConfig, 
"global.indextype", "rtree"));
+        IndexType.getIndexType(getConfigValue(confGetter, "global.indextype", 
"rtree"));
     this.joinApproximateTotalCount =
-        Long.parseLong(getConfigValue(runtimeConfig, "join.approxcount", 
"-1"));
-    String[] boundaryString = getConfigValue(runtimeConfig, "join.boundary", 
"0,0,0,0").split(",");
+        Long.parseLong(getConfigValue(confGetter, "join.approxcount", "-1"));
+    String[] boundaryString = getConfigValue(confGetter, "join.boundary", 
"0,0,0,0").split(",");
     this.datasetBoundary =
         new Envelope(
             Double.parseDouble(boundaryString[0]),
@@ -80,43 +137,57 @@ public class SedonaConf implements Serializable {
             Double.parseDouble(boundaryString[2]),
             Double.parseDouble(boundaryString[3]));
     this.joinGridType =
-        GridType.getGridType(getConfigValue(runtimeConfig, "join.gridtype", 
"kdbtree"));
+        GridType.getGridType(getConfigValue(confGetter, "join.gridtype", 
"kdbtree"));
     this.joinBuildSide =
-        JoinBuildSide.getBuildSide(getConfigValue(runtimeConfig, 
"join.indexbuildside", "left"));
+        JoinBuildSide.getBuildSide(getConfigValue(confGetter, 
"join.indexbuildside", "left"));
     this.joinSpartitionDominantSide =
         JoinSpartitionDominantSide.getJoinSpartitionDominantSide(
-            getConfigValue(runtimeConfig, "join.spatitionside", "left"));
+            getConfigValue(confGetter, "join.spatitionside", "left"));
     this.fallbackPartitionNum =
-        Integer.parseInt(getConfigValue(runtimeConfig, "join.numpartition", 
"-1"));
+        Integer.parseInt(getConfigValue(confGetter, "join.numpartition", 
"-1"));
     this.autoBroadcastJoinThreshold =
         bytesFromString(
             getConfigValue(
-                runtimeConfig,
+                confGetter,
                 "join.autoBroadcastJoinThreshold",
-                runtimeConfig.get("spark.sql.autoBroadcastJoinThreshold")));
+                confGetter.get("spark.sql.autoBroadcastJoinThreshold")));
     this.spatialJoinOptimizationMode =
         SpatialJoinOptimizationMode.getSpatialJoinOptimizationMode(
-            getConfigValue(runtimeConfig, "join.optimizationmode", "nonequi"));
+            getConfigValue(confGetter, "join.optimizationmode", "nonequi"));
 
     // Parameters for knn joins
     this.includeTieBreakersInKNNJoins =
-        Boolean.parseBoolean(getConfigValue(runtimeConfig, 
"join.knn.includeTieBreakers", "false"));
+        Boolean.parseBoolean(getConfigValue(confGetter, 
"join.knn.includeTieBreakers", "false"));
 
     // Parameters for geostats
     this.DBSCANIncludeOutliers =
-        
Boolean.parseBoolean(runtimeConfig.get("spark.sedona.dbscan.includeOutliers", 
"true"));
+        
Boolean.parseBoolean(confGetter.get("spark.sedona.dbscan.includeOutliers", 
"true"));
+
+    // Parameters for libpostal integration
+    String libPostalDataDir =
+        confGetter.get(
+            "spark.sedona.libpostal.dataDir",
+            Paths.get(System.getProperty("java.io.tmpdir"))
+                .resolve(Paths.get("libpostal"))
+                .toString());
+    if (!libPostalDataDir.isEmpty() && !libPostalDataDir.endsWith("/")) {
+      libPostalDataDir = libPostalDataDir + "/";
+    }
+    this.libPostalDataDir = libPostalDataDir;
+
+    this.libPostalUseSenzing =
+        
Boolean.parseBoolean(confGetter.get("spark.sedona.libpostal.useSenzing", 
"true"));
   }
 
   // Helper method to prioritize `sedona.*` over `spark.sedona.*`
-  private String getConfigValue(
-      RuntimeConfig runtimeConfig, String keySuffix, String defaultValue) {
+  private String getConfigValue(ConfGetter confGetter, String keySuffix, 
String defaultValue) {
     String sedonaKey = "sedona." + keySuffix;
     String sparkSedonaKey = "spark.sedona." + keySuffix;
 
-    if (runtimeConfig.contains(sedonaKey)) {
-      return runtimeConfig.get(sedonaKey, defaultValue);
+    if (confGetter.contains(sedonaKey)) {
+      return confGetter.get(sedonaKey, defaultValue);
     } else {
-      return runtimeConfig.get(sparkSedonaKey, defaultValue);
+      return confGetter.get(sparkSedonaKey, defaultValue);
     }
   }
 
@@ -179,6 +250,9 @@ public class SedonaConf implements Serializable {
   }
 
   static long bytesFromString(String str) {
+    if (str == null || str.isEmpty()) {
+      return 0;
+    }
     if (str.startsWith("-")) {
       return -1 * Utils.byteStringAsBytes(str.substring(1));
     } else {
@@ -193,4 +267,12 @@ public class SedonaConf implements Serializable {
   public Boolean getDBSCANIncludeOutliers() {
     return DBSCANIncludeOutliers;
   }
+
+  public String getLibPostalDataDir() {
+    return libPostalDataDir;
+  }
+
+  public Boolean getLibPostalUseSenzing() {
+    return libPostalUseSenzing;
+  }
 }
diff --git 
a/spark/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala 
b/spark/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
index e0e3503012..031c65f97f 100644
--- a/spark/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
+++ b/spark/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
@@ -29,6 +29,8 @@ object Catalog extends AbstractCatalog {
 
   override val expressions: Seq[FunctionDescription] = Seq(
     // Expression for vectors
+    function[ExpandAddress](),
+    function[ParseAddress](),
     function[GeometryType](),
     function[ST_LabelPoint](),
     function[ST_PointFromText](),
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
index b594d8a43d..a9e6e5cd2a 100644
--- 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
@@ -22,19 +22,23 @@ import org.apache.sedona.common.geometryObjects.Geography
 import org.apache.sedona.common.{Functions, FunctionsGeoTools}
 import org.apache.sedona.common.sphere.{Haversine, Spheroid}
 import org.apache.sedona.common.utils.{InscribedCircle, ValidDetail}
+import org.apache.sedona.core.utils.SedonaConf
 import org.apache.sedona.sql.utils.GeometrySerializer
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression, Generator, Nondeterministic}
-import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
CodegenFallback, ExprCode}
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression, Generator, ImplicitCastInputTypes, Nondeterministic, 
UnaryExpression}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
 import org.apache.spark.sql.sedona_sql.expressions.implicits._
 import org.apache.spark.sql.types._
 import org.locationtech.jts.algorithm.MinimumBoundingCircle
 import org.locationtech.jts.geom._
 import 
org.apache.spark.sql.sedona_sql.expressions.InferrableFunctionConverter._
+import 
org.apache.spark.sql.sedona_sql.expressions.LibPostalUtils.{getExpanderFromConf,
 getParserFromConf}
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
+import com.mapzen.jpostal.{AddressExpander, AddressParser}
+import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper
 
 case class ST_LabelPoint(inputExpressions: Seq[Expression])
     extends InferredExpression(
@@ -1831,3 +1835,140 @@ case class ST_InterpolatePoint(inputExpressions: 
Seq[Expression])
   protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) =
     copy(inputExpressions = newChildren)
 }
+
+case class ExpandAddress(address: Expression)
+    extends UnaryExpression
+    with ImplicitCastInputTypes
+    with CodegenFallback
+    with FoldableExpression
+    with Serializable {
+
+  def this(children: Seq[Expression]) = this(children.head)
+
+  lazy private final val expander = {
+    val conf = SedonaConf.fromSparkEnv
+    getExpanderFromConf(conf.getLibPostalDataDir, conf.getLibPostalUseSenzing)
+  }
+
+  override def nullable: Boolean = true
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(StringType)
+
+  override def dataType: DataType = ArrayType(StringType)
+
+  override def eval(input: InternalRow): Any = {
+    val addressVal = address.eval(input)
+    if (addressVal == null) {
+      null
+    } else {
+      new GenericArrayData(
+        expander
+          .expandAddress(addressVal.asInstanceOf[UTF8String].toString)
+          .map(UTF8String.fromString))
+    }
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+    val expanderRef = ctx.addReferenceObj("expander", expander, 
classOf[AddressExpander].getName)
+    val utf8StringClass = "org.apache.spark.unsafe.types.UTF8String"
+    val arrayDataClass = "org.apache.spark.sql.catalyst.util.GenericArrayData"
+    val addressRef = child.genCode(ctx)
+    val address = addressRef.value
+    ev.copy(code = code"""
+          ${addressRef.code}
+          boolean ${ev.isNull} = ${addressRef.isNull};
+          $arrayDataClass ${ev.value} = null;
+
+          if (!${ev.isNull}) {
+            String[] expandedAddressArray = 
$expanderRef.expandAddress($address.toString());
+            $utf8StringClass[] utf8Strings = new 
$utf8StringClass[expandedAddressArray.length];
+            for (int j = 0; j < expandedAddressArray.length; j++) {
+              utf8Strings[j] = 
$utf8StringClass.fromString(expandedAddressArray[j]);
+            }
+            ${ev.value} = new $arrayDataClass(utf8Strings);
+          } else {
+            ${ev.value} = null;
+          }""".stripMargin)
+  }
+
+  override def toString: String = s"ExpandAddress($address)"
+
+  override def child: Expression = address
+
+  override protected def withNewChildInternal(newChild: Expression): 
Expression =
+    ExpandAddress(newChild)
+}
+
+case class ParseAddress(address: Expression)
+    extends UnaryExpression
+    with ImplicitCastInputTypes
+    with CodegenFallback
+    with FoldableExpression
+    with Serializable {
+
+  def this(children: Seq[Expression]) = this(children.head)
+
+  lazy private final val parser = {
+    val conf = SedonaConf.fromSparkEnv
+    getParserFromConf(conf.getLibPostalDataDir, conf.getLibPostalUseSenzing)
+  }
+
+  override def nullable: Boolean = true
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(StringType)
+
+  override def dataType: DataType = ArrayType(
+    StructType(Seq(StructField("label", StringType), StructField("value", 
StringType))))
+
+  override def eval(input: InternalRow): Any = {
+    val addressVal = address.eval(input)
+    if (addressVal == null) {
+      null
+    } else {
+      new GenericArrayData(
+        parser
+          .parseAddress(addressVal.asInstanceOf[UTF8String].toString)
+          .map(component =>
+            InternalRow(
+              UTF8String
+                .fromString(component.getLabel),
+              UTF8String.fromString(component.getValue))))
+    }
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
+    val parserRef = ctx.addReferenceObj("parser", parser, 
classOf[AddressParser].getName)
+    val arrayDataClass = "org.apache.spark.sql.catalyst.util.GenericArrayData"
+    val internalRowClass = 
"org.apache.spark.sql.catalyst.expressions.GenericInternalRow"
+
+    val addressRef = child.genCode(ctx)
+    val address = addressRef.value
+    val code = code"""
+        ${addressRef.code}
+        boolean ${ev.isNull} = ${addressRef.isNull};
+        $arrayDataClass ${ev.value};
+
+        if (!${ev.isNull}) {
+          com.mapzen.jpostal.ParsedComponent[] components = 
$parserRef.parseAddress($address.toString());
+          $internalRowClass[] rows = new $internalRowClass[components.length];
+          for (int j = 0; j < components.length; j++) {
+            Object[] fields = new Object[2];
+            fields[0] = UTF8String.fromString(components[j].getLabel());
+            fields[1] = UTF8String.fromString(components[j].getValue());
+            $internalRowClass row = new GenericInternalRow(fields);
+            rows[j] = row;
+          }
+          ${ev.value} = new $arrayDataClass(rows);
+        } else {
+          ${ev.value} = null;
+        }""".stripMargin
+    ev.copy(code = code)
+  }
+
+  override def toString: String = s"ParseAddress($address)"
+
+  override def child: Expression = address
+
+  override protected def withNewChildInternal(newChild: Expression): 
Expression = ParseAddress(
+    newChild)
+}
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalUtils.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalUtils.scala
new file mode 100644
index 0000000000..fe23cab722
--- /dev/null
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/LibPostalUtils.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.expressions
+
+import com.mapzen.jpostal.{AddressExpander, AddressParser, Config}
+
+object LibPostalUtils {
+
+  private def getConfig(dataDir: String, useSenzing: Boolean): Config = {
+    Config
+      .builder()
+      .dataDir(dataDir)
+      .downloadDataIfNeeded(true)
+      .senzing(useSenzing)
+      .build()
+  }
+
+  def getParserFromConf(dataDir: String, useSenzing: Boolean): AddressParser =
+    AddressParser.getInstanceConfig(getConfig(dataDir, useSenzing))
+
+  def getExpanderFromConf(dataDir: String, useSenzing: Boolean): 
AddressExpander =
+    AddressExpander.getInstanceConfig(getConfig(dataDir, useSenzing))
+}
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/st_functions.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/st_functions.scala
index 56e0168e59..972746c693 100644
--- 
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/st_functions.scala
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/st_functions.scala
@@ -25,6 +25,10 @@ import org.locationtech.jts.operation.buffer.BufferParameters
 import org.apache.spark.sql.sedona_sql.DataFrameShims._
 
 object st_functions {
+  def ExpandAddress(address: Column): Column = 
wrapExpression[ExpandAddress](address)
+  def ExpandAddress(address: String): Column = 
wrapExpression[ExpandAddress](address)
+  def ParseAddress(address: Column): Column = 
wrapExpression[ParseAddress](address)
+  def ParseAddress(address: String): Column = 
wrapExpression[ParseAddress](address)
   def GeometryType(geometry: Column): Column = 
wrapExpression[GeometryType](geometry)
   def GeometryType(geometry: String): Column = 
wrapExpression[GeometryType](geometry)
 
diff --git 
a/spark/common/src/test/java/org/apache/sedona/core/utils/SedonaConfTest.java 
b/spark/common/src/test/java/org/apache/sedona/core/utils/SedonaConfTest.java
index 4c0d5b81df..f90641f76a 100644
--- 
a/spark/common/src/test/java/org/apache/sedona/core/utils/SedonaConfTest.java
+++ 
b/spark/common/src/test/java/org/apache/sedona/core/utils/SedonaConfTest.java
@@ -57,5 +57,7 @@ public class SedonaConfTest {
     assertEquals(-1, SedonaConf.bytesFromString("-1"));
     assertEquals(1024, SedonaConf.bytesFromString("1k"));
     assertEquals(2097152, SedonaConf.bytesFromString("2MB"));
+    // fromSparkEnv means we don't have access to default values so sometimes 
we get null as input
+    assertEquals(0, SedonaConf.bytesFromString(null));
   }
 }
diff --git 
a/spark/common/src/test/scala/org/apache/sedona/sql/AddressProcessingFunctionsTest.scala
 
b/spark/common/src/test/scala/org/apache/sedona/sql/AddressProcessingFunctionsTest.scala
new file mode 100644
index 0000000000..6b13d4b909
--- /dev/null
+++ 
b/spark/common/src/test/scala/org/apache/sedona/sql/AddressProcessingFunctionsTest.scala
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql
+
+import org.apache.sedona.core.utils.SedonaConf
+import 
org.apache.spark.sql.sedona_sql.expressions.st_functions.{ExpandAddress, 
ParseAddress}
+import org.apache.spark.sql.{Row, functions => f}
+import org.scalatest.BeforeAndAfterEach
+import org.slf4j.LoggerFactory
+
+import java.nio.file.{Files, Paths}
+import scala.collection.mutable
+
+class AddressProcessingFunctionsTest extends TestBaseScala with 
BeforeAndAfterEach {
+  private val logger = LoggerFactory.getLogger(getClass)
+  var clearedLibPostal = false
+
+  def clearLibpostalDataDir(): String = {
+    val dir = SedonaConf.fromActiveSession().getLibPostalDataDir
+    if (dir != null && dir.nonEmpty) {
+      try {
+        Files
+          .walk(Paths.get(dir))
+          .sorted(java.util.Comparator.reverseOrder())
+          .forEach(path => Files.deleteIfExists(path))
+      } catch {
+        case e: Exception =>
+          logger.warn(s"Failed to clear libpostal data directory: 
${e.getMessage}")
+      }
+    }
+    dir
+  }
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    if (!clearedLibPostal) {
+      clearedLibPostal = true
+      clearLibpostalDataDir()
+    }
+  }
+
+  describe("ExpandAddress") {
+    it("should return expected normalized forms") {
+      val resultDf = sparkSession
+        .sql(
+          "SELECT ExpandAddress('781 Franklin Ave Crown Heights Brooklyn NY 
11216 USA') as normalized")
+      resultDf.write
+        .format("noop")
+        .mode("overwrite")
+        .save() // Tests a deserialization step that collect does not
+      val result = resultDf.collect
+        .take(1)(0)(0)
+        .asInstanceOf[mutable.Seq[String]]
+      assert(result.contains("781 franklin avenue crown heights brooklyn new 
york 11216 usa"))
+    }
+
+    it("should return null for null input") {
+      val result = sparkSession.sql("SELECT ExpandAddress(NULL) as 
normalized").collect()
+      assert(result.head.get(0) == null)
+    }
+
+    it("should work when chained with explode") {
+      val result = sparkSession
+        .sql("SELECT Explode(ExpandAddress('781 Franklin Ave Crown Heights 
Brooklyn NY 11216 USA')) as normalized")
+        .collect()
+        .map(_.getString(0))
+      assert(result.contains("781 franklin avenue crown heights brooklyn new 
york 11216 usa"))
+    }
+  }
+
+  describe("ParseAddress") {
+    it("should return expected label/value pairs") {
+      val resultDf = sparkSession
+        .sql(
+          "SELECT ParseAddress('781 Franklin Ave Crown Heights Brooklyn NY 
11216 USA') as parsed")
+      resultDf.write
+        .format("noop")
+        .mode("overwrite")
+        .save() // Tests a deserialization step that collect does not
+      val result = resultDf
+        .take(1)(0)(0)
+        .asInstanceOf[mutable.Seq[Row]]
+        .map(row => row.getAs[String]("label") -> row.getAs[String]("value"))
+        .toMap
+
+      assert(result.contains("road"))
+      assert(result("road") == "franklin ave")
+      assert(result.contains("city_district"))
+      assert(result("city_district") == "brooklyn")
+      assert(result.contains("house_number"))
+      assert(result("house_number") == "781")
+      assert(result.contains("postcode"))
+      assert(result("postcode") == "11216")
+      assert(result.contains("country"))
+      assert(result("country").toLowerCase.contains("usa"))
+      assert(result.contains("suburb"))
+      assert(result("suburb").toLowerCase.contains("crown heights"))
+    }
+
+    it("should return null for null input") {
+      val result = sparkSession.sql("SELECT ParseAddress(NULL) as 
parsed").collect()
+      assert(result.head.get(0) == null)
+    }
+
+    it("should work when chained with explode") {
+      val result = sparkSession
+        .sql("SELECT Explode(ParseAddress('781 Franklin Ave Crown Heights 
Brooklyn NY 11216 USA')) as parsed")
+        .collect()
+        .map(row => row.getAs[Row]("parsed"))
+        .map(row => row.getAs[String]("label") -> row.getAs[String]("value"))
+        .toMap
+
+      assert(result.contains("road"))
+      assert(result("road") == "franklin ave")
+      assert(result.contains("city_district"))
+      assert(result("city_district") == "brooklyn")
+      assert(result.contains("house_number"))
+      assert(result("house_number") == "781")
+      assert(result.contains("postcode"))
+      assert(result("postcode") == "11216")
+      assert(result.contains("country"))
+      assert(result("country").toLowerCase.contains("usa"))
+      assert(result.contains("suburb"))
+      assert(result("suburb").toLowerCase.contains("crown heights"))
+    }
+  }
+
+  describe("DataFrame API") {
+    it("should return expected normalized forms using ExpandAddress") {
+      val result = sparkSession
+        .range(1)
+        .select(ExpandAddress(f.lit("781 Franklin Ave Crown Heights Brooklyn 
NY 11216 USA"))
+          .alias("address"))
+        .take(1)(0)(0)
+        .asInstanceOf[mutable.Seq[String]]
+      assert(result.contains("781 franklin avenue crown heights brooklyn new 
york 11216 usa"))
+    }
+
+    it("should return expected label/value pairs using ParseAddress") {
+      val result = sparkSession
+        .range(1)
+        .select(ParseAddress(f.lit("781 Franklin Ave Crown Heights Brooklyn NY 
11216 USA")))
+        .take(1)(0)(0)
+        .asInstanceOf[mutable.Seq[Row]]
+        .map(row => row.getAs[String]("label") -> row.getAs[String]("value"))
+        .toMap
+
+      assert(result.contains("road"))
+      assert(result("road") == "franklin ave")
+      assert(result.contains("city_district"))
+      assert(result("city_district") == "brooklyn")
+      assert(result.contains("house_number"))
+      assert(result("house_number") == "781")
+      assert(result.contains("postcode"))
+      assert(result("postcode") == "11216")
+      assert(result.contains("country"))
+      assert(result("country").toLowerCase.contains("usa"))
+      assert(result.contains("suburb"))
+      assert(result("suburb").toLowerCase.contains("crown heights"))
+    }
+  }
+
+  describe("Codegen") {
+    it("should return the same values as non-codegen for ExpandAddress and 
ParseAddress") {
+      val expected = sparkSession
+        .range(1)
+        .select(
+          ExpandAddress(f.lit("781 Franklin Ave Crown Heights Brooklyn NY 
11216 USA")).alias(
+            "expanded"),
+          ParseAddress(f.lit("781 Franklin Ave Crown Heights Brooklyn NY 11216 
USA")).alias(
+            "parsed"))
+        .collect()
+      sparkSession.conf.set("spark.sql.codegen.factoryMode", "CODEGEN_ONLY")
+      val actual = sparkSession
+        .range(1)
+        .withColumn("address", f.lit("781 Franklin Ave Crown Heights Brooklyn 
NY 11216 USA"))
+        .cache() // Cache to ensure codegen is triggered, no constant folding
+        .select(
+          ExpandAddress(f.col("address")).alias("expanded"),
+          ParseAddress(f.col("address")).alias("parsed"))
+        .collect()
+
+      sparkSession.catalog.clearCache()
+      assert(expected sameElements actual)
+    }
+
+    it(
+      "should return the same values as non-codegen for ExpandAddress and 
ParseAddress for null input") {
+      val expected = sparkSession
+        .range(1)
+        .select(
+          ExpandAddress(f.lit(null)).alias("expanded"),
+          ParseAddress(f.lit(null)).alias("parsed"))
+        .collect()
+      sparkSession.conf.set("spark.sql.codegen.factoryMode", "CODEGEN_ONLY")
+      val actual = sparkSession
+        .range(1)
+        .withColumn("address", f.lit(null))
+        .cache() // Cache to ensure codegen is triggered, no constant folding
+        .select(
+          ExpandAddress(f.col("address")).alias("expanded"),
+          ParseAddress(f.col("address")).alias("parsed"))
+        .collect()
+
+      sparkSession.catalog.clearCache()
+      assert(expected sameElements actual)
+    }
+
+    it("should return the same values as non-code-gen case with explode") {
+      // debugging reveals this goes through codegen, but not sure why
+      val codeGenResult = sparkSession
+        .range(1)
+        .select(
+          f.explode(ExpandAddress(f.lit("781 Franklin Ave Crown Heights 
Brooklyn NY 11216 USA")))
+            .alias("expanded"))
+        .withColumn(
+          "parsed",
+          f.explode(ParseAddress(f.lit("781 Franklin Ave Crown Heights 
Brooklyn NY 11216 USA"))))
+        .collect()
+      val nonCodeGenResult = sparkSession
+        .range(1)
+        .withColumn("address", f.lit("781 Franklin Ave Crown Heights Brooklyn 
NY 11216 USA"))
+        .cache() // For some reason with the cache and explode we don't get 
code gen
+        .withColumn("expanded", f.explode(ExpandAddress(f.col("address"))))
+        .withColumn("parsed", f.explode(ParseAddress(f.col("address"))))
+        .drop("id", "address")
+        .collect()
+
+      sparkSession.catalog.clearCache()
+      assert(codeGenResult sameElements nonCodeGenResult)
+    }
+  }
+}
diff --git 
a/spark/common/src/test/scala/org/apache/sedona/sql/GeoStatsSuite.scala 
b/spark/common/src/test/scala/org/apache/sedona/sql/GeoStatsSuite.scala
index 498da39821..abde7dbfaa 100644
--- a/spark/common/src/test/scala/org/apache/sedona/sql/GeoStatsSuite.scala
+++ b/spark/common/src/test/scala/org/apache/sedona/sql/GeoStatsSuite.scala
@@ -29,6 +29,10 @@ import 
org.apache.spark.sql.sedona_sql.expressions.st_functions.{ST_DBSCAN, ST_L
 class GeoStatsSuite extends TestBaseScala {
   private val spark = sparkSession
 
+  override def sparkConfig = {
+    super.sparkConfig ++ Map("spark.sql.adaptive.enabled" -> "false")
+  }
+
   case class Record(id: Int, x: Double, y: Double)
 
   def getData: DataFrame = {


Reply via email to