This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new afda11172d [SEDONA-725] Add pyflink to Sedona. (#1875)
afda11172d is described below

commit afda11172d0ba48240bc05e932614f65098eb1cb
Author: PaweÅ‚ Tokaj <[email protected]>
AuthorDate: Fri May 23 23:58:21 2025 +0200

    [SEDONA-725] Add pyflink to Sedona. (#1875)
    
    * SEDONA-725 Add pyflink to Sedona.
    
    SEDONA-725 rearrange the spark module
    
    * SEDONA-725 add docs
    
    * SEDONA-725 add docs
    
    * SEDONA-725 add docs
    
    * SEDONA-725 add docs
    
    * SEDONA-725 add docs
    
    * Update docs/setup/flink/install-python.md
    
    Co-authored-by: Jia Yu <[email protected]>
    
    * Update docs/setup/flink/install-python.md
    
    Co-authored-by: Jia Yu <[email protected]>
    
    * SEDONA-725 add docs
    
    * SEDONA-725 add docs
    
    * SEDONA-725 add docs
    
    ---------
    
    Co-authored-by: Jia Yu <[email protected]>
---
 .github/workflows/lint.yml                    |  2 +-
 .github/workflows/pyflink.yml                 | 76 +++++++++++++++++++++++++++
 docs/setup/flink/install-python.md            | 34 ++++++++++++
 docs/tutorial/flink/pyflink-sql.md            | 76 +++++++++++++++++++++++++++
 mkdocs.yml                                    |  2 +
 python/sedona/flink/__init__.py               | 30 +++++++++++
 python/sedona/flink/context.py                | 52 ++++++++++++++++++
 python/setup.py                               |  1 +
 python/tests/flink/conftest.py                | 60 +++++++++++++++++++++
 python/tests/flink/test_flink_registration.py | 68 ++++++++++++++++++++++++
 10 files changed, 400 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml
index 69066517e4..3dc8683951 100644
--- a/.github/workflows/lint.yml
+++ b/.github/workflows/lint.yml
@@ -35,7 +35,7 @@ jobs:
         uses: actions/checkout@v4
       - uses: actions/setup-python@v5 # https://www.python.org/
         with:
-          python-version: '3.x' # Version range or exact version of a Python 
version to use, using SemVer's version range syntax
+          python-version: '3.10' # Version range or exact version of a Python 
version to use, using SemVer's version range syntax
           architecture: 'x64' # optional x64 or x86. Defaults to x64 if not 
specified
       - name: Install dependencies # https://pip.pypa.io/en/stable/
         run: |
diff --git a/.github/workflows/pyflink.yml b/.github/workflows/pyflink.yml
new file mode 100644
index 0000000000..d93e431544
--- /dev/null
+++ b/.github/workflows/pyflink.yml
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: Sedona Pyflink Test
+
+on:
+  push:
+    branches:
+      - master
+    paths:
+      - 'common/**'
+      - 'flink/**'
+      - 'flink-shaded/**'
+      - 'pom.xml'
+      - 'python/**'
+      - '.github/workflows/pyflink.yml'
+  pull_request:
+    branches:
+      - '*'
+    paths:
+      - 'common/**'
+      - 'flink/**'
+      - 'flink-shaded/**'
+      - 'pom.xml'
+      - 'python/**'
+      - '.github/workflows/pyflink.yml'
+
+concurrency:
+  group: ${{ github.workflow }}-${{ github.ref }}
+  cancel-in-progress: true
+
+jobs:
+  test:
+    runs-on: ubuntu-22.04
+    strategy:
+      matrix:
+        include:
+          - python: '3.10'
+    steps:
+      - uses: actions/checkout@v4
+      - uses: actions/setup-java@v4
+        with:
+          distribution: 'zulu'
+          java-version: '11'
+      - uses: actions/setup-python@v5
+        with:
+          python-version: ${{ matrix.python }}
+      - run: sudo apt-get -y install python3-pip python-dev-is-python3
+      - run: mvn package -pl "org.apache.sedona:sedona-flink-shaded_2.12" -am 
-DskipTests
+      - run: sudo pip3 install -U setuptools
+      - run: sudo pip3 install -U wheel
+      - run: sudo pip3 install -U virtualenvwrapper
+      - run: python3 -m pip install uv
+      - run: cd python
+      - run: rm pyproject.toml
+      - run: uv init --no-workspace
+      - run: uv add apache-flink==1.20.1 shapely attr setuptools
+      - run: uv add pytest --dev
+      - run: |
+          wget 
https://repo1.maven.org/maven2/org/datasyslab/geotools-wrapper/1.7.1-28.5/geotools-wrapper-1.7.1-28.5.jar
+          export SEDONA_PYFLINK_EXTRA_JARS=${PWD}/$(find flink-shaded/target 
-name sedona-flink*.jar),${PWD}/geotools-wrapper-1.7.1-28.5.jar
+          (cd python; PYTHONPATH=$(pwd) uv run pytest -v -s ./tests/flink)
diff --git a/docs/setup/flink/install-python.md 
b/docs/setup/flink/install-python.md
new file mode 100644
index 0000000000..b792a1cacc
--- /dev/null
+++ b/docs/setup/flink/install-python.md
@@ -0,0 +1,34 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+To install Apache Sedona Python, you need to install the following steps:
+
+Install the required Python packages.
+
+```
+pip install apache-sedona[flink] shapely attr
+```
+
+Download the required JAR files from Maven Central:
+
+* sedona-flink-shaded_2.12:jar:{{ sedona.current_version }}
+* geotools-wrapper-{{ sedona.current_geotools }}.jar
+
+Follow the official Flink documentation to install the JAR files in your Flink 
cluster or PyFlink application.
+https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/dependency_management/
diff --git a/docs/tutorial/flink/pyflink-sql.md 
b/docs/tutorial/flink/pyflink-sql.md
new file mode 100644
index 0000000000..20a58f59ba
--- /dev/null
+++ b/docs/tutorial/flink/pyflink-sql.md
@@ -0,0 +1,76 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+To set up the PyFlink with Apache Sedona, please follow the guide. 
[PyFlink](../../setup/flink/install-python.md)
+When you finish it, you can run the following code to test if everything works.
+
+```python
+from sedona.flink import SedonaContext
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import EnvironmentSettings, StreamTableEnvironment
+
+stream_env = StreamExecutionEnvironment.get_execution_environment()
+flink_settings = EnvironmentSettings.in_streaming_mode()
+table_env = SedonaContext.create(stream_env, flink_settings)
+
+table_env.\
+    sql_query("SELECT ST_Point(1.0, 2.0)").\
+    execute()
+```
+
+PyFlink does not expose the possibility of transforming Scala's own 
user-defined types (UDT) to Python UDT.
+So, when you want to collect the result in Python, you need to use functions
+like `ST_AsText` or `ST_ASBinary` to convert the result to a string or binary.
+
+```python
+from shapely.wkb import loads
+
+table_env.\
+    sql_query("SELECT ST_ASBinary(ST_Point(1.0, 2.0))").\
+    execute().\
+    collect()
+
+[loads(bytes(el[0])) for el in result]
+```
+
+```
+[<POINT (1 2)>]
+```
+
+Similar with User Defined Scalar functions
+
+```python
+from pyflink.table.udf import ScalarFunction, udf
+from shapely.wkb import loads
+
+class Buffer(ScalarFunction):
+    def eval(self, s):
+        geom = loads(s)
+        return geom.buffer(1).wkb
+
+table_env.create_temporary_function(
+    "ST_BufferPython", udf(Buffer(), result_type="Binary")
+)
+
+buffer_table = table_env.sql_query(
+    "SELECT ST_BufferPython(ST_ASBinary(ST_Point(1.0, 2.0))) AS buffer"
+)
+```
+
+For more SQL examples please follow the FlinkSQL section [FlinkSQL](sql.md).
diff --git a/mkdocs.yml b/mkdocs.yml
index 5aed88a737..913c7091c6 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -47,6 +47,7 @@ nav:
           - Install on Azure Synapse Analytics: 
setup/azure-synapse-analytics.md
       - Install with Apache Flink:
           - Install Sedona Scala/Java: setup/flink/install-scala.md
+          - Install Sedona Python: setup/flink/install-python.md
       - Install with Snowflake:
           - Install Sedona SQL: setup/snowflake/install.md
       - Release notes: setup/release-notes.md
@@ -80,6 +81,7 @@ nav:
               - Storing large raster geometries in Parquet files: 
tutorial/storing-blobs-in-parquet.md
       - Sedona with Apache Flink:
           - Spatial SQL app (Flink): tutorial/flink/sql.md
+          - Spatial SQL app (PyFlink): tutorial/flink/pyflink-sql.md
       - Sedona with Snowflake:
           - Spatial SQL app (Snowflake): tutorial/snowflake/sql.md
       - Examples:
diff --git a/python/sedona/flink/__init__.py b/python/sedona/flink/__init__.py
new file mode 100644
index 0000000000..4f64fb96e0
--- /dev/null
+++ b/python/sedona/flink/__init__.py
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import warnings
+
+try:
+    import pyflink
+    from sedona.flink.context import SedonaContext
+except ImportError:
+    warnings.warn(
+        "Apache Sedona requires Pyflink. Please install PyFlink before using 
Sedona flink.",
+        DeprecationWarning,
+        stacklevel=2,
+    )
+
+__all__ = ["SedonaContext"]
diff --git a/python/sedona/flink/context.py b/python/sedona/flink/context.py
new file mode 100644
index 0000000000..35d15f44c9
--- /dev/null
+++ b/python/sedona/flink/context.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import warnings
+
+try:
+    from pyflink.table import EnvironmentSettings, StreamTableEnvironment
+    from pyflink.datastream import StreamExecutionEnvironment
+    from pyflink.java_gateway import get_gateway
+except ImportError:
+    StreamTableEnvironment = None
+    StreamExecutionEnvironment = None
+    EnvironmentSettings = None
+    warnings.warn(
+        "Apache Sedona requires Pyflink. Please install PyFlink before using 
Sedona flink.",
+        DeprecationWarning,
+        stacklevel=2,
+    )
+
+
+class SedonaContext:
+
+    @classmethod
+    def create(
+        cls, env: StreamExecutionEnvironment, settings: EnvironmentSettings
+    ) -> StreamTableEnvironment:
+        table_env = StreamTableEnvironment.create(env, settings)
+        gateway = get_gateway()
+
+        flink_sedona_context = 
gateway.jvm.org.apache.sedona.flink.SedonaContext
+
+        table_env_j = flink_sedona_context.create(
+            env._j_stream_execution_environment, table_env._j_tenv
+        )
+
+        table_env._j_tenv = table_env_j
+
+        return table_env
diff --git a/python/setup.py b/python/setup.py
index 5176d9e9b7..ee92cec460 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -63,6 +63,7 @@ setup(
         "spark": ["pyspark>=2.3.0"],
         "pydeck-map": ["geopandas", "pydeck==0.8.0"],
         "kepler-map": ["geopandas", "keplergl==0.3.2"],
+        "flink": ["apache-flink>=1.19.0"],
         "all": [
             "pyspark>=2.3.0",
             "geopandas",
diff --git a/python/tests/flink/conftest.py b/python/tests/flink/conftest.py
new file mode 100644
index 0000000000..44be77b8f2
--- /dev/null
+++ b/python/tests/flink/conftest.py
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+import pytest
+
+
+EXTRA_JARS = os.getenv("SEDONA_PYFLINK_EXTRA_JARS")
+
+
+def has_pyflink():
+    try:
+        import pyflink
+    except ImportError:
+        return False
+    return True
+
+
+if has_pyflink():
+    from sedona.flink import SedonaContext
+
+    try:
+        from pyflink.datastream import StreamExecutionEnvironment
+        from pyflink.table import EnvironmentSettings, StreamTableEnvironment
+    except ImportError:
+        pytest.skip("PyFlink is not installed. Skipping tests that require 
PyFlink.")
+
+    @pytest.fixture(scope="module")
+    def flink_settings():
+        return EnvironmentSettings.in_streaming_mode()
+
+    @pytest.fixture(scope="module")
+    def stream_env() -> StreamExecutionEnvironment:
+        env = StreamExecutionEnvironment.get_execution_environment()
+        jars = EXTRA_JARS.split(",") if EXTRA_JARS else []
+        for jar in jars:
+            env.add_jars(f"file://{jar}")
+
+        return env
+
+    @pytest.fixture(scope="module")
+    def table_env(
+        stream_env: StreamExecutionEnvironment, flink_settings: 
EnvironmentSettings
+    ) -> StreamTableEnvironment:
+        return SedonaContext.create(stream_env, flink_settings)
diff --git a/python/tests/flink/test_flink_registration.py 
b/python/tests/flink/test_flink_registration.py
new file mode 100644
index 0000000000..847788bc00
--- /dev/null
+++ b/python/tests/flink/test_flink_registration.py
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from shapely.wkb import loads
+import pytest
+
+from tests.flink.conftest import has_pyflink
+
+if not has_pyflink():
+    pytest.skip(
+        "PyFlink is not installed. Skipping tests that require PyFlink.",
+        allow_module_level=True,
+    )
+
+
+def test_register(table_env):
+    result = (
+        table_env.sql_query("SELECT ST_ASBinary(ST_Point(1.0, 2.0))")
+        .execute()
+        .collect()
+    )
+
+    assert 1 == len([el for el in result])
+
+
+def test_register_udf(table_env):
+    from pyflink.table.udf import ScalarFunction, udf
+
+    class Buffer(ScalarFunction):
+        def eval(self, s):
+            geom = loads(s)
+            return geom.buffer(1).wkb
+
+    table_env.create_temporary_function(
+        "ST_BufferPython", udf(Buffer(), result_type="Binary")
+    )
+
+    buffer_table = table_env.sql_query(
+        "SELECT ST_BufferPython(ST_ASBinary(ST_Point(1.0, 2.0))) AS buffer"
+    )
+
+    table_env.create_temporary_view("buffer_table", buffer_table)
+
+    result = (
+        table_env.sql_query("SELECT ST_Area(ST_GeomFromWKB(buffer)) FROM 
buffer_table")
+        .execute()
+        .collect()
+    )
+
+    items = [el for el in result]
+    area = items[0][0]
+
+    assert 3.12 < area < 3.14
+    assert 1 == len(items)

Reply via email to