This is an automated email from the ASF dual-hosted git repository.
jasonliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6160327370c feat(elasticsearch): enable e2e remote logging test for
elasticsearch (#63519)
6160327370c is described below
commit 6160327370c24f34b11924bd55521bdb18c718df
Author: Owen Leung <[email protected]>
AuthorDate: Sun Mar 15 14:37:49 2026 +0800
feat(elasticsearch): enable e2e remote logging test for elasticsearch
(#63519)
* feat(elasticsearch): enable e2e remote logging test for elasticsearch
* feat(elasticsearch): remove leftover comments
* feat(elasticsearch): fix ruff format
* feat(elasticsearch): improve readability
---
.github/workflows/airflow-e2e-tests.yml | 4 +-
airflow-e2e-tests/docker/elasticsearch.yml | 38 ++++++++
.../tests/airflow_e2e_tests/conftest.py | 26 +++++-
.../tests/airflow_e2e_tests/constants.py | 1 +
.../airflow_e2e_tests/e2e_test_utils/clients.py | 5 +
.../remote_log_elasticsearch_tests/__init__.py | 16 ++++
.../test_remote_logging_elasticsearch.py | 103 +++++++++++++++++++++
.../images/output_testing_airflow-e2e-tests.svg | 2 +-
.../images/output_testing_airflow-e2e-tests.txt | 2 +-
.../airflow_breeze/commands/testing_commands.py | 5 +-
10 files changed, 196 insertions(+), 6 deletions(-)
diff --git a/.github/workflows/airflow-e2e-tests.yml
b/.github/workflows/airflow-e2e-tests.yml
index e9d9811ae29..cee6f855dc8 100644
--- a/.github/workflows/airflow-e2e-tests.yml
+++ b/.github/workflows/airflow-e2e-tests.yml
@@ -49,7 +49,7 @@ on: # yamllint disable-line rule:truthy
type: string
required: true
e2e_test_mode:
- description: "Test mode - basic, remote_log, or xcom_object_storage"
+ description: "Test mode - basic, remote_log, remote_log_elasticsearch,
or xcom_object_storage"
type: string
default: "basic"
@@ -80,7 +80,7 @@ on: # yamllint disable-line rule:truthy
type: string
default: ""
e2e_test_mode:
- description: "Test mode - basic, remote_log, or xcom_object_storage"
+ description: "Test mode - basic, remote_log, remote_log_elasticsearch,
or xcom_object_storage"
type: string
default: "basic"
diff --git a/airflow-e2e-tests/docker/elasticsearch.yml
b/airflow-e2e-tests/docker/elasticsearch.yml
new file mode 100644
index 00000000000..601f7166c5f
--- /dev/null
+++ b/airflow-e2e-tests/docker/elasticsearch.yml
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+services:
+ elasticsearch:
+ image: docker.elastic.co/elasticsearch/elasticsearch:8.19.0
+ environment:
+ - discovery.type=single-node
+ - xpack.security.enabled=false
+ - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
+ volumes:
+ - elasticsearch-db-volume:/usr/share/elasticsearch/data
+ ports:
+ - "9200:9200"
+ healthcheck:
+ test: ["CMD-SHELL", "curl -f http://localhost:9200/_cluster/health ||
exit 1"]
+ interval: 10s
+ timeout: 30s
+ start_period: 60s
+ retries: 50
+ restart: "on-failure"
+
+volumes:
+ elasticsearch-db-volume:
diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
index ad071d4e02b..32e1fb0e21c 100644
--- a/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py
@@ -33,6 +33,7 @@ from airflow_e2e_tests.constants import (
DOCKER_IMAGE,
E2E_DAGS_FOLDER,
E2E_TEST_MODE,
+ ELASTICSEARCH_PATH,
LOCALSTACK_PATH,
LOGS_FOLDER,
TEST_REPORT_FILE,
@@ -58,6 +59,11 @@ def _copy_localstack_files(tmp_dir):
os.chmod(tmp_dir / "init-aws.sh", current_permissions | 0o111)
+def _copy_elasticsearch_files(tmp_dir):
+ """Copy Elasticsearch compose file into the temp directory."""
+ copyfile(ELASTICSEARCH_PATH, tmp_dir / "elasticsearch.yml")
+
+
def _setup_s3_integration(dot_env_file, tmp_dir):
_copy_localstack_files(tmp_dir)
@@ -74,6 +80,21 @@ def _setup_s3_integration(dot_env_file, tmp_dir):
os.environ["ENV_FILE_PATH"] = str(dot_env_file)
+def _setup_elasticsearch_integration(dot_env_file, tmp_dir):
+ _copy_elasticsearch_files(tmp_dir)
+
+ dot_env_file.write_text(
+ f"AIRFLOW_UID={os.getuid()}\n"
+ "AIRFLOW__LOGGING__REMOTE_LOGGING=true\n"
+ "AIRFLOW__ELASTICSEARCH__HOST=http://elasticsearch:9200\n"
+ "AIRFLOW__ELASTICSEARCH__WRITE_STDOUT=false\n"
+ "AIRFLOW__ELASTICSEARCH__JSON_FORMAT=true\n"
+ "AIRFLOW__ELASTICSEARCH__WRITE_TO_ES=true\n"
+ "AIRFLOW__ELASTICSEARCH__TARGET_INDEX=airflow-e2e-logs\n"
+ )
+ os.environ["ENV_FILE_PATH"] = str(dot_env_file)
+
+
def _setup_xcom_object_storage_integration(dot_env_file, tmp_dir):
_copy_localstack_files(tmp_dir)
@@ -124,6 +145,9 @@ def spin_up_airflow_environment(tmp_path_factory:
pytest.TempPathFactory):
if E2E_TEST_MODE == "remote_log":
compose_file_names.append("localstack.yml")
_setup_s3_integration(dot_env_file, tmp_dir)
+ elif E2E_TEST_MODE == "remote_log_elasticsearch":
+ compose_file_names.append("elasticsearch.yml")
+ _setup_elasticsearch_integration(dot_env_file, tmp_dir)
elif E2E_TEST_MODE == "xcom_object_storage":
compose_file_names.append("localstack.yml")
_setup_xcom_object_storage_integration(dot_env_file, tmp_dir)
@@ -166,7 +190,7 @@ def _print_logs(compose_instance: DockerCompose):
if service:
stdout, _ = compose_instance.get_logs(service)
console.print(f"::group:: {service} Logs")
- console.print(f"[red]{stdout}")
+ console.print(stdout, style="red", soft_wrap=True)
console.print("::endgroup::")
diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
b/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
index 764b4e56dc5..67fec2c5580 100644
--- a/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/constants.py
@@ -40,6 +40,7 @@ E2E_DAGS_FOLDER = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" /
"tests" / "airflow_e
LOGS_FOLDER = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "logs"
TEST_REPORT_FILE = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" /
"_e2e_test_report.json"
LOCALSTACK_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" /
"localstack.yml"
+ELASTICSEARCH_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "docker" /
"elasticsearch.yml"
E2E_TEST_MODE = os.environ.get("E2E_TEST_MODE", "basic")
AWS_INIT_PATH = AIRFLOW_ROOT_PATH / "airflow-e2e-tests" / "scripts" /
"init-aws.sh"
diff --git
a/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py
b/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py
index 3a11101d4ac..b3e70973b2e 100644
--- a/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py
+++ b/airflow-e2e-tests/tests/airflow_e2e_tests/e2e_test_utils/clients.py
@@ -43,6 +43,11 @@ def get_s3_client():
)
+def get_elasticsearch_session():
+ """Return a requests session configured for talking to the local
Elasticsearch container."""
+ return create_request_session_with_retries(status_forcelist=[429, 500,
502, 503, 504])
+
+
def create_request_session_with_retries(status_forcelist: list[int]):
"""Create a requests Session with retry logic for handling transient
errors."""
Retry.DEFAULT_BACKOFF_MAX = 32
diff --git
a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/__init__.py
b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/__init__.py
new file mode 100644
index 00000000000..13a83393a91
--- /dev/null
+++
b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git
a/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/test_remote_logging_elasticsearch.py
b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/test_remote_logging_elasticsearch.py
new file mode 100644
index 00000000000..5c07a04203a
--- /dev/null
+++
b/airflow-e2e-tests/tests/airflow_e2e_tests/remote_log_elasticsearch_tests/test_remote_logging_elasticsearch.py
@@ -0,0 +1,103 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import time
+from datetime import datetime, timezone
+
+import pytest
+
+from airflow_e2e_tests.e2e_test_utils.clients import AirflowClient,
get_elasticsearch_session
+
+
+class TestRemoteLoggingElasticsearch:
+ airflow_client = AirflowClient()
+ dag_id = "example_xcom_test"
+ task_id = "bash_pull"
+ retry_interval_in_seconds = 5
+ max_retries = 12
+ target_index = "airflow-e2e-logs"
+ expected_message = "finished"
+ elasticsearch_url = "http://localhost:9200"
+ expected_log_id_prefix = f"{dag_id}-{task_id}-"
+
+ def _matches_expected_log(self, log_source: dict, run_id: str) -> bool:
+ log_id = log_source.get("log_id", "")
+ log_message = log_source.get("event", "")
+ if not isinstance(log_message, str):
+ log_message = log_source.get("message", "")
+ return (
+ log_id.startswith(self.expected_log_id_prefix)
+ and run_id in log_id
+ and self.expected_message in log_message
+ )
+
+ def test_remote_logging_elasticsearch(self):
+ """Test that a DAG using remote logging to Elasticsearch completes
successfully."""
+
+ self.airflow_client.un_pause_dag(TestRemoteLoggingElasticsearch.dag_id)
+
+ resp = self.airflow_client.trigger_dag(
+ TestRemoteLoggingElasticsearch.dag_id,
+ json={"logical_date": datetime.now(timezone.utc).isoformat()},
+ )
+ run_id = resp["dag_run_id"]
+ state = self.airflow_client.wait_for_dag_run(
+ dag_id=TestRemoteLoggingElasticsearch.dag_id,
+ run_id=run_id,
+ )
+
+ assert state == "success", (
+ f"DAG {TestRemoteLoggingElasticsearch.dag_id} did not complete
successfully. Final state: {state}"
+ )
+
+ elasticsearch_session = get_elasticsearch_session()
+ matching_logs = []
+ for _ in range(self.max_retries):
+ response = elasticsearch_session.post(
+ f"{self.elasticsearch_url}/{self.target_index}/_search",
+ json={"size": 200, "query": {"match_all": {}}},
+ )
+ if response.status_code == 404:
+ time.sleep(self.retry_interval_in_seconds)
+ continue
+ response.raise_for_status()
+ hits = response.json()["hits"]["hits"]
+ matching_logs = [
+ hit["_source"] for hit in hits if
self._matches_expected_log(hit["_source"], run_id)
+ ]
+ if matching_logs:
+ break
+ time.sleep(self.retry_interval_in_seconds)
+
+ if not matching_logs:
+ pytest.fail(
+ f"Expected Elasticsearch logs for run_id {run_id} in index
{self.target_index}, "
+ f"but none contained log_id starting with
{self.expected_log_id_prefix!r} and event "
+ f"or message containing {self.expected_message!r}"
+ )
+
+ task_logs = self.airflow_client.get_task_logs(
+ dag_id=TestRemoteLoggingElasticsearch.dag_id,
+ task_id=self.task_id,
+ run_id=run_id,
+ )
+
+ events = [item.get("event", "") for item in task_logs.get("content",
[]) if isinstance(item, dict)]
+ assert any(self.expected_message in event for event in events), (
+ f"Expected task logs to contain {self.expected_message!r}, got
events: {events}"
+ )
diff --git a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg
b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg
index 9c48a6c8cb4..d31043d8a8e 100644
--- a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg
+++ b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.svg
@@ -131,7 +131,7 @@
</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="288.4"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-11)">│</text><text
class="breeze-testing-airflow-e2e-tests-r4" x="24.4" y="288.4" textLength="366"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-11)">--github-repository           </text><text
class="breeze-testing-airflow-e2e-tests-r6" x="414.8" y="288.4"
textLength="24.4" clip-path [...]
</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="312.8"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-12)">│</text><text
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="312.8"
textLength="73.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-12)">(TEXT)</text><text
class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="312.8"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-12)">│</text><text
class="breez [...]
</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="337.2"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-13)">│</text><text
class="breeze-testing-airflow-e2e-tests-r4" x="24.4" y="337.2" textLength="366"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-13)">--e2e-test-mode               </text><text
class="breeze-testing-airflow-e2e-tests-r1" x="463.6" y="337.2" textLen [...]
-</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="361.6"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-14)">│</text><text
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="361.6"
textLength="463.6"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-14)">(basic|remote_log|xcom_object_storage)</text><text
class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="361.6"
textLength="12.2" clip-path="url(#breeze-testing-airflow-e2e-tests-lin [...]
+</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="361.6"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-14)">│</text><text
class="breeze-testing-airflow-e2e-tests-r7" x="463.6" y="361.6"
textLength="768.6"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-14)">(basic|remote_log|remote_log_elasticsearch|xcom_object_storage)</text><text
class="breeze-testing-airflow-e2e-tests-r5" x="1451.8" y="361.6"
textLength="12.2" clip-path="url(#breeze-test [...]
</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="386"
textLength="1464"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯</text><text
class="breeze-testing-airflow-e2e-tests-r1" x="1464" y="386" textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-15)">
</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="410.4"
textLength="24.4"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-16)">╭─</text><text
class="breeze-testing-airflow-e2e-tests-r5" x="24.4" y="410.4"
textLength="195.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-16)"> Common options </text><text
class="breeze-testing-airflow-e2e-tests-r5" x="219.6" y="410.4"
textLength="1220" clip-path="url(#breeze-testing-airflow-e2e-tests-line-16)">─
[...]
</text><text class="breeze-testing-airflow-e2e-tests-r5" x="0" y="434.8"
textLength="12.2"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-17)">│</text><text
class="breeze-testing-airflow-e2e-tests-r4" x="24.4" y="434.8"
textLength="109.8"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-17)">--verbose</text><text
class="breeze-testing-airflow-e2e-tests-r6" x="158.6" y="434.8"
textLength="24.4"
clip-path="url(#breeze-testing-airflow-e2e-tests-line-17)">-v</text><text
class="br [...]
diff --git a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt
b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt
index 1329a6ca615..44763f1b463 100644
--- a/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt
+++ b/dev/breeze/doc/images/output_testing_airflow-e2e-tests.txt
@@ -1 +1 @@
-7ae6755f2c4ee578b3ebf11d051e8809
+88bb06bc8e24442ad0878e6c05942ee7
diff --git a/dev/breeze/src/airflow_breeze/commands/testing_commands.py
b/dev/breeze/src/airflow_breeze/commands/testing_commands.py
index ad589145cfa..391e3736fdd 100644
--- a/dev/breeze/src/airflow_breeze/commands/testing_commands.py
+++ b/dev/breeze/src/airflow_breeze/commands/testing_commands.py
@@ -1402,7 +1402,10 @@ option_e2e_test_mode = click.option(
default="basic",
show_default=True,
envvar="E2E_TEST_MODE",
- type=click.Choice(["basic", "remote_log", "xcom_object_storage"],
case_sensitive=False),
+ type=click.Choice(
+ ["basic", "remote_log", "remote_log_elasticsearch",
"xcom_object_storage"],
+ case_sensitive=False,
+ ),
)