mik-laj commented on a change in pull request #21785: URL: https://github.com/apache/airflow/pull/21785#discussion_r821968801
########## File path: airflow/providers/alibaba/cloud/log/oss_task_handler.py ########## @@ -0,0 +1,186 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import os +import sys + +if sys.version_info >= (3, 8): + from functools import cached_property +else: + from cached_property import cached_property + +from airflow.configuration import conf +from airflow.providers.alibaba.cloud.hooks.oss import OSSHook +from airflow.utils.log.file_task_handler import FileTaskHandler +from airflow.utils.log.logging_mixin import LoggingMixin + + +class OSSTaskHandler(FileTaskHandler, LoggingMixin): + """ + OSSTaskHandler is a python log handler that handles and reads + task instance logs. It extends airflow FileTaskHandler and + uploads to and reads from OSS remote storage. + """ + + def __init__(self, base_log_folder, oss_log_folder, filename_template): + self.log.info("Using oss_task_handler for remote logging...") + super().__init__(base_log_folder, filename_template) + (self.bucket_name, self.base_folder) = OSSHook.parse_oss_url(oss_log_folder) + self.log_relative_path = '' + self._hook = None + self.closed = False + self.upload_on_close = True + + @cached_property + def hook(self): + remote_conn_id = conf.get('logging', 'REMOTE_LOG_CONN_ID') + self.log.info("remote_conn_id: %s", remote_conn_id) + try: + return OSSHook(oss_conn_id=remote_conn_id) + except Exception as e: + self.log.error(e, exc_info=True) + self.log.error( + 'Could not create an OSSHook with connection id "%s". ' + 'Please make sure that airflow[oss] is installed and ' + 'the OSS connection exists.', + remote_conn_id, + ) + + def set_context(self, ti): + super().set_context(ti) + # Local location and remote location is needed to open and + # upload local log file to OSS remote storage. + self.log_relative_path = self._render_filename(ti, ti.try_number) + self.upload_on_close = not ti.raw + + # Clear the file first so that duplicate data is not uploaded + # when re-using the same path (e.g. with rescheduled sensors) + if self.upload_on_close: + with open(self.handler.baseFilename, 'w'): + pass + + def close(self): + """Close and upload local log file to remote storage OSS.""" + # When application exit, system shuts down all handlers by + # calling close method. Here we check if logger is already + # closed to prevent uploading the log to remote storage multiple + # times when `logging.shutdown` is called. + if self.closed: + return + + super().close() + + if not self.upload_on_close: + return + + local_loc = os.path.join(self.local_base, self.log_relative_path) + remote_loc = self.log_relative_path + if os.path.exists(local_loc): + # read log and remove old logs to get just the latest additions + with open(local_loc) as logfile: + log = logfile.read() + self.oss_write(log, remote_loc) + + # Mark closed so we don't double write if close is called twice + self.closed = True + + def _read(self, ti, try_number, metadata=None): + """ + Read logs of given task instance and try_number from OSS remote storage. + If failed, read the log from task instance host machine. + + :param ti: task instance object + :param try_number: task instance try_number to read logs from + :param metadata: log metadata, + can be used for steaming log reading and auto-tailing. + """ + # Explicitly getting log relative path is necessary as the given + # task instance might be different than task instance passed in + # in set_context method. + log_relative_path = self._render_filename(ti, try_number) + remote_loc = log_relative_path + + if self.oss_log_exists(remote_loc): + # If OSS remote file exists, we do not fetch logs from task instance + # local machine even if there are errors reading remote logs, as + # returned remote_log will contain error messages. + remote_log = self.oss_read(remote_loc, return_error=True) + log = f'*** Reading remote log from {remote_loc}.\n{remote_log}\n' + return log, {'end_of_log': True} + else: + return super()._read(ti, try_number) + + def oss_log_exists(self, remote_log_location): + """ + Check if remote_log_location exists in remote storage + + :param remote_log_location: log's location in remote storage + :return: True if location exists else False + """ + oss_remote_log_location = self.base_folder + '/' + remote_log_location + try: + return self.hook.key_exist(self.bucket_name, oss_remote_log_location) + except Exception: + pass + return False + + def oss_read(self, remote_log_location, return_error=False): + """ + Returns the log found at the remote_log_location. Returns '' if no + logs are found or there is an error. + + :param remote_log_location: the log's location in remote storage + :param return_error: if True, returns a string error message if an + error occurs. Otherwise returns '' when an error occurs. + """ + try: + oss_remote_log_location = self.base_folder + '/' + remote_log_location + self.log.info("read remote log: " + oss_remote_log_location) Review comment: ```suggestion self.log.info("read remote log: %s", oss_remote_log_location) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
