This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ae3a55194 Add early job_id xcom_push for google provider Beam 
Pipeline operators (#42982)
7ae3a55194 is described below

commit 7ae3a5519446c6e3e090ec44036883764a3dffc5
Author: olegkachur-e <qws...@gmail.com>
AuthorDate: Mon Oct 14 03:49:35 2024 +0200

    Add early job_id xcom_push for google provider Beam Pipeline operators 
(#42982)
    
    - To let GCP Beam Sensor operators 'sense' the pipeline changes,
    by having dataflow job_id been xcom_push as soon as it available.
    
    Related issue: https://github.com/apache/airflow/issues/30007.
    
    Co-authored-by: Oleg Kachur <kac...@google.com>
---
 .../airflow/providers/apache/beam/operators/beam.py | 19 +++++++++++++++++--
 providers/tests/apache/beam/operators/test_beam.py  | 21 +++++++++++++++++++++
 2 files changed, 38 insertions(+), 2 deletions(-)

diff --git a/providers/src/airflow/providers/apache/beam/operators/beam.py 
b/providers/src/airflow/providers/apache/beam/operators/beam.py
index 41c55ede2a..65f2333658 100644
--- a/providers/src/airflow/providers/apache/beam/operators/beam.py
+++ b/providers/src/airflow/providers/apache/beam/operators/beam.py
@@ -187,7 +187,20 @@ class BeamBasePipelineOperator(BaseOperator, 
BeamDataflowMixin, ABC):
         self.gcp_conn_id = gcp_conn_id
         self.beam_hook: BeamHook
         self.dataflow_hook: DataflowHook | None = None
-        self.dataflow_job_id: str | None = None
+        self._dataflow_job_id: str | None = None
+        self._execute_context: Context | None = None
+
+    @property
+    def dataflow_job_id(self):
+        return self._dataflow_job_id
+
+    @dataflow_job_id.setter
+    def dataflow_job_id(self, new_value):
+        if all([new_value, not self._dataflow_job_id, self._execute_context]):
+            # push job_id as soon as it's ready, to let Sensors work before 
the job finished
+            # and job_id pushed as returned value item.
+            self.xcom_push(context=self._execute_context, 
key="dataflow_job_id", value=new_value)
+        self._dataflow_job_id = new_value
 
     def _cast_dataflow_config(self):
         if isinstance(self.dataflow_config, dict):
@@ -346,6 +359,7 @@ class 
BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
 
     def execute(self, context: Context):
         """Execute the Apache Beam Python Pipeline."""
+        self._execute_context = context
         self._cast_dataflow_config()
         self.pipeline_options.setdefault("labels", {}).update(
             {"airflow-version": "v" + version.replace(".", "-").replace("+", 
"-")}
@@ -540,6 +554,7 @@ class BeamRunJavaPipelineOperator(BeamBasePipelineOperator):
 
     def execute(self, context: Context):
         """Execute the Apache Beam Python Pipeline."""
+        self._execute_context = context
         self._cast_dataflow_config()
         (
             self.is_dataflow,
@@ -738,7 +753,7 @@ class BeamRunGoPipelineOperator(BeamBasePipelineOperator):
         """Execute the Apache Beam Pipeline."""
         if not exactly_one(self.go_file, self.launcher_binary):
             raise ValueError("Exactly one of `go_file` and `launcher_binary` 
must be set")
-
+        self._execute_context = context
         self._cast_dataflow_config()
         if self.dataflow_config.impersonation_chain:
             self.log.warning(
diff --git a/providers/tests/apache/beam/operators/test_beam.py 
b/providers/tests/apache/beam/operators/test_beam.py
index 6d1b4b5d1b..fd2e706c29 100644
--- a/providers/tests/apache/beam/operators/test_beam.py
+++ b/providers/tests/apache/beam/operators/test_beam.py
@@ -110,6 +110,27 @@ class TestBeamBasePipelineOperator:
         )
         assert f"{TASK_ID} completed with response Pipeline has finished 
SUCCESSFULLY" in caplog.text
 
+    def test_early_dataflow_id_xcom_push(self, default_options, 
pipeline_options):
+        with mock.patch.object(BeamBasePipelineOperator, "xcom_push") as 
mock_xcom_push:
+            op = BeamBasePipelineOperator(
+                **self.default_op_kwargs,
+                default_pipeline_options=copy.deepcopy(default_options),
+                pipeline_options=copy.deepcopy(pipeline_options),
+                dataflow_config={},
+            )
+            sample_df_job_id = "sample_df_job_id_value"
+            op._execute_context = MagicMock()
+
+            assert op.dataflow_job_id is None
+
+            op.dataflow_job_id = sample_df_job_id
+            mock_xcom_push.assert_called_once_with(
+                context=op._execute_context, key="dataflow_job_id", 
value=sample_df_job_id
+            )
+            mock_xcom_push.reset_mock()
+            op.dataflow_job_id = "sample_df_job_same_value_id"
+            mock_xcom_push.assert_not_called()
+
 
 class TestBeamRunPythonPipelineOperator:
     @pytest.fixture(autouse=True)

Reply via email to