This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 3d1ebdea77 [GH-2509] Refactor the example projects to include better
examples (#2510)
3d1ebdea77 is described below
commit 3d1ebdea77f7caafd13b73138ce92316bea132dd
Author: Jia Yu <[email protected]>
AuthorDate: Sun Nov 16 01:28:39 2025 -0600
[GH-2509] Refactor the example projects to include better examples (#2510)
---
.github/workflows/example.yml | 43 +++---
examples/flink-sql/pom.xml | 39 ++++-
examples/flink-sql/src/main/java/FlinkExample.java | 112 ++++++++------
examples/flink-sql/src/main/java/Utils.java | 172 +++++++++++----------
.../src/test/java/FlinkFunctionsTest.java | 39 +++++
examples/java-spark-sql/pom.xml | 102 ++++++------
.../src/main/java/spark/GeoParquetAccessor.java | 87 +++++------
.../src/main/java/spark/SedonaGeoParquetMain.java | 59 ++++---
.../src/main/java/spark/SedonaSparkSession.java | 49 +++---
.../src/test/java/spark/SedonaParquetTest.java | 128 +++++++--------
examples/spark-sql/pom.xml | 56 ++++++-
examples/spark-sql/src/main/scala/Main.scala | 22 ++-
examples/spark-sql/src/main/scala/RddExample.scala | 54 +++++--
examples/spark-sql/src/main/scala/SqlExample.scala | 135 ++++++++++++----
examples/spark-sql/src/main/scala/VizExample.scala | 59 +++++--
.../spark-sql/src/test/scala/testFunctions.scala | 121 +++++++++++++++
16 files changed, 838 insertions(+), 439 deletions(-)
diff --git a/.github/workflows/example.yml b/.github/workflows/example.yml
index bd9cb62cc7..2fa3f61bf6 100644
--- a/.github/workflows/example.yml
+++ b/.github/workflows/example.yml
@@ -39,6 +39,7 @@ concurrency:
jobs:
build:
+ name: 'Spark ${{ matrix.spark }}, Hadoop ${{ matrix.hadoop }}, Sedona ${{
matrix.sedona }}'
runs-on: ubuntu-22.04
strategy:
fail-fast: false
@@ -56,23 +57,6 @@ jobs:
spark-compat: '3.4'
sedona: 1.8.0
hadoop: 3.3.4
- env:
- JAVA_TOOL_OPTIONS: >-
- -XX:+IgnoreUnrecognizedVMOptions
- --add-opens=java.base/java.lang=ALL-UNNAMED
- --add-opens=java.base/java.lang.invoke=ALL-UNNAMED
- --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
- --add-opens=java.base/java.io=ALL-UNNAMED
- --add-opens=java.base/java.net=ALL-UNNAMED
- --add-opens=java.base/java.nio=ALL-UNNAMED
- --add-opens=java.base/java.util=ALL-UNNAMED
- --add-opens=java.base/java.util.concurrent=ALL-UNNAMED
- --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
- --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
- --add-opens=java.base/sun.nio.cs=ALL-UNNAMED
- --add-opens=java.base/sun.security.action=ALL-UNNAMED
- --add-opens=java.base/sun.util.calendar=ALL-UNNAMED
- -Djdk.reflect.useDirectMethodHandle=false
steps:
- uses: actions/checkout@v5
- uses: actions/setup-java@v5
@@ -100,7 +84,8 @@ jobs:
path: ~/.m2
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-m2
- - env:
+ - name: Test Scala Spark SQL Example
+ env:
SPARK_VERSION: ${{ matrix.spark }}
SPARK_LOCAL_IP: 127.0.0.1
SPARK_COMPAT_VERSION: ${{ matrix.spark-compat }}
@@ -109,16 +94,28 @@ jobs:
run: |
cd examples/spark-sql
mvn versions:set -DnewVersion=${SEDONA_VERSION}
-DgenerateBackupPoms=false
- mvn clean install \
+ mvn clean test \
-Dspark.version=${SPARK_VERSION} \
-Dspark.compat.version=${SPARK_COMPAT_VERSION} \
-Dsedona.version=${SEDONA_VERSION} \
-Dhadoop.version=${HADOOP_VERSION}
- java -jar target/sedona-spark-example-${SEDONA_VERSION}.jar
- - env:
+ - name: Test Java Spark SQL Example
+ env:
+ SPARK_VERSION: ${{ matrix.spark }}
+ SPARK_LOCAL_IP: 127.0.0.1
+ SPARK_COMPAT_VERSION: ${{ matrix.spark-compat }}
+ SEDONA_VERSION: ${{ matrix.sedona }}
+ HADOOP_VERSION: ${{ matrix.hadoop }}
+ run: |
+ cd examples/java-spark-sql
+ mvn versions:set -DnewVersion=${SEDONA_VERSION}
-DgenerateBackupPoms=false
+ mvn clean test \
+ -Dspark.version=${SPARK_VERSION} \
+ -Dspark.compat.version=${SPARK_COMPAT_VERSION}
+ - name: Test Flink SQL Example
+ env:
SEDONA_VERSION: ${{ matrix.sedona }}
run: |
cd examples/flink-sql
mvn versions:set -DnewVersion=${SEDONA_VERSION}
-DgenerateBackupPoms=false
- mvn clean install
- java -jar target/sedona-flink-example-${SEDONA_VERSION}.jar
+ mvn clean test
diff --git a/examples/flink-sql/pom.xml b/examples/flink-sql/pom.xml
index d6f7e97b68..6c72b6acf4 100644
--- a/examples/flink-sql/pom.xml
+++ b/examples/flink-sql/pom.xml
@@ -31,7 +31,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<geotools.scope>compile</geotools.scope>
<flink.version>1.19.0</flink.version>
- <flink.scope>compile</flink.scope>
+ <flink.scope>provided</flink.scope>
<scala.compat.version>2.12</scala.compat.version>
<geotools.version>33.1</geotools.version>
<log4j.version>2.17.2</log4j.version>
@@ -247,6 +247,20 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.22.2</version>
+ <configuration>
+ <argLine>
+ --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
+ --add-opens=java.base/java.nio=ALL-UNNAMED
+ --add-opens=java.base/java.lang=ALL-UNNAMED
+ --add-opens=java.base/java.lang.invoke=ALL-UNNAMED
+ --add-opens=java.base/java.util=ALL-UNNAMED
+ </argLine>
+ </configuration>
+ </plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
@@ -266,6 +280,29 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>com.diffplug.spotless</groupId>
+ <artifactId>spotless-maven-plugin</artifactId>
+ <version>2.35.0</version>
+ <configuration>
+ <java>
+ <googleJavaFormat>
+ <version>1.15.0</version>
+ </googleJavaFormat>
+ <licenseHeader>
+ <file>../../tools/maven/license-header.txt</file>
+ </licenseHeader>
+ </java>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
<resources>
<resource>
diff --git a/examples/flink-sql/src/main/java/FlinkExample.java
b/examples/flink-sql/src/main/java/FlinkExample.java
index c59eb8125e..7ded36e0d6 100644
--- a/examples/flink-sql/src/main/java/FlinkExample.java
+++ b/examples/flink-sql/src/main/java/FlinkExample.java
@@ -16,70 +16,84 @@
* specific language governing permissions and limitations
* under the License.
*/
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
import org.apache.sedona.flink.SedonaFlinkRegistrator;
import org.apache.sedona.flink.expressions.Constructors;
-import static org.apache.flink.table.api.Expressions.$;
-import static org.apache.flink.table.api.Expressions.call;
-
-public class FlinkExample
-{
- static String[] pointColNames = {"geom_point", "name_point", "event_time",
"proc_time"};
-
- static String[] polygonColNames = {"geom_polygon", "name_polygon",
"event_time", "proc_time"};
+public class FlinkExample {
+ static String[] pointColNames = {"geom_point", "name_point", "event_time",
"proc_time"};
- public static void main(String[] args) {
- int testDataSize = 10;
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().build();
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
- SedonaFlinkRegistrator.registerType(env);
- SedonaFlinkRegistrator.registerFunc(tableEnv);
+ static String[] polygonColNames = {"geom_polygon", "name_polygon",
"event_time", "proc_time"};
- // Create a fake WKT string table source
- Table pointWktTable = Utils.createTextTable(env, tableEnv,
Utils.createPointWKT(testDataSize), pointColNames);
+ public static void main(String[] args) {
+ testS2SpatialJoin(10);
+ }
- // Create a geometry column
- Table pointTable = pointWktTable.select(
- call("ST_GeomFromWKT", $(pointColNames[0])).as(pointColNames[0]),
- $(pointColNames[1]));
+ public static void testS2SpatialJoin(int testDataSize) {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().build();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
+ SedonaFlinkRegistrator.registerType(env);
+ SedonaFlinkRegistrator.registerFunc(tableEnv);
- // Create S2CellID
- pointTable = pointTable.select($(pointColNames[0]),
$(pointColNames[1]),
- call("ST_S2CellIDs", $(pointColNames[0]), 6).as("s2id_array"));
- // Explode s2id array
- tableEnv.createTemporaryView("pointTable", pointTable);
- pointTable = tableEnv.sqlQuery("SELECT geom_point, name_point,
s2id_point FROM pointTable CROSS JOIN UNNEST(pointTable.s2id_array) AS
tmpTbl1(s2id_point)");
+ // Create a fake WKT string table source
+ Table pointWktTable =
+ Utils.createTextTable(env, tableEnv,
Utils.createPointWKT(testDataSize), pointColNames);
+ // Create a geometry column
+ Table pointTable =
+ pointWktTable.select(
+ call("ST_GeomFromWKT", $(pointColNames[0])).as(pointColNames[0]),
$(pointColNames[1]));
- // Create a fake WKT string table source
- Table polygonWktTable = Utils.createTextTable(env, tableEnv,
Utils.createPolygonWKT(testDataSize), polygonColNames);
- // Create a geometry column
- Table polygonTable =
polygonWktTable.select(call(Constructors.ST_GeomFromWKT.class.getSimpleName(),
- $(polygonColNames[0])).as(polygonColNames[0]),
- $(polygonColNames[1]));
- // Create S2CellID
- polygonTable = polygonTable.select($(polygonColNames[0]),
$(polygonColNames[1]),
- call("ST_S2CellIDs", $(polygonColNames[0]),
6).as("s2id_array"));
- // Explode s2id array
- tableEnv.createTemporaryView("polygonTable", polygonTable);
- polygonTable = tableEnv.sqlQuery("SELECT geom_polygon, name_polygon,
s2id_polygon FROM polygonTable CROSS JOIN UNNEST(polygonTable.s2id_array) AS
tmpTbl2(s2id_polygon)");
+ // Create S2CellID
+ pointTable =
+ pointTable.select(
+ $(pointColNames[0]),
+ $(pointColNames[1]),
+ call("ST_S2CellIDs", $(pointColNames[0]), 6).as("s2id_array"));
+ // Explode s2id array
+ tableEnv.createTemporaryView("pointTable", pointTable);
+ pointTable =
+ tableEnv.sqlQuery(
+ "SELECT geom_point, name_point, s2id_point FROM pointTable CROSS
JOIN UNNEST(pointTable.s2id_array) AS tmpTbl1(s2id_point)");
- // TODO: TableImpl.print() occurs EOF Exception due to
https://issues.apache.org/jira/browse/FLINK-35406
- // Use polygonTable.execute().print() when FLINK-35406 is fixed.
- polygonTable.execute().collect().forEachRemaining(row ->
System.out.println(row));
+ // Create a fake WKT string table source
+ Table polygonWktTable =
+ Utils.createTextTable(env, tableEnv,
Utils.createPolygonWKT(testDataSize), polygonColNames);
+ // Create a geometry column
+ Table polygonTable =
+ polygonWktTable.select(
+ call(Constructors.ST_GeomFromWKT.class.getSimpleName(),
$(polygonColNames[0]))
+ .as(polygonColNames[0]),
+ $(polygonColNames[1]));
+ // Create S2CellID
+ polygonTable =
+ polygonTable.select(
+ $(polygonColNames[0]),
+ $(polygonColNames[1]),
+ call("ST_S2CellIDs", $(polygonColNames[0]), 6).as("s2id_array"));
+ // Explode s2id array
+ tableEnv.createTemporaryView("polygonTable", polygonTable);
+ polygonTable =
+ tableEnv.sqlQuery(
+ "SELECT geom_polygon, name_polygon, s2id_polygon FROM polygonTable
CROSS JOIN UNNEST(polygonTable.s2id_array) AS tmpTbl2(s2id_polygon)");
- // Join two tables by their S2 ids
- Table joinResult =
pointTable.join(polygonTable).where($("s2id_point").isEqual($("s2id_polygon")));
- // Optional: remove false positives
- joinResult = joinResult.where(call("ST_Contains", $("geom_polygon"),
$("geom_point")));
- joinResult.execute().collect().forEachRemaining(row ->
System.out.println(row));
- }
+ // TODO: TableImpl.print() occurs EOF Exception due to
+ // https://issues.apache.org/jira/browse/FLINK-35406
+ // Use polygonTable.execute().print() when FLINK-35406 is fixed.
+ polygonTable.execute().collect().forEachRemaining(row ->
System.out.println(row));
+ // Join two tables by their S2 ids
+ Table joinResult =
+
pointTable.join(polygonTable).where($("s2id_point").isEqual($("s2id_polygon")));
+ // Optional: remove false positives
+ joinResult = joinResult.where(call("ST_Contains", $("geom_polygon"),
$("geom_point")));
+ joinResult.execute().collect().forEachRemaining(row ->
System.out.println(row));
+ }
}
diff --git a/examples/flink-sql/src/main/java/Utils.java
b/examples/flink-sql/src/main/java/Utils.java
index 0a95ab1b7b..abe1c3d3a4 100644
--- a/examples/flink-sql/src/main/java/Utils.java
+++ b/examples/flink-sql/src/main/java/Utils.java
@@ -16,7 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
+import static org.apache.flink.table.api.Expressions.$;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -27,91 +32,102 @@ import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.apache.flink.table.api.Expressions.$;
-
-public class Utils
-{
- static Long timestamp_base = new
Timestamp(System.currentTimeMillis()).getTime();
- static Long time_interval = 1L; // Generate a record per this interval.
Unit is second
+public class Utils {
+ static Long timestamp_base = new
Timestamp(System.currentTimeMillis()).getTime();
+ static Long time_interval = 1L; // Generate a record per this interval. Unit
is second
- static List<Row> createPointText(int size){
- List<Row> data = new ArrayList<>();
- for (int i = 0; i < size; i++) {
- // Create a number of points (1, 1) (2, 2) ...
- data.add(Row.of(i + "," + i, "point" + i, timestamp_base +
time_interval * 1000 * i));
- }
- return data;
+ static List<Row> createPointText(int size) {
+ List<Row> data = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ // Create a number of points (1, 1) (2, 2) ...
+ data.add(Row.of(i + "," + i, "point" + i, timestamp_base + time_interval
* 1000 * i));
}
+ return data;
+ }
- static List<Row> createPolygonText(int size) {
- List<Row> data = new ArrayList<>();
- for (int i = 0; i < size; i++) {
- // Create polygons each of which only has 1 match in points
- // Each polygon is an envelope like (-0.5, -0.5, 0.5, 0.5)
- String minX = String.valueOf(i - 0.5);
- String minY = String.valueOf(i - 0.5);
- String maxX = String.valueOf(i + 0.5);
- String maxY = String.valueOf(i + 0.5);
- List<String> polygon = new ArrayList<>();
- polygon.add(minX);polygon.add(minY);
- polygon.add(minX);polygon.add(maxY);
- polygon.add(maxX);polygon.add(maxY);
- polygon.add(maxX);polygon.add(minY);
- polygon.add(minX);polygon.add(minY);
- data.add(Row.of(String.join(",", polygon), "polygon" + i,
timestamp_base + time_interval * 1000 * i));
- }
- return data;
+ static List<Row> createPolygonText(int size) {
+ List<Row> data = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ // Create polygons each of which only has 1 match in points
+ // Each polygon is an envelope like (-0.5, -0.5, 0.5, 0.5)
+ String minX = String.valueOf(i - 0.5);
+ String minY = String.valueOf(i - 0.5);
+ String maxX = String.valueOf(i + 0.5);
+ String maxY = String.valueOf(i + 0.5);
+ List<String> polygon = new ArrayList<>();
+ polygon.add(minX);
+ polygon.add(minY);
+ polygon.add(minX);
+ polygon.add(maxY);
+ polygon.add(maxX);
+ polygon.add(maxY);
+ polygon.add(maxX);
+ polygon.add(minY);
+ polygon.add(minX);
+ polygon.add(minY);
+ data.add(
+ Row.of(
+ String.join(",", polygon), "polygon" + i, timestamp_base +
time_interval * 1000 * i));
}
+ return data;
+ }
- static List<Row> createPointWKT(int size){
- List<Row> data = new ArrayList<>();
- for (int i = 0; i < size; i++) {
- // Create a number of points (1, 1) (2, 2) ...
- data.add(Row.of("POINT (" + i + " " + i +")", "point" + i,
timestamp_base + time_interval * 1000 * i));
- }
- return data;
+ static List<Row> createPointWKT(int size) {
+ List<Row> data = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ // Create a number of points (1, 1) (2, 2) ...
+ data.add(
+ Row.of(
+ "POINT (" + i + " " + i + ")",
+ "point" + i,
+ timestamp_base + time_interval * 1000 * i));
}
+ return data;
+ }
- static List<Row> createPolygonWKT(int size) {
- List<Row> data = new ArrayList<>();
- for (int i = 0; i < size; i++) {
- // Create polygons each of which only has 1 match in points
- // Each polygon is an envelope like (-0.5, -0.5, 0.5, 0.5)
- String minX = String.valueOf(i - 0.5);
- String minY = String.valueOf(i - 0.5);
- String maxX = String.valueOf(i + 0.5);
- String maxY = String.valueOf(i + 0.5);
- List<String> polygon = new ArrayList<>();
- polygon.add(minX + " " + minY);
- polygon.add(minX + " " + maxY);
- polygon.add(maxX + " " + maxY);
- polygon.add(maxX + " " + minY);
- polygon.add(minX + " " + minY);
- data.add(Row.of("POLYGON ((" + String.join(", ", polygon) + "))",
"polygon" + i, timestamp_base + time_interval * 1000 * i));
- }
- return data;
+ static List<Row> createPolygonWKT(int size) {
+ List<Row> data = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ // Create polygons each of which only has 1 match in points
+ // Each polygon is an envelope like (-0.5, -0.5, 0.5, 0.5)
+ String minX = String.valueOf(i - 0.5);
+ String minY = String.valueOf(i - 0.5);
+ String maxX = String.valueOf(i + 0.5);
+ String maxY = String.valueOf(i + 0.5);
+ List<String> polygon = new ArrayList<>();
+ polygon.add(minX + " " + minY);
+ polygon.add(minX + " " + maxY);
+ polygon.add(maxX + " " + maxY);
+ polygon.add(maxX + " " + minY);
+ polygon.add(minX + " " + minY);
+ data.add(
+ Row.of(
+ "POLYGON ((" + String.join(", ", polygon) + "))",
+ "polygon" + i,
+ timestamp_base + time_interval * 1000 * i));
}
+ return data;
+ }
- static Table createTextTable(StreamExecutionEnvironment env,
StreamTableEnvironment tableEnv, List<Row> data, String[] colNames){
- TypeInformation<?>[] colTypes = {
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO
- };
- RowTypeInfo typeInfo = new RowTypeInfo(colTypes,
Arrays.copyOfRange(colNames, 0, 3));
- DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
- // Generate Time Attribute
- WatermarkStrategy<Row> wmStrategy =
- WatermarkStrategy
- .<Row>forMonotonousTimestamps()
- .withTimestampAssigner((event, timestamp) ->
event.getFieldAs(2));
- return
tableEnv.fromDataStream(ds.assignTimestampsAndWatermarks(wmStrategy),
$(colNames[0]), $(colNames[1]), $(colNames[2]).rowtime(),
$(colNames[3]).proctime());
- }
-
-
+ static Table createTextTable(
+ StreamExecutionEnvironment env,
+ StreamTableEnvironment tableEnv,
+ List<Row> data,
+ String[] colNames) {
+ TypeInformation<?>[] colTypes = {
+ BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO
+ };
+ RowTypeInfo typeInfo = new RowTypeInfo(colTypes,
Arrays.copyOfRange(colNames, 0, 3));
+ DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
+ // Generate Time Attribute
+ WatermarkStrategy<Row> wmStrategy =
+ WatermarkStrategy.<Row>forMonotonousTimestamps()
+ .withTimestampAssigner((event, timestamp) -> event.getFieldAs(2));
+ return tableEnv.fromDataStream(
+ ds.assignTimestampsAndWatermarks(wmStrategy),
+ $(colNames[0]),
+ $(colNames[1]),
+ $(colNames[2]).rowtime(),
+ $(colNames[3]).proctime());
+ }
}
diff --git a/examples/flink-sql/src/test/java/FlinkFunctionsTest.java
b/examples/flink-sql/src/test/java/FlinkFunctionsTest.java
new file mode 100644
index 0000000000..64c63f8127
--- /dev/null
+++ b/examples/flink-sql/src/test/java/FlinkFunctionsTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.junit.Test;
+
+public class FlinkFunctionsTest {
+ @Test
+ public void testS2SpatialJoinWithSmallDataset() {
+ // Test with small dataset
+ FlinkExample.testS2SpatialJoin(5);
+ }
+
+ @Test
+ public void testS2SpatialJoinWithMediumDataset() {
+ // Test with medium dataset
+ FlinkExample.testS2SpatialJoin(10);
+ }
+
+ @Test
+ public void testS2SpatialJoinWithLargeDataset() {
+ // Test with larger dataset
+ FlinkExample.testS2SpatialJoin(20);
+ }
+}
diff --git a/examples/java-spark-sql/pom.xml b/examples/java-spark-sql/pom.xml
index 640cf400f2..787e8d911b 100644
--- a/examples/java-spark-sql/pom.xml
+++ b/examples/java-spark-sql/pom.xml
@@ -23,7 +23,7 @@
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-java-spark-example</artifactId>
- <version>1.6.1</version>
+ <version>1.8.0</version>
<name>Sedona : Examples : Java Spark SQL</name>
<description>Example project for Apache Sedona with Java and
Spark.</description>
@@ -32,11 +32,31 @@
<spark.scope>provided</spark.scope>
<javax.scope>test</javax.scope>
- <sedona.version>1.6.1</sedona.version>
+ <sedona.version>1.8.0</sedona.version>
<geotools.version>1.8.0-33.1</geotools.version>
- <spark.version>3.5.7</spark.version>
+ <spark.version>4.0.1</spark.version>
+ <spark.compat.version>4.0</spark.compat.version>
+ <scala.compat.version>2.13</scala.compat.version>
<javax.servlet.version>4.0.1</javax.servlet.version>
- <spotless.version>3.0.0</spotless.version>
+
+ <!-- For JDK-17 and above -->
+ <extraJavaArgs>
+ -XX:+IgnoreUnrecognizedVMOptions
+ --add-opens=java.base/java.lang=ALL-UNNAMED
+ --add-opens=java.base/java.lang.invoke=ALL-UNNAMED
+ --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
+ --add-opens=java.base/java.io=ALL-UNNAMED
+ --add-opens=java.base/java.net=ALL-UNNAMED
+ --add-opens=java.base/java.nio=ALL-UNNAMED
+ --add-opens=java.base/java.util=ALL-UNNAMED
+ --add-opens=java.base/java.util.concurrent=ALL-UNNAMED
+ --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
+ --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
+ --add-opens=java.base/sun.nio.cs=ALL-UNNAMED
+ --add-opens=java.base/sun.security.action=ALL-UNNAMED
+ --add-opens=java.base/sun.util.calendar=ALL-UNNAMED
+ -Djdk.reflect.useDirectMethodHandle=false
+ </extraJavaArgs>
</properties>
<dependencies>
@@ -47,17 +67,17 @@
</dependency>
<dependency>
<groupId>org.apache.sedona</groupId>
- <artifactId>sedona-spark-shaded-3.5_2.13</artifactId>
+
<artifactId>sedona-spark-shaded-${spark.compat.version}_${scala.compat.version}</artifactId>
<version>${sedona.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.13</artifactId>
+ <artifactId>spark-core_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.13</artifactId>
+ <artifactId>spark-sql_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
<scope>${spark.scope}</scope>
</dependency>
@@ -68,9 +88,10 @@
<scope>${javax.scope}</scope>
</dependency>
<dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-engine</artifactId>
- <version>5.2.0-M1</version>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.13.1</version>
+ <scope>test</scope>
</dependency>
</dependencies>
@@ -79,21 +100,15 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
- <version>3.2.5</version>
+ <version>2.22.2</version>
<configuration>
- <argLine>
- --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
- --add-opens=java.base/java.nio=ALL-UNNAMED
- --add-opens=java.base/java.lang=ALL-UNNAMED
- --add-opens=java.base/java.lang.invoke=ALL-UNNAMED
- --add-opens=java.base/java.util=ALL-UNNAMED
- </argLine>
+ <argLine>${extraJavaArgs}</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
- <version>2.1</version>
+ <version>3.5.0</version>
<executions>
<execution>
<phase>package</phase>
@@ -136,12 +151,7 @@
</excludes>
</filter>
</filters>
- <argLine>
- --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
- --add-opens=java.base/java.nio=ALL-UNNAMED
- --add-opens=java.base/java.lang=ALL-UNNAMED
- --add-opens=java.base/java.util=ALL-UNNAMED
- </argLine>
+ <argLine>${extraJavaArgs}</argLine>
</configuration>
</execution>
</executions>
@@ -149,36 +159,26 @@
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
- <version>${spotless.version}</version>
+ <version>2.35.0</version>
<configuration>
- <formats>
- <!-- you can define as many formats as you want, each is
independent -->
- <format>
- <!-- define the files to apply to -->
- <includes>
- <include>.gitattributes</include>
- <include>.gitignore</include>
- </includes>
- <!-- define the steps to apply to those files -->
- <trimTrailingWhitespace/>
- <endWithNewline/>
- <indent>
- <tabs>true</tabs>
- <spacesPerTab>4</spacesPerTab>
- </indent>
- </format>
- </formats>
- <!-- define a language-specific format -->
<java>
- <googleJavaFormat>
- <version>1.10</version>
- <style>AOSP</style>
- <reflowLongStrings>true</reflowLongStrings>
- <formatJavadoc>false</formatJavadoc>
- </googleJavaFormat>
+ <googleJavaFormat>
+ <version>1.15.0</version>
+ </googleJavaFormat>
+ <licenseHeader>
+ <file>../../tools/maven/license-header.txt</file>
+ </licenseHeader>
</java>
</configuration>
- </plugin>
+ <executions>
+ <execution>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
diff --git
a/examples/java-spark-sql/src/main/java/spark/GeoParquetAccessor.java
b/examples/java-spark-sql/src/main/java/spark/GeoParquetAccessor.java
index ba8e6a1f65..1745d823ab 100644
--- a/examples/java-spark-sql/src/main/java/spark/GeoParquetAccessor.java
+++ b/examples/java-spark-sql/src/main/java/spark/GeoParquetAccessor.java
@@ -16,9 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package spark;
+import java.util.List;
import org.apache.sedona.core.spatialOperator.RangeQuery;
import org.apache.sedona.core.spatialOperator.SpatialPredicate;
import org.apache.sedona.core.spatialRDD.SpatialRDD;
@@ -31,60 +31,57 @@ import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.GeometryFactory;
import org.locationtech.jts.geom.Polygon;
-import java.util.List;
-
-
public class GeoParquetAccessor {
- private final SparkSession session;
- private String parquetPath;
+ private final SparkSession session;
+ private String parquetPath;
- public GeoParquetAccessor() {
- this.session = new SedonaSparkSession().getSession();
- this.parquetPath = "";
- }
+ public GeoParquetAccessor() {
+ this.session = new SedonaSparkSession().getSession();
+ this.parquetPath = "";
+ }
- //Overload with constructor that has Spark session provided
- //Use to avoid error - can't have two SparkContext objects on one JVM
- public GeoParquetAccessor(SparkSession session, String parquetPath) {
- this.session = session;
- this.parquetPath = parquetPath;
- }
+ // Overload with constructor that has Spark session provided
+ // Use to avoid error - can't have two SparkContext objects on one JVM
+ public GeoParquetAccessor(SparkSession session, String parquetPath) {
+ this.session = session;
+ this.parquetPath = parquetPath;
+ }
- public List<Geometry> selectFeaturesByPolygon(double xmin, double ymax,
- double xmax, double ymin,
- String geometryColumn) {
+ public List<Geometry> selectFeaturesByPolygon(
+ double xmin, double ymax, double xmax, double ymin, String
geometryColumn) {
- //Read the GeoParquet file into a DataFrame
- Dataset<Row> insarDF =
session.read().format("geoparquet").load(parquetPath);
+ // Read the GeoParquet file into a DataFrame
+ Dataset<Row> insarDF =
session.read().format("geoparquet").load(parquetPath);
- //Convert the DataFrame to a SpatialRDD
- //The second argument to toSpatialRdd is the name of the geometry
column.
- SpatialRDD<Geometry> insarRDD = Adapter.toSpatialRdd(insarDF,
geometryColumn);
+ // Convert the DataFrame to a SpatialRDD
+ // The second argument to toSpatialRdd is the name of the geometry column.
+ SpatialRDD<Geometry> insarRDD = Adapter.toSpatialRdd(insarDF,
geometryColumn);
- // Define the polygon for the spatial query
- GeometryFactory geometryFactory = new GeometryFactory();
- Coordinate[] coordinates = new Coordinate[] {
- new Coordinate(xmin, ymin),
- new Coordinate(xmax, ymin),
- new Coordinate(xmax, ymax),
- new Coordinate(xmin, ymax),
- new Coordinate(xmin, ymin) // A closed polygon has the same start
and end coordinate
+ // Define the polygon for the spatial query
+ GeometryFactory geometryFactory = new GeometryFactory();
+ Coordinate[] coordinates =
+ new Coordinate[] {
+ new Coordinate(xmin, ymin),
+ new Coordinate(xmax, ymin),
+ new Coordinate(xmax, ymax),
+ new Coordinate(xmin, ymax),
+ new Coordinate(xmin, ymin) // A closed polygon has the same start
and end coordinate
};
- Polygon queryPolygon = geometryFactory.createPolygon(coordinates);
-
- // Perform the spatial range query
- // This will return all geometries that intersect with the query
polygon.
- // Alternatives are SpatialPredicate.CONTAINS or
SpatialPredicate.WITHIN
- SpatialRDD<Geometry> resultRDD = new SpatialRDD<>();
- try {
- resultRDD.rawSpatialRDD = RangeQuery.SpatialRangeQuery(insarRDD,
queryPolygon, SpatialPredicate.INTERSECTS, false);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ Polygon queryPolygon = geometryFactory.createPolygon(coordinates);
- // Collect the results back to the driver
- return resultRDD.getRawSpatialRDD().collect();
+ // Perform the spatial range query
+ // This will return all geometries that intersect with the query polygon.
+ // Alternatives are SpatialPredicate.CONTAINS or SpatialPredicate.WITHIN
+ SpatialRDD<Geometry> resultRDD = new SpatialRDD<>();
+ try {
+ resultRDD.rawSpatialRDD =
+ RangeQuery.SpatialRangeQuery(insarRDD, queryPolygon,
SpatialPredicate.INTERSECTS, false);
+ } catch (Exception e) {
+ e.printStackTrace();
}
+ // Collect the results back to the driver
+ return resultRDD.getRawSpatialRDD().collect();
+ }
}
diff --git
a/examples/java-spark-sql/src/main/java/spark/SedonaGeoParquetMain.java
b/examples/java-spark-sql/src/main/java/spark/SedonaGeoParquetMain.java
index 4a11437283..5dfe45a46e 100644
--- a/examples/java-spark-sql/src/main/java/spark/SedonaGeoParquetMain.java
+++ b/examples/java-spark-sql/src/main/java/spark/SedonaGeoParquetMain.java
@@ -16,46 +16,45 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package spark;
-import org.locationtech.jts.geom.Coordinate;
-import org.locationtech.jts.geom.Geometry;
-
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Properties;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.Geometry;
public class SedonaGeoParquetMain {
- protected static Properties properties;
- protected static String parquetPath;
- protected static SedonaSparkSession session;
+ protected static Properties properties;
+ protected static String parquetPath;
+ protected static SedonaSparkSession session;
- public static void main(String args[]) {
+ public static void main(String args[]) {
- session = new SedonaSparkSession();
- //Get parquetPath and any other application.properties
- try {
- ClassLoader loader =
Thread.currentThread().getContextClassLoader();
- Properties properties = new Properties();
- InputStream is =
loader.getResourceAsStream("application.properties");
- properties.load(is);
- parquetPath = properties.getProperty("parquet.path");
- } catch (IOException e) {
- e.printStackTrace();
- parquetPath = "";
- }
- GeoParquetAccessor accessor = new GeoParquetAccessor(session.session,
parquetPath);
- //Test parquet happens to be in New Zealand Transverse Mercator
(EPSG:2193) (meters)
- List<Geometry> geoms = accessor.selectFeaturesByPolygon(1155850,
4819840, 1252000, 4748100, "geometry");
- System.out.println("Coordinates of convex hull of points in
boundary:");
- for (Geometry geom : geoms) {
- Coordinate[] convexHullCoordinates =
geom.convexHull().getCoordinates();
- for (Coordinate coord : convexHullCoordinates) {
- System.out.println(String.format("\t%s", coord.toString()));
- }
- }
+ session = new SedonaSparkSession();
+ // Get parquetPath and any other application.properties
+ try {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ Properties properties = new Properties();
+ InputStream is = loader.getResourceAsStream("application.properties");
+ properties.load(is);
+ parquetPath = properties.getProperty("parquet.path");
+ } catch (IOException e) {
+ e.printStackTrace();
+ parquetPath = "";
+ }
+ GeoParquetAccessor accessor = new GeoParquetAccessor(session.session,
parquetPath);
+ // Test parquet happens to be in New Zealand Transverse Mercator
(EPSG:2193) (meters)
+ List<Geometry> geoms =
+ accessor.selectFeaturesByPolygon(1155850, 4819840, 1252000, 4748100,
"geometry");
+ System.out.println("Coordinates of convex hull of points in boundary:");
+ for (Geometry geom : geoms) {
+ Coordinate[] convexHullCoordinates = geom.convexHull().getCoordinates();
+ for (Coordinate coord : convexHullCoordinates) {
+ System.out.println(String.format("\t%s", coord.toString()));
+ }
}
+ }
}
diff --git
a/examples/java-spark-sql/src/main/java/spark/SedonaSparkSession.java
b/examples/java-spark-sql/src/main/java/spark/SedonaSparkSession.java
index 6be6c99585..aaf1c938fe 100644
--- a/examples/java-spark-sql/src/main/java/spark/SedonaSparkSession.java
+++ b/examples/java-spark-sql/src/main/java/spark/SedonaSparkSession.java
@@ -16,36 +16,35 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package spark;
import org.apache.sedona.spark.SedonaContext;
import org.apache.spark.sql.SparkSession;
-
public class SedonaSparkSession {
- public SparkSession session;
-
- public SedonaSparkSession() {
-
- //Set configuration for localhost spark cluster. Intended to be run
from IDE or similar.
- //Use SedonaContext builder to create SparkSession with Sedona
extensions
- SparkSession config = SedonaContext.builder()
- .appName(this.getClass().getSimpleName())
- .master("local[*]")
- .config("spark.ui.enabled", "false")
- .config("spark.driver.extraJavaOptions",
-
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED")
- .getOrCreate();
-
- //Create Sedona-enabled SparkSession
- this.session = SedonaContext.create(config);
- }
-
- public SparkSession getSession() {
- // Access SparkSession object
- return this.session;
- }
-
+ public SparkSession session;
+
+ public SedonaSparkSession() {
+
+ // Set configuration for localhost spark cluster. Intended to be run from
IDE or similar.
+ // Use SedonaContext builder to create SparkSession with Sedona extensions
+ SparkSession config =
+ SedonaContext.builder()
+ .appName(this.getClass().getSimpleName())
+ .master("local[*]")
+ .config("spark.ui.enabled", "false")
+ .config(
+ "spark.driver.extraJavaOptions",
+ "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED")
+ .getOrCreate();
+
+ // Create Sedona-enabled SparkSession
+ this.session = SedonaContext.create(config);
+ }
+
+ public SparkSession getSession() {
+ // Access SparkSession object
+ return this.session;
+ }
}
diff --git a/examples/java-spark-sql/src/test/java/spark/SedonaParquetTest.java
b/examples/java-spark-sql/src/test/java/spark/SedonaParquetTest.java
index 036cdda956..f965e5b510 100644
--- a/examples/java-spark-sql/src/test/java/spark/SedonaParquetTest.java
+++ b/examples/java-spark-sql/src/test/java/spark/SedonaParquetTest.java
@@ -16,87 +16,71 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package spark;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
-
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
public class SedonaParquetTest {
-
- protected static Properties properties;
- protected static String parquetPath;
- protected static SedonaSparkSession session;
-
- public SedonaParquetTest() {
- }
-
- @BeforeAll
- public static void setUpClass() throws IOException {
-
- session = new SedonaSparkSession();
- //Get parquetPath and any other application.properties
- try {
- ClassLoader loader =
Thread.currentThread().getContextClassLoader();
- Properties properties = new Properties();
- InputStream is =
loader.getResourceAsStream("application.properties");
- properties.load(is);
- parquetPath = properties.getProperty("parquet.path");
- } catch (IOException e) {
- e.printStackTrace();
- parquetPath = "";
- }
-
+ protected static Properties properties;
+ protected static String parquetPath;
+ protected static SedonaSparkSession session;
+
+ public SedonaParquetTest() {}
+
+ @BeforeClass
+ public static void setUpClass() throws IOException {
+
+ session = new SedonaSparkSession();
+ // Get parquetPath and any other application.properties
+ try {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ Properties properties = new Properties();
+ InputStream is = loader.getResourceAsStream("application.properties");
+ properties.load(is);
+ parquetPath = properties.getProperty("parquet.path");
+ } catch (IOException e) {
+ e.printStackTrace();
+ parquetPath = "";
}
-
- @AfterAll
- public static void tearDownClass() {
- }
-
- @BeforeEach
- public void setUp() {
- }
-
- @AfterEach
- public void tearDown() {
- }
-
- @Test
- public void connects() {
- assertNotNull(session, "SparkSedonaSession not initialized
correctly.");
- assertNotNull(session.session, "Spark session not initialized inside
SparkSedonaSession.");
- }
-
- @Test
- public void parquetAccessible() {
- File file = new File(parquetPath);
- assertTrue(file.exists(), "Parquet file does not exist.");
- assertTrue(file.canRead(), "Can't read geoparquet file on record.");
- }
-
- @Test
- public void canLoadRDD() {
- assertNotNull(session, "Session is null.");
- Dataset<Row> insarDF = session.session.read()
- .format("geoparquet")
- .load(parquetPath);
- assertNotNull(insarDF, "Dataset was not created.");
- assertTrue(insarDF.count() > 0, "Dataset is empty.");
- }
-
+ }
+
+ @AfterClass
+ public static void tearDownClass() {}
+
+ @Before
+ public void setUp() {}
+
+ @Test
+ public void connects() {
+ assertNotNull("SparkSedonaSession not initialized correctly.", session);
+ assertNotNull("Spark session not initialized inside SparkSedonaSession.",
session.session);
+ }
+
+ @Test
+ public void parquetAccessible() {
+ File file = new File(parquetPath);
+ assertTrue("Parquet file does not exist.", file.exists());
+ assertTrue("Can't read geoparquet file on record.", file.canRead());
+ }
+
+ @Test
+ public void canLoadRDD() {
+ assertNotNull("Session is null.", session);
+ Dataset<Row> insarDF =
session.session.read().format("geoparquet").load(parquetPath);
+ assertNotNull("Dataset was not created.", insarDF);
+ assertTrue("Dataset is empty.", insarDF.count() > 0);
+ }
}
diff --git a/examples/spark-sql/pom.xml b/examples/spark-sql/pom.xml
index f2c647d653..28066a6460 100644
--- a/examples/spark-sql/pom.xml
+++ b/examples/spark-sql/pom.xml
@@ -30,7 +30,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <dependency.scope>compile</dependency.scope>
+ <spark.scope>provided</spark.scope>
<sedona.scope>compile</sedona.scope>
<geotools.scope>compile</geotools.scope>
@@ -66,13 +66,13 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
- <scope>${dependency.scope}</scope>
+ <scope>${spark.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
<version>${spark.version}</version>
- <scope>${dependency.scope}</scope>
+ <scope>${spark.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.sedona</groupId>
@@ -185,7 +185,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
- <scope>${dependency.scope}</scope>
+ <scope>${spark.scope}</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
@@ -193,6 +193,12 @@
<version>4.13.1</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.compat.version}</artifactId>
+ <version>3.2.15</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<repositories>
<repository>
@@ -355,6 +361,48 @@
<argLine>${extraJavaArgs}</argLine>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <version>2.2.0</version>
+ <configuration>
+
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+ <junitxml>.</junitxml>
+ <filereports>TestSuite.txt</filereports>
+ <argLine>--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.a [...]
+ </configuration>
+ <executions>
+ <execution>
+ <id>test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>com.diffplug.spotless</groupId>
+ <artifactId>spotless-maven-plugin</artifactId>
+ <version>2.35.0</version>
+ <configuration>
+ <java>
+ <googleJavaFormat>
+ <version>1.15.0</version>
+ </googleJavaFormat>
+ <licenseHeader>
+ <file>../../tools/maven/license-header.txt</file>
+ </licenseHeader>
+ </java>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
<resources>
<resource>
diff --git a/examples/spark-sql/src/main/scala/Main.scala
b/examples/spark-sql/src/main/scala/Main.scala
index cd3e4c67a8..45efd85165 100644
--- a/examples/spark-sql/src/main/scala/Main.scala
+++ b/examples/spark-sql/src/main/scala/Main.scala
@@ -26,6 +26,15 @@ import
org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
import org.apache.sedona.viz.sql.utils.SedonaVizRegistrator
+/**
+ * Main entry point for running Sedona SQL, RDD, and Visualization examples.
+ * Demonstrates various spatial operations including:
+ * - SQL-based spatial queries and joins
+ * - GeoParquet I/O operations
+ * - Shapefile and raster data handling
+ * - RDD-based spatial analysis
+ * - Spatial visualization techniques
+ */
object Main extends App {
Logger.getRootLogger().setLevel(Level.WARN)
@@ -39,20 +48,31 @@ object Main extends App {
val resourceFolder =
System.getProperty("user.dir")+"/src/test/resources/"
+ // SQL-based spatial operations
+ println("=== Running SQL Examples ===")
testPredicatePushdownAndRangeJonQuery(sedona)
testDistanceJoinQuery(sedona)
testAggregateFunction(sedona)
testShapefileConstructor(sedona)
testRasterIOAndMapAlgebra(sedona)
+ // GeoParquet operations
+ println("\n=== Running GeoParquet Examples ===")
+ testGeoParquetWriter(sedona)
+ testGeoParquetReader(sedona)
+
+ // RDD-based spatial analysis
+ println("\n=== Running RDD Examples ===")
visualizeSpatialColocation(sedona)
calculateSpatialColocation(sedona)
+ // Visualization examples
+ println("\n=== Running Visualization Examples ===")
buildScatterPlot(sedona)
buildHeatMap(sedona)
buildChoroplethMap(sedona)
parallelFilterRenderNoStitch(sedona)
sqlApiVisualization(sedona)
- System.out.println("All SedonaSQL DEMOs passed!")
+ println("\n✅ All Sedona examples completed successfully!")
}
diff --git a/examples/spark-sql/src/main/scala/RddExample.scala
b/examples/spark-sql/src/main/scala/RddExample.scala
index 7dc54860cf..6f66305150 100644
--- a/examples/spark-sql/src/main/scala/RddExample.scala
+++ b/examples/spark-sql/src/main/scala/RddExample.scala
@@ -17,7 +17,6 @@
* under the License.
*/
-import Main.resourceFolder
import org.apache.sedona.core.enums.{GridType, IndexType}
import org.apache.sedona.core.formatMapper.shapefileParser.ShapefileReader
import org.apache.sedona.core.spatialOperator.JoinQuery
@@ -34,6 +33,8 @@ import java.awt.Color
object RddExample {
+ val resourceFolder = System.getProperty("user.dir")+"/src/test/resources/"
+
// Data link (in shapefile): https://geo.nyu.edu/catalog/nyu_2451_34514
val nycArealandmarkShapefileLocation =
resourceFolder+"nyc-area-landmark-shapefile"
@@ -42,16 +43,25 @@ object RddExample {
val colocationMapLocation = System.getProperty("user.dir")+"/colocationMap"
+ /**
+ * Visualizes spatial co-location between NYC landmarks and taxi pickup
points.
+ * Creates an overlay visualization with landmarks (scatter plot) and taxi
trips (heat map).
+ *
+ * Note: This function uses RDD API to demonstrate low-level spatial
operations.
+ * For DataFrame-based approach, see SqlExample.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
def visualizeSpatialColocation(sedona: SparkSession): Unit =
{
// Prepare NYC area landmarks which includes airports, museums, colleges,
hospitals
- var arealmRDD = ShapefileReader.readToPolygonRDD(sedona.sparkContext,
nycArealandmarkShapefileLocation)
+ val arealmRDD = ShapefileReader.readToPolygonRDD(sedona.sparkContext,
nycArealandmarkShapefileLocation)
// Prepare NYC taxi trips. Only use the taxi trips' pickup points
- var tripDf =
sedona.read.format("csv").option("delimiter",",").option("header","false").load(nyctripCSVLocation)
+ val tripDf =
sedona.read.format("csv").option("delimiter",",").option("header","false").load(nyctripCSVLocation)
// Convert from DataFrame to RDD. This can also be done directly through
Sedona RDD API.
tripDf.createOrReplaceTempView("tripdf")
- var tripRDD = Adapter.toSpatialRdd(sedona.sql("select
ST_Point(cast(tripdf._c0 as Decimal(24, 14)), cast(tripdf._c1 as Decimal(24,
14))) as point from tripdf")
+ val tripRDD = Adapter.toSpatialRdd(sedona.sql("select
ST_Point(cast(tripdf._c0 as Decimal(24, 14)), cast(tripdf._c1 as Decimal(24,
14))) as point from tripdf")
, "point")
// Convert the Coordinate Reference System from degree-based to
meter-based. This returns the accurate distance calculate.
@@ -79,6 +89,16 @@ object RddExample {
imageGenerator.SaveRasterImageAsLocalFile(overlayOperator.backRasterImage,
colocationMapLocation, ImageType.PNG)
}
+ /**
+ * Calculates spatial co-location using Ripley's K function.
+ * Analyzes whether taxi trips are clustered around NYC landmarks at various
distance thresholds.
+ * Uses distance join queries to compute co-location statistics.
+ *
+ * The Ripley's K function tests for spatial clustering/dispersion by
comparing
+ * observed vs expected point patterns at increasing distance bands.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
def calculateSpatialColocation(sedona: SparkSession): Unit =
{
@@ -88,22 +108,22 @@ object RddExample {
// Use the center point of area landmarks to check co-location. This is
required by Ripley's K function.
arealmRDD.rawSpatialRDD = arealmRDD.rawSpatialRDD.rdd.map[Geometry](f=>
{
- var geom = f.getCentroid
+ val geom = f.getCentroid
// Copy non-spatial attributes
geom.setUserData(f.getUserData)
geom
})
// The following two lines are optional. The purpose is to show the
structure of the shapefile.
- var arealmDf = Adapter.toDf(arealmRDD, sedona)
+ val arealmDf = Adapter.toDf(arealmRDD, sedona)
arealmDf.show()
// Prepare NYC taxi trips. Only use the taxi trips' pickup points
- var tripDf =
sedona.read.format("csv").option("delimiter",",").option("header","false").load(nyctripCSVLocation)
+ val tripDf =
sedona.read.format("csv").option("delimiter",",").option("header","false").load(nyctripCSVLocation)
tripDf.show() // Optional
// Convert from DataFrame to RDD. This can also be done directly through
Sedona RDD API.
tripDf.createOrReplaceTempView("tripdf")
- var tripRDD = Adapter.toSpatialRdd(sedona.sql("select
ST_Point(cast(tripdf._c0 as Decimal(24, 14)), cast(tripdf._c1 as Decimal(24,
14))) as point, 'def' as trip_attr from tripdf")
+ val tripRDD = Adapter.toSpatialRdd(sedona.sql("select
ST_Point(cast(tripdf._c0 as Decimal(24, 14)), cast(tripdf._c1 as Decimal(24,
14))) as point, 'def' as trip_attr from tripdf")
, "point")
// Convert the Coordinate Reference System from degree-based to
meter-based. This returns the accurate distance calculate.
@@ -127,27 +147,27 @@ object RddExample {
val beginDistance = 0.0
var currentDistance = 0.0
- // Start the iteration
+ // Start the iteration - test multiple distance bands
println("distance(meter),observedL,difference,coLocationStatus")
for (i <- 1 to iterationTimes)
{
currentDistance = beginDistance + i*distanceIncrement
- var bufferedArealmRDD = new CircleRDD(arealmRDD,currentDistance)
+ val bufferedArealmRDD = new CircleRDD(arealmRDD,currentDistance)
bufferedArealmRDD.spatialPartitioning(tripRDD.getPartitioner)
// Run Sedona Distance Join Query
- var adjacentMatrix = JoinQuery.DistanceJoinQueryFlat(tripRDD,
bufferedArealmRDD,true,true)
+ val adjacentMatrix = JoinQuery.DistanceJoinQueryFlat(tripRDD,
bufferedArealmRDD,true,true)
// Uncomment the following two lines if you want to see what the
join result looks like in SparkSQL
// import scala.collection.JavaConversions._
- // var adjacentMatrixDf = Adapter.toDf(adjacentMatrix,
arealmRDD.fieldNames, tripRDD.fieldNames, sparkSession)
+ // val adjacentMatrixDf = Adapter.toDf(adjacentMatrix,
arealmRDD.fieldNames, tripRDD.fieldNames, sparkSession)
// adjacentMatrixDf.show()
- var observedK =
adjacentMatrix.count()*area*1.0/(arealmRDD.approximateTotalCount*tripRDD.approximateTotalCount)
- var observedL = Math.sqrt(observedK/Math.PI)
- var expectedL = currentDistance
- var colocationDifference = observedL - expectedL
- var colocationStatus = {if (colocationDifference>0) "Co-located" else
"Dispersed"}
+ val observedK =
adjacentMatrix.count()*area*1.0/(arealmRDD.approximateTotalCount*tripRDD.approximateTotalCount)
+ val observedL = Math.sqrt(observedK/Math.PI)
+ val expectedL = currentDistance
+ val colocationDifference = observedL - expectedL
+ val colocationStatus = {if (colocationDifference>0) "Co-located" else
"Dispersed"}
println(s"""$currentDistance,$observedL,$colocationDifference,$colocationStatus""")
}
diff --git a/examples/spark-sql/src/main/scala/SqlExample.scala
b/examples/spark-sql/src/main/scala/SqlExample.scala
index 367f06160f..ed34c08d2f 100644
--- a/examples/spark-sql/src/main/scala/SqlExample.scala
+++ b/examples/spark-sql/src/main/scala/SqlExample.scala
@@ -17,42 +17,46 @@
* under the License.
*/
-import Main.resourceFolder
-import org.apache.sedona.core.formatMapper.shapefileParser.ShapefileReader
-import org.apache.sedona.core.spatialRDD.SpatialRDD
import org.apache.sedona.core.utils.SedonaConf
-import org.apache.sedona.sql.utils.Adapter
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{SaveMode, SparkSession}
import org.locationtech.jts.geom.{Coordinate, Geometry, GeometryFactory}
object SqlExample {
+ val resourceFolder = System.getProperty("user.dir")+"/src/test/resources/"
+
val csvPolygonInputLocation = resourceFolder + "testenvelope.csv"
val csvPointInputLocation = resourceFolder + "testpoint.csv"
val shapefileInputLocation = resourceFolder + "shapefiles/dbf"
val rasterdatalocation = resourceFolder + "raster/"
+ /**
+ * Demonstrates predicate pushdown optimization and range join queries with
spatial indexing.
+ * Tests ST_Contains predicate with polygon and point data, including
spatial filter pushdown.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
def testPredicatePushdownAndRangeJonQuery(sedona: SparkSession):Unit =
{
val sedonaConf = new SedonaConf(sedona.conf)
println(sedonaConf)
- var polygonCsvDf =
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPolygonInputLocation)
+ val polygonCsvDf =
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPolygonInputLocation)
polygonCsvDf.createOrReplaceTempView("polygontable")
polygonCsvDf.show()
- var polygonDf = sedona.sql("select
ST_PolygonFromEnvelope(cast(polygontable._c0 as
Decimal(24,20)),cast(polygontable._c1 as Decimal(24,20)), cast(polygontable._c2
as Decimal(24,20)), cast(polygontable._c3 as Decimal(24,20))) as polygonshape
from polygontable")
+ val polygonDf = sedona.sql("select
ST_PolygonFromEnvelope(cast(polygontable._c0 as
Decimal(24,20)),cast(polygontable._c1 as Decimal(24,20)), cast(polygontable._c2
as Decimal(24,20)), cast(polygontable._c3 as Decimal(24,20))) as polygonshape
from polygontable")
polygonDf.createOrReplaceTempView("polygondf")
polygonDf.show()
- var pointCsvDF =
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
+ val pointCsvDF =
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
pointCsvDF.createOrReplaceTempView("pointtable")
pointCsvDF.show()
- var pointDf = sedona.sql("select ST_Point(cast(pointtable._c0 as
Decimal(24,20)),cast(pointtable._c1 as Decimal(24,20))) as pointshape from
pointtable")
+ val pointDf = sedona.sql("select ST_Point(cast(pointtable._c0 as
Decimal(24,20)),cast(pointtable._c1 as Decimal(24,20))) as pointshape from
pointtable")
pointDf.createOrReplaceTempView("pointdf")
pointDf.show()
- var rangeJoinDf = sedona.sql("select * from polygondf, pointdf where
ST_Contains(polygondf.polygonshape,pointdf.pointshape) " +
+ val rangeJoinDf = sedona.sql("select * from polygondf, pointdf where
ST_Contains(polygondf.polygonshape,pointdf.pointshape) " +
"and ST_Contains(ST_PolygonFromEnvelope(1.0,101.0,501.0,601.0),
polygondf.polygonshape)")
// Write result to GeoParquet file
@@ -62,41 +66,53 @@ object SqlExample {
assert (rangeJoinDf.count()==500)
}
+ /**
+ * Demonstrates distance join query that finds all point pairs within a
specified distance.
+ * Uses ST_Distance predicate with distance-based join optimization.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
def testDistanceJoinQuery(sedona: SparkSession): Unit =
{
val sedonaConf = new SedonaConf(sedona.conf)
println(sedonaConf)
- var pointCsvDF1 =
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
+ val pointCsvDF1 =
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
pointCsvDF1.createOrReplaceTempView("pointtable")
pointCsvDF1.show()
- var pointDf1 = sedona.sql("select ST_Point(cast(pointtable._c0 as
Decimal(24,20)),cast(pointtable._c1 as Decimal(24,20))) as pointshape1 from
pointtable")
+ val pointDf1 = sedona.sql("select ST_Point(cast(pointtable._c0 as
Decimal(24,20)),cast(pointtable._c1 as Decimal(24,20))) as pointshape1 from
pointtable")
pointDf1.createOrReplaceTempView("pointdf1")
pointDf1.show()
- var pointCsvDF2 =
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
+ val pointCsvDF2 =
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
pointCsvDF2.createOrReplaceTempView("pointtable")
pointCsvDF2.show()
- var pointDf2 = sedona.sql("select ST_Point(cast(pointtable._c0 as
Decimal(24,20)),cast(pointtable._c1 as Decimal(24,20))) as pointshape2 from
pointtable")
+ val pointDf2 = sedona.sql("select ST_Point(cast(pointtable._c0 as
Decimal(24,20)),cast(pointtable._c1 as Decimal(24,20))) as pointshape2 from
pointtable")
pointDf2.createOrReplaceTempView("pointdf2")
pointDf2.show()
- var distanceJoinDf = sedona.sql("select * from pointdf1, pointdf2 where
ST_Distance(pointdf1.pointshape1,pointdf2.pointshape2) < 2")
+ val distanceJoinDf = sedona.sql("select * from pointdf1, pointdf2 where
ST_Distance(pointdf1.pointshape1,pointdf2.pointshape2) < 2")
distanceJoinDf.explain()
distanceJoinDf.show(10)
assert (distanceJoinDf.count()==2998)
}
+ /**
+ * Demonstrates spatial aggregate function ST_Envelope_Aggr to compute
bounding box of point set.
+ * Validates the computed envelope matches the expected boundary.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
def testAggregateFunction(sedona: SparkSession): Unit =
{
val sedonaConf = new SedonaConf(sedona.conf)
println(sedonaConf)
- var pointCsvDF =
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
+ val pointCsvDF =
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
pointCsvDF.createOrReplaceTempView("pointtable")
- var pointDf = sedona.sql("select ST_Point(cast(pointtable._c0 as
Decimal(24,20)), cast(pointtable._c1 as Decimal(24,20))) as arealandmark from
pointtable")
+ val pointDf = sedona.sql("select ST_Point(cast(pointtable._c0 as
Decimal(24,20)), cast(pointtable._c1 as Decimal(24,20))) as arealandmark from
pointtable")
pointDf.createOrReplaceTempView("pointdf")
- var boundary = sedona.sql("select ST_Envelope_Aggr(pointdf.arealandmark)
from pointdf")
+ val boundary = sedona.sql("select ST_Envelope_Aggr(pointdf.arealandmark)
from pointdf")
val coordinates:Array[Coordinate] = new Array[Coordinate](5)
coordinates(0) = new Coordinate(1.1,101.1)
coordinates(1) = new Coordinate(1.1,1100.1)
@@ -108,25 +124,92 @@ object SqlExample {
assert(boundary.take(1)(0).get(0)==geometryFactory.createPolygon(coordinates))
}
+ /**
+ * Demonstrates reading shapefiles using the modern DataFrame-based reader.
+ * Shows how to load shapefile data and query geometry and attribute fields.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
def testShapefileConstructor(sedona: SparkSession): Unit =
{
- var spatialRDD = new SpatialRDD[Geometry]
- spatialRDD = ShapefileReader.readToGeometryRDD(sedona.sparkContext,
shapefileInputLocation)
- var rawSpatialDf = Adapter.toDf(spatialRDD,sedona)
- rawSpatialDf.createOrReplaceTempView("rawSpatialDf")
- var spatialDf = sedona.sql("""
+ // Read shapefile using the DataFrame-based reader
+ val spatialDf =
sedona.read.format("shapefile").load(shapefileInputLocation)
+ spatialDf.createOrReplaceTempView("rawSpatialDf")
+
+ // Select specific columns
+ val resultDf = sedona.sql("""
| SELECT geometry, STATEFP, COUNTYFP
| FROM rawSpatialDf
""".stripMargin)
- spatialDf.show()
- spatialDf.printSchema()
+ resultDf.show()
+ resultDf.printSchema()
}
+ /**
+ * Demonstrates raster data I/O and map algebra operations.
+ * Loads GeoTIFF raster data and performs various raster operations.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
def testRasterIOAndMapAlgebra(sedona: SparkSession): Unit = {
- var df = sedona.read.format("binaryFile").option("dropInvalid",
true).load(rasterdatalocation).selectExpr("RS_FromGeoTiff(content) as raster",
"path")
+ val df = sedona.read.format("binaryFile").option("dropInvalid",
true).load(rasterdatalocation).selectExpr("RS_FromGeoTiff(content) as raster",
"path")
df.printSchema()
df.show()
df.selectExpr("RS_Metadata(raster) as metadata", "RS_GeoReference(raster)
as georef", "RS_NumBands(raster) as numBands").show(false)
df.selectExpr("RS_AddBand(raster, raster, 1) as raster_extraband").show()
}
+
+ /**
+ * Demonstrates writing spatial DataFrame to GeoParquet format.
+ * GeoParquet is a cloud-native geospatial data format based on Apache
Parquet.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
+ def testGeoParquetWriter(sedona: SparkSession): Unit = {
+ // Create a sample DataFrame with geometries
+ val pointCsvDF =
sedona.read.format("csv").option("delimiter",",").option("header","false").load(csvPointInputLocation)
+ pointCsvDF.createOrReplaceTempView("pointtable")
+ val pointDf = sedona.sql("select ST_Point(cast(pointtable._c0 as
Decimal(24,20)),cast(pointtable._c1 as Decimal(24,20))) as geometry from
pointtable")
+
+ // Write to GeoParquet format
+ val geoParquetOutputPath = "target/test-classes/output/points.geoparquet"
+ pointDf.write
+ .format("geoparquet")
+ .mode(SaveMode.Overwrite)
+ .save(geoParquetOutputPath)
+
+ println(s"GeoParquet file written to: $geoParquetOutputPath")
+ pointDf.show(5)
+ }
+
+ /**
+ * Demonstrates reading GeoParquet files and performing spatial operations.
+ * Shows how to load GeoParquet data and apply spatial transformations.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
+ def testGeoParquetReader(sedona: SparkSession): Unit = {
+ // First, ensure we have a GeoParquet file by writing one
+ testGeoParquetWriter(sedona)
+
+ // Read GeoParquet file
+ val geoParquetInputPath = "target/test-classes/output/points.geoparquet"
+ val geoParquetDf = sedona.read
+ .format("geoparquet")
+ .load(geoParquetInputPath)
+
+ println(s"GeoParquet file read from: $geoParquetInputPath")
+ geoParquetDf.printSchema()
+ geoParquetDf.show(5)
+
+ // Perform spatial operations on the loaded data
+ geoParquetDf.createOrReplaceTempView("geoparquet_points")
+ val bufferedDf = sedona.sql("""
+ | SELECT ST_Buffer(geometry, 0.1) as
buffered_geometry
+ | FROM geoparquet_points
+ """.stripMargin)
+
+ println("Applied spatial operations on GeoParquet data:")
+ bufferedDf.show(5)
+ }
}
diff --git a/examples/spark-sql/src/main/scala/VizExample.scala
b/examples/spark-sql/src/main/scala/VizExample.scala
index b7b333a156..1108f4a09d 100644
--- a/examples/spark-sql/src/main/scala/VizExample.scala
+++ b/examples/spark-sql/src/main/scala/VizExample.scala
@@ -17,7 +17,6 @@
* under the License.
*/
-import Main.resourceFolder
import org.apache.sedona.common.enums.FileDataSplitter
import org.apache.sedona.core.enums.{GridType, IndexType}
import org.apache.sedona.core.spatialOperator.JoinQuery
@@ -33,6 +32,8 @@ import java.awt.Color
object VizExample {
+ val resourceFolder = System.getProperty("user.dir")+"/src/test/resources/"
+
val demoOutputPath = "target/demo"
val scatterPlotOutputPath = System.getProperty("user.dir") + "/" +
demoOutputPath + "/scatterplot"
@@ -55,27 +56,42 @@ object VizExample {
val PolygonNumPartitions = 5
val USMainLandBoundary = new Envelope(-126.790180, -64.630926, 24.863836,
50.000)
- def buildScatterPlot(sedona: SparkSession): Boolean = {
+ /**
+ * Creates a scatter plot visualization of polygon data.
+ * Generates a PNG image showing spatial distribution of polygons.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
+ def buildScatterPlot(sedona: SparkSession): Unit = {
val spatialRDD = new PolygonRDD(sedona.sparkContext, PolygonInputLocation,
PolygonSplitter, false, PolygonNumPartitions)
- var visualizationOperator = new ScatterPlot(1000, 600, USMainLandBoundary,
false)
+ val visualizationOperator = new ScatterPlot(1000, 600, USMainLandBoundary,
false)
visualizationOperator.CustomizeColor(255, 255, 255, 255, Color.GREEN, true)
visualizationOperator.Visualize(sedona.sparkContext, spatialRDD)
- var imageGenerator = new ImageGenerator
+ val imageGenerator = new ImageGenerator
imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage,
scatterPlotOutputPath, ImageType.PNG)
- true
}
- def buildHeatMap(sedona: SparkSession): Boolean = {
+ /**
+ * Creates a heat map visualization showing density of rectangle geometries.
+ * Generates a PNG image with heat intensity based on spatial clustering.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
+ def buildHeatMap(sedona: SparkSession): Unit = {
val spatialRDD = new RectangleRDD(sedona.sparkContext,
RectangleInputLocation, RectangleSplitter, false, RectangleNumPartitions)
val visualizationOperator = new HeatMap(1000, 600, USMainLandBoundary,
false, 2)
visualizationOperator.Visualize(sedona.sparkContext, spatialRDD)
val imageGenerator = new ImageGenerator
imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.rasterImage,
heatMapOutputPath, ImageType.PNG)
- true
}
-
- def buildChoroplethMap(sedona: SparkSession): Boolean = {
+ /**
+ * Creates a choropleth map by performing spatial join and visualizing join
counts.
+ * Combines heat map with polygon overlay to show spatial relationships.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
+ def buildChoroplethMap(sedona: SparkSession): Unit = {
val spatialRDD = new PointRDD(sedona.sparkContext, PointInputLocation,
PointOffset, PointSplitter, false, PointNumPartitions)
val queryRDD = new PolygonRDD(sedona.sparkContext, PolygonInputLocation,
PolygonSplitter, false, PolygonNumPartitions)
spatialRDD.spatialPartitioning(GridType.KDBTREE)
@@ -92,20 +108,30 @@ object VizExample {
overlayOperator.JoinImage(frontImage.rasterImage)
val imageGenerator = new ImageGenerator
imageGenerator.SaveRasterImageAsLocalFile(overlayOperator.backRasterImage,
choroplethMapOutputPath, ImageType.PNG)
- true
}
- def parallelFilterRenderNoStitch(sedona: SparkSession): Boolean = {
+ /**
+ * Demonstrates parallel rendering without image stitching.
+ * Creates tiled heat map images for distributed rendering.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
+ def parallelFilterRenderNoStitch(sedona: SparkSession): Unit = {
val spatialRDD = new RectangleRDD(sedona.sparkContext,
RectangleInputLocation, RectangleSplitter, false, RectangleNumPartitions)
val visualizationOperator = new HeatMap(1000, 600, USMainLandBoundary,
false, 2, 4, 4, true, true)
visualizationOperator.Visualize(sedona.sparkContext, spatialRDD)
val imageGenerator = new ImageGenerator
imageGenerator.SaveRasterImageAsLocalFile(visualizationOperator.distributedRasterImage,
parallelFilterRenderOutputPath, ImageType.PNG)
- true
}
- def sqlApiVisualization(sedona: SparkSession): Boolean = {
- var pointDf = sedona.read.format("csv").option("delimiter",
",").option("header", "false").load(PointInputLocation)
+ /**
+ * Demonstrates visualization using Sedona SQL API with pixelization and
rendering.
+ * Creates heat map using SQL functions for rasterization and colorization.
+ *
+ * @param sedona SparkSession with Sedona extensions enabled
+ */
+ def sqlApiVisualization(sedona: SparkSession): Unit = {
+ val pointDf = sedona.read.format("csv").option("delimiter",
",").option("header", "false").load(PointInputLocation)
pointDf.selectExpr("ST_Point(cast(_c0 as Decimal(24,20)),cast(_c1 as
Decimal(24,20))) as shape")
.filter("ST_Contains(ST_PolygonFromEnvelope(-126.790180,24.863836,-64.630926,50.000),shape)").createOrReplaceTempView("pointtable")
sedona.sql(
@@ -127,8 +153,8 @@ object VizExample {
|SELECT ST_Render(pixel, ST_Colorize(weight,
(SELECT max(weight) FROM pixelaggregates), 'red')) AS image
|FROM pixelaggregates
""".stripMargin)
- var image =
sedona.table("images").take(1)(0)(0).asInstanceOf[ImageSerializableWrapper].getImage
- var imageGenerator = new ImageGenerator
+ val image =
sedona.table("images").take(1)(0)(0).asInstanceOf[ImageSerializableWrapper].getImage
+ val imageGenerator = new ImageGenerator
imageGenerator.SaveRasterImageAsLocalFile(image, sqlApiOutputPath,
ImageType.PNG)
sedona.sql(
"""
@@ -137,7 +163,6 @@ object VizExample {
|FROM images
""".stripMargin)
sedona.table("imagestring").show()
- true
}
}
diff --git a/examples/spark-sql/src/test/scala/testFunctions.scala
b/examples/spark-sql/src/test/scala/testFunctions.scala
new file mode 100644
index 0000000000..ac6d19a3aa
--- /dev/null
+++ b/examples/spark-sql/src/test/scala/testFunctions.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.log4j.{Level, Logger}
+import org.apache.sedona.spark.SedonaContext
+import org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
+import org.apache.sedona.viz.sql.utils.SedonaVizRegistrator
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.funsuite.AnyFunSuite
+import org.apache.spark.sql.SparkSession
+
+class testFunctions extends AnyFunSuite with BeforeAndAfterAll {
+
+ var sedona: SparkSession = _
+
+ override def beforeAll(): Unit = {
+ Logger.getRootLogger().setLevel(Level.WARN)
+
+ // Main object initialization happens on first access
+ // Access resourceFolder to trigger Main's initialization
+ println(s"Resource folder: ${Main.resourceFolder}")
+
+ // Create Spark session with driver JVM options for Java module access
+ val config = SedonaContext.builder().appName("SedonaSQL-test")
+ .master("local[*]")
+ .config("spark.driver.extraJavaOptions",
+ "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED " +
+ "--add-opens=java.base/java.nio=ALL-UNNAMED " +
+ "--add-opens=java.base/java.lang=ALL-UNNAMED " +
+ "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED " +
+ "--add-opens=java.base/java.util=ALL-UNNAMED " +
+ "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED " +
+ "--add-opens=java.base/java.io=ALL-UNNAMED " +
+ "--add-opens=java.base/java.net=ALL-UNNAMED " +
+ "--add-opens=java.base/java.util.concurrent=ALL-UNNAMED " +
+ "--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED " +
+ "--add-opens=java.base/sun.nio.cs=ALL-UNNAMED " +
+ "--add-opens=java.base/sun.security.action=ALL-UNNAMED " +
+ "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED")
+ .config("spark.kryo.registrator",
classOf[SedonaVizKryoRegistrator].getName)
+ .getOrCreate()
+ sedona = SedonaContext.create(config)
+
+ SedonaVizRegistrator.registerAll(sedona)
+ }
+
+ override def afterAll(): Unit = {
+ if (sedona != null) {
+ sedona.stop()
+ }
+ }
+
+ test("SqlExample - testPredicatePushdownAndRangeJonQuery") {
+ SqlExample.testPredicatePushdownAndRangeJonQuery(sedona)
+ }
+
+ test("SqlExample - testDistanceJoinQuery") {
+ SqlExample.testDistanceJoinQuery(sedona)
+ }
+
+ test("SqlExample - testAggregateFunction") {
+ SqlExample.testAggregateFunction(sedona)
+ }
+
+ test("SqlExample - testShapefileConstructor") {
+ SqlExample.testShapefileConstructor(sedona)
+ }
+
+ test("SqlExample - testRasterIOAndMapAlgebra") {
+ SqlExample.testRasterIOAndMapAlgebra(sedona)
+ }
+
+ test("RddExample - visualizeSpatialColocation") {
+ RddExample.visualizeSpatialColocation(sedona)
+ }
+
+ test("RddExample - calculateSpatialColocation") {
+ RddExample.calculateSpatialColocation(sedona)
+ }
+
+ test("VizExample - buildScatterPlot") {
+ VizExample.buildScatterPlot(sedona)
+ succeed // Test passes if function completes without exception
+ }
+
+ test("VizExample - buildHeatMap") {
+ VizExample.buildHeatMap(sedona)
+ succeed // Test passes if function completes without exception
+ }
+
+ test("VizExample - buildChoroplethMap") {
+ VizExample.buildChoroplethMap(sedona)
+ succeed // Test passes if function completes without exception
+ }
+
+ test("VizExample - parallelFilterRenderNoStitch") {
+ VizExample.parallelFilterRenderNoStitch(sedona)
+ succeed // Test passes if function completes without exception
+ }
+
+ test("VizExample - sqlApiVisualization") {
+ VizExample.sqlApiVisualization(sedona)
+ succeed // Test passes if function completes without exception
+ }
+}