This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 7ae3a55194 Add early job_id xcom_push for google provider Beam Pipeline operators (#42982) 7ae3a55194 is described below commit 7ae3a5519446c6e3e090ec44036883764a3dffc5 Author: olegkachur-e <qws...@gmail.com> AuthorDate: Mon Oct 14 03:49:35 2024 +0200 Add early job_id xcom_push for google provider Beam Pipeline operators (#42982) - To let GCP Beam Sensor operators 'sense' the pipeline changes, by having dataflow job_id been xcom_push as soon as it available. Related issue: https://github.com/apache/airflow/issues/30007. Co-authored-by: Oleg Kachur <kac...@google.com> --- .../airflow/providers/apache/beam/operators/beam.py | 19 +++++++++++++++++-- providers/tests/apache/beam/operators/test_beam.py | 21 +++++++++++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/providers/src/airflow/providers/apache/beam/operators/beam.py b/providers/src/airflow/providers/apache/beam/operators/beam.py index 41c55ede2a..65f2333658 100644 --- a/providers/src/airflow/providers/apache/beam/operators/beam.py +++ b/providers/src/airflow/providers/apache/beam/operators/beam.py @@ -187,7 +187,20 @@ class BeamBasePipelineOperator(BaseOperator, BeamDataflowMixin, ABC): self.gcp_conn_id = gcp_conn_id self.beam_hook: BeamHook self.dataflow_hook: DataflowHook | None = None - self.dataflow_job_id: str | None = None + self._dataflow_job_id: str | None = None + self._execute_context: Context | None = None + + @property + def dataflow_job_id(self): + return self._dataflow_job_id + + @dataflow_job_id.setter + def dataflow_job_id(self, new_value): + if all([new_value, not self._dataflow_job_id, self._execute_context]): + # push job_id as soon as it's ready, to let Sensors work before the job finished + # and job_id pushed as returned value item. + self.xcom_push(context=self._execute_context, key="dataflow_job_id", value=new_value) + self._dataflow_job_id = new_value def _cast_dataflow_config(self): if isinstance(self.dataflow_config, dict): @@ -346,6 +359,7 @@ class BeamRunPythonPipelineOperator(BeamBasePipelineOperator): def execute(self, context: Context): """Execute the Apache Beam Python Pipeline.""" + self._execute_context = context self._cast_dataflow_config() self.pipeline_options.setdefault("labels", {}).update( {"airflow-version": "v" + version.replace(".", "-").replace("+", "-")} @@ -540,6 +554,7 @@ class BeamRunJavaPipelineOperator(BeamBasePipelineOperator): def execute(self, context: Context): """Execute the Apache Beam Python Pipeline.""" + self._execute_context = context self._cast_dataflow_config() ( self.is_dataflow, @@ -738,7 +753,7 @@ class BeamRunGoPipelineOperator(BeamBasePipelineOperator): """Execute the Apache Beam Pipeline.""" if not exactly_one(self.go_file, self.launcher_binary): raise ValueError("Exactly one of `go_file` and `launcher_binary` must be set") - + self._execute_context = context self._cast_dataflow_config() if self.dataflow_config.impersonation_chain: self.log.warning( diff --git a/providers/tests/apache/beam/operators/test_beam.py b/providers/tests/apache/beam/operators/test_beam.py index 6d1b4b5d1b..fd2e706c29 100644 --- a/providers/tests/apache/beam/operators/test_beam.py +++ b/providers/tests/apache/beam/operators/test_beam.py @@ -110,6 +110,27 @@ class TestBeamBasePipelineOperator: ) assert f"{TASK_ID} completed with response Pipeline has finished SUCCESSFULLY" in caplog.text + def test_early_dataflow_id_xcom_push(self, default_options, pipeline_options): + with mock.patch.object(BeamBasePipelineOperator, "xcom_push") as mock_xcom_push: + op = BeamBasePipelineOperator( + **self.default_op_kwargs, + default_pipeline_options=copy.deepcopy(default_options), + pipeline_options=copy.deepcopy(pipeline_options), + dataflow_config={}, + ) + sample_df_job_id = "sample_df_job_id_value" + op._execute_context = MagicMock() + + assert op.dataflow_job_id is None + + op.dataflow_job_id = sample_df_job_id + mock_xcom_push.assert_called_once_with( + context=op._execute_context, key="dataflow_job_id", value=sample_df_job_id + ) + mock_xcom_push.reset_mock() + op.dataflow_job_id = "sample_df_job_same_value_id" + mock_xcom_push.assert_not_called() + class TestBeamRunPythonPipelineOperator: @pytest.fixture(autouse=True)