This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 04d313e9c5 [SEDONA-738] Add moran i autocorrelation. (#1975)
04d313e9c5 is described below

commit 04d313e9c5de55cb3416f5b852a2350d6e73f597
Author: PaweÅ‚ Tokaj <[email protected]>
AuthorDate: Thu Jul 17 19:57:57 2025 +0200

    [SEDONA-738] Add moran i autocorrelation. (#1975)
    
    * SEDONA-738 Add moran i autocorrelation.
    
    * SEDONA-738 Fix unit tests.
    
    * SEDONA-738 Fix unit tests.
    
    * SEDONA-738 Fix unit tests.
    
    * SEDONA-738 Fix unit tests.
    
    * SEDONA-738 Fix unit tests.
    
    * SEDONA-738 Fix scala 2.13 issue
    
    * Update 
spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/AutoCorrelationFixtures.scala
    
    Co-authored-by: Copilot <[email protected]>
    
    * Fix typos
    
    * Update doc
    
    ---------
    
    Co-authored-by: Jia Yu <[email protected]>
    Co-authored-by: Copilot <[email protected]>
---
 docs/api/stats/sql.md                              | 114 ++++++++++++++++++
 docs/image/moranI.png                              | Bin 0 -> 5422 bytes
 python/sedona/spark/register/java_libs.py          |   1 +
 .../sedona/spark/stats/autocorrelation/__init__.py |  16 +++
 python/sedona/spark/stats/autocorrelation/moran.py |  52 +++++++++
 .../spark/stats/hotspot_detection/getis_ord.py     |   4 +-
 python/sedona/spark/stats/weighting.py             |  77 +++++++-----
 python/tests/stats/test_moran.py                   |  95 +++++++++++++++
 .../sedona/stats/autocorrelation/MoranResult.java  |  43 +++++++
 .../scala/org/apache/sedona/stats/Weighting.scala  |  31 +++++
 .../sedona/stats/autocorrelation/Moran.scala       | 130 +++++++++++++++++++++
 .../autocorrelation/AutoCorrelationFixtures.scala  | 110 +++++++++++++++++
 .../sedona/stats/autocorrelation/MoranTest.scala   | 100 ++++++++++++++++
 13 files changed, 739 insertions(+), 34 deletions(-)

diff --git a/docs/api/stats/sql.md b/docs/api/stats/sql.md
index f39722d87f..005e647a9f 100644
--- a/docs/api/stats/sql.md
+++ b/docs/api/stats/sql.md
@@ -135,3 +135,117 @@ names in parentheses are python variable names
 - useSpheroid (use_spheroid) - whether to use a cartesian or spheroidal 
distance calculation. Default is false
 
 In both cases the output is the input DataFrame with the weights column added 
to each row.
+
+## Moran I
+
+Moran I is the spatial autocorrelation algorithm, which is using spatial
+location and non-spatial attribute. When the value is close to the 1 it
+means that there is spatial correlation, when it is close to 0 then the
+correlation does not exist and data is randomly distributed. When the
+MoranI autocorrelation value is close to -1 it means that there is negative
+correlation. Negative correlation means that close values has dissimilar 
values.
+
+You can see spatial correlation values on the figure below
+
+- on the left there is negative correlation (-1)
+- in the middle correlation is positive (1)
+- on the right the correlation is close to zero and data is random.
+
+![moranI.png](../../image/moranI.png)
+
+Moran statistics can be used as the Scala/Java and Python functions.
+As the input function requires weight DataFrame. You can create the
+weight DataFrame using Apache Sedona weighting functions. You need
+to keep in mind that your input has to have id column that uniquely identifies
+the feature and value field. The required minimal schema for the MoranI Apache 
Sedona
+function is:
+
+```
+ |-- id: integer (nullable = true)
+ |-- value: double (nullable = true)
+ |-- weights: array (nullable = false)
+ |    |-- element: struct (containsNull = false)
+ |    |    |-- neighbor: struct (nullable = false)
+ |    |    |    |-- id: integer (nullable = true)
+ |    |    |    |-- value: double (nullable = true)
+ |    |    |-- value: double (nullable = true)
+```
+
+You can manipulate the value column name and id using function parameters.
+
+To use the [Apache Sedona weight functions](#adddistancebandcolumn) you need 
to pass the id column and value column to kept parameters.
+
+=== "Scala"
+
+    ```scala
+    val weights = Weighting.addDistanceBandColumn(
+          positiveCorrelationFrame,
+          1.0,
+          savedAttributes = Seq("id", "value")
+    )
+
+    val moranResult = Moran.getGlobal(weights, idColumn = "id")
+
+    // result fields
+    moranResult.getPNorm
+    moranResult.getI
+    moranResult.getZNorm
+    ```
+
+=== "Python"
+
+    ```python
+    from sedona.spark.stats.autocorrelation.moran import Moran
+    from sedona.spark.stats.weighting import add_binary_distance_band_column
+
+    result = add_binary_distance_band_column(
+        df,
+        1.0,
+        saved_attributes=["id", "value"]
+    )
+
+    moran_i_result = Moran.get_global(result)
+
+    ## result fields
+    moran_i_result.p_norm
+    moran_i_result.i
+    moran_i_result.z_norm
+    ```
+
+In the result you get the Z norm, P norm and Moran I value.
+
+The full signatures of the functions
+
+=== "Scala"
+
+    ```scala
+    def getGlobal(
+      dataframe: DataFrame,
+      twoTailed: Boolean = true,
+      idColumn: String = ID_COLUMN,
+      valueColumnName: String = VALUE_COLUMN): MoranResult
+
+    // java interface
+    public interface MoranResult {
+        public double getI();
+        public double getPNorm();
+        public double getZNorm();
+    }
+    ```
+
+=== "Python"
+
+    ```python
+    def get_global(
+        df: DataFrame,
+        two_tailed: bool = True,
+        id_column: str = "id",
+        value_column: str = "value",
+    ) -> MoranResult
+
+    @dataclass
+    class MoranResult:
+        i: float
+        p_norm: float
+        z_norm: float
+    ```
diff --git a/docs/image/moranI.png b/docs/image/moranI.png
new file mode 100644
index 0000000000..157c2e36e6
Binary files /dev/null and b/docs/image/moranI.png differ
diff --git a/python/sedona/spark/register/java_libs.py 
b/python/sedona/spark/register/java_libs.py
index 675d788855..8ba82f2c3f 100644
--- a/python/sedona/spark/register/java_libs.py
+++ b/python/sedona/spark/register/java_libs.py
@@ -65,6 +65,7 @@ class SedonaJvmLib(Enum):
     st_predicates = "org.apache.spark.sql.sedona_sql.expressions.st_predicates"
     st_aggregates = "org.apache.spark.sql.sedona_sql.expressions.st_aggregates"
     SedonaContext = "org.apache.sedona.spark.SedonaContext"
+    Moran = "org.apache.sedona.stats.autocorrelation.Moran"
 
     @classmethod
     def from_str(cls, geo_lib: str) -> "SedonaJvmLib":
diff --git a/python/sedona/spark/stats/autocorrelation/__init__.py 
b/python/sedona/spark/stats/autocorrelation/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/python/sedona/spark/stats/autocorrelation/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/python/sedona/spark/stats/autocorrelation/moran.py 
b/python/sedona/spark/stats/autocorrelation/moran.py
new file mode 100644
index 0000000000..506aed12b9
--- /dev/null
+++ b/python/sedona/spark/stats/autocorrelation/moran.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from dataclasses import dataclass
+
+from pyspark.sql import DataFrame
+from pyspark.sql import SparkSession
+
+
+@dataclass
+class MoranResult:
+    i: float
+    p_norm: float
+    z_norm: float
+
+
+class Moran:
+
+    @staticmethod
+    def get_global(
+        df: DataFrame,
+        two_tailed: bool = True,
+        id_column: str = "id",
+        value_column: str = "value",
+    ) -> MoranResult:
+        sedona = SparkSession.getActiveSession()
+
+        _jvm = sedona._jvm
+        moran_result = (
+            
sedona._jvm.org.apache.sedona.stats.autocorrelation.Moran.getGlobal(
+                df._jdf, two_tailed, id_column, value_column
+            )
+        )
+
+        return MoranResult(
+            i=moran_result.getI(),
+            p_norm=moran_result.getPNorm(),
+            z_norm=moran_result.getZNorm(),
+        )
diff --git a/python/sedona/spark/stats/hotspot_detection/getis_ord.py 
b/python/sedona/spark/stats/hotspot_detection/getis_ord.py
index 7e6a039950..b2b4d3ccc8 100644
--- a/python/sedona/spark/stats/hotspot_detection/getis_ord.py
+++ b/python/sedona/spark/stats/hotspot_detection/getis_ord.py
@@ -21,7 +21,7 @@ Getis, A., & Ord, J. K. (1992). The analysis of spatial 
association by use of di
 Geographical Analysis, 24(3), 189-206. 
https://doi.org/10.1111/j.1538-4632.1992.tb00261.x
 """
 
-from pyspark.sql import Column, DataFrame, SparkSession
+from pyspark.sql import DataFrame, SparkSession
 
 # todo change weights and x type to string
 
@@ -59,7 +59,7 @@ def g_local(
     sedona = SparkSession.getActiveSession()
 
     result_df = 
sedona._jvm.org.apache.sedona.stats.hotspotDetection.GetisOrd.gLocal(
-        dataframe, x, weights, permutations, star, island_weight
+        dataframe._jdf, x, weights, permutations, star, island_weight
     )
 
     return DataFrame(result_df, sedona)
diff --git a/python/sedona/spark/stats/weighting.py 
b/python/sedona/spark/stats/weighting.py
index 05cd08db4f..d12639255e 100644
--- a/python/sedona/spark/stats/weighting.py
+++ b/python/sedona/spark/stats/weighting.py
@@ -60,18 +60,21 @@ def add_distance_band_column(
 
     """
     sedona = SparkSession.getActiveSession()
-    return sedona._jvm.org.apache.sedona.stats.Weighting.addDistanceBandColumn(
-        dataframe._jdf,
-        float(threshold),
-        binary,
-        float(alpha),
-        include_zero_distance_neighbors,
-        include_self,
-        float(self_weight),
-        geometry,
-        use_spheroid,
-        saved_attributes,
-        result_name,
+    return DataFrame(
+        
sedona._jvm.org.apache.sedona.stats.Weighting.addDistanceBandColumnPython(
+            dataframe._jdf,
+            float(threshold),
+            binary,
+            float(alpha),
+            include_zero_distance_neighbors,
+            include_self,
+            float(self_weight),
+            geometry,
+            use_spheroid,
+            saved_attributes,
+            result_name,
+        ),
+        sedona,
     )
 
 
@@ -110,15 +113,21 @@ def add_binary_distance_band_column(
     """
     sedona = SparkSession.getActiveSession()
 
-    return 
sedona._jvm.org.apache.sedona.stats.Weighting.addBinaryDistanceBandColumn(
-        dataframe._jdf,
-        float(threshold),
-        include_zero_distance_neighbors,
-        include_self,
-        geometry,
-        use_spheroid,
-        saved_attributes,
-        result_name,
+    return DataFrame(
+        
sedona._jvm.org.apache.sedona.stats.Weighting.addDistanceBandColumnPython(
+            dataframe._jdf,
+            float(threshold),
+            True,
+            float(-1.0),
+            include_zero_distance_neighbors,
+            include_self,
+            float(1.0),
+            geometry,
+            use_spheroid,
+            saved_attributes,
+            result_name,
+        ),
+        sedona,
     )
 
 
@@ -161,15 +170,19 @@ def add_weighted_distance_band_column(
     """
     sedona = SparkSession.getActiveSession()
 
-    return 
sedona._jvm.org.apache.sedona.stats.Weighting.addBinaryDistanceBandColumn(
-        dataframe._jdf,
-        float(threshold),
-        float(alpha),
-        include_zero_distance_neighbors,
-        include_self,
-        float(self_weight),
-        geometry,
-        use_spheroid,
-        saved_attributes,
-        result_name,
+    return DataFrame(
+        
sedona._jvm.org.apache.sedona.stats.Weighting.addDistanceBandColumnPython(
+            dataframe._jdf,
+            float(threshold),
+            False,
+            alpha,
+            include_zero_distance_neighbors,
+            include_self,
+            self_weight,
+            geometry,
+            use_spheroid,
+            saved_attributes,
+            result_name,
+        ),
+        sedona,
     )
diff --git a/python/tests/stats/test_moran.py b/python/tests/stats/test_moran.py
new file mode 100644
index 0000000000..d910d79f1c
--- /dev/null
+++ b/python/tests/stats/test_moran.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from sedona.spark.stats.autocorrelation.moran import Moran
+from sedona.spark.stats.weighting import add_binary_distance_band_column
+from tests.test_base import TestBase
+
+
+class TestMoran(TestBase):
+
+    data = [
+        (1, 1.0, 1.0, 8.5),
+        (2, 1.5, 1.2, 8.2),
+        (3, 1.3, 1.8, 8.7),
+        (4, 1.7, 1.6, 7.9),
+        (5, 4.0, 1.5, 6.2),
+        (6, 4.2, 1.7, 6.5),
+        (7, 4.5, 1.3, 5.9),
+        (8, 4.7, 1.8, 6.0),
+        (9, 1.8, 4.3, 3.1),
+        (10, 1.5, 4.5, 3.4),
+        (11, 1.2, 4.7, 3.0),
+        (12, 1.6, 4.2, 3.3),
+        (13, 1.9, 4.8, 2.8),
+        (14, 4.3, 4.2, 1.2),
+        (15, 4.5, 4.5, 1.5),
+        (16, 4.7, 4.8, 1.0),
+        (17, 4.1, 4.6, 1.3),
+        (18, 4.8, 4.3, 1.1),
+        (19, 4.2, 4.9, 1.4),
+        (20, 4.6, 4.1, 1.6),
+    ]
+
+    def test_moran_integration(self):
+        df = (
+            self.spark.createDataFrame(self.data)
+            .selectExpr("_1 as id", "_2 AS x", "_3 AS y", "_4 AS value")
+            .selectExpr("id", "ST_MakePoint(x, y) AS geometry", "value")
+        )
+
+        result = add_binary_distance_band_column(
+            df, 1.0, saved_attributes=["id", "value"]
+        )
+
+        moran_i_result = Moran.get_global(result)
+
+        assert 0 < moran_i_result.p_norm < 0.00001
+        assert 0.9614 < moran_i_result.i < 0.9615
+        assert 7.7523 < moran_i_result.z_norm < 7.7524
+
+        two_tailed_result = Moran.get_global(result, False)
+        assert 0 < two_tailed_result.p_norm < 0.00001
+        assert 0.9614 < two_tailed_result.i < 0.9615
+        assert 7.7523 < two_tailed_result.z_norm < 7.7524
+
+    def test_moran_with_different_column_names(self):
+        df = (
+            self.spark.createDataFrame(self.data)
+            .selectExpr("_1 as index", "_2 AS x", "_3 AS y", "_4 AS 
feature_value")
+            .selectExpr("index", "ST_MakePoint(x, y) AS geometry", 
"feature_value")
+        )
+
+        result = add_binary_distance_band_column(
+            df, threshold=1.0, saved_attributes=["index", "feature_value"]
+        )
+
+        moran_i_result = Moran.get_global(
+            result, id_column="index", value_column="feature_value"
+        )
+
+        assert 0 < moran_i_result.p_norm < 0.00001
+        assert 0.9614 < moran_i_result.i < 0.9615
+        assert 7.7523 < moran_i_result.z_norm < 7.7524
+
+        two_tailed_result = Moran.get_global(
+            result, id_column="index", value_column="feature_value", 
two_tailed=False
+        )
+
+        assert 0 < two_tailed_result.p_norm < 0.00001
+        assert 0.9614 < two_tailed_result.i < 0.9615
+        assert 7.7523 < two_tailed_result.z_norm < 7.7524
diff --git 
a/spark/common/src/main/java/org/apache/sedona/stats/autocorrelation/MoranResult.java
 
b/spark/common/src/main/java/org/apache/sedona/stats/autocorrelation/MoranResult.java
new file mode 100644
index 0000000000..1d158c7bf5
--- /dev/null
+++ 
b/spark/common/src/main/java/org/apache/sedona/stats/autocorrelation/MoranResult.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.stats.autocorrelation;
+
+public class MoranResult {
+  private final double i;
+  private final double pNorm;
+  private final double zNorm;
+
+  public MoranResult(double i, double pNorm, double zNorm) {
+    this.i = i;
+    this.pNorm = pNorm;
+    this.zNorm = zNorm;
+  }
+
+  public double getI() {
+    return i;
+  }
+
+  public double getPNorm() {
+    return pNorm;
+  }
+
+  public double getZNorm() {
+    return zNorm;
+  }
+}
diff --git 
a/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala 
b/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala
index 7713674261..d404f2c2db 100644
--- a/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala
+++ b/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala
@@ -22,6 +22,7 @@ import org.apache.sedona.util.DfUtils.getGeometryColumnName
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.sedona_sql.expressions.st_functions.{ST_Distance, 
ST_DistanceSpheroid}
 import org.apache.spark.sql.{Column, DataFrame}
+import scala.collection.JavaConverters._
 
 object Weighting {
 
@@ -255,4 +256,34 @@ object Weighting {
     savedAttributes = savedAttributes,
     resultName = resultName)
 
+  def addDistanceBandColumnPython(
+      dataframe: DataFrame,
+      threshold: Double,
+      binary: Boolean = true,
+      alpha: Double = -1.0,
+      includeZeroDistanceNeighbors: Boolean = false,
+      includeSelf: Boolean = false,
+      selfWeight: Double = 1.0,
+      geometry: String = null,
+      useSpheroid: Boolean = false,
+      savedAttributes: java.util.ArrayList[String] = null,
+      resultName: String = "weights"): DataFrame = {
+
+    val savedAttributesScala =
+      if (savedAttributes != null) savedAttributes.asScala.toSeq
+      else null
+
+    addDistanceBandColumn(
+      dataframe,
+      threshold,
+      binary,
+      alpha,
+      includeZeroDistanceNeighbors,
+      includeSelf,
+      selfWeight,
+      geometry,
+      useSpheroid,
+      savedAttributesScala,
+      resultName)
+  }
 }
diff --git 
a/spark/common/src/main/scala/org/apache/sedona/stats/autocorrelation/Moran.scala
 
b/spark/common/src/main/scala/org/apache/sedona/stats/autocorrelation/Moran.scala
new file mode 100644
index 0000000000..8e38cfdd0b
--- /dev/null
+++ 
b/spark/common/src/main/scala/org/apache/sedona/stats/autocorrelation/Moran.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.stats.autocorrelation
+
+import org.apache.commons.math3.distribution.NormalDistribution
+import org.apache.sedona.stats.autocorrelation.MoranResult
+import org.apache.spark.sql.{DataFrame, functions}
+import org.apache.spark.sql.functions.{col, explode, pow}
+
+object Moran {
+  private val ID_COLUMN = "id"
+  private val VALUE_COLUMN = "value"
+
+  private def normSf(x: Double, mean: Double = 0.0, stdDev: Double = 1.0): 
Double = {
+    val normalDist = new NormalDistribution(mean, stdDev)
+    1.0 - normalDist.cumulativeProbability(x)
+  }
+
+  private def normCdf(x: Double, mean: Double = 0.0, stdDev: Double = 1.0): 
Double = {
+    val normalDist = new NormalDistribution(mean, stdDev)
+    normalDist.cumulativeProbability(x)
+  }
+
+  def getGlobal(
+      dataframe: DataFrame,
+      twoTailed: Boolean = true,
+      idColumn: String = ID_COLUMN,
+      valueColumnName: String = VALUE_COLUMN): MoranResult = {
+    val spark = dataframe.sparkSession
+    import spark.implicits._
+
+    val data = dataframe
+      .selectExpr(s"avg($valueColumnName)", "count(*)")
+      .as[(Double, Long)]
+      .head()
+
+    val yMean = data._1
+
+    val n = data._2
+
+    val explodedWeights = dataframe
+      .select(col(idColumn), explode(col("weights")).alias("col"))
+      .select(
+        $"$idColumn".alias("id"),
+        $"col.neighbor.$idColumn".alias("n_id"),
+        $"col.value".alias("weight_value"))
+
+    val s1Data = explodedWeights
+      .alias("left")
+      .join(
+        explodedWeights.alias("right"),
+        $"left.n_id" === $"right.id" && $"right.n_id" === $"left.id")
+      .select(
+        $"left.id",
+        $"right.weight_value".alias("b_weight_value"),
+        pow(($"right.weight_value" + $"left.weight_value"), 
2).alias("s1_comp"),
+        $"left.weight_value".alias("a_weight"))
+
+    val sStats = s1Data
+      .selectExpr(
+        "CAST(sum(s1_comp)/2 AS DOUBLE) AS s1_comp_sum",
+        "CAST(sum(a_weight) AS DOUBLE) AS a_weight_sum",
+        "CAST(sum(b_weight_value) AS DOUBLE) AS b_weight_value_sum")
+      .as[(Double, Double, Double)]
+      .head()
+
+    val s1 = sStats._1
+
+    val inumData = dataframe
+      .selectExpr(
+        s"$idColumn AS id",
+        s"$valueColumnName AS value",
+        f"$valueColumnName - ${yMean} AS z",
+        f"transform(weights, w -> struct(w.neighbor.$idColumn AS id, w.value 
AS w, w.neighbor.$valueColumnName, w.neighbor.$valueColumnName - ${yMean} AS 
z)) AS weight")
+      .selectExpr(
+        "z",
+        "AGGREGATE(transform(weight, x-> x.z*x.w), CAST(0.0 AS DOUBLE), (acc, 
x) -> acc + x) AS ZL",
+        "AGGREGATE(transform(weight, x-> x.w), CAST(0.0 AS DOUBLE), (acc, x) 
-> acc + x) AS w_sum",
+        "AGGREGATE(transform(weight, x-> x.w), CAST(0.0 AS DOUBLE), (acc, x) 
-> acc + x) AS w_sq_sum",
+        "z * z AS z2ss_comp")
+      .selectExpr("*", "(z * zl) AS inum_comp")
+      .selectExpr("sum(inum_comp)", "sum(w_sum)", "sum(z2ss_comp)")
+
+    val s2Data = s1Data
+      .groupBy("id")
+      .agg(
+        functions.sum("b_weight_value").alias("s_b_weight_value"),
+        functions.sum("a_weight").alias("s_a_weight"))
+      .selectExpr("pow(s_b_weight_value + s_a_weight, 2) AS summed")
+
+    val s2 = s2Data.selectExpr("sum(summed)").as[Double].head()
+    val inumResult = inumData.as[(Double, Double, Double)].head()
+    val inum = inumResult._1
+
+    val s0 = inumResult._2
+
+    val z2ss = inumResult._3
+
+    val i = n / s0 * inum / z2ss
+    val ei = -1.0 / (n - 1)
+    val n2 = n * n
+    val s02 = s0 * s0
+    val vNum = n2 * s1 - n * s2 + 3 * s02
+    val vDen = (n - 1) * (n + 1) * s02
+    val viNorm = (vNum / vDen) - math.pow(1.0 / (n - 1), 2)
+    val seINorm = math.pow(viNorm, 0.5)
+    val zNorm = (i - ei) / seINorm
+
+    val pNorm = if (zNorm > 0) normSf(zNorm) else normCdf(zNorm)
+    val pNormFinal = if (twoTailed) pNorm * 2.0 else pNorm
+
+    new MoranResult(i, pNormFinal, zNorm)
+  }
+}
diff --git 
a/spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/AutoCorrelationFixtures.scala
 
b/spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/AutoCorrelationFixtures.scala
new file mode 100644
index 0000000000..5209bc7d3f
--- /dev/null
+++ 
b/spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/AutoCorrelationFixtures.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.stats.autocorrelation
+
+import org.apache.sedona.spark.SedonaContext
+import org.apache.sedona.sql.TestBaseScala
+
+import java.nio.file.Files
+
+trait AutoCorrelationFixtures {
+  this: TestBaseScala =>
+  SedonaContext.create(sparkSession)
+
+  val positiveAutoCorrelation = List(
+    (1, 1.0, 1.0, 8.5),
+    (2, 1.5, 1.2, 8.2),
+    (3, 1.3, 1.8, 8.7),
+    (4, 1.7, 1.6, 7.9),
+    (5, 4.0, 1.5, 6.2),
+    (6, 4.2, 1.7, 6.5),
+    (7, 4.5, 1.3, 5.9),
+    (8, 4.7, 1.8, 6.0),
+    (9, 1.8, 4.3, 3.1),
+    (10, 1.5, 4.5, 3.4),
+    (11, 1.2, 4.7, 3.0),
+    (12, 1.6, 4.2, 3.3),
+    (13, 1.9, 4.8, 2.8),
+    (14, 4.3, 4.2, 1.2),
+    (15, 4.5, 4.5, 1.5),
+    (16, 4.7, 4.8, 1.0),
+    (17, 4.1, 4.6, 1.3),
+    (18, 4.8, 4.3, 1.1),
+    (19, 4.2, 4.9, 1.4),
+    (20, 4.6, 4.1, 1.6))
+
+  val positiveCorrelationFrame = sparkSession
+    .createDataFrame(positiveAutoCorrelation)
+    .selectExpr("_1 as id", "_2 AS x", "_3 AS y", "_4 AS value")
+    .selectExpr("id", "ST_MakePoint(x, y) AS geometry", "value")
+
+  val negativeCorrelationPoints = List(
+    (1, 1.0, 1.0, 8.5),
+    (2, 2.0, 1.0, 2.1),
+    (3, 3.0, 1.0, 8.2),
+    (4, 4.0, 1.0, 2.3),
+    (5, 5.0, 1.0, 8.7),
+    (6, 1.0, 2.0, 2.5),
+    (7, 2.0, 2.0, 8.1),
+    (8, 3.0, 2.0, 2.7),
+    (9, 4.0, 2.0, 8.3),
+    (10, 5.0, 2.0, 2.0),
+    (11, 1.0, 3.0, 8.6),
+    (12, 2.0, 3.0, 2.2),
+    (13, 3.0, 3.0, 8.4),
+    (14, 4.0, 3.0, 2.4),
+    (15, 5.0, 3.0, 8.0),
+    (16, 1.0, 4.0, 2.6),
+    (17, 2.0, 4.0, 8.8),
+    (18, 3.0, 4.0, 2.8),
+    (19, 4.0, 4.0, 8.9),
+    (20, 5.0, 4.0, 2.9))
+
+  val zeroCorrelationPoints = List(
+    (1, 3.75, 7.89, 2.58),
+    (2, 9.31, 4.25, 7.43),
+    (3, 5.12, 0.48, 5.96),
+    (4, 6.25, 1.74, 3.12),
+    (5, 1.47, 6.33, 8.26),
+    (6, 8.18, 9.57, 1.97),
+    (7, 2.64, 3.05, -6.42),
+    (8, 4.33, 5.88, 4.74),
+    (9, 7.91, 2.41, -10.13),
+    (10, 0.82, 8.76, 3.89),
+    (11, 9.70, 1.19, 100.0),
+    (12, 2.18, 7.54, 7.35),
+    (13, 5.47, 3.94, 2.16),
+    (14, 8.59, 6.78, -12.63),
+    (15, 3.07, 2.88, 4.27),
+    (16, 6.71, 9.12, 6.84),
+    (17, 1.34, 4.51, -25.0),
+    (18, 7.26, 5.29, -45.0),
+    (19, 4.89, 0.67, 1.59),
+    (20, 0.53, 8.12, 5.21))
+
+  val zeroCorrelationFrame = sparkSession
+    .createDataFrame(zeroCorrelationPoints)
+    .selectExpr("_1 as id", "_2 AS x", "_3 AS y", "_4 AS value")
+    .selectExpr("id", "ST_MakePoint(x, y) AS geometry", "value")
+
+  val negativeCorrelationFrame = sparkSession
+    .createDataFrame(negativeCorrelationPoints)
+    .selectExpr("_1 as id", "_2 AS x", "_3 AS y", "_4 AS value")
+    .selectExpr("id", "ST_MakePoint(x, y) AS geometry", "value")
+}
diff --git 
a/spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/MoranTest.scala
 
b/spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/MoranTest.scala
new file mode 100644
index 0000000000..f26f79197b
--- /dev/null
+++ 
b/spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/MoranTest.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.stats.autocorrelation
+
+import org.apache.sedona.sql.TestBaseScala
+import org.apache.sedona.stats.Weighting
+import org.apache.sedona.stats.autocorrelation.Moran
+import org.apache.spark.sql.functions.expr
+
+class MoranTest extends TestBaseScala with AutoCorrelationFixtures {
+  describe("Moran's I") {
+    it("correlation exists") {
+      val weights = Weighting
+        .addDistanceBandColumn(
+          positiveCorrelationFrame,
+          1.0,
+          savedAttributes = Seq("id", "value"))
+        .withColumn(
+          "weights",
+          expr("transform(weights, w -> struct(w.neighbor, 
w.value/size(weights) AS value))"))
+
+      weights.cache().count()
+
+      val moranResult = Moran.getGlobal(weights, idColumn = "id")
+
+      assert(moranResult.getPNorm < 0.0001)
+      assert(moranResult.getI > 0.99)
+    }
+
+    it("different id and value column names") {
+      val weights = Weighting
+        .addDistanceBandColumn(
+          positiveCorrelationFrame
+            .selectExpr("id AS index", "value as feature_value", "geometry"),
+          1.0,
+          savedAttributes = Seq("index", "feature_value"))
+        .withColumn(
+          "weights",
+          expr("transform(weights, w -> struct(w.neighbor, 
w.value/size(weights) AS value))"))
+
+      weights.cache().count()
+
+      val moranResult =
+        Moran.getGlobal(weights, idColumn = "index", valueColumnName = 
"feature_value")
+
+      assert(moranResult.getPNorm < 0.0001)
+      assert(moranResult.getI > 0.99)
+    }
+
+    it("correlation is negative") {
+      val weights = Weighting
+        .addDistanceBandColumn(
+          negativeCorrelationFrame,
+          1.0,
+          savedAttributes = Seq("id", "value"))
+        .withColumn(
+          "weights",
+          expr("transform(weights, w -> struct(w.neighbor, 
w.value/size(weights) AS value))"))
+
+      weights.cache().count()
+
+      val moranResult = Moran.getGlobal(weights)
+
+      assert(moranResult.getPNorm < 0.0001)
+      assert(moranResult.getI < -0.99)
+      assert(moranResult.getI > -1)
+    }
+
+    it("zero correlation exists") {
+      val weights = Weighting
+        .addDistanceBandColumn(zeroCorrelationFrame, 2.0, savedAttributes = 
Seq("id", "value"))
+        .withColumn(
+          "weights",
+          expr("transform(weights, w -> struct(w.neighbor, 
w.value/size(weights) AS value))"))
+
+      weights.cache().count()
+
+      val moranResult = Moran.getGlobal(weights)
+
+      assert(moranResult.getPNorm < 0.44 && moranResult.getPNorm > 0.43)
+      assert(moranResult.getI < 0.16 && moranResult.getI > 0.15)
+    }
+  }
+}

Reply via email to