This is an automated email from the ASF dual-hosted git repository. imbruced pushed a commit to branch SEDONA-725-restructure-spark-python-package in repository https://gitbox.apache.org/repos/asf/sedona.git
commit db7af60eaec022ad832ca2b99ba623ee6cf40fd3 Author: pawelkocinski <[email protected]> AuthorDate: Tue Apr 29 22:09:26 2025 +0200 SEDONA-725 rearrange the spark module --- .github/workflows/pyflink.yml | 78 --------------------------- python/sedona/flink/__init__.py | 14 ----- python/sedona/flink/context.py | 35 ------------ python/setup.py | 1 - python/tests/flink/conftest.py | 43 --------------- python/tests/flink/test_flink_registration.py | 45 ---------------- 6 files changed, 216 deletions(-) diff --git a/.github/workflows/pyflink.yml b/.github/workflows/pyflink.yml deleted file mode 100644 index 7e9732190d..0000000000 --- a/.github/workflows/pyflink.yml +++ /dev/null @@ -1,78 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -name: Sedona Pyflink Test - -on: - push: - branches: - - master - paths: - - 'common/**' - - 'flink/**' - - 'flink-shaded/**' - - 'pom.xml' - - 'python/**' - - '.github/workflows/pyflink.yml' - pull_request: - branches: - - '*' - paths: - - 'common/**' - - 'flink/**' - - 'flink-shaded/**' - - 'pom.xml' - - 'python/**' - - '.github/workflows/pyflink.yml' - -concurrency: - group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: true - -jobs: - test: - runs-on: ubuntu-22.04 - strategy: - matrix: - include: - - python: '3.10' - steps: - - uses: actions/checkout@v4 - - uses: actions/setup-java@v4 - with: - distribution: 'zulu' - java-version: '8' - - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python }} - - run: sudo apt-get -y install python3-pip python-dev-is-python3 - - run: mvn package -pl "org.apache.sedona:sedona-flink-shaded_2.12" -am -DskipTests - - run: sudo pip3 install -U setuptools - - run: sudo pip3 install -U wheel - - run: sudo pip3 install -U virtualenvwrapper - - run: python3 -m pip install uv - - run: cd python - - run: rm pyproject.toml - - run: uv init --no-workspace - - run: uv add apache-flink==1.20.1 shapely attr setuptools - - run: uv add pytest --dev - - run: | - wget https://repo1.maven.org/maven2/org/datasyslab/geotools-wrapper/1.7.1-28.5/geotools-wrapper-1.7.1-28.5.jar - export SEDONA_PYFLINK_EXTRA_JARS=${PWD}/$(find flink-shaded/target -name sedona-flink*.jar),${PWD}/geotools-wrapper-1.7.1-28.5.jar - pwd - ls -l - (cd python; PYTHONPATH=$(pwd) uv run pytest -v -m flink -s ./tests/flink) diff --git a/python/sedona/flink/__init__.py b/python/sedona/flink/__init__.py deleted file mode 100644 index 4219449814..0000000000 --- a/python/sedona/flink/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -import logging -import warnings - -try: - import pyflink - from sedona.flink.context import SedonaContext -except ImportError: - warnings.warn( - "Apache Sedona requires Pyflink. Please install PyFlink before using Sedona flink.", - DeprecationWarning, - stacklevel=2, - ) - -__all__ = ["SedonaContext"] diff --git a/python/sedona/flink/context.py b/python/sedona/flink/context.py deleted file mode 100644 index 2f14d20b19..0000000000 --- a/python/sedona/flink/context.py +++ /dev/null @@ -1,35 +0,0 @@ -import warnings - -try: - from pyflink.table import EnvironmentSettings, StreamTableEnvironment - from pyflink.datastream import StreamExecutionEnvironment - from pyflink.java_gateway import get_gateway -except ImportError: - StreamTableEnvironment = None - StreamExecutionEnvironment = None - EnvironmentSettings = None - warnings.warn( - "Apache Sedona requires Pyflink. Please install PyFlink before using Sedona flink.", - DeprecationWarning, - stacklevel=2, - ) - - -class SedonaContext: - - @classmethod - def create( - cls, env: StreamExecutionEnvironment, settings: EnvironmentSettings - ) -> StreamTableEnvironment: - table_env = StreamTableEnvironment.create(env, settings) - gateway = get_gateway() - - flink_sedona_context = gateway.jvm.org.apache.sedona.flink.SedonaContext - - table_env_j = flink_sedona_context.create( - env._j_stream_execution_environment, table_env._j_tenv - ) - - table_env._j_tenv = table_env_j - - return table_env diff --git a/python/setup.py b/python/setup.py index 695966631e..214a1f0029 100644 --- a/python/setup.py +++ b/python/setup.py @@ -63,7 +63,6 @@ setup( "spark": ["pyspark>=2.3.0"], "pydeck-map": ["geopandas", "pydeck==0.8.0"], "kepler-map": ["geopandas", "keplergl==0.3.2"], - "flink": ["apache-flink>=1.19.0"], "all": [ "pyspark>=2.3.0", "geopandas", diff --git a/python/tests/flink/conftest.py b/python/tests/flink/conftest.py deleted file mode 100644 index 2163f3c514..0000000000 --- a/python/tests/flink/conftest.py +++ /dev/null @@ -1,43 +0,0 @@ -import os - -import pytest - - -EXTRA_JARS = os.getenv("SEDONA_PYFLINK_EXTRA_JARS") - - -def has_pyflink(): - try: - import pyflink - except ImportError: - return False - return True - - -if has_pyflink(): - from sedona.flink import SedonaContext - - try: - from pyflink.datastream import StreamExecutionEnvironment - from pyflink.table import EnvironmentSettings, StreamTableEnvironment - except ImportError: - pytest.skip("PyFlink is not installed. Skipping tests that require PyFlink.") - - @pytest.fixture(scope="module") - def flink_settings(): - return EnvironmentSettings.in_streaming_mode() - - @pytest.fixture(scope="module") - def stream_env() -> StreamExecutionEnvironment: - env = StreamExecutionEnvironment.get_execution_environment() - jars = EXTRA_JARS.split(",") if EXTRA_JARS else [] - for jar in jars: - env.add_jars(f"file://{jar}") - - return env - - @pytest.fixture(scope="module") - def table_env( - stream_env: StreamExecutionEnvironment, flink_settings: EnvironmentSettings - ) -> StreamTableEnvironment: - return SedonaContext.create(stream_env, flink_settings) diff --git a/python/tests/flink/test_flink_registration.py b/python/tests/flink/test_flink_registration.py deleted file mode 100644 index 02c6c08261..0000000000 --- a/python/tests/flink/test_flink_registration.py +++ /dev/null @@ -1,45 +0,0 @@ -from shapely.wkb import loads -import pytest - - [email protected] -def test_register(table_env): - result = ( - table_env.sql_query("SELECT ST_ASBinary(ST_Point(1.0, 2.0))") - .execute() - .collect() - ) - - assert 1 == len([el for el in result]) - - [email protected] -def test_register_udf(table_env): - from pyflink.table.udf import ScalarFunction, udf - - class Buffer(ScalarFunction): - def eval(self, s): - geom = loads(s) - return geom.buffer(1).wkb - - table_env.create_temporary_function( - "ST_BufferPython", udf(Buffer(), result_type="Binary") - ) - - buffer_table = table_env.sql_query( - "SELECT ST_BufferPython(ST_ASBinary(ST_Point(1.0, 2.0))) AS buffer" - ) - - table_env.create_temporary_view("buffer_table", buffer_table) - - result = ( - table_env.sql_query("SELECT ST_Area(ST_GeomFromWKB(buffer)) FROM buffer_table") - .execute() - .collect() - ) - - items = [el for el in result] - area = items[0][0] - - assert 3.12 < area < 3.14 - assert 1 == len(items)
