This is an automated email from the ASF dual-hosted git repository. imbruced pushed a commit to branch SEDONA-738-add-autocorrelation in repository https://gitbox.apache.org/repos/asf/sedona.git
commit e9e20639ac0cbdf16e3c9996eedb5f7abd830ff7 Author: pawelkocinski <[email protected]> AuthorDate: Wed Jun 4 23:44:41 2025 +0200 SEDONA-738 Add moran i autocorrelation. --- python/sedona/spark/register/java_libs.py | 1 + .../sedona/spark/stats/autocorrelation/__init__.py | 16 +++ python/sedona/spark/stats/autocorrelation/moran.py | 49 ++++++++ python/sedona/spark/stats/weighting.py | 73 ++++++------ python/tests/stats/test_moran.py | 63 ++++++++++ .../sedona/stats/autocorrelation/MoranResult.java | 43 +++++++ .../apache/sedona/stats/autocorelation/Moran.scala | 128 +++++++++++++++++++++ .../autocorellation/AutoCorrelationFixtures.scala | 110 ++++++++++++++++++ .../sedona/stats/autocorellation/MoranTest.scala | 80 +++++++++++++ 9 files changed, 531 insertions(+), 32 deletions(-) diff --git a/python/sedona/spark/register/java_libs.py b/python/sedona/spark/register/java_libs.py index 675d788855..d9f76831b4 100644 --- a/python/sedona/spark/register/java_libs.py +++ b/python/sedona/spark/register/java_libs.py @@ -65,6 +65,7 @@ class SedonaJvmLib(Enum): st_predicates = "org.apache.spark.sql.sedona_sql.expressions.st_predicates" st_aggregates = "org.apache.spark.sql.sedona_sql.expressions.st_aggregates" SedonaContext = "org.apache.sedona.spark.SedonaContext" + Moran = "org.apache.sedona.stats.autocorelation.Moran" @classmethod def from_str(cls, geo_lib: str) -> "SedonaJvmLib": diff --git a/python/sedona/spark/stats/autocorrelation/__init__.py b/python/sedona/spark/stats/autocorrelation/__init__.py new file mode 100644 index 0000000000..13a83393a9 --- /dev/null +++ b/python/sedona/spark/stats/autocorrelation/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/python/sedona/spark/stats/autocorrelation/moran.py b/python/sedona/spark/stats/autocorrelation/moran.py new file mode 100644 index 0000000000..9b7717a0f7 --- /dev/null +++ b/python/sedona/spark/stats/autocorrelation/moran.py @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from dataclasses import dataclass + +from pyspark.sql import DataFrame +from pyspark.sql import SparkSession + + +@dataclass +class MoranResult: + i: float + p_norm: float + z_norm: float + + +class Moran: + + @staticmethod + def get_global( + df: DataFrame, two_tailed: bool = True, id_column: str = "id" + ) -> MoranResult: + sedona = SparkSession.getActiveSession() + + _jvm = sedona._jvm + moran_result = ( + sedona._jvm.org.apache.sedona.stats.autocorelation.Moran.getGlobal( + df._jdf, two_tailed, id_column + ) + ) + + return MoranResult( + i=moran_result.getI(), + p_norm=moran_result.getPNorm(), + z_norm=moran_result.getZNorm(), + ) diff --git a/python/sedona/spark/stats/weighting.py b/python/sedona/spark/stats/weighting.py index 05cd08db4f..68394d75f0 100644 --- a/python/sedona/spark/stats/weighting.py +++ b/python/sedona/spark/stats/weighting.py @@ -60,18 +60,21 @@ def add_distance_band_column( """ sedona = SparkSession.getActiveSession() - return sedona._jvm.org.apache.sedona.stats.Weighting.addDistanceBandColumn( - dataframe._jdf, - float(threshold), - binary, - float(alpha), - include_zero_distance_neighbors, - include_self, - float(self_weight), - geometry, - use_spheroid, - saved_attributes, - result_name, + return DataFrame( + sedona._jvm.org.apache.sedona.stats.Weighting.addDistanceBandColumn( + dataframe._jdf, + float(threshold), + binary, + float(alpha), + include_zero_distance_neighbors, + include_self, + float(self_weight), + geometry, + use_spheroid, + saved_attributes, + result_name, + ), + sedona._jsparkSession, ) @@ -110,15 +113,18 @@ def add_binary_distance_band_column( """ sedona = SparkSession.getActiveSession() - return sedona._jvm.org.apache.sedona.stats.Weighting.addBinaryDistanceBandColumn( - dataframe._jdf, - float(threshold), - include_zero_distance_neighbors, - include_self, - geometry, - use_spheroid, - saved_attributes, - result_name, + return DataFrame( + sedona._jvm.org.apache.sedona.stats.Weighting.addBinaryDistanceBandColumn( + dataframe._jdf, + float(threshold), + include_zero_distance_neighbors, + include_self, + geometry, + use_spheroid, + saved_attributes, + result_name, + ), + sedona._jsparkSession, ) @@ -161,15 +167,18 @@ def add_weighted_distance_band_column( """ sedona = SparkSession.getActiveSession() - return sedona._jvm.org.apache.sedona.stats.Weighting.addBinaryDistanceBandColumn( - dataframe._jdf, - float(threshold), - float(alpha), - include_zero_distance_neighbors, - include_self, - float(self_weight), - geometry, - use_spheroid, - saved_attributes, - result_name, + return DataFrame( + sedona._jvm.org.apache.sedona.stats.Weighting.addBinaryDistanceBandColumn( + dataframe._jdf, + float(threshold), + float(alpha), + include_zero_distance_neighbors, + include_self, + float(self_weight), + geometry, + use_spheroid, + saved_attributes, + result_name, + ), + sedona._jsparkSession, ) diff --git a/python/tests/stats/test_moran.py b/python/tests/stats/test_moran.py new file mode 100644 index 0000000000..84b13ee379 --- /dev/null +++ b/python/tests/stats/test_moran.py @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from sedona.spark.stats.autocorrelation.moran import Moran, MoranResult +from sedona.spark.stats.weighting import add_binary_distance_band_column +from tests.test_base import TestBase + + +class TestMoran(TestBase): + + def test_moran_integration(self): + data = [ + (1, 1.0, 1.0, 8.5), + (2, 1.5, 1.2, 8.2), + (3, 1.3, 1.8, 8.7), + (4, 1.7, 1.6, 7.9), + (5, 4.0, 1.5, 6.2), + (6, 4.2, 1.7, 6.5), + (7, 4.5, 1.3, 5.9), + (8, 4.7, 1.8, 6.0), + (9, 1.8, 4.3, 3.1), + (10, 1.5, 4.5, 3.4), + (11, 1.2, 4.7, 3.0), + (12, 1.6, 4.2, 3.3), + (13, 1.9, 4.8, 2.8), + (14, 4.3, 4.2, 1.2), + (15, 4.5, 4.5, 1.5), + (16, 4.7, 4.8, 1.0), + (17, 4.1, 4.6, 1.3), + (18, 4.8, 4.3, 1.1), + (19, 4.2, 4.9, 1.4), + (20, 4.6, 4.1, 1.6), + ] + + df = ( + self.spark.createDataFrame(data) + .selectExpr("_1 as id", "_2 AS x", "_3 AS y", "_4 AS value") + .selectExpr("id", "ST_MakePoint(x, y) AS geometry", "value") + ) + + result = add_binary_distance_band_column(df, 1.0) + + assert Moran.get_global(result) == MoranResult( + i=0.9614304631460562, p_norm=9.103828801926284e-15, z_norm=7.752321966127421 + ) + + assert Moran.get_global(result, False) == MoranResult( + i=0.9614304631460562, p_norm=4.551914400963142e-15, z_norm=7.752321966127421 + ) diff --git a/spark/common/src/main/java/org/apache/sedona/stats/autocorrelation/MoranResult.java b/spark/common/src/main/java/org/apache/sedona/stats/autocorrelation/MoranResult.java new file mode 100644 index 0000000000..1d158c7bf5 --- /dev/null +++ b/spark/common/src/main/java/org/apache/sedona/stats/autocorrelation/MoranResult.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.stats.autocorrelation; + +public class MoranResult { + private final double i; + private final double pNorm; + private final double zNorm; + + public MoranResult(double i, double pNorm, double zNorm) { + this.i = i; + this.pNorm = pNorm; + this.zNorm = zNorm; + } + + public double getI() { + return i; + } + + public double getPNorm() { + return pNorm; + } + + public double getZNorm() { + return zNorm; + } +} diff --git a/spark/common/src/main/scala/org/apache/sedona/stats/autocorelation/Moran.scala b/spark/common/src/main/scala/org/apache/sedona/stats/autocorelation/Moran.scala new file mode 100644 index 0000000000..7e06a069dd --- /dev/null +++ b/spark/common/src/main/scala/org/apache/sedona/stats/autocorelation/Moran.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.stats.autocorelation + +import org.apache.commons.math3.distribution.NormalDistribution +import org.apache.sedona.stats.autocorrelation.MoranResult +import org.apache.spark.sql.{DataFrame, functions} +import org.apache.spark.sql.functions.{col, explode, pow} + +object Moran { + private val ID_COLUMN = "id" + + private def normSf(x: Double, mean: Double = 0.0, stdDev: Double = 1.0): Double = { + val normalDist = new NormalDistribution(mean, stdDev) + 1.0 - normalDist.cumulativeProbability(x) + } + + private def normCdf(x: Double, mean: Double = 0.0, stdDev: Double = 1.0): Double = { + val normalDist = new NormalDistribution(mean, stdDev) + normalDist.cumulativeProbability(x) + } + + def getGlobal( + dataframe: DataFrame, + twoTailed: Boolean = true, + idColumn: String = ID_COLUMN): MoranResult = { + val spark = dataframe.sparkSession + import spark.implicits._ + + val data = dataframe + .selectExpr("avg(value)", "count(*)") + .as[(Double, Long)] + .head() + + val yMean = data._1 + + val n = data._2 + + val explodedWeights = dataframe + .select(col(idColumn), explode(col("weights")).alias("col")) + .select( + $"$idColumn".alias("id"), + $"col.neighbor.id".alias("n_id"), + $"col.value".alias("weight_value")) + + val s1Data = explodedWeights + .alias("left") + .join( + explodedWeights.alias("right"), + $"left.n_id" === $"right.id" && $"right.n_id" === $"left.id") + .select( + $"left.id", + $"right.weight_value".alias("b_weight_value"), + pow(($"right.weight_value" + $"left.weight_value"), 2).alias("s1_comp"), + $"left.weight_value".alias("a_weight")) + + val sStats = s1Data + .selectExpr( + "CAST(sum(s1_comp)/2 AS DOUBLE) AS s1_comp_sum", + "CAST(sum(a_weight) AS DOUBLE) AS a_weight_sum", + "CAST(sum(b_weight_value) AS DOUBLE) AS b_weight_value_sum") + .as[(Double, Double, Double)] + .head() + + val s1 = sStats._1 + + val inumData = dataframe + .selectExpr( + s"$idColumn AS id", + "value", + f"value - ${yMean} AS z", + f"transform(weights, w -> struct(w.neighbor.id AS id, w.value AS w, w.neighbor.value, w.neighbor.value - ${yMean} AS z)) AS weight") + .selectExpr( + "z", + "AGGREGATE(transform(weight, x-> x.z*x.w), CAST(0.0 AS DOUBLE), (acc, x) -> acc + x) AS ZL", + "AGGREGATE(transform(weight, x-> x.w), CAST(0.0 AS DOUBLE), (acc, x) -> acc + x) AS w_sum", + "AGGREGATE(transform(weight, x-> x.w), CAST(0.0 AS DOUBLE), (acc, x) -> acc + x) AS w_sq_sum", + "z * z AS z2ss_comp") + .selectExpr("*", "(z * zl) AS inum_comp") + .selectExpr("sum(inum_comp)", "sum(w_sum)", "sum(z2ss_comp)") + + val s2Data = s1Data + .groupBy("id") + .agg( + functions.sum("b_weight_value").alias("s_b_weight_value"), + functions.sum("a_weight").alias("s_a_weight")) + .selectExpr("pow(s_b_weight_value + s_a_weight, 2) AS summed") + + val s2 = s2Data.selectExpr("sum(summed)").as[Double].head() + val inumResult = inumData.as[(Double, Double, Double)].head() + val inum = inumResult._1 + + val s0 = inumResult._2 + + val z2ss = inumResult._3 + + val i = n / s0 * inum / z2ss + val ei = -1.0 / (n - 1) + val n2 = n * n + val s02 = s0 * s0 + val vNum = n2 * s1 - n * s2 + 3 * s02 + val vDen = (n - 1) * (n + 1) * s02 + val viNorm = (vNum / vDen) - math.pow(1.0 / (n - 1), 2) + val seINorm = math.pow(viNorm, 0.5) + val zNorm = (i - ei) / seINorm + + val pNorm = if (zNorm > 0) normSf(zNorm) else normCdf(zNorm) + val pNormFinal = if (twoTailed) pNorm * 2.0 else pNorm + + new MoranResult(i, pNormFinal, zNorm) + } +} diff --git a/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/AutoCorrelationFixtures.scala b/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/AutoCorrelationFixtures.scala new file mode 100644 index 0000000000..c87ae430f1 --- /dev/null +++ b/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/AutoCorrelationFixtures.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.stats.autocorellation + +import org.apache.sedona.spark.SedonaContext +import org.apache.sedona.sql.TestBaseScala + +import java.nio.file.Files + +trait AutoCorrelationFixtures { + this: TestBaseScala => + SedonaContext.create(sparkSession) + + val positiveAutoCorrelation = List( + (1, 1.0, 1.0, 8.5), + (2, 1.5, 1.2, 8.2), + (3, 1.3, 1.8, 8.7), + (4, 1.7, 1.6, 7.9), + (5, 4.0, 1.5, 6.2), + (6, 4.2, 1.7, 6.5), + (7, 4.5, 1.3, 5.9), + (8, 4.7, 1.8, 6.0), + (9, 1.8, 4.3, 3.1), + (10, 1.5, 4.5, 3.4), + (11, 1.2, 4.7, 3.0), + (12, 1.6, 4.2, 3.3), + (13, 1.9, 4.8, 2.8), + (14, 4.3, 4.2, 1.2), + (15, 4.5, 4.5, 1.5), + (16, 4.7, 4.8, 1.0), + (17, 4.1, 4.6, 1.3), + (18, 4.8, 4.3, 1.1), + (19, 4.2, 4.9, 1.4), + (20, 4.6, 4.1, 1.6)) + + val positiveCorrelationFrame = sparkSession + .createDataFrame(positiveAutoCorrelation) + .selectExpr("_1 as id", "_2 AS x", "_3 AS y", "_4 AS value") + .selectExpr("id", "ST_MakePoint(x, y) AS geometry", "value") + + val negativeCorrelationPoints = List( + (1, 1.0, 1.0, 8.5), + (2, 2.0, 1.0, 2.1), + (3, 3.0, 1.0, 8.2), + (4, 4.0, 1.0, 2.3), + (5, 5.0, 1.0, 8.7), + (6, 1.0, 2.0, 2.5), + (7, 2.0, 2.0, 8.1), + (8, 3.0, 2.0, 2.7), + (9, 4.0, 2.0, 8.3), + (10, 5.0, 2.0, 2.0), + (11, 1.0, 3.0, 8.6), + (12, 2.0, 3.0, 2.2), + (13, 3.0, 3.0, 8.4), + (14, 4.0, 3.0, 2.4), + (15, 5.0, 3.0, 8.0), + (16, 1.0, 4.0, 2.6), + (17, 2.0, 4.0, 8.8), + (18, 3.0, 4.0, 2.8), + (19, 4.0, 4.0, 8.9), + (20, 5.0, 4.0, 2.9)) + + val zeroCorrelationPoints = List( + (1, 3.75, 7.89, 2.58), + (2, 9.31, 4.25, 7.43), + (3, 5.12, 0.48, 5.96), + (4, 6.25, 1.74, 3.12), + (5, 1.47, 6.33, 8.26), + (6, 8.18, 9.57, 1.97), + (7, 2.64, 3.05, -6.42), + (8, 4.33, 5.88, 4.74), + (9, 7.91, 2.41, -10.13), + (10, 0.82, 8.76, 3.89), + (11, 9.70, 1.19, 100.0), + (12, 2.18, 7.54, 7.35), + (13, 5.47, 3.94, 2.16), + (14, 8.59, 6.78, -12.63), + (15, 3.07, 2.88, 4.27), + (16, 6.71, 9.12, 6.84), + (17, 1.34, 4.51, -25.0), + (18, 7.26, 5.29, -45.0), + (19, 4.89, 0.67, 1.59), + (20, 0.53, 8.12, 5.21)) + + val zeroCorrelationFrame = sparkSession + .createDataFrame(zeroCorrelationPoints) + .selectExpr("_1 as id", "_2 AS x", "_3 AS y", "_4 AS value") + .selectExpr("id", "ST_MakePoint(x, y) AS geometry", "value") + + val negativeCorrelationFrame = sparkSession + .createDataFrame(negativeCorrelationPoints) + .selectExpr("_1 as id", "_2 AS x", "_3 AS y", "_4 AS value") + .selectExpr("id", "ST_MakePoint(x, y) AS geometry", "value") +} diff --git a/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/MoranTest.scala b/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/MoranTest.scala new file mode 100644 index 0000000000..c350959f95 --- /dev/null +++ b/spark/common/src/test/scala/org/apache/sedona/stats/autocorellation/MoranTest.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.stats.autocorellation + +import org.apache.sedona.sql.TestBaseScala +import org.apache.sedona.stats.Weighting +import org.apache.sedona.stats.autocorelation.Moran +import org.apache.spark.sql.functions.expr + +class MoranTest extends TestBaseScala with AutoCorrelationFixtures { + describe("Moran's I") { + it("correlation exists") { + val weights = Weighting + .addDistanceBandColumn( + positiveCorrelationFrame, + 1.0, + savedAttributes = Seq("id", "value")) + .withColumn( + "weights", + expr("transform(weights, w -> struct(w.neighbor, w.value/size(weights) AS value))")) + + weights.cache().count() + + val moranResult = Moran.getGlobal(weights, idColumn = "id") + + assert(moranResult.getPNorm < 0.0001) + assert(moranResult.getI > 0.99) + } + + it("correlation is negative") { + val weights = Weighting + .addDistanceBandColumn( + negativeCorrelationFrame, + 1.0, + savedAttributes = Seq("id", "value")) + .withColumn( + "weights", + expr("transform(weights, w -> struct(w.neighbor, w.value/size(weights) AS value))")) + + weights.cache().count() + + val moranResult = Moran.getGlobal(weights) + + assert(moranResult.getPNorm < 0.0001) + assert(moranResult.getI < -0.99) + assert(moranResult.getI > -1) + } + + it("zero correlation exists") { + val weights = Weighting + .addDistanceBandColumn(zeroCorrelationFrame, 2.0, savedAttributes = Seq("id", "value")) + .withColumn( + "weights", + expr("transform(weights, w -> struct(w.neighbor, w.value/size(weights) AS value))")) + + weights.cache().count() + + val moranResult = Moran.getGlobal(weights) + + assert(moranResult.getPNorm < 0.44 && moranResult.getPNorm > 0.43) + assert(moranResult.getI < 0.16 && moranResult.getI > 0.15) + } + } +}
