This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new afda11172d [SEDONA-725] Add pyflink to Sedona. (#1875)
afda11172d is described below
commit afda11172d0ba48240bc05e932614f65098eb1cb
Author: Paweł Tokaj <[email protected]>
AuthorDate: Fri May 23 23:58:21 2025 +0200
[SEDONA-725] Add pyflink to Sedona. (#1875)
* SEDONA-725 Add pyflink to Sedona.
SEDONA-725 rearrange the spark module
* SEDONA-725 add docs
* SEDONA-725 add docs
* SEDONA-725 add docs
* SEDONA-725 add docs
* SEDONA-725 add docs
* Update docs/setup/flink/install-python.md
Co-authored-by: Jia Yu <[email protected]>
* Update docs/setup/flink/install-python.md
Co-authored-by: Jia Yu <[email protected]>
* SEDONA-725 add docs
* SEDONA-725 add docs
* SEDONA-725 add docs
---------
Co-authored-by: Jia Yu <[email protected]>
---
.github/workflows/lint.yml | 2 +-
.github/workflows/pyflink.yml | 76 +++++++++++++++++++++++++++
docs/setup/flink/install-python.md | 34 ++++++++++++
docs/tutorial/flink/pyflink-sql.md | 76 +++++++++++++++++++++++++++
mkdocs.yml | 2 +
python/sedona/flink/__init__.py | 30 +++++++++++
python/sedona/flink/context.py | 52 ++++++++++++++++++
python/setup.py | 1 +
python/tests/flink/conftest.py | 60 +++++++++++++++++++++
python/tests/flink/test_flink_registration.py | 68 ++++++++++++++++++++++++
10 files changed, 400 insertions(+), 1 deletion(-)
diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml
index 69066517e4..3dc8683951 100644
--- a/.github/workflows/lint.yml
+++ b/.github/workflows/lint.yml
@@ -35,7 +35,7 @@ jobs:
uses: actions/checkout@v4
- uses: actions/setup-python@v5 # https://www.python.org/
with:
- python-version: '3.x' # Version range or exact version of a Python
version to use, using SemVer's version range syntax
+ python-version: '3.10' # Version range or exact version of a Python
version to use, using SemVer's version range syntax
architecture: 'x64' # optional x64 or x86. Defaults to x64 if not
specified
- name: Install dependencies # https://pip.pypa.io/en/stable/
run: |
diff --git a/.github/workflows/pyflink.yml b/.github/workflows/pyflink.yml
new file mode 100644
index 0000000000..d93e431544
--- /dev/null
+++ b/.github/workflows/pyflink.yml
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: Sedona Pyflink Test
+
+on:
+ push:
+ branches:
+ - master
+ paths:
+ - 'common/**'
+ - 'flink/**'
+ - 'flink-shaded/**'
+ - 'pom.xml'
+ - 'python/**'
+ - '.github/workflows/pyflink.yml'
+ pull_request:
+ branches:
+ - '*'
+ paths:
+ - 'common/**'
+ - 'flink/**'
+ - 'flink-shaded/**'
+ - 'pom.xml'
+ - 'python/**'
+ - '.github/workflows/pyflink.yml'
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}
+ cancel-in-progress: true
+
+jobs:
+ test:
+ runs-on: ubuntu-22.04
+ strategy:
+ matrix:
+ include:
+ - python: '3.10'
+ steps:
+ - uses: actions/checkout@v4
+ - uses: actions/setup-java@v4
+ with:
+ distribution: 'zulu'
+ java-version: '11'
+ - uses: actions/setup-python@v5
+ with:
+ python-version: ${{ matrix.python }}
+ - run: sudo apt-get -y install python3-pip python-dev-is-python3
+ - run: mvn package -pl "org.apache.sedona:sedona-flink-shaded_2.12" -am
-DskipTests
+ - run: sudo pip3 install -U setuptools
+ - run: sudo pip3 install -U wheel
+ - run: sudo pip3 install -U virtualenvwrapper
+ - run: python3 -m pip install uv
+ - run: cd python
+ - run: rm pyproject.toml
+ - run: uv init --no-workspace
+ - run: uv add apache-flink==1.20.1 shapely attr setuptools
+ - run: uv add pytest --dev
+ - run: |
+ wget
https://repo1.maven.org/maven2/org/datasyslab/geotools-wrapper/1.7.1-28.5/geotools-wrapper-1.7.1-28.5.jar
+ export SEDONA_PYFLINK_EXTRA_JARS=${PWD}/$(find flink-shaded/target
-name sedona-flink*.jar),${PWD}/geotools-wrapper-1.7.1-28.5.jar
+ (cd python; PYTHONPATH=$(pwd) uv run pytest -v -s ./tests/flink)
diff --git a/docs/setup/flink/install-python.md
b/docs/setup/flink/install-python.md
new file mode 100644
index 0000000000..b792a1cacc
--- /dev/null
+++ b/docs/setup/flink/install-python.md
@@ -0,0 +1,34 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+To install Apache Sedona Python, you need to install the following steps:
+
+Install the required Python packages.
+
+```
+pip install apache-sedona[flink] shapely attr
+```
+
+Download the required JAR files from Maven Central:
+
+* sedona-flink-shaded_2.12:jar:{{ sedona.current_version }}
+* geotools-wrapper-{{ sedona.current_geotools }}.jar
+
+Follow the official Flink documentation to install the JAR files in your Flink
cluster or PyFlink application.
+https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/dependency_management/
diff --git a/docs/tutorial/flink/pyflink-sql.md
b/docs/tutorial/flink/pyflink-sql.md
new file mode 100644
index 0000000000..20a58f59ba
--- /dev/null
+++ b/docs/tutorial/flink/pyflink-sql.md
@@ -0,0 +1,76 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+To set up the PyFlink with Apache Sedona, please follow the guide.
[PyFlink](../../setup/flink/install-python.md)
+When you finish it, you can run the following code to test if everything works.
+
+```python
+from sedona.flink import SedonaContext
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import EnvironmentSettings, StreamTableEnvironment
+
+stream_env = StreamExecutionEnvironment.get_execution_environment()
+flink_settings = EnvironmentSettings.in_streaming_mode()
+table_env = SedonaContext.create(stream_env, flink_settings)
+
+table_env.\
+ sql_query("SELECT ST_Point(1.0, 2.0)").\
+ execute()
+```
+
+PyFlink does not expose the possibility of transforming Scala's own
user-defined types (UDT) to Python UDT.
+So, when you want to collect the result in Python, you need to use functions
+like `ST_AsText` or `ST_ASBinary` to convert the result to a string or binary.
+
+```python
+from shapely.wkb import loads
+
+table_env.\
+ sql_query("SELECT ST_ASBinary(ST_Point(1.0, 2.0))").\
+ execute().\
+ collect()
+
+[loads(bytes(el[0])) for el in result]
+```
+
+```
+[<POINT (1 2)>]
+```
+
+Similar with User Defined Scalar functions
+
+```python
+from pyflink.table.udf import ScalarFunction, udf
+from shapely.wkb import loads
+
+class Buffer(ScalarFunction):
+ def eval(self, s):
+ geom = loads(s)
+ return geom.buffer(1).wkb
+
+table_env.create_temporary_function(
+ "ST_BufferPython", udf(Buffer(), result_type="Binary")
+)
+
+buffer_table = table_env.sql_query(
+ "SELECT ST_BufferPython(ST_ASBinary(ST_Point(1.0, 2.0))) AS buffer"
+)
+```
+
+For more SQL examples please follow the FlinkSQL section [FlinkSQL](sql.md).
diff --git a/mkdocs.yml b/mkdocs.yml
index 5aed88a737..913c7091c6 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -47,6 +47,7 @@ nav:
- Install on Azure Synapse Analytics:
setup/azure-synapse-analytics.md
- Install with Apache Flink:
- Install Sedona Scala/Java: setup/flink/install-scala.md
+ - Install Sedona Python: setup/flink/install-python.md
- Install with Snowflake:
- Install Sedona SQL: setup/snowflake/install.md
- Release notes: setup/release-notes.md
@@ -80,6 +81,7 @@ nav:
- Storing large raster geometries in Parquet files:
tutorial/storing-blobs-in-parquet.md
- Sedona with Apache Flink:
- Spatial SQL app (Flink): tutorial/flink/sql.md
+ - Spatial SQL app (PyFlink): tutorial/flink/pyflink-sql.md
- Sedona with Snowflake:
- Spatial SQL app (Snowflake): tutorial/snowflake/sql.md
- Examples:
diff --git a/python/sedona/flink/__init__.py b/python/sedona/flink/__init__.py
new file mode 100644
index 0000000000..4f64fb96e0
--- /dev/null
+++ b/python/sedona/flink/__init__.py
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import warnings
+
+try:
+ import pyflink
+ from sedona.flink.context import SedonaContext
+except ImportError:
+ warnings.warn(
+ "Apache Sedona requires Pyflink. Please install PyFlink before using
Sedona flink.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+
+__all__ = ["SedonaContext"]
diff --git a/python/sedona/flink/context.py b/python/sedona/flink/context.py
new file mode 100644
index 0000000000..35d15f44c9
--- /dev/null
+++ b/python/sedona/flink/context.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import warnings
+
+try:
+ from pyflink.table import EnvironmentSettings, StreamTableEnvironment
+ from pyflink.datastream import StreamExecutionEnvironment
+ from pyflink.java_gateway import get_gateway
+except ImportError:
+ StreamTableEnvironment = None
+ StreamExecutionEnvironment = None
+ EnvironmentSettings = None
+ warnings.warn(
+ "Apache Sedona requires Pyflink. Please install PyFlink before using
Sedona flink.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+
+
+class SedonaContext:
+
+ @classmethod
+ def create(
+ cls, env: StreamExecutionEnvironment, settings: EnvironmentSettings
+ ) -> StreamTableEnvironment:
+ table_env = StreamTableEnvironment.create(env, settings)
+ gateway = get_gateway()
+
+ flink_sedona_context =
gateway.jvm.org.apache.sedona.flink.SedonaContext
+
+ table_env_j = flink_sedona_context.create(
+ env._j_stream_execution_environment, table_env._j_tenv
+ )
+
+ table_env._j_tenv = table_env_j
+
+ return table_env
diff --git a/python/setup.py b/python/setup.py
index 5176d9e9b7..ee92cec460 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -63,6 +63,7 @@ setup(
"spark": ["pyspark>=2.3.0"],
"pydeck-map": ["geopandas", "pydeck==0.8.0"],
"kepler-map": ["geopandas", "keplergl==0.3.2"],
+ "flink": ["apache-flink>=1.19.0"],
"all": [
"pyspark>=2.3.0",
"geopandas",
diff --git a/python/tests/flink/conftest.py b/python/tests/flink/conftest.py
new file mode 100644
index 0000000000..44be77b8f2
--- /dev/null
+++ b/python/tests/flink/conftest.py
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+import pytest
+
+
+EXTRA_JARS = os.getenv("SEDONA_PYFLINK_EXTRA_JARS")
+
+
+def has_pyflink():
+ try:
+ import pyflink
+ except ImportError:
+ return False
+ return True
+
+
+if has_pyflink():
+ from sedona.flink import SedonaContext
+
+ try:
+ from pyflink.datastream import StreamExecutionEnvironment
+ from pyflink.table import EnvironmentSettings, StreamTableEnvironment
+ except ImportError:
+ pytest.skip("PyFlink is not installed. Skipping tests that require
PyFlink.")
+
+ @pytest.fixture(scope="module")
+ def flink_settings():
+ return EnvironmentSettings.in_streaming_mode()
+
+ @pytest.fixture(scope="module")
+ def stream_env() -> StreamExecutionEnvironment:
+ env = StreamExecutionEnvironment.get_execution_environment()
+ jars = EXTRA_JARS.split(",") if EXTRA_JARS else []
+ for jar in jars:
+ env.add_jars(f"file://{jar}")
+
+ return env
+
+ @pytest.fixture(scope="module")
+ def table_env(
+ stream_env: StreamExecutionEnvironment, flink_settings:
EnvironmentSettings
+ ) -> StreamTableEnvironment:
+ return SedonaContext.create(stream_env, flink_settings)
diff --git a/python/tests/flink/test_flink_registration.py
b/python/tests/flink/test_flink_registration.py
new file mode 100644
index 0000000000..847788bc00
--- /dev/null
+++ b/python/tests/flink/test_flink_registration.py
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from shapely.wkb import loads
+import pytest
+
+from tests.flink.conftest import has_pyflink
+
+if not has_pyflink():
+ pytest.skip(
+ "PyFlink is not installed. Skipping tests that require PyFlink.",
+ allow_module_level=True,
+ )
+
+
+def test_register(table_env):
+ result = (
+ table_env.sql_query("SELECT ST_ASBinary(ST_Point(1.0, 2.0))")
+ .execute()
+ .collect()
+ )
+
+ assert 1 == len([el for el in result])
+
+
+def test_register_udf(table_env):
+ from pyflink.table.udf import ScalarFunction, udf
+
+ class Buffer(ScalarFunction):
+ def eval(self, s):
+ geom = loads(s)
+ return geom.buffer(1).wkb
+
+ table_env.create_temporary_function(
+ "ST_BufferPython", udf(Buffer(), result_type="Binary")
+ )
+
+ buffer_table = table_env.sql_query(
+ "SELECT ST_BufferPython(ST_ASBinary(ST_Point(1.0, 2.0))) AS buffer"
+ )
+
+ table_env.create_temporary_view("buffer_table", buffer_table)
+
+ result = (
+ table_env.sql_query("SELECT ST_Area(ST_GeomFromWKB(buffer)) FROM
buffer_table")
+ .execute()
+ .collect()
+ )
+
+ items = [el for el in result]
+ area = items[0][0]
+
+ assert 3.12 < area < 3.14
+ assert 1 == len(items)