kotaaaa commented on issue #22907:
URL: https://github.com/apache/airflow/issues/22907#issuecomment-1094989998
@eladkal, thank you for your reply!
For my case, process is folked by the result of `select count(*) from
{bigquery's table}`
so, I created class like below.
In not only my case, but also general usecase, it might happen that the
result of the query to bigquery's table will decide whether it should skip the
process or proceed, I thought.
(below is actually Airflow v1)
```
from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowSkipException
from airflow.models import BaseOperator
from airflow.hooks.base_hook import BaseHook
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.utils.decorators import apply_defaults
class SQLCheckSkipOperator(BaseOperator):
...
def execute(self, context=None):
self.log.info("Executing SQL check: %s", self.sql)
records = self.get_db_hook().get_first(self.sql)
self.log.info("Record: %s", records)
if not records:
raise AirflowSkipException("The query returned None") # skip the
upstream tasks
elif not all([bool(r) for r in records]):
raise AirflowSkipException(
"Test
failed.\nQuery:\n{query}\nResults:\n{records!s}".format(
query=self.sql, records=records
)
)
self.log.info("Success.")
...
class BigQueryCheckSkipOperator(SQLCheckSkipOperator):
...
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]