This is an automated email from the ASF dual-hosted git repository.

jasonliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6160327370c feat(elasticsearch): enable e2e remote logging test for 
elasticsearch (#63519)
6160327370c is described below

commit 6160327370c24f34b11924bd55521bdb18c718df
Author: Owen Leung <[email protected]>
AuthorDate: Sun Mar 15 14:37:49 2026 +0800

    feat(elasticsearch): enable e2e remote logging test for elasticsearch 
(#63519)
    
    * feat(elasticsearch): enable e2e remote logging test for elasticsearch
    
    * feat(elasticsearch): remove leftover comments
    
    * feat(elasticsearch): fix ruff format
    
    * feat(elasticsearch): improve readability
---
 .github/workflows/airflow-e2e-tests.yml            |   4 +-
 airflow-e2e-tests/docker/elasticsearch.yml         |  38 ++++++++
 .../tests/airflow_e2e_tests/conftest.py            |  26 +++++-
 .../tests/airflow_e2e_tests/constants.py           |   1 +
 .../airflow_e2e_tests/e2e_test_utils/clients.py    |   5 +
 .../remote_log_elasticsearch_tests/__init__.py     |  16 ++++
 .../test_remote_logging_elasticsearch.py           | 103 +++++++++++++++++++++
 .../images/output_testing_airflow-e2e-tests.svg    |   2 +-
 .../images/output_testing_airflow-e2e-tests.txt    |   2 +-
 .../airflow_breeze/commands/testing_commands.py    |   5 +-
 10 files changed, 196 insertions(+), 6 deletions(-)

diff --git a/.github/workflows/airflow-e2e-tests.yml 
b/.github/workflows/airflow-e2e-tests.yml
index e9d9811ae29..cee6f855dc8 100644
--- a/.github/workflows/airflow-e2e-tests.yml
+++ b/.github/workflows/airflow-e2e-tests.yml
@@ -49,7 +49,7 @@ on:  # yamllint disable-line rule:truthy
         type: string
         required: true
       e2e_test_mode:
-        description: "Test mode - basic, remote_log, or xcom_object_storage"
+        description: "Test mode - basic, remote_log, remote_log_elasticsearch, 
or xcom_object_storage"
         type: string
         default: "basic"
 
@@ -80,7 +80,7 @@ on:  # yamllint disable-line rule:truthy
         type: string
         default: ""
       e2e_test_mode:
-        description: "Test mode - basic, remote_log, or xcom_object_storage"
+        description: "Test mode - basic, remote_log, remote_log_elasticsearch, 
or xcom_object_storage"
         type: string
         default: "basic"
 
diff --git a/airflow-e2e-tests/docker/elasticsearch.yml 
b/airflow-e2e-tests/docker/elasticsearch.yml
new file mode 100644
index 00000000000..601f7166c5f
--- /dev/null
+++ b/airflow-e2e-tests/docker/elasticsearch.yml
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+services:
+  elasticsearch:
+    image: docker.elastic.co/elasticsearch/elasticsearch:8.19.0
+    environment:
+      - discovery.type=single-node
+      - xpack.security.enabled=false
+      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
+    volumes:
+      - elasticsearch-db-volume:/usr/share/elasticsearch/data
+    ports:
+      - "9200:9200"
+    healthcheck:
+      test: ["CMD-SHELL", "curl -f http://localhost:9200/_cluster/health || 
exit 1"]
+      interval: 10s
+      timeout: 30s
+      start_period: 60s
+      retries: 50
+    restart: "on-failure"
+
+volumes:
+  elasticsearch-db-volume:
diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py 
b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
index ad071d4e02b..32e1fb0e21c 100644
--- a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
@@ -33,6 +33,7 @@ from airflow_e2e_tests.constants import (
     DOCKER_IMAGE,
     E2E_DAGS_FOLDER,
     E2E_TEST_MODE,
+    ELASTICSEARCH_PATH,
     LOCALSTACK_PATH,
     LOGS_FOLDER,
     TEST_REPORT_FILE,
@@ -58,6 +59,11 @@ def _copy_localstack_files(tmp_dir):
     os.chmod(tmp_dir / "init-aws.sh", current_permissions | 0o111)
 
 
+def _copy_elasticsearch_files(tmp_dir):
+    """Copy Elasticsearch compose file into the temp directory."""
+    copyfile(ELASTICSEARCH_PATH, tmp_dir / "elasticsearch.yml")
+
+
 def _setup_s3_integration(dot_env_file, tmp_dir):
     _copy_localstack_files(tmp_dir)
 
@@ -74,6 +80,21 @@ def _setup_s3_integration(dot_env_file, tmp_dir):
     os.environ["ENV_FILE_PATH"] = str(dot_env_file)
 
 
+def _setup_elasticsearch_integration(dot_env_file, tmp_dir):
+    _copy_elasticsearch_files(tmp_dir)
+
+    dot_env_file.write_text(
+        f"AIRFLOW_UID={os.getuid()}\n"
+        "AIRFLOW__LOGGING__REMOTE_LOGGING=true\n"
+        "AIRFLOW__ELASTICSEARCH__HOST=http://elasticsearch:9200\n";
+        "AIRFLOW__ELASTICSEARCH__WRITE_STDOUT=false\n"
+        "AIRFLOW__ELASTICSEARCH__JSON_FORMAT=true\n"
+        "AIRFLOW__ELASTICSEARCH__WRITE_TO_ES=true\n"
+        "AIRFLOW__ELASTICSEARCH__TARGET_INDEX=airflow-e2e-logs\n"
+    )
+    os.environ["ENV_FILE_PATH"] = str(dot_env_file)
+
+
 def _setup_xcom_object_storage_integration(dot_env_file, tmp_dir):
     _copy_localstack_files(tmp_dir)
 
@@ -124,6 +145,9 @@ def spin_up_airflow_environment(tmp_path_factory: 
pytest.TempPathFactory):
     if E2E_TEST_MODE == "remote_log":
         compose_file_names.append("localstack.yml")
         _setup_s3_integration(dot_env_file, tmp_dir)
+    elif E2E_TEST_MODE == "remote_log_elasticsearch":
+        compose_file_names.append("elasticsearch.yml")
+        _setup_elasticsearch_integration(dot_env_file, tmp_dir)
     elif E2E_TEST_MODE == "xcom_object_storage":
         compose_file_names.append("localstack.yml")
         _setup_xcom_object_storage_integration(dot_env_file, tmp_dir)
@@ -166,7 +190,7 @@ def _print_logs(compose_instance: DockerCompose):
         if service:
             stdout, _ = compose_instance.get_logs(service)
             console.print(f"::group:: {service} Logs")
-            console.print(f"[red]{stdout}")
+            console.print(stdout, style="red", soft_wrap=True)
             console.print("::endgroup::")
 
 
diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py 
b/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
index 764b4e56dc5..67fec2c5580 100644
--- a/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
@@ -40,6 +40,7 @@ E2E_DAGS_FOLDER = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / 
"tests" / "airflow_e
 LOGS_FOLDER = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "logs"
 TEST_REPORT_FILE = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / 
"_e2e_test_report.json"
 LOCALSTACK_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" / 
"localstack.yml"
+ELASTICSEARCH_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" / 
"elasticsearch.yml"
 E2E_TEST_MODE = os.environ.get("E2E_TEST_MODE", "basic")
 AWS_INIT_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "scripts" / 
"init-aws.sh"
 
diff --git 
a/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py 
b/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py
index 3a11101d4ac..b3e70973b2e 100644
--- a/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py
@@ -43,6 +43,11 @@ def get_s3_client():
     )
 
 
+def get_elasticsearch_session():
+    """Return a requests session configured for talking to the local 
Elasticsearch container."""
+    return create_request_session_with_retries(status_forcelist=[429, 500, 
502, 503, 504])
+
+
 def create_request_session_with_retries(status_forcelist: list[int]):
     """Create a requests Session with retry logic for handling transient 
errors."""
     Retry.DEFAULT_BACKOFF_MAX = 32
diff --git 
a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/__init__.py
 
b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++ 
b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git 
a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/test_remote_logging_elasticsearch.py
 
b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/test_remote_logging_elasticsearch.py
new file mode 100644
index 00000000000..5c07a04203a
--- /dev/null
+++ 
b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/test_remote_logging_elasticsearch.py
@@ -0,0 +1,103 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import time
+from datetime import datetime, timezone
+
+import pytest
+
+from airflow_e2e_tests.e2e_test_utils.clients import AirflowClient, 
get_elasticsearch_session
+
+
+class TestRemoteLoggingElasticsearch:
+    airflow_client = AirflowClient()
+    dag_id = "example_xcom_test"
+    task_id = "bash_pull"
+    retry_interval_in_seconds = 5
+    max_retries = 12
+    target_index = "airflow-e2e-logs"
+    expected_message = "finished"
+    elasticsearch_url = "http://localhost:9200";
+    expected_log_id_prefix = f"{dag_id}-{task_id}-"
+
+    def _matches_expected_log(self, log_source: dict, run_id: str) -> bool:
+        log_id = log_source.get("log_id", "")
+        log_message = log_source.get("event", "")
+        if not isinstance(log_message, str):
+            log_message = log_source.get("message", "")
+        return (
+            log_id.startswith(self.expected_log_id_prefix)
+            and run_id in log_id
+            and self.expected_message in log_message
+        )
+
+    def test_remote_logging_elasticsearch(self):
+        """Test that a DAG using remote logging to Elasticsearch completes 
successfully."""
+
+        self.airflow_client.un_pause_dag(TestRemoteLoggingElasticsearch.dag_id)
+
+        resp = self.airflow_client.trigger_dag(
+            TestRemoteLoggingElasticsearch.dag_id,
+            json={"logical_date": datetime.now(timezone.utc).isoformat()},
+        )
+        run_id = resp["dag_run_id"]
+        state = self.airflow_client.wait_for_dag_run(
+            dag_id=TestRemoteLoggingElasticsearch.dag_id,
+            run_id=run_id,
+        )
+
+        assert state == "success", (
+            f"DAG {TestRemoteLoggingElasticsearch.dag_id} did not complete 
successfully. Final state: {state}"
+        )
+
+        elasticsearch_session = get_elasticsearch_session()
+        matching_logs = []
+        for _ in range(self.max_retries):
+            response = elasticsearch_session.post(
+                f"{self.elasticsearch_url}/{self.target_index}/_search",
+                json={"size": 200, "query": {"match_all": {}}},
+            )
+            if response.status_code == 404:
+                time.sleep(self.retry_interval_in_seconds)
+                continue
+            response.raise_for_status()
+            hits = response.json()["hits"]["hits"]
+            matching_logs = [
+                hit["_source"] for hit in hits if 
self._matches_expected_log(hit["_source"], run_id)
+            ]
+            if matching_logs:
+                break
+            time.sleep(self.retry_interval_in_seconds)
+
+        if not matching_logs:
+            pytest.fail(
+                f"Expected Elasticsearch logs for run_id {run_id} in index 
{self.target_index}, "
+                f"but none contained log_id starting with 
{self.expected_log_id_prefix!r} and event "
+                f"or message containing {self.expected_message!r}"
+            )
+
+        task_logs = self.airflow_client.get_task_logs(
+            dag_id=TestRemoteLoggingElasticsearch.dag_id,
+            task_id=self.task_id,
+            run_id=run_id,
+        )
+
+        events = [item.get("event", "") for item in task_logs.get("content", 
[]) if isinstance(item, dict)]
+        assert any(self.expected_message in event for event in events), (
+            f"Expected task logs to contain {self.expected_message!r}, got 
events: {events}"
+        )
diff --git a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg 
b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg
index 9c48a6c8cb4..d31043d8a8e 100644
--- a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg
+++ b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg
@@ -131,7 +131,7 @@
 </text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="288.4" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-11)">│</text><text 
class="breeze-testing-airflow-e2e-tests-r4" x="24.4" y="288.4" textLength="366" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-11)">--github-repository&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;</text><text
 class="breeze-testing-airflow-e2e-tests-r6" x="414.8" y="288.4" 
textLength="24.4" clip-path [...]
 </text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="312.8" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-12)">│</text><text 
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="312.8" 
textLength="73.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-12)">(TEXT)</text><text 
class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="312.8" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-12)">│</text><text 
class="breez [...]
 </text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="337.2" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-13)">│</text><text 
