Owen-CH-Leung commented on code in PR #53821: URL: https://github.com/apache/airflow/pull/53821#discussion_r2334816641
########## providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py: ########## @@ -661,13 +602,79 @@ def _get_result(self, hit: dict[Any, Any], parent_class=None) -> Hit: callback: type[Hit] | Callable[..., Any] = getattr(doc_class, "from_es", doc_class) return callback(hit) - def _parse_raw_log(self, log: str) -> list[dict[str, Any]]: + def _get_log_message(self, hit: Hit) -> str: + """Get log message from hit, supporting both Airflow 2.x and 3.x formats.""" + if hasattr(hit, "event"): + return hit.event + if hasattr(hit, "message"): + return hit.message + return "" + + +@attrs.define(kw_only=True) +class ElasticsearchRemoteLogIO(LoggingMixin): # noqa: D101 + json_format: bool = False + write_stdout: bool = False + delete_local_copy: bool = False + host: str = "http://localhost:9200" + host_field: str = "host" + target_index: str = "airflow-logs" + offset_field: str = "offset" + write_to_es: bool = False + base_log_folder: Path = attrs.field(converter=Path) + + processors = () + + def __attrs_post_init__(self): + es_kwargs = get_es_kwargs_from_config() + self.client = elasticsearch.Elasticsearch(self.host, **es_kwargs) + self.index_patterns_callable = conf.get("elasticsearch", "index_patterns_callable", fallback="") + self.PAGE = 0 + self.MAX_LINE_PER_PAGE = 1000 + self.index_patterns: str = conf.get("elasticsearch", "index_patterns") + self._doc_type_map: dict[Any, Any] = {} + self._doc_type: list[Any] = [] + + def upload(self, path: os.PathLike | str, ti: RuntimeTI): + """Write the log to ElasticSearch.""" + path = Path(path) + + if path.is_absolute(): + local_loc = path + else: + local_loc = self.base_log_folder.joinpath(path) + + # Convert the runtimeTI to the real TaskInstance that via fetching from DB + ti = TaskInstance.get_task_instance( + ti.dag_id, ti.run_id, ti.task_id, ti.map_index if ti.map_index is not None else -1 + ) # type: ignore[assignment] Review Comment: Thanks. I rebased from master and re-test airflow upload feature. i'm hitting error while trying to create Runtime Task Instance: ``` File "/usr/python/lib/python3.10/site-packages/airflow/utils/sqlalchemy.py", line 383, in _validate_commit raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!") RuntimeError: UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS! ``` I think it's becoz the main process (i.e. scehduler is holding the global lock and thus any process that attempt to commit will fail (Strange that it doesn't appear before). I'll spend some time digging into this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org