This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e72cfd7d9e8 Add CloudComposerTriggerDAGRunOperator for Cloud Composer 
service (#55256)
e72cfd7d9e8 is described below

commit e72cfd7d9e8b73fb5bdad9c0a0bcc83fa0cad6a3
Author: Maksim <maks...@google.com>
AuthorDate: Fri Sep 5 22:50:52 2025 +0200

    Add CloudComposerTriggerDAGRunOperator for Cloud Composer service (#55256)
---
 .../google/docs/operators/cloud/cloud_composer.rst | 12 +++
 .../providers/google/cloud/hooks/cloud_composer.py | 66 ++++++++++++++++-
 .../google/cloud/operators/cloud_composer.py       | 85 +++++++++++++++++++++-
 .../cloud/composer/example_cloud_composer.py       | 12 +++
 .../unit/google/cloud/hooks/test_cloud_composer.py | 26 +++++++
 .../google/cloud/operators/test_cloud_composer.py  | 36 +++++++++
 6 files changed, 235 insertions(+), 2 deletions(-)

diff --git a/providers/google/docs/operators/cloud/cloud_composer.rst 
b/providers/google/docs/operators/cloud/cloud_composer.rst
index cd0ada3c38f..88381b48438 100644
--- a/providers/google/docs/operators/cloud/cloud_composer.rst
+++ b/providers/google/docs/operators/cloud/cloud_composer.rst
@@ -197,3 +197,15 @@ or you can define the same sensor in the deferrable mode:
     :dedent: 4
     :start-after: [START howto_sensor_dag_run_deferrable_mode]
     :end-before: [END howto_sensor_dag_run_deferrable_mode]
+
+Trigger a DAG run
+-----------------
+
+You can trigger a DAG in another Composer environment, use:
+:class:`~airflow.providers.google.cloud.operators.cloud_composer.CloudComposerTriggerDAGRunOperator`
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/composer/example_cloud_composer.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_trigger_dag_run]
+    :end-before: [END howto_operator_trigger_dag_run]
diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py 
b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py
index 5026b63005e..9c963e47b0f 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_composer.py
@@ -18,12 +18,15 @@
 from __future__ import annotations
 
 import asyncio
+import json
 import time
 from collections.abc import MutableSequence, Sequence
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
+from urllib.parse import urljoin
 
 from google.api_core.client_options import ClientOptions
 from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
+from google.auth.transport.requests import AuthorizedSession
 from google.cloud.orchestration.airflow.service_v1 import (
     EnvironmentsAsyncClient,
     EnvironmentsClient,
@@ -76,6 +79,34 @@ class CloudComposerHook(GoogleBaseHook, OperationHelper):
             client_options=self.client_options,
         )
 
+    def make_composer_airflow_api_request(
+        self,
+        method: str,
+        airflow_uri: str,
+        path: str,
+        data: Any | None = None,
+        timeout: float | None = None,
+    ):
+        """
+        Make a request to Cloud Composer environment's web server.
+
+        :param method: The request method to use ('GET', 'OPTIONS', 'HEAD', 
'POST', 'PUT', 'PATCH', 'DELETE').
+        :param airflow_uri: The URI of the Apache Airflow Web UI hosted within 
this environment.
+        :param path: The path to send the request.
+        :param data: Dictionary, list of tuples, bytes, or file-like object to 
send in the body of the request.
+        :param timeout: The timeout for this request.
+        """
+        authed_session = AuthorizedSession(self.get_credentials())
+
+        resp = authed_session.request(
+            method=method,
+            url=urljoin(airflow_uri, path),
+            data=data,
+            headers={"Content-Type": "application/json"},
+            timeout=timeout,
+        )
+        return resp
+
     def get_operation(self, operation_name):
         return 
self.get_environment_client().transport.operations_client.get_operation(name=operation_name)
 
@@ -408,6 +439,39 @@ class CloudComposerHook(GoogleBaseHook, OperationHelper):
             self.log.info("Waiting for result...")
             time.sleep(poll_interval)
 
+    def trigger_dag_run(
+        self,
+        composer_airflow_uri: str,
+        composer_dag_id: str,
+        composer_dag_conf: dict | None = None,
+        timeout: float | None = None,
+    ) -> dict:
+        """
+        Trigger DAG run for provided Apache Airflow Web UI hosted within 
Composer environment.
+
+        :param composer_airflow_uri: The URI of the Apache Airflow Web UI 
hosted within Composer environment.
+        :param composer_dag_id: The ID of DAG which will be triggered.
+        :param composer_dag_conf: Configuration parameters for the DAG run.
+        :param timeout: The timeout for this request.
+        """
+        response = self.make_composer_airflow_api_request(
+            method="POST",
+            airflow_uri=composer_airflow_uri,
+            path=f"/api/v1/dags/{composer_dag_id}/dagRuns",
+            data=json.dumps(
+                {
+                    "conf": composer_dag_conf or {},
+                }
+            ),
+            timeout=timeout,
+        )
+
+        if response.status_code != 200:
+            self.log.error(response.text)
+            response.raise_for_status()
+
+        return response.json()
+
 
 class CloudComposerAsyncHook(GoogleBaseHook):
     """Hook for Google Cloud Composer async APIs."""
diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/cloud_composer.py
 
b/providers/google/src/airflow/providers/google/cloud/operators/cloud_composer.py
index 8d5a13561d4..5783dd816ad 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/operators/cloud_composer.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/operators/cloud_composer.py
@@ -21,7 +21,7 @@ import shlex
 from collections.abc import Sequence
 from typing import TYPE_CHECKING, Any
 
-from google.api_core.exceptions import AlreadyExists
+from google.api_core.exceptions import AlreadyExists, NotFound
 from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
 from google.cloud.orchestration.airflow.service_v1 import ImageVersion
 from google.cloud.orchestration.airflow.service_v1.types import Environment, 
ExecuteAirflowCommandResponse
@@ -798,3 +798,86 @@ class 
CloudComposerRunAirflowCLICommandOperator(GoogleCloudBaseOperator):
         """Merge output to one string."""
         result_str = "\n".join(line_dict["content"] for line_dict in 
result["output"])
         return result_str
+
+
+class CloudComposerTriggerDAGRunOperator(GoogleCloudBaseOperator):
+    """
+    Trigger DAG run for provided Composer environment.
+
+    :param project_id: The ID of the Google Cloud project that the service 
belongs to.
+    :param region: The ID of the Google Cloud region that the service belongs 
to.
+    :param environment_id: The ID of the Google Cloud environment that the 
service belongs to.
+    :param composer_dag_id: The ID of DAG which will be triggered.
+    :param composer_dag_conf: Configuration parameters for the DAG run.
+    :param timeout: The timeout for this request.
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud 
Platform.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields = (
+        "project_id",
+        "region",
+        "environment_id",
+        "composer_dag_id",
+        "impersonation_chain",
+    )
+
+    def __init__(
+        self,
+        *,
+        project_id: str,
+        region: str,
+        environment_id: str,
+        composer_dag_id: str,
+        composer_dag_conf: dict | None = None,
+        timeout: float | None = None,
+        gcp_conn_id: str = "google_cloud_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.environment_id = environment_id
+        self.composer_dag_id = composer_dag_id
+        self.composer_dag_conf = composer_dag_conf or {}
+        self.timeout = timeout
+        self.gcp_conn_id = gcp_conn_id
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: Context):
+        hook = CloudComposerHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+        try:
+            environment = hook.get_environment(
+                project_id=self.project_id,
+                region=self.region,
+                environment_id=self.environment_id,
+                timeout=self.timeout,
+            )
+        except NotFound as not_found_err:
+            self.log.info("The Composer environment %s does not exist.", 
self.environment_id)
+            raise AirflowException(not_found_err)
+        composer_airflow_uri = environment.config.airflow_uri
+
+        self.log.info(
+            "Triggering the DAG %s on the %s environment...", 
self.composer_dag_id, self.environment_id
+        )
+        dag_run = hook.trigger_dag_run(
+            composer_airflow_uri=composer_airflow_uri,
+            composer_dag_id=self.composer_dag_id,
+            composer_dag_conf=self.composer_dag_conf,
+            timeout=self.timeout,
+        )
+        self.log.info("The DAG %s was triggered with Run ID: %s", 
self.composer_dag_id, dag_run["dag_run_id"])
+
+        return dag_run
diff --git 
a/providers/google/tests/system/google/cloud/composer/example_cloud_composer.py 
b/providers/google/tests/system/google/cloud/composer/example_cloud_composer.py
index 48cccbaedb1..21e635a3853 100644
--- 
a/providers/google/tests/system/google/cloud/composer/example_cloud_composer.py
+++ 
b/providers/google/tests/system/google/cloud/composer/example_cloud_composer.py
@@ -40,6 +40,7 @@ from airflow.providers.google.cloud.operators.cloud_composer 
import (
     CloudComposerListEnvironmentsOperator,
     CloudComposerListImageVersionsOperator,
     CloudComposerRunAirflowCLICommandOperator,
+    CloudComposerTriggerDAGRunOperator,
     CloudComposerUpdateEnvironmentOperator,
 )
 from airflow.providers.google.cloud.sensors.cloud_composer import 
CloudComposerDAGRunSensor
@@ -218,6 +219,16 @@ with DAG(
     )
     # [END howto_sensor_dag_run_deferrable_mode]
 
+    # [START howto_operator_trigger_dag_run]
+    trigger_dag_run = CloudComposerTriggerDAGRunOperator(
+        task_id="trigger_dag_run",
+        project_id=PROJECT_ID,
+        region=REGION,
+        environment_id=ENVIRONMENT_ID,
+        composer_dag_id="airflow_monitoring",
+    )
+    # [END howto_operator_trigger_dag_run]
+
     # [START howto_operator_delete_composer_environment]
     delete_env = CloudComposerDeleteEnvironmentOperator(
         task_id="delete_env",
@@ -250,6 +261,7 @@ with DAG(
         [update_env, defer_update_env],
         [run_airflow_cli_cmd, defer_run_airflow_cli_cmd],
         [dag_run_sensor, defer_dag_run_sensor],
+        trigger_dag_run,
         # TEST TEARDOWN
         [delete_env, defer_delete_env],
     )
diff --git 
a/providers/google/tests/unit/google/cloud/hooks/test_cloud_composer.py 
b/providers/google/tests/unit/google/cloud/hooks/test_cloud_composer.py
index a7ef0e347fe..4a371794793 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_cloud_composer.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_cloud_composer.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+import json
 from unittest import mock
 from unittest.mock import AsyncMock
 
@@ -56,6 +57,10 @@ TEST_METADATA = [("key", "value")]
 TEST_PARENT = "test-parent"
 TEST_NAME = "test-name"
 
+TEST_COMPOSER_AIRFLOW_URI = "test-composer-airflow-uri"
+TEST_COMPOSER_DAG_ID = "test-composer-dag-id"
+TEST_COMPOSER_DAG_CONF = {"test-key": "test-value"}
+
 BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}"
 COMPOSER_STRING = "airflow.providers.google.cloud.hooks.cloud_composer.{}"
 
@@ -257,6 +262,27 @@ class TestCloudComposerHook:
             metadata=TEST_METADATA,
         )
 
+    
@mock.patch(COMPOSER_STRING.format("CloudComposerHook.make_composer_airflow_api_request"))
+    def test_trigger_dag_run(self, mock_composer_airflow_api_request) -> None:
+        self.hook.get_credentials = mock.MagicMock()
+        self.hook.trigger_dag_run(
+            composer_airflow_uri=TEST_COMPOSER_AIRFLOW_URI,
+            composer_dag_id=TEST_COMPOSER_DAG_ID,
+            composer_dag_conf=TEST_COMPOSER_DAG_CONF,
+            timeout=TEST_TIMEOUT,
+        )
+        mock_composer_airflow_api_request.assert_called_once_with(
+            method="POST",
+            airflow_uri=TEST_COMPOSER_AIRFLOW_URI,
+            path=f"/api/v1/dags/{TEST_COMPOSER_DAG_ID}/dagRuns",
+            data=json.dumps(
+                {
+                    "conf": TEST_COMPOSER_DAG_CONF,
+                }
+            ),
+            timeout=TEST_TIMEOUT,
+        )
+
 
 class TestCloudComposerAsyncHook:
     def setup_method(self, method):
diff --git 
a/providers/google/tests/unit/google/cloud/operators/test_cloud_composer.py 
b/providers/google/tests/unit/google/cloud/operators/test_cloud_composer.py
index e6da6cf5cf9..0ca1738961c 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_cloud_composer.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_cloud_composer.py
@@ -29,6 +29,7 @@ from airflow.providers.google.cloud.operators.cloud_composer 
import (
     CloudComposerListEnvironmentsOperator,
     CloudComposerListImageVersionsOperator,
     CloudComposerRunAirflowCLICommandOperator,
+    CloudComposerTriggerDAGRunOperator,
     CloudComposerUpdateEnvironmentOperator,
 )
 from airflow.providers.google.cloud.triggers.cloud_composer import (
@@ -67,6 +68,9 @@ TEST_METADATA = [("key", "value")]
 TEST_PARENT = "test-parent"
 TEST_NAME = "test-name"
 
+TEST_COMPOSER_DAG_ID = "test-composer-dag-id"
+TEST_COMPOSER_DAG_CONF = {"test-key": "test-value"}
+
 COMPOSER_STRING = "airflow.providers.google.cloud.operators.cloud_composer.{}"
 COMPOSER_TRIGGERS_STRING = 
"airflow.providers.google.cloud.triggers.cloud_composer.{}"
 
@@ -375,3 +379,35 @@ class TestCloudComposerRunAirflowCLICommandOperator:
 
         assert isinstance(exc.value.trigger, 
CloudComposerAirflowCLICommandTrigger)
         assert exc.value.method_name == GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME
+
+
+class TestCloudComposerTriggerDAGRunOperator:
+    @mock.patch(COMPOSER_STRING.format("CloudComposerHook"))
+    def test_execute(self, mock_hook) -> None:
+        op = CloudComposerTriggerDAGRunOperator(
+            task_id=TASK_ID,
+            project_id=TEST_GCP_PROJECT,
+            region=TEST_GCP_REGION,
+            environment_id=TEST_ENVIRONMENT_ID,
+            composer_dag_id=TEST_COMPOSER_DAG_ID,
+            composer_dag_conf=TEST_COMPOSER_DAG_CONF,
+            gcp_conn_id=TEST_GCP_CONN_ID,
+            timeout=TEST_TIMEOUT,
+        )
+        op.execute(mock.MagicMock())
+        mock_hook.assert_called_once_with(
+            gcp_conn_id=TEST_GCP_CONN_ID,
+            impersonation_chain=TEST_IMPERSONATION_CHAIN,
+        )
+        mock_hook.return_value.get_environment.assert_called_once_with(
+            project_id=TEST_GCP_PROJECT,
+            region=TEST_GCP_REGION,
+            environment_id=TEST_ENVIRONMENT_ID,
+            timeout=TEST_TIMEOUT,
+        )
+        mock_hook.return_value.trigger_dag_run.assert_called_once_with(
+            
composer_airflow_uri=mock_hook.return_value.get_environment.return_value.config.airflow_uri,
+            composer_dag_id=TEST_COMPOSER_DAG_ID,
+            composer_dag_conf=TEST_COMPOSER_DAG_CONF,
+            timeout=TEST_TIMEOUT,
+        )

Reply via email to