class="breeze-testing-airflow-e2e-tests-r4" x="24.4" y="337.2" textLength="366" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-13)">--e2e-test-mode&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;</text><text
 class="breeze-testing-airflow-e2e-tests-r1" x="463.6" y="337.2" textLen [...]
-</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="361.6" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-14)">│</text><text 
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="361.6" 
textLength="463.6" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-14)">(basic|remote_log|xcom_object_storage)</text><text
 class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="361.6" 
textLength="12.2" clip-path="url(#breeze-testing-airflow-e2e-tests-lin [...]
+</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="361.6" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-14)">│</text><text 
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="361.6" 
textLength="768.6" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-14)">(basic|remote_log|remote_log_elasticsearch|xcom_object_storage)</text><text
 class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="361.6" 
textLength="12.2" clip-path="url(#breeze-test [...]
 </text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="386" 
textLength="1464" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯</text><text
 class="breeze-testing-airflow-e2e-tests-r1" x="1464" y="386" textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">
 </text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="410.4" 
textLength="24.4" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-16)">╭─</text><text 
class="breeze-testing-airflow-e2e-tests-r5" x="24.4" y="410.4" 
textLength="195.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-16)">&#160;Common&#160;options&#160;</text><text
 class="breeze-testing-airflow-e2e-tests-r5" x="219.6" y="410.4" 
