This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 97717133551 [v3-1-test] Make elasticsearch compatible with
remote_task_log (#62121) (#62940)
97717133551 is described below
commit 977171335517ebab47fd11061220bd964cc25986
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Mar 5 17:05:51 2026 +0200
[v3-1-test] Make elasticsearch compatible with remote_task_log (#62121)
(#62940)
* Make elasticsearch compatible with remote_task_log and add min version
* Fix pyproject.toml to use elasticsearch>=6.5.0
---------
(cherry picked from commit 4f1faf61b0d55de65631b3c4ff6e24af4cf8519e)
Co-authored-by: Owen Leung <[email protected]>
Co-authored-by: vatsrahul1001 <[email protected]>
---
airflow-core/newsfragments/62121.bugfix.rst | 1 +
.../config_templates/airflow_local_settings.py | 38 +++++++++-------------
pyproject.toml | 4 +--
scripts/ci/prek/update_airflow_pyproject_toml.py | 1 +
4 files changed, 20 insertions(+), 24 deletions(-)
diff --git a/airflow-core/newsfragments/62121.bugfix.rst
b/airflow-core/newsfragments/62121.bugfix.rst
new file mode 100644
index 00000000000..a35d8ae3bb7
--- /dev/null
+++ b/airflow-core/newsfragments/62121.bugfix.rst
@@ -0,0 +1 @@
+Elasticsearch is now fully compatible with remote logging along side with
apache-airflow-providers-elasticsearch>=6.5.0. Please review elasticsearch
provider release notes for more information
https://airflow.apache.org/docs/apache-airflow-providers-elasticsearch/6.5.0/changelog.html
diff --git
a/airflow-core/src/airflow/config_templates/airflow_local_settings.py
b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
index bb7efe9b193..192c9cec526 100644
--- a/airflow-core/src/airflow/config_templates/airflow_local_settings.py
+++ b/airflow-core/src/airflow/config_templates/airflow_local_settings.py
@@ -282,35 +282,29 @@ if REMOTE_LOGGING:
)
remote_task_handler_kwargs = {}
elif ELASTICSEARCH_HOST:
- ELASTICSEARCH_END_OF_LOG_MARK: str =
conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
- ELASTICSEARCH_FRONTEND: str =
conf.get_mandatory_value("elasticsearch", "frontend")
+ from airflow.providers.elasticsearch.log.es_task_handler import
ElasticsearchRemoteLogIO
+
ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch",
"WRITE_STDOUT")
ELASTICSEARCH_WRITE_TO_ES: bool = conf.getboolean("elasticsearch",
"WRITE_TO_ES")
ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch",
"JSON_FORMAT")
- ELASTICSEARCH_JSON_FIELDS: str =
conf.get_mandatory_value("elasticsearch", "JSON_FIELDS")
ELASTICSEARCH_TARGET_INDEX: str =
conf.get_mandatory_value("elasticsearch", "TARGET_INDEX")
ELASTICSEARCH_HOST_FIELD: str =
conf.get_mandatory_value("elasticsearch", "HOST_FIELD")
ELASTICSEARCH_OFFSET_FIELD: str =
conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD")
+ ELASTICSEARCH_LOG_ID_TEMPLATE: str =
conf.get_mandatory_value("elasticsearch", "LOG_ID_TEMPLATE")
+
+ REMOTE_TASK_LOG = ElasticsearchRemoteLogIO(
+ host=ELASTICSEARCH_HOST,
+ target_index=ELASTICSEARCH_TARGET_INDEX,
+ write_stdout=ELASTICSEARCH_WRITE_STDOUT,
+ write_to_es=ELASTICSEARCH_WRITE_TO_ES,
+ offset_field=ELASTICSEARCH_OFFSET_FIELD,
+ host_field=ELASTICSEARCH_HOST_FIELD,
+ base_log_folder=BASE_LOG_FOLDER,
+ delete_local_copy=delete_local_copy,
+ json_format=ELASTICSEARCH_JSON_FORMAT,
+ log_id_template=ELASTICSEARCH_LOG_ID_TEMPLATE,
+ )
- ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
- "task": {
- "class":
"airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler",
- "formatter": "airflow",
- "base_log_folder": BASE_LOG_FOLDER,
- "end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK,
- "host": ELASTICSEARCH_HOST,
- "frontend": ELASTICSEARCH_FRONTEND,
- "write_stdout": ELASTICSEARCH_WRITE_STDOUT,
- "write_to_es": ELASTICSEARCH_WRITE_TO_ES,
- "target_index": ELASTICSEARCH_TARGET_INDEX,
- "json_format": ELASTICSEARCH_JSON_FORMAT,
- "json_fields": ELASTICSEARCH_JSON_FIELDS,
- "host_field": ELASTICSEARCH_HOST_FIELD,
- "offset_field": ELASTICSEARCH_OFFSET_FIELD,
- },
- }
-
- DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS)
elif OPENSEARCH_HOST:
OPENSEARCH_END_OF_LOG_MARK: str =
conf.get_mandatory_value("opensearch", "END_OF_LOG_MARK")
OPENSEARCH_PORT: str = conf.get_mandatory_value("opensearch", "PORT")
diff --git a/pyproject.toml b/pyproject.toml
index 4ba31d7ca68..93704037982 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -207,7 +207,7 @@ packages = []
"apache-airflow-providers-edge3>=1.0.0"
]
"elasticsearch" = [
- "apache-airflow-providers-elasticsearch>=5.5.2"
+ "apache-airflow-providers-elasticsearch>=6.5.0" # Set from
MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
]
"exasol" = [
"apache-airflow-providers-exasol>=4.6.1"
@@ -424,7 +424,7 @@ packages = []
"apache-airflow-providers-discord>=3.9.0",
"apache-airflow-providers-docker>=3.14.1",
"apache-airflow-providers-edge3>=1.0.0",
- "apache-airflow-providers-elasticsearch>=5.5.2",
+ "apache-airflow-providers-elasticsearch>=6.5.0", # Set from
MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
"apache-airflow-providers-exasol>=4.6.1",
"apache-airflow-providers-fab>=2.2.0; python_version !=\"3.13\"", # Set
from MIN_VERSION_OVERRIDE in update_airflow_pyproject_toml.py
"apache-airflow-providers-facebook>=3.7.0",
diff --git a/scripts/ci/prek/update_airflow_pyproject_toml.py
b/scripts/ci/prek/update_airflow_pyproject_toml.py
index b55642baa58..28df1fa1798 100755
--- a/scripts/ci/prek/update_airflow_pyproject_toml.py
+++ b/scripts/ci/prek/update_airflow_pyproject_toml.py
@@ -75,6 +75,7 @@ MIN_VERSION_OVERRIDE: dict[str, Version] = {
"openlineage": parse_version("2.3.0"),
"git": parse_version("0.0.2"),
"common.messaging": parse_version("2.0.0"),
+ "elasticsearch": parse_version("6.5.0"),
}