This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 04d313e9c5 [SEDONA-738] Add moran i autocorrelation. (#1975)
04d313e9c5 is described below
commit 04d313e9c5de55cb3416f5b852a2350d6e73f597
Author: Paweł Tokaj <[email protected]>
AuthorDate: Thu Jul 17 19:57:57 2025 +0200
[SEDONA-738] Add moran i autocorrelation. (#1975)
* SEDONA-738 Add moran i autocorrelation.
* SEDONA-738 Fix unit tests.
* SEDONA-738 Fix unit tests.
* SEDONA-738 Fix unit tests.
* SEDONA-738 Fix unit tests.
* SEDONA-738 Fix unit tests.
* SEDONA-738 Fix scala 2.13 issue
* Update
spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/AutoCorrelationFixtures.scala
Co-authored-by: Copilot <[email protected]>
* Fix typos
* Update doc
---------
Co-authored-by: Jia Yu <[email protected]>
Co-authored-by: Copilot <[email protected]>
---
docs/api/stats/sql.md | 114 ++++++++++++++++++
docs/image/moranI.png | Bin 0 -> 5422 bytes
python/sedona/spark/register/java_libs.py | 1 +
.../sedona/spark/stats/autocorrelation/__init__.py | 16 +++
python/sedona/spark/stats/autocorrelation/moran.py | 52 +++++++++
.../spark/stats/hotspot_detection/getis_ord.py | 4 +-
python/sedona/spark/stats/weighting.py | 77 +++++++-----
python/tests/stats/test_moran.py | 95 +++++++++++++++
.../sedona/stats/autocorrelation/MoranResult.java | 43 +++++++
.../scala/org/apache/sedona/stats/Weighting.scala | 31 +++++
.../sedona/stats/autocorrelation/Moran.scala | 130 +++++++++++++++++++++
.../autocorrelation/AutoCorrelationFixtures.scala | 110 +++++++++++++++++
.../sedona/stats/autocorrelation/MoranTest.scala | 100 ++++++++++++++++
13 files changed, 739 insertions(+), 34 deletions(-)
diff --git a/docs/api/stats/sql.md b/docs/api/stats/sql.md
index f39722d87f..005e647a9f 100644
--- a/docs/api/stats/sql.md
+++ b/docs/api/stats/sql.md
@@ -135,3 +135,117 @@ names in parentheses are python variable names
- useSpheroid (use_spheroid) - whether to use a cartesian or spheroidal
distance calculation. Default is false
In both cases the output is the input DataFrame with the weights column added
to each row.
+
+## Moran I
+
+Moran I is the spatial autocorrelation algorithm, which is using spatial
+location and non-spatial attribute. When the value is close to the 1 it
+means that there is spatial correlation, when it is close to 0 then the
+correlation does not exist and data is randomly distributed. When the
+MoranI autocorrelation value is close to -1 it means that there is negative
+correlation. Negative correlation means that close values has dissimilar
values.
+
+You can see spatial correlation values on the figure below
+
+- on the left there is negative correlation (-1)
+- in the middle correlation is positive (1)
+- on the right the correlation is close to zero and data is random.
+
+
+
+Moran statistics can be used as the Scala/Java and Python functions.
+As the input function requires weight DataFrame. You can create the
+weight DataFrame using Apache Sedona weighting functions. You need
+to keep in mind that your input has to have id column that uniquely identifies
+the feature and value field. The required minimal schema for the MoranI Apache
Sedona
+function is:
+
+```
+ |-- id: integer (nullable = true)
+ |-- value: double (nullable = true)
+ |-- weights: array (nullable = false)
+ | |-- element: struct (containsNull = false)
+ | | |-- neighbor: struct (nullable = false)
+ | | | |-- id: integer (nullable = true)
+ | | | |-- value: double (nullable = true)
+ | | |-- value: double (nullable = true)
+```
+
+You can manipulate the value column name and id using function parameters.
+
+To use the [Apache Sedona weight functions](#adddistancebandcolumn) you need
to pass the id column and value column to kept parameters.
+
+=== "Scala"
+
+ ```scala
+ val weights = Weighting.addDistanceBandColumn(
+ positiveCorrelationFrame,
+ 1.0,
+ savedAttributes = Seq("id", "value")
+ )
+
+ val moranResult = Moran.getGlobal(weights, idColumn = "id")
+
+ // result fields
+ moranResult.getPNorm
+ moranResult.getI
+ moranResult.getZNorm
+ ```
+
+=== "Python"
+
+ ```python
+ from sedona.spark.stats.autocorrelation.moran import Moran
+ from sedona.spark.stats.weighting import add_binary_distance_band_column
+
+ result = add_binary_distance_band_column(
+ df,
+ 1.0,
+ saved_attributes=["id", "value"]
+ )
+
+ moran_i_result = Moran.get_global(result)
+
+ ## result fields
+ moran_i_result.p_norm
+ moran_i_result.i
+ moran_i_result.z_norm
+ ```
+
+In the result you get the Z norm, P norm and Moran I value.
+
+The full signatures of the functions
+
+=== "Scala"
+
+ ```scala
+ def getGlobal(
+ dataframe: DataFrame,
+ twoTailed: Boolean = true,
+ idColumn: String = ID_COLUMN,
+ valueColumnName: String = VALUE_COLUMN): MoranResult
+
+ // java interface
+ public interface MoranResult {
+ public double getI();
+ public double getPNorm();
+ public double getZNorm();
+ }
+ ```
+
+=== "Python"
+
+ ```python
+ def get_global(
+ df: DataFrame,
+ two_tailed: bool = True,
+ id_column: str = "id",
+ value_column: str = "value",
+ ) -> MoranResult
+
+ @dataclass
+ class MoranResult:
+ i: float
+ p_norm: float
+ z_norm: float
+ ```
diff --git a/docs/image/moranI.png b/docs/image/moranI.png
new file mode 100644
index 0000000000..157c2e36e6
Binary files /dev/null and b/docs/image/moranI.png differ
diff --git a/python/sedona/spark/register/java_libs.py
b/python/sedona/spark/register/java_libs.py
index 675d788855..8ba82f2c3f 100644
--- a/python/sedona/spark/register/java_libs.py
+++ b/python/sedona/spark/register/java_libs.py
@@ -65,6 +65,7 @@ class SedonaJvmLib(Enum):
st_predicates = "org.apache.spark.sql.sedona_sql.expressions.st_predicates"
st_aggregates = "org.apache.spark.sql.sedona_sql.expressions.st_aggregates"
SedonaContext = "org.apache.sedona.spark.SedonaContext"
+ Moran = "org.apache.sedona.stats.autocorrelation.Moran"
@classmethod
def from_str(cls, geo_lib: str) -> "SedonaJvmLib":
diff --git a/python/sedona/spark/stats/autocorrelation/__init__.py
b/python/sedona/spark/stats/autocorrelation/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/python/sedona/spark/stats/autocorrelation/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/python/sedona/spark/stats/autocorrelation/moran.py
b/python/sedona/spark/stats/autocorrelation/moran.py
new file mode 100644
index 0000000000..506aed12b9
--- /dev/null
+++ b/python/sedona/spark/stats/autocorrelation/moran.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from dataclasses import dataclass
+
+from pyspark.sql import DataFrame
+from pyspark.sql import SparkSession
+
+
+@dataclass
+class MoranResult:
+ i: float
+ p_norm: float
+ z_norm: float
+
+
+class Moran:
+
+ @staticmethod
+ def get_global(
+ df: DataFrame,
+ two_tailed: bool = True,
+ id_column: str = "id",
+ value_column: str = "value",
+ ) -> MoranResult:
+ sedona = SparkSession.getActiveSession()
+
+ _jvm = sedona._jvm
+ moran_result = (
+
sedona._jvm.org.apache.sedona.stats.autocorrelation.Moran.getGlobal(
+ df._jdf, two_tailed, id_column, value_column
+ )
+ )
+
+ return MoranResult(
+ i=moran_result.getI(),
+ p_norm=moran_result.getPNorm(),
+ z_norm=moran_result.getZNorm(),
+ )
diff --git a/python/sedona/spark/stats/hotspot_detection/getis_ord.py
b/python/sedona/spark/stats/hotspot_detection/getis_ord.py
index 7e6a039950..b2b4d3ccc8 100644
--- a/python/sedona/spark/stats/hotspot_detection/getis_ord.py
+++ b/python/sedona/spark/stats/hotspot_detection/getis_ord.py
@@ -21,7 +21,7 @@ Getis, A., & Ord, J. K. (1992). The analysis of spatial
association by use of di
Geographical Analysis, 24(3), 189-206.
https://doi.org/10.1111/j.1538-4632.1992.tb00261.x
"""
-from pyspark.sql import Column, DataFrame, SparkSession
+from pyspark.sql import DataFrame, SparkSession
# todo change weights and x type to string
@@ -59,7 +59,7 @@ def g_local(
sedona = SparkSession.getActiveSession()
result_df =
sedona._jvm.org.apache.sedona.stats.hotspotDetection.GetisOrd.gLocal(
- dataframe, x, weights, permutations, star, island_weight
+ dataframe._jdf, x, weights, permutations, star, island_weight
)
return DataFrame(result_df, sedona)
diff --git a/python/sedona/spark/stats/weighting.py
b/python/sedona/spark/stats/weighting.py
index 05cd08db4f..d12639255e 100644
--- a/python/sedona/spark/stats/weighting.py
+++ b/python/sedona/spark/stats/weighting.py
@@ -60,18 +60,21 @@ def add_distance_band_column(
"""
sedona = SparkSession.getActiveSession()
- return sedona._jvm.org.apache.sedona.stats.Weighting.addDistanceBandColumn(
- dataframe._jdf,
- float(threshold),
- binary,
- float(alpha),
- include_zero_distance_neighbors,
- include_self,
- float(self_weight),
- geometry,
- use_spheroid,
- saved_attributes,
- result_name,
+ return DataFrame(
+
sedona._jvm.org.apache.sedona.stats.Weighting.addDistanceBandColumnPython(
+ dataframe._jdf,
+ float(threshold),
+ binary,
+ float(alpha),
+ include_zero_distance_neighbors,
+ include_self,
+ float(self_weight),
+ geometry,
+ use_spheroid,
+ saved_attributes,
+ result_name,
+ ),
+ sedona,
)
@@ -110,15 +113,21 @@ def add_binary_distance_band_column(
"""
sedona = SparkSession.getActiveSession()
- return
sedona._jvm.org.apache.sedona.stats.Weighting.addBinaryDistanceBandColumn(
- dataframe._jdf,
- float(threshold),
- include_zero_distance_neighbors,
- include_self,
- geometry,
- use_spheroid,
- saved_attributes,
- result_name,
+ return DataFrame(
+
sedona._jvm.org.apache.sedona.stats.Weighting.addDistanceBandColumnPython(
+ dataframe._jdf,
+ float(threshold),
+ True,
+ float(-1.0),
+ include_zero_distance_neighbors,
+ include_self,
+ float(1.0),
+ geometry,
+ use_spheroid,
+ saved_attributes,
+ result_name,
+ ),
+ sedona,
)
@@ -161,15 +170,19 @@ def add_weighted_distance_band_column(
"""
sedona = SparkSession.getActiveSession()
- return
sedona._jvm.org.apache.sedona.stats.Weighting.addBinaryDistanceBandColumn(
- dataframe._jdf,
- float(threshold),
- float(alpha),
- include_zero_distance_neighbors,
- include_self,
- float(self_weight),
- geometry,
- use_spheroid,
- saved_attributes,
- result_name,
+ return DataFrame(
+
sedona._jvm.org.apache.sedona.stats.Weighting.addDistanceBandColumnPython(
+ dataframe._jdf,
+ float(threshold),
+ False,
+ alpha,
+ include_zero_distance_neighbors,
+ include_self,
+ self_weight,
+ geometry,
+ use_spheroid,
+ saved_attributes,
+ result_name,
+ ),
+ sedona,
)
diff --git a/python/tests/stats/test_moran.py b/python/tests/stats/test_moran.py
new file mode 100644
index 0000000000..d910d79f1c
--- /dev/null
+++ b/python/tests/stats/test_moran.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from sedona.spark.stats.autocorrelation.moran import Moran
+from sedona.spark.stats.weighting import add_binary_distance_band_column
+from tests.test_base import TestBase
+
+
+class TestMoran(TestBase):
+
+ data = [
+ (1, 1.0, 1.0, 8.5),
+ (2, 1.5, 1.2, 8.2),
+ (3, 1.3, 1.8, 8.7),
+ (4, 1.7, 1.6, 7.9),
+ (5, 4.0, 1.5, 6.2),
+ (6, 4.2, 1.7, 6.5),
+ (7, 4.5, 1.3, 5.9),
+ (8, 4.7, 1.8, 6.0),
+ (9, 1.8, 4.3, 3.1),
+ (10, 1.5, 4.5, 3.4),
+ (11, 1.2, 4.7, 3.0),
+ (12, 1.6, 4.2, 3.3),
+ (13, 1.9, 4.8, 2.8),
+ (14, 4.3, 4.2, 1.2),
+ (15, 4.5, 4.5, 1.5),
+ (16, 4.7, 4.8, 1.0),
+ (17, 4.1, 4.6, 1.3),
+ (18, 4.8, 4.3, 1.1),
+ (19, 4.2, 4.9, 1.4),
+ (20, 4.6, 4.1, 1.6),
+ ]
+
+ def test_moran_integration(self):
+ df = (
+ self.spark.createDataFrame(self.data)
+ .selectExpr("_1 as id", "_2 AS x", "_3 AS y", "_4 AS value")
+ .selectExpr("id", "ST_MakePoint(x, y) AS geometry", "value")
+ )
+
+ result = add_binary_distance_band_column(
+ df, 1.0, saved_attributes=["id", "value"]
+ )
+
+ moran_i_result = Moran.get_global(result)
+
+ assert 0 < moran_i_result.p_norm < 0.00001
+ assert 0.9614 < moran_i_result.i < 0.9615
+ assert 7.7523 < moran_i_result.z_norm < 7.7524
+
+ two_tailed_result = Moran.get_global(result, False)
+ assert 0 < two_tailed_result.p_norm < 0.00001
+ assert 0.9614 < two_tailed_result.i < 0.9615
+ assert 7.7523 < two_tailed_result.z_norm < 7.7524
+
+ def test_moran_with_different_column_names(self):
+ df = (
+ self.spark.createDataFrame(self.data)
+ .selectExpr("_1 as index", "_2 AS x", "_3 AS y", "_4 AS
feature_value")
+ .selectExpr("index", "ST_MakePoint(x, y) AS geometry",
"feature_value")
+ )
+
+ result = add_binary_distance_band_column(
+ df, threshold=1.0, saved_attributes=["index", "feature_value"]
+ )
+
+ moran_i_result = Moran.get_global(
+ result, id_column="index", value_column="feature_value"
+ )
+
+ assert 0 < moran_i_result.p_norm < 0.00001
+ assert 0.9614 < moran_i_result.i < 0.9615
+ assert 7.7523 < moran_i_result.z_norm < 7.7524
+
+ two_tailed_result = Moran.get_global(
+ result, id_column="index", value_column="feature_value",
two_tailed=False
+ )
+
+ assert 0 < two_tailed_result.p_norm < 0.00001
+ assert 0.9614 < two_tailed_result.i < 0.9615
+ assert 7.7523 < two_tailed_result.z_norm < 7.7524
diff --git
a/spark/common/src/main/java/org/apache/sedona/stats/autocorrelation/MoranResult.java
b/spark/common/src/main/java/org/apache/sedona/stats/autocorrelation/MoranResult.java
new file mode 100644
index 0000000000..1d158c7bf5
--- /dev/null
+++
b/spark/common/src/main/java/org/apache/sedona/stats/autocorrelation/MoranResult.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.stats.autocorrelation;
+
+public class MoranResult {
+ private final double i;
+ private final double pNorm;
+ private final double zNorm;
+
+ public MoranResult(double i, double pNorm, double zNorm) {
+ this.i = i;
+ this.pNorm = pNorm;
+ this.zNorm = zNorm;
+ }
+
+ public double getI() {
+ return i;
+ }
+
+ public double getPNorm() {
+ return pNorm;
+ }
+
+ public double getZNorm() {
+ return zNorm;
+ }
+}
diff --git
a/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala
b/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala
index 7713674261..d404f2c2db 100644
--- a/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala
+++ b/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala
@@ -22,6 +22,7 @@ import org.apache.sedona.util.DfUtils.getGeometryColumnName
import org.apache.spark.sql.functions._
import org.apache.spark.sql.sedona_sql.expressions.st_functions.{ST_Distance,
ST_DistanceSpheroid}
import org.apache.spark.sql.{Column, DataFrame}
+import scala.collection.JavaConverters._
object Weighting {
@@ -255,4 +256,34 @@ object Weighting {
savedAttributes = savedAttributes,
resultName = resultName)
+ def addDistanceBandColumnPython(
+ dataframe: DataFrame,
+ threshold: Double,
+ binary: Boolean = true,
+ alpha: Double = -1.0,
+ includeZeroDistanceNeighbors: Boolean = false,
+ includeSelf: Boolean = false,
+ selfWeight: Double = 1.0,
+ geometry: String = null,
+ useSpheroid: Boolean = false,
+ savedAttributes: java.util.ArrayList[String] = null,
+ resultName: String = "weights"): DataFrame = {
+
+ val savedAttributesScala =
+ if (savedAttributes != null) savedAttributes.asScala.toSeq
+ else null
+
+ addDistanceBandColumn(
+ dataframe,
+ threshold,
+ binary,
+ alpha,
+ includeZeroDistanceNeighbors,
+ includeSelf,
+ selfWeight,
+ geometry,
+ useSpheroid,
+ savedAttributesScala,
+ resultName)
+ }
}
diff --git
a/spark/common/src/main/scala/org/apache/sedona/stats/autocorrelation/Moran.scala
b/spark/common/src/main/scala/org/apache/sedona/stats/autocorrelation/Moran.scala
new file mode 100644
index 0000000000..8e38cfdd0b
--- /dev/null
+++
b/spark/common/src/main/scala/org/apache/sedona/stats/autocorrelation/Moran.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.stats.autocorrelation
+
+import org.apache.commons.math3.distribution.NormalDistribution
+import org.apache.sedona.stats.autocorrelation.MoranResult
+import org.apache.spark.sql.{DataFrame, functions}
+import org.apache.spark.sql.functions.{col, explode, pow}
+
+object Moran {
+ private val ID_COLUMN = "id"
+ private val VALUE_COLUMN = "value"
+
+ private def normSf(x: Double, mean: Double = 0.0, stdDev: Double = 1.0):
Double = {
+ val normalDist = new NormalDistribution(mean, stdDev)
+ 1.0 - normalDist.cumulativeProbability(x)
+ }
+
+ private def normCdf(x: Double, mean: Double = 0.0, stdDev: Double = 1.0):
Double = {
+ val normalDist = new NormalDistribution(mean, stdDev)
+ normalDist.cumulativeProbability(x)
+ }
+
+ def getGlobal(
+ dataframe: DataFrame,
+ twoTailed: Boolean = true,
+ idColumn: String = ID_COLUMN,
+ valueColumnName: String = VALUE_COLUMN): MoranResult = {
+ val spark = dataframe.sparkSession
+ import spark.implicits._
+
+ val data = dataframe
+ .selectExpr(s"avg($valueColumnName)", "count(*)")
+ .as[(Double, Long)]
+ .head()
+
+ val yMean = data._1
+
+ val n = data._2
+
+ val explodedWeights = dataframe
+ .select(col(idColumn), explode(col("weights")).alias("col"))
+ .select(
+ $"$idColumn".alias("id"),
+ $"col.neighbor.$idColumn".alias("n_id"),
+ $"col.value".alias("weight_value"))
+
+ val s1Data = explodedWeights
+ .alias("left")
+ .join(
+ explodedWeights.alias("right"),
+ $"left.n_id" === $"right.id" && $"right.n_id" === $"left.id")
+ .select(
+ $"left.id",
+ $"right.weight_value".alias("b_weight_value"),
+ pow(($"right.weight_value" + $"left.weight_value"),
2).alias("s1_comp"),
+ $"left.weight_value".alias("a_weight"))
+
+ val sStats = s1Data
+ .selectExpr(
+ "CAST(sum(s1_comp)/2 AS DOUBLE) AS s1_comp_sum",
+ "CAST(sum(a_weight) AS DOUBLE) AS a_weight_sum",
+ "CAST(sum(b_weight_value) AS DOUBLE) AS b_weight_value_sum")
+ .as[(Double, Double, Double)]
+ .head()
+
+ val s1 = sStats._1
+
+ val inumData = dataframe
+ .selectExpr(
+ s"$idColumn AS id",
+ s"$valueColumnName AS value",
+ f"$valueColumnName - ${yMean} AS z",
+ f"transform(weights, w -> struct(w.neighbor.$idColumn AS id, w.value
AS w, w.neighbor.$valueColumnName, w.neighbor.$valueColumnName - ${yMean} AS
z)) AS weight")
+ .selectExpr(
+ "z",
+ "AGGREGATE(transform(weight, x-> x.z*x.w), CAST(0.0 AS DOUBLE), (acc,
x) -> acc + x) AS ZL",
+ "AGGREGATE(transform(weight, x-> x.w), CAST(0.0 AS DOUBLE), (acc, x)
-> acc + x) AS w_sum",
+ "AGGREGATE(transform(weight, x-> x.w), CAST(0.0 AS DOUBLE), (acc, x)
-> acc + x) AS w_sq_sum",
+ "z * z AS z2ss_comp")
+ .selectExpr("*", "(z * zl) AS inum_comp")
+ .selectExpr("sum(inum_comp)", "sum(w_sum)", "sum(z2ss_comp)")
+
+ val s2Data = s1Data
+ .groupBy("id")
+ .agg(
+ functions.sum("b_weight_value").alias("s_b_weight_value"),
+ functions.sum("a_weight").alias("s_a_weight"))
+ .selectExpr("pow(s_b_weight_value + s_a_weight, 2) AS summed")
+
+ val s2 = s2Data.selectExpr("sum(summed)").as[Double].head()
+ val inumResult = inumData.as[(Double, Double, Double)].head()
+ val inum = inumResult._1
+
+ val s0 = inumResult._2
+
+ val z2ss = inumResult._3
+
+ val i = n / s0 * inum / z2ss
+ val ei = -1.0 / (n - 1)
+ val n2 = n * n
+ val s02 = s0 * s0
+ val vNum = n2 * s1 - n * s2 + 3 * s02
+ val vDen = (n - 1) * (n + 1) * s02
+ val viNorm = (vNum / vDen) - math.pow(1.0 / (n - 1), 2)
+ val seINorm = math.pow(viNorm, 0.5)
+ val zNorm = (i - ei) / seINorm
+
+ val pNorm = if (zNorm > 0) normSf(zNorm) else normCdf(zNorm)
+ val pNormFinal = if (twoTailed) pNorm * 2.0 else pNorm
+
+ new MoranResult(i, pNormFinal, zNorm)
+ }
+}
diff --git
a/spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/AutoCorrelationFixtures.scala
b/spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/AutoCorrelationFixtures.scala
new file mode 100644
index 0000000000..5209bc7d3f
--- /dev/null
+++
b/spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/AutoCorrelationFixtures.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.stats.autocorrelation
+
+import org.apache.sedona.spark.SedonaContext
+import org.apache.sedona.sql.TestBaseScala
+
+import java.nio.file.Files
+
+trait AutoCorrelationFixtures {
+ this: TestBaseScala =>
+ SedonaContext.create(sparkSession)
+
+ val positiveAutoCorrelation = List(
+ (1, 1.0, 1.0, 8.5),
+ (2, 1.5, 1.2, 8.2),
+ (3, 1.3, 1.8, 8.7),
+ (4, 1.7, 1.6, 7.9),
+ (5, 4.0, 1.5, 6.2),
+ (6, 4.2, 1.7, 6.5),
+ (7, 4.5, 1.3, 5.9),
+ (8, 4.7, 1.8, 6.0),
+ (9, 1.8, 4.3, 3.1),
+ (10, 1.5, 4.5, 3.4),
+ (11, 1.2, 4.7, 3.0),
+ (12, 1.6, 4.2, 3.3),
+ (13, 1.9, 4.8, 2.8),
+ (14, 4.3, 4.2, 1.2),
+ (15, 4.5, 4.5, 1.5),
+ (16, 4.7, 4.8, 1.0),
+ (17, 4.1, 4.6, 1.3),
+ (18, 4.8, 4.3, 1.1),
+ (19, 4.2, 4.9, 1.4),
+ (20, 4.6, 4.1, 1.6))
+
+ val positiveCorrelationFrame = sparkSession
+ .createDataFrame(positiveAutoCorrelation)
+ .selectExpr("_1 as id", "_2 AS x", "_3 AS y", "_4 AS value")
+ .selectExpr("id", "ST_MakePoint(x, y) AS geometry", "value")
+
+ val negativeCorrelationPoints = List(
+ (1, 1.0, 1.0, 8.5),
+ (2, 2.0, 1.0, 2.1),
+ (3, 3.0, 1.0, 8.2),
+ (4, 4.0, 1.0, 2.3),
+ (5, 5.0, 1.0, 8.7),
+ (6, 1.0, 2.0, 2.5),
+ (7, 2.0, 2.0, 8.1),
+ (8, 3.0, 2.0, 2.7),
+ (9, 4.0, 2.0, 8.3),
+ (10, 5.0, 2.0, 2.0),
+ (11, 1.0, 3.0, 8.6),
+ (12, 2.0, 3.0, 2.2),
+ (13, 3.0, 3.0, 8.4),
+ (14, 4.0, 3.0, 2.4),
+ (15, 5.0, 3.0, 8.0),
+ (16, 1.0, 4.0, 2.6),
+ (17, 2.0, 4.0, 8.8),
+ (18, 3.0, 4.0, 2.8),
+ (19, 4.0, 4.0, 8.9),
+ (20, 5.0, 4.0, 2.9))
+
+ val zeroCorrelationPoints = List(
+ (1, 3.75, 7.89, 2.58),
+ (2, 9.31, 4.25, 7.43),
+ (3, 5.12, 0.48, 5.96),
+ (4, 6.25, 1.74, 3.12),
+ (5, 1.47, 6.33, 8.26),
+ (6, 8.18, 9.57, 1.97),
+ (7, 2.64, 3.05, -6.42),
+ (8, 4.33, 5.88, 4.74),
+ (9, 7.91, 2.41, -10.13),
+ (10, 0.82, 8.76, 3.89),
+ (11, 9.70, 1.19, 100.0),
+ (12, 2.18, 7.54, 7.35),
+ (13, 5.47, 3.94, 2.16),
+ (14, 8.59, 6.78, -12.63),
+ (15, 3.07, 2.88, 4.27),
+ (16, 6.71, 9.12, 6.84),
+ (17, 1.34, 4.51, -25.0),
+ (18, 7.26, 5.29, -45.0),
+ (19, 4.89, 0.67, 1.59),
+ (20, 0.53, 8.12, 5.21))
+
+ val zeroCorrelationFrame = sparkSession
+ .createDataFrame(zeroCorrelationPoints)
+ .selectExpr("_1 as id", "_2 AS x", "_3 AS y", "_4 AS value")
+ .selectExpr("id", "ST_MakePoint(x, y) AS geometry", "value")
+
+ val negativeCorrelationFrame = sparkSession
+ .createDataFrame(negativeCorrelationPoints)
+ .selectExpr("_1 as id", "_2 AS x", "_3 AS y", "_4 AS value")
+ .selectExpr("id", "ST_MakePoint(x, y) AS geometry", "value")
+}
diff --git
a/spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/MoranTest.scala
b/spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/MoranTest.scala
new file mode 100644
index 0000000000..f26f79197b
--- /dev/null
+++
b/spark/common/src/test/scala/org/apache/sedona/stats/autocorrelation/MoranTest.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.stats.autocorrelation
+
+import org.apache.sedona.sql.TestBaseScala
+import org.apache.sedona.stats.Weighting
+import org.apache.sedona.stats.autocorrelation.Moran
+import org.apache.spark.sql.functions.expr
+
+class MoranTest extends TestBaseScala with AutoCorrelationFixtures {
+ describe("Moran's I") {
+ it("correlation exists") {
+ val weights = Weighting
+ .addDistanceBandColumn(
+ positiveCorrelationFrame,
+ 1.0,
+ savedAttributes = Seq("id", "value"))
+ .withColumn(
+ "weights",
+ expr("transform(weights, w -> struct(w.neighbor,
w.value/size(weights) AS value))"))
+
+ weights.cache().count()
+
+ val moranResult = Moran.getGlobal(weights, idColumn = "id")
+
+ assert(moranResult.getPNorm < 0.0001)
+ assert(moranResult.getI > 0.99)
+ }
+
+ it("different id and value column names") {
+ val weights = Weighting
+ .addDistanceBandColumn(
+ positiveCorrelationFrame
+ .selectExpr("id AS index", "value as feature_value", "geometry"),
+ 1.0,
+ savedAttributes = Seq("index", "feature_value"))
+ .withColumn(
+ "weights",
+ expr("transform(weights, w -> struct(w.neighbor,
w.value/size(weights) AS value))"))
+
+ weights.cache().count()
+
+ val moranResult =
+ Moran.getGlobal(weights, idColumn = "index", valueColumnName =
"feature_value")
+
+ assert(moranResult.getPNorm < 0.0001)
+ assert(moranResult.getI > 0.99)
+ }
+
+ it("correlation is negative") {
+ val weights = Weighting
+ .addDistanceBandColumn(
+ negativeCorrelationFrame,
+ 1.0,
+ savedAttributes = Seq("id", "value"))
+ .withColumn(
+ "weights",
+ expr("transform(weights, w -> struct(w.neighbor,
w.value/size(weights) AS value))"))
+
+ weights.cache().count()
+
+ val moranResult = Moran.getGlobal(weights)
+
+ assert(moranResult.getPNorm < 0.0001)
+ assert(moranResult.getI < -0.99)
+ assert(moranResult.getI > -1)
+ }
+
+ it("zero correlation exists") {
+ val weights = Weighting
+ .addDistanceBandColumn(zeroCorrelationFrame, 2.0, savedAttributes =
Seq("id", "value"))
+ .withColumn(
+ "weights",
+ expr("transform(weights, w -> struct(w.neighbor,
w.value/size(weights) AS value))"))
+
+ weights.cache().count()
+
+ val moranResult = Moran.getGlobal(weights)
+
+ assert(moranResult.getPNorm < 0.44 && moranResult.getPNorm > 0.43)
+ assert(moranResult.getI < 0.16 && moranResult.getI > 0.15)
+ }
+ }
+}