textLength="1220" clip-path="url(#breeze-testing-airflow-e2e-tests-line-16)">─ 
[...]
 </text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="434.8" 
textLength="12.2" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-17)">│</text><text 
class="breeze-testing-airflow-e2e-tests-r4" x="24.4" y="434.8" 
textLength="109.8" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-17)">--verbose</text><text
 class="breeze-testing-airflow-e2e-tests-r6" x="158.6" y="434.8" 
textLength="24.4" 
clip-path="url(#breeze-testing-airflow-e2e-tests-line-17)">-v</text><text 
class="br [...]
diff --git a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt 
b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt
index 1329a6ca615..44763f1b463 100644
--- a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt
+++ b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt
@@ -1 +1 @@
-7ae6755f2c4ee578b3ebf11d051e8809
+88bb06bc8e24442ad0878e6c05942ee7
diff --git a/dev/breeze/src/airflow_breeze/commands/testing_commands.py 
b/dev/breeze/src/airflow_breeze/commands/testing_commands.py
index ad589145cfa..391e3736fdd 100644
--- a/dev/breeze/src/airflow_breeze/commands/testing_commands.py
+++ b/dev/breeze/src/airflow_breeze/commands/testing_commands.py
@@ -1402,7 +1402,10 @@ option_e2e_test_mode = click.option(
     default="basic",
     show_default=True,
     envvar="E2E_TEST_MODE",
-    type=click.Choice(["basic", "remote_log", "xcom_object_storage"], 
case_sensitive=False),
+    type=click.Choice(
+        ["basic", "remote_log", "remote_log_elasticsearch", 
"xcom_object_storage"],
+        case_sensitive=False,
+    ),
 )
 
 

Reply via email to