This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 87c1246  Add Dataplex operators (#20377)
87c1246 is described below

commit 87c1246b79769f20214a339aadc6a8270d453953
Author: Wojciech Januszek <[email protected]>
AuthorDate: Mon Mar 14 19:07:49 2022 +0000

    Add Dataplex operators (#20377)
---
 .../google/cloud/example_dags/example_dataplex.py  | 122 ++++++
 airflow/providers/google/cloud/hooks/dataplex.py   | 247 ++++++++++++
 airflow/providers/google/cloud/links/dataplex.py   |  76 ++++
 .../providers/google/cloud/operators/dataplex.py   | 428 +++++++++++++++++++++
 airflow/providers/google/cloud/sensors/dataplex.py | 119 ++++++
 airflow/providers/google/provider.yaml             |  16 +
 .../operators/cloud/dataplex.rst                   | 105 +++++
 docs/spelling_wordlist.txt                         |   3 +
 setup.py                                           |   1 +
 .../providers/google/cloud/hooks/test_dataplex.py  | 120 ++++++
 .../google/cloud/operators/test_dataplex.py        | 179 +++++++++
 .../google/cloud/operators/test_dataplex_system.py |  45 +++
 .../google/cloud/sensors/test_dataplex.py          | 103 +++++
 13 files changed, 1564 insertions(+)

diff --git a/airflow/providers/google/cloud/example_dags/example_dataplex.py 
b/airflow/providers/google/cloud/example_dags/example_dataplex.py
new file mode 100644
index 0000000..aabe17a
--- /dev/null
+++ b/airflow/providers/google/cloud/example_dags/example_dataplex.py
@@ -0,0 +1,122 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that shows how to use Dataplex.
+"""
+
+import datetime
+import os
+
+from airflow import models
+from airflow.models.baseoperator import chain
+from airflow.providers.google.cloud.operators.dataplex import (
+    DataplexCreateTaskOperator,
+    DataplexDeleteTaskOperator,
+    DataplexGetTaskOperator,
+    DataplexListTasksOperator,
+)
+from airflow.providers.google.cloud.sensors.dataplex import 
DataplexTaskStateSensor
+
+PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "INVALID PROJECT ID")
+REGION = os.environ.get("GCP_REGION", "INVALID REGION")
+LAKE_ID = os.environ.get("GCP_LAKE_ID", "INVALID LAKE ID")
+SERVICE_ACC = os.environ.get("GCP_DATAPLEX_SERVICE_ACC", 
"[email protected]")
+BUCKET = os.environ.get("GCP_DATAPLEX_BUCKET", "INVALID BUCKET NAME")
+SPARK_FILE_NAME = os.environ.get("SPARK_FILE_NAME", "INVALID FILE NAME")
+SPARK_FILE_FULL_PATH = f"gs://{BUCKET}/{SPARK_FILE_NAME}"
+DATAPLEX_TASK_ID = "task001"
+TRIGGER_SPEC_TYPE = "ON_DEMAND"
+
+# [START howto_dataplex_configuration]
+EXAMPLE_TASK_BODY = {
+    "trigger_spec": {"type_": TRIGGER_SPEC_TYPE},
+    "execution_spec": {"service_account": SERVICE_ACC},
+    "spark": {"python_script_file": SPARK_FILE_FULL_PATH},
+}
+# [END howto_dataplex_configuration]
+
+with models.DAG(
+    "example_dataplex", start_date=datetime.datetime(2021, 1, 1), 
schedule_interval="@once", catchup=False
+) as dag:
+    # [START howto_dataplex_create_task_operator]
+    create_dataplex_task = DataplexCreateTaskOperator(
+        project_id=PROJECT_ID,
+        region=REGION,
+        lake_id=LAKE_ID,
+        body=EXAMPLE_TASK_BODY,
+        dataplex_task_id=DATAPLEX_TASK_ID,
+        task_id="create_dataplex_task",
+    )
+    # [END howto_dataplex_create_task_operator]
+
+    # [START howto_dataplex_async_create_task_operator]
+    create_dataplex_task_async = DataplexCreateTaskOperator(
+        project_id=PROJECT_ID,
+        region=REGION,
+        lake_id=LAKE_ID,
+        body=EXAMPLE_TASK_BODY,
+        dataplex_task_id=DATAPLEX_TASK_ID,
+        asynchronous=True,
+        task_id="create_dataplex_task_async",
+    )
+    # [END howto_dataplex_async_create_task_operator]
+
+    # [START howto_dataplex_delete_task_operator]
+    delete_dataplex_task = DataplexDeleteTaskOperator(
+        project_id=PROJECT_ID,
+        region=REGION,
+        lake_id=LAKE_ID,
+        dataplex_task_id=DATAPLEX_TASK_ID,
+        task_id="delete_dataplex_task",
+    )
+    # [END howto_dataplex_delete_task_operator]
+
+    # [START howto_dataplex_list_tasks_operator]
+    list_dataplex_task = DataplexListTasksOperator(
+        project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, 
task_id="list_dataplex_task"
+    )
+    # [END howto_dataplex_list_tasks_operator]
+
+    # [START howto_dataplex_get_task_operator]
+    get_dataplex_task = DataplexGetTaskOperator(
+        project_id=PROJECT_ID,
+        region=REGION,
+        lake_id=LAKE_ID,
+        dataplex_task_id=DATAPLEX_TASK_ID,
+        task_id="get_dataplex_task",
+    )
+    # [END howto_dataplex_get_task_operator]
+
+    # [START howto_dataplex_task_state_sensor]
+    dataplex_task_state = DataplexTaskStateSensor(
+        project_id=PROJECT_ID,
+        region=REGION,
+        lake_id=LAKE_ID,
+        dataplex_task_id=DATAPLEX_TASK_ID,
+        task_id="dataplex_task_state",
+    )
+    # [END howto_dataplex_task_state_sensor]
+
+    chain(
+        create_dataplex_task,
+        get_dataplex_task,
+        list_dataplex_task,
+        delete_dataplex_task,
+        create_dataplex_task_async,
+        dataplex_task_state,
+    )
diff --git a/airflow/providers/google/cloud/hooks/dataplex.py 
b/airflow/providers/google/cloud/hooks/dataplex.py
new file mode 100644
index 0000000..a553dac
--- /dev/null
+++ b/airflow/providers/google/cloud/hooks/dataplex.py
@@ -0,0 +1,247 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""This module contains Google Dataplex hook."""
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
+
+from google.api_core.operation import Operation
+from google.api_core.retry import Retry
+from google.cloud.dataplex_v1 import DataplexServiceClient
+from google.cloud.dataplex_v1.types import Task
+from googleapiclient.discovery import Resource
+
+from airflow.exceptions import AirflowException
+from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
+
+
+class DataplexHook(GoogleBaseHook):
+    """
+    Hook for Google Dataplex.
+
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param delegate_to: The account to impersonate, if any. For this to work, 
the service accountmaking the
+        request must have  domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    _conn = None  # type: Optional[Resource]
+
+    def __init__(
+        self,
+        api_version: str = "v1",
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+    ) -> None:
+        super().__init__(
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            impersonation_chain=impersonation_chain,
+        )
+        self.api_version = api_version
+
+    def get_dataplex_client(self) -> DataplexServiceClient:
+        """Returns DataplexServiceClient."""
+        client_options = {'api_endpoint': 'dataplex.googleapis.com:443'}
+
+        return DataplexServiceClient(
+            credentials=self._get_credentials(), client_info=self.client_info, 
client_options=client_options
+        )
+
+    def wait_for_operation(self, timeout: Optional[float], operation: 
Operation):
+        """Waits for long-lasting operation to complete."""
+        try:
+            return operation.result(timeout=timeout)
+        except Exception:
+            error = operation.exception(timeout=timeout)
+            raise AirflowException(error)
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def create_task(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        body: Union[Dict[str, Any], Task],
+        dataplex_task_id: str,
+        validate_only: Optional[bool] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Sequence[Tuple[str, str]] = (),
+    ) -> Any:
+        """
+        Creates a task resource within a lake.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the task belongs to.
+        :param region: Required. The ID of the Google Cloud region that the 
task belongs to.
+        :param lake_id: Required. The ID of the Google Cloud lake that the 
task belongs to.
+        :param body: Required. The Request body contains an instance of Task.
+        :param dataplex_task_id: Required. Task identifier.
+        :param validate_only: Optional. Only validate the request, but do not 
perform mutations.
+            The default is false.
+        :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+            will not be retried.
+        :param timeout: The amount of time, in seconds, to wait for the 
request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Additional metadata that is provided to the method.
+        """
+        parent = f'projects/{project_id}/locations/{region}/lakes/{lake_id}'
+
+        client = self.get_dataplex_client()
+        result = client.create_task(
+            request={
+                'parent': parent,
+                'task_id': dataplex_task_id,
+                'task': body,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def delete_task(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        dataplex_task_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Sequence[Tuple[str, str]] = (),
+    ) -> Any:
+        """
+        Delete the task resource.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the task belongs to.
+        :param region: Required. The ID of the Google Cloud region that the 
task belongs to.
+        :param lake_id: Required. The ID of the Google Cloud lake that the 
task belongs to.
+        :param dataplex_task_id: Required. The ID of the Google Cloud task to 
be deleted.
+        :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+            will not be retried.
+        :param timeout: The amount of time, in seconds, to wait for the 
request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Additional metadata that is provided to the method.
+        """
+        name = 
f'projects/{project_id}/locations/{region}/lakes/{lake_id}/tasks/{dataplex_task_id}'
+
+        client = self.get_dataplex_client()
+        result = client.delete_task(
+            request={
+                'name': name,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def list_tasks(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        page_size: Optional[int] = None,
+        page_token: Optional[str] = None,
+        filter: Optional[str] = None,
+        order_by: Optional[str] = None,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Sequence[Tuple[str, str]] = (),
+    ) -> Any:
+        """
+        Lists tasks under the given lake.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the task belongs to.
+        :param region: Required. The ID of the Google Cloud region that the 
task belongs to.
+        :param lake_id: Required. The ID of the Google Cloud lake that the 
task belongs to.
+        :param page_size: Optional. Maximum number of tasks to return. The 
service may return fewer than this
+            value. If unspecified, at most 10 tasks will be returned. The 
maximum value is 1000;
+            values above 1000 will be coerced to 1000.
+        :param page_token: Optional. Page token received from a previous 
ListZones call. Provide this to
+            retrieve the subsequent page. When paginating, all other 
parameters provided to ListZones must
+            match the call that provided the page token.
+        :param filter: Optional. Filter request.
+        :param order_by: Optional. Order by fields for the result.
+        :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+            will not be retried.
+        :param timeout: The amount of time, in seconds, to wait for the 
request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Additional metadata that is provided to the method.
+        """
+        parent = f'projects/{project_id}/locations/{region}/lakes/{lake_id}'
+
+        client = self.get_dataplex_client()
+        result = client.list_tasks(
+            request={
+                'parent': parent,
+                'page_size': page_size,
+                'page_token': page_token,
+                'filter': filter,
+                'order_by': order_by,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
+
+    @GoogleBaseHook.fallback_to_default_project_id
+    def get_task(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        dataplex_task_id: str,
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Sequence[Tuple[str, str]] = (),
+    ) -> Any:
+        """
+        Get task resource.
+
+        :param project_id: Required. The ID of the Google Cloud project that 
the task belongs to.
+        :param region: Required. The ID of the Google Cloud region that the 
task belongs to.
+        :param lake_id: Required. The ID of the Google Cloud lake that the 
task belongs to.
+        :param dataplex_task_id: Required. The ID of the Google Cloud task to 
be retrieved.
+        :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+            will not be retried.
+        :param timeout: The amount of time, in seconds, to wait for the 
request to complete.
+            Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+        :param metadata: Additional metadata that is provided to the method.
+        """
+        name = 
f'projects/{project_id}/locations/{region}/lakes/{lake_id}/tasks/{dataplex_task_id}'
+        client = self.get_dataplex_client()
+        result = client.get_task(
+            request={
+                'name': name,
+            },
+            retry=retry,
+            timeout=timeout,
+            metadata=metadata,
+        )
+        return result
diff --git a/airflow/providers/google/cloud/links/dataplex.py 
b/airflow/providers/google/cloud/links/dataplex.py
new file mode 100644
index 0000000..8c08d83
--- /dev/null
+++ b/airflow/providers/google/cloud/links/dataplex.py
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google Dataplex links."""
+
+from typing import TYPE_CHECKING
+
+from airflow.providers.google.cloud.links.base import BaseGoogleLink
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+DATAPLEX_BASE_LINK = "https://console.cloud.google.com/dataplex/process/tasks";
+DATAPLEX_TASK_LINK = DATAPLEX_BASE_LINK + 
"/{lake_id}.{task_id};location={region}/jobs?project={project_id}"
+DATAPLEX_TASKS_LINK = DATAPLEX_BASE_LINK + 
"?project={project_id}&qLake={lake_id}.{region}"
+
+
+class DataplexTaskLink(BaseGoogleLink):
+    """Helper class for constructing Dataplex Task link"""
+
+    name = "Dataplex Task"
+    key = "task_conf"
+    format_str = DATAPLEX_TASK_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance,
+    ):
+        task_instance.xcom_push(
+            context=context,
+            key=DataplexTaskLink.key,
+            value={
+                "lake_id": task_instance.lake_id,
+                "task_id": task_instance.dataplex_task_id,
+                "region": task_instance.region,
+                "project_id": task_instance.project_id,
+            },
+        )
+
+
+class DataplexTasksLink(BaseGoogleLink):
+    """Helper class for constructing Dataplex Tasks link"""
+
+    name = "Dataplex Tasks"
+    key = "tasks_conf"
+    format_str = DATAPLEX_TASKS_LINK
+
+    @staticmethod
+    def persist(
+        context: "Context",
+        task_instance,
+    ):
+        task_instance.xcom_push(
+            context=context,
+            key=DataplexTasksLink.key,
+            value={
+                "project_id": task_instance.project_id,
+                "lake_id": task_instance.lake_id,
+                "region": task_instance.region,
+            },
+        )
diff --git a/airflow/providers/google/cloud/operators/dataplex.py 
b/airflow/providers/google/cloud/operators/dataplex.py
new file mode 100644
index 0000000..21a89ea
--- /dev/null
+++ b/airflow/providers/google/cloud/operators/dataplex.py
@@ -0,0 +1,428 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""This module contains Google Dataplex operators."""
+from time import sleep
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Tuple, 
Union
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+from google.api_core.retry import Retry, exponential_sleep_generator
+from google.cloud.dataplex_v1.types import Task
+from googleapiclient.errors import HttpError
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.hooks.dataplex import DataplexHook
+from airflow.providers.google.cloud.links.dataplex import DataplexTaskLink, 
DataplexTasksLink
+
+
+class DataplexCreateTaskOperator(BaseOperator):
+    """
+    Creates a task resource within a lake.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
task belongs to.
+    :param region: Required. The ID of the Google Cloud region that the task 
belongs to.
+    :param lake_id: Required. The ID of the Google Cloud lake that the task 
belongs to.
+    :param body:  Required. The Request body contains an instance of Task.
+    :param dataplex_task_id: Required. Task identifier.
+    :param validate_only: Optional. Only validate the request, but do not 
perform mutations. The default is
+        false.
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+        will not be retried.
+    :param timeout: The amount of time, in seconds, to wait for the request to 
complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Additional metadata that is provided to the method.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param delegate_to: The account to impersonate, if any. For this to work, 
the service accountmaking the
+        request must have  domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    :param asynchronous: Flag informing should the Dataplex task be created 
asynchronously.
+        This is useful for long running creating tasks and
+        waiting on them asynchronously using the DataplexTaskSensor
+    """
+
+    template_fields = (
+        "project_id",
+        "dataplex_task_id",
+        "body",
+        "validate_only",
+        "delegate_to",
+        "impersonation_chain",
+    )
+    template_fields_renderers = {'body': 'json'}
+    operator_extra_links = (DataplexTaskLink(),)
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        body: Dict[str, Any],
+        dataplex_task_id: str,
+        validate_only: Optional[bool] = None,
+        api_version: str = "v1",
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Sequence[Tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        asynchronous: bool = False,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.lake_id = lake_id
+        self.body = body
+        self.dataplex_task_id = dataplex_task_id
+        self.validate_only = validate_only
+        self.api_version = api_version
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+        self.asynchronous = asynchronous
+
+    def execute(self, context: "Context") -> dict:
+        hook = DataplexHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info("Creating Dataplex task %s", self.dataplex_task_id)
+        DataplexTaskLink.persist(context=context, task_instance=self)
+
+        try:
+            operation = hook.create_task(
+                project_id=self.project_id,
+                region=self.region,
+                lake_id=self.lake_id,
+                body=self.body,
+                dataplex_task_id=self.dataplex_task_id,
+                validate_only=self.validate_only,
+                retry=self.retry,
+                timeout=self.timeout,
+                metadata=self.metadata,
+            )
+            if not self.asynchronous:
+                self.log.info("Waiting for Dataplex task %s to be created", 
self.dataplex_task_id)
+                task = hook.wait_for_operation(timeout=self.timeout, 
operation=operation)
+                self.log.info("Task %s created successfully", 
self.dataplex_task_id)
+            else:
+                is_done = operation.done()
+                self.log.info("Is operation done already? %s", is_done)
+                return is_done
+        except HttpError as err:
+            if err.resp.status not in (409, '409'):
+                raise
+            self.log.info("Task %s already exists", self.dataplex_task_id)
+            # Wait for task to be ready
+            for time_to_wait in exponential_sleep_generator(initial=10, 
maximum=120):
+                task = hook.get_task(
+                    project_id=self.project_id,
+                    region=self.region,
+                    lake_id=self.lake_id,
+                    dataplex_task_id=self.dataplex_task_id,
+                    retry=self.retry,
+                    timeout=self.timeout,
+                    metadata=self.metadata,
+                )
+                if task['state'] != 'CREATING':
+                    break
+                sleep(time_to_wait)
+
+        return Task.to_dict(task)
+
+
+class DataplexDeleteTaskOperator(BaseOperator):
+    """
+    Delete the task resource.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
task belongs to.
+    :param region: Required. The ID of the Google Cloud region that the task 
belongs to.
+    :param lake_id: Required. The ID of the Google Cloud lake that the task 
belongs to.
+    :param dataplex_task_id: Required. Task identifier.
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+        will not be retried.
+    :param timeout: The amount of time, in seconds, to wait for the request to 
complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Additional metadata that is provided to the method.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param delegate_to: The account to impersonate, if any. For this to work, 
the service accountmaking the
+        request must have  domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields = ("project_id", "dataplex_task_id", "delegate_to", 
"impersonation_chain")
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        dataplex_task_id: str,
+        api_version: str = "v1",
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Sequence[Tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.lake_id = lake_id
+        self.dataplex_task_id = dataplex_task_id
+        self.api_version = api_version
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context") -> None:
+        hook = DataplexHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info("Deleting Dataplex task %s", self.dataplex_task_id)
+
+        operation = hook.delete_task(
+            project_id=self.project_id,
+            region=self.region,
+            lake_id=self.lake_id,
+            dataplex_task_id=self.dataplex_task_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        hook.wait_for_operation(timeout=self.timeout, operation=operation)
+        self.log.info("Dataplex task %s deleted successfully!", 
self.dataplex_task_id)
+
+
+class DataplexListTasksOperator(BaseOperator):
+    """
+    Lists tasks under the given lake.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
task belongs to.
+    :param region: Required. The ID of the Google Cloud region that the task 
belongs to.
+    :param lake_id: Required. The ID of the Google Cloud lake that the task 
belongs to.
+    :param page_size: Optional. Maximum number of tasks to return. The service 
may return fewer than this
+        value. If unspecified, at most 10 tasks will be returned. The maximum 
value is 1000; values above 1000
+        will be coerced to 1000.
+    :param page_token: Optional. Page token received from a previous ListZones 
call. Provide this to retrieve
+        the subsequent page. When paginating, all other parameters provided to 
ListZones must match the call
+        that provided the page token.
+    :param filter: Optional. Filter request.
+    :param order_by: Optional. Order by fields for the result.
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+        will not be retried.
+    :param timeout: The amount of time, in seconds, to wait for the request to 
complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Additional metadata that is provided to the method.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param delegate_to: The account to impersonate, if any. For this to work, 
the service accountmaking the
+        request must have  domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields = (
+        "project_id",
+        "page_size",
+        "page_token",
+        "filter",
+        "order_by",
+        "delegate_to",
+        "impersonation_chain",
+    )
+    operator_extra_links = (DataplexTasksLink(),)
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        page_size: Optional[int] = None,
+        page_token: Optional[str] = None,
+        filter: Optional[str] = None,
+        order_by: Optional[str] = None,
+        api_version: str = "v1",
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Sequence[Tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.lake_id = lake_id
+        self.page_size = page_size
+        self.page_token = page_token
+        self.filter = filter
+        self.order_by = order_by
+        self.api_version = api_version
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context") -> List[dict]:
+        hook = DataplexHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info("Listing Dataplex tasks from lake %s", self.lake_id)
+        DataplexTasksLink.persist(context=context, task_instance=self)
+
+        tasks = hook.list_tasks(
+            project_id=self.project_id,
+            region=self.region,
+            lake_id=self.lake_id,
+            page_size=self.page_size,
+            page_token=self.page_token,
+            filter=self.filter,
+            order_by=self.order_by,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return [Task.to_dict(task) for task in tasks]
+
+
+class DataplexGetTaskOperator(BaseOperator):
+    """
+    Get task resource.
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
task belongs to.
+    :param region: Required. The ID of the Google Cloud region that the task 
belongs to.
+    :param lake_id: Required. The ID of the Google Cloud lake that the task 
belongs to.
+    :param dataplex_task_id: Required. Task identifier.
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+        will not be retried.
+    :param timeout: The amount of time, in seconds, to wait for the request to 
complete.
+        Note that if `retry` is specified, the timeout applies to each 
individual attempt.
+    :param metadata: Additional metadata that is provided to the method.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param delegate_to: The account to impersonate, if any. For this to work, 
the service accountmaking the
+        request must have  domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields = ("project_id", "dataplex_task_id", "delegate_to", 
"impersonation_chain")
+    operator_extra_links = (DataplexTaskLink(),)
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        dataplex_task_id: str,
+        api_version: str = "v1",
+        retry: Optional[Retry] = None,
+        timeout: Optional[float] = None,
+        metadata: Sequence[Tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.lake_id = lake_id
+        self.dataplex_task_id = dataplex_task_id
+        self.api_version = api_version
+        self.retry = retry
+        self.timeout = timeout
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def execute(self, context: "Context") -> dict:
+        hook = DataplexHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+        self.log.info("Retrieving Dataplex task %s", self.dataplex_task_id)
+        DataplexTaskLink.persist(context=context, task_instance=self)
+
+        task = hook.get_task(
+            project_id=self.project_id,
+            region=self.region,
+            lake_id=self.lake_id,
+            dataplex_task_id=self.dataplex_task_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        return Task.to_dict(task)
diff --git a/airflow/providers/google/cloud/sensors/dataplex.py 
b/airflow/providers/google/cloud/sensors/dataplex.py
new file mode 100644
index 0000000..056b613
--- /dev/null
+++ b/airflow/providers/google/cloud/sensors/dataplex.py
@@ -0,0 +1,119 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""This module contains Google Dataplex sensors."""
+from typing import TYPE_CHECKING, Optional, Sequence, Tuple, Union
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+from google.api_core.retry import Retry
+
+from airflow.exceptions import AirflowException
+from airflow.providers.google.cloud.hooks.dataplex import DataplexHook
+from airflow.sensors.base import BaseSensorOperator
+
+
+class TaskState:
+    """Dataplex Task states"""
+
+    STATE_UNSPECIFIED = 0
+    ACTIVE = 1
+    CREATING = 2
+    DELETING = 3
+    ACTION_REQUIRED = 4
+
+
+class DataplexTaskStateSensor(BaseSensorOperator):
+    """
+    Check the status of the Dataplex task
+
+    :param project_id: Required. The ID of the Google Cloud project that the 
task belongs to.
+    :param region: Required. The ID of the Google Cloud region that the task 
belongs to.
+    :param lake_id: Required. The ID of the Google Cloud lake that the task 
belongs to.
+    :param dataplex_task_id: Required. Task identifier.
+    :param api_version: The version of the api that will be requested for 
example 'v3'.
+    :param retry: A retry object used  to retry requests. If `None` is 
specified, requests
+        will not be retried.
+    :param metadata: Additional metadata that is provided to the method.
+    :param gcp_conn_id: The connection ID to use when fetching connection info.
+    :param delegate_to: The account to impersonate, if any. For this to work, 
the service accountmaking the
+        request must have  domain-wide delegation enabled.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields = ['dataplex_task_id']
+
+    def __init__(
+        self,
+        project_id: str,
+        region: str,
+        lake_id: str,
+        dataplex_task_id: str,
+        api_version: str = "v1",
+        retry: Optional[Retry] = None,
+        metadata: Sequence[Tuple[str, str]] = (),
+        gcp_conn_id: str = "google_cloud_default",
+        delegate_to: Optional[str] = None,
+        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+        self.project_id = project_id
+        self.region = region
+        self.lake_id = lake_id
+        self.dataplex_task_id = dataplex_task_id
+        self.api_version = api_version
+        self.retry = retry
+        self.metadata = metadata
+        self.gcp_conn_id = gcp_conn_id
+        self.delegate_to = delegate_to
+        self.impersonation_chain = impersonation_chain
+
+    def poke(self, context: "Context") -> bool:
+        self.log.info("Waiting for task %s to be %s", self.dataplex_task_id, 
TaskState.ACTIVE)
+        hook = DataplexHook(
+            gcp_conn_id=self.gcp_conn_id,
+            delegate_to=self.delegate_to,
+            api_version=self.api_version,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        task = hook.get_task(
+            project_id=self.project_id,
+            region=self.region,
+            lake_id=self.lake_id,
+            dataplex_task_id=self.dataplex_task_id,
+            retry=self.retry,
+            metadata=self.metadata,
+        )
+        task_status = task.state
+
+        if task_status == TaskState.DELETING:
+            raise AirflowException(f"Task is going to be deleted 
{self.dataplex_task_id}")
+
+        self.log.info("Current status of the Dataplex task %s => %s", 
self.dataplex_task_id, task_status)
+
+        return task_status == TaskState.ACTIVE
diff --git a/airflow/providers/google/provider.yaml 
b/airflow/providers/google/provider.yaml
index dbb4a70..70c0472 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -235,6 +235,11 @@ integrations:
       - /docs/apache-airflow-providers-google/operators/cloud/datafusion.rst
     logo: /integration-logos/gcp/Google-Data-Fusion.png
     tags: [gcp]
+  - integration-name: Google Dataplex
+    external-doc-url: https://cloud.google.com/dataplex/
+    how-to-guide:
+      - /docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
+    tags: [gcp]
   - integration-name: Google Dataprep
     external-doc-url: https://cloud.google.com/dataprep/
     how-to-guide:
@@ -399,6 +404,9 @@ operators:
   - integration-name: Google Data Fusion
     python-modules:
       - airflow.providers.google.cloud.operators.datafusion
+  - integration-name: Google Dataplex
+    python-modules:
+      - airflow.providers.google.cloud.operators.dataplex
   - integration-name: Google Dataprep
     python-modules:
       - airflow.providers.google.cloud.operators.dataprep
@@ -513,6 +521,9 @@ sensors:
   - integration-name: Google Data Fusion
     python-modules:
       - airflow.providers.google.cloud.sensors.datafusion
+  - integration-name: Google Dataplex
+    python-modules:
+      - airflow.providers.google.cloud.sensors.dataplex
   - integration-name: Google Dataproc
     python-modules:
       - airflow.providers.google.cloud.sensors.dataproc
@@ -585,6 +596,9 @@ hooks:
   - integration-name: Google Data Fusion
     python-modules:
       - airflow.providers.google.cloud.hooks.datafusion
+  - integration-name: Google Dataplex
+    python-modules:
+      - airflow.providers.google.cloud.hooks.dataplex
   - integration-name: Google Dataprep
     python-modules:
       - airflow.providers.google.cloud.hooks.dataprep
@@ -853,6 +867,8 @@ extra-links:
   - airflow.providers.google.cloud.operators.datafusion.DataFusionInstanceLink
   - airflow.providers.google.cloud.operators.datafusion.DataFusionPipelineLink
   - airflow.providers.google.cloud.operators.datafusion.DataFusionPipelinesLink
+  - airflow.providers.google.cloud.links.dataplex.DataplexTaskLink
+  - airflow.providers.google.cloud.links.dataplex.DataplexTasksLink
   - airflow.providers.google.cloud.links.dataproc.DataprocLink
   - airflow.providers.google.cloud.links.dataproc.DataprocListLink
   - 
airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreDetailedLink
diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst 
b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
new file mode 100644
index 0000000..00b1949
--- /dev/null
+++ b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
@@ -0,0 +1,105 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Google Dataplex Operators
+=========================
+
+Dataplex is an intelligent data fabric that provides unified analytics
+and data management across your data lakes, data warehouses, and data marts.
+
+For more information about the task visit `Dataplex production documentation 
<Product documentation <https://cloud.google.com/dataplex/docs/reference>`__
+
+Create a Task
+-------------
+
+Before you create a dataplex task you need to define its body.
+For more information about the available fields to pass when creating a task, 
visit `Dataplex create task API. 
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.lakes.tasks#Task>`__
+
+A simple task configuration can look as followed:
+
+.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+    :language: python
+    :dedent: 0
+    :start-after: [START howto_dataplex_configuration]
+    :end-before: [END howto_dataplex_configuration]
+
+With this configuration we can create the task both synchronously & 
asynchronously:
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateTaskOperator`
+
+.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_dataplex_create_task_operator]
+    :end-before: [END howto_dataplex_create_task_operator]
+
+.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_dataplex_async_create_task_operator]
+    :end-before: [END howto_dataplex_async_create_task_operator]
+
+Delete a task
+-------------
+
+To delete a task you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteTaskOperator`
+
+.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_dataplex_delete_task_operator]
+    :end-before: [END howto_dataplex_delete_task_operator]
+
+List tasks
+----------
+
+To list tasks you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexListTasksOperator`
+
+.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_dataplex_list_tasks_operator]
+    :end-before: [END howto_dataplex_list_tasks_operator]
+
+Get a task
+----------
+
+To get a task you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexGetTaskOperator`
+
+.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_dataplex_get_task_operator]
+    :end-before: [END howto_dataplex_get_task_operator]
+
+Wait for a task
+---------------
+
+To wait for a task created asynchronously you can use:
+
+:class:`~airflow.providers.google.cloud.sensors.dataplex.DataplexTaskStateSensor`
+
+.. exampleinclude:: 
/../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_dataplex_task_state_sensor]
+    :end-before: [END howto_dataplex_task_state_sensor]
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index b6242d9..81e0a7b 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -98,6 +98,7 @@ Dataflow
 Dataframe
 Datalake
 Datanodes
+Dataplex
 Dataprep
 Dataproc
 Dataset
@@ -442,6 +443,7 @@ Zsh
 Zymergen
 abc
 accessor
+accountmaking
 aci
 ack
 ackIds
@@ -666,6 +668,7 @@ dataflow
 dataframe
 dataframes
 datapipe
+dataplex
 datapoint
 dataproc
 dataset
diff --git a/setup.py b/setup.py
index 667819e..f31f2cd 100644
--- a/setup.py
+++ b/setup.py
@@ -328,6 +328,7 @@ google = [
     'google-cloud-build>=3.0.0',
     'google-cloud-container>=0.1.1,<2.0.0',
     'google-cloud-datacatalog>=3.0.0',
+    'google-cloud-dataplex>=0.1.0',
     'google-cloud-dataproc>=3.1.0',
     'google-cloud-dataproc-metastore>=1.2.0,<2.0.0',
     'google-cloud-dlp>=0.11.0,<2.0.0',
diff --git a/tests/providers/google/cloud/hooks/test_dataplex.py 
b/tests/providers/google/cloud/hooks/test_dataplex.py
new file mode 100644
index 0000000..6be8afb
--- /dev/null
+++ b/tests/providers/google/cloud/hooks/test_dataplex.py
@@ -0,0 +1,120 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import TestCase, mock
+
+from airflow.providers.google.cloud.operators.dataplex import DataplexHook
+from tests.providers.google.cloud.utils.base_gcp_mock import 
mock_base_gcp_hook_default_project_id
+
+BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}"
+DATAPLEX_STRING = "airflow.providers.google.cloud.hooks.dataplex.{}"
+
+PROJECT_ID = "project-id"
+REGION = "region"
+LAKE_ID = "lake-id"
+BODY = {"body": "test"}
+DATAPLEX_TASK_ID = "testTask001"
+
+GCP_CONN_ID = "google_cloud_default"
+DELEGATE_TO = "test-delegate-to"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+
+
+class TestDataplexHook(TestCase):
+    def setUp(self):
+        with mock.patch(
+            BASE_STRING.format("GoogleBaseHook.__init__"),
+            new=mock_base_gcp_hook_default_project_id,
+        ):
+            self.hook = DataplexHook(
+                gcp_conn_id=GCP_CONN_ID,
+                delegate_to=DELEGATE_TO,
+                impersonation_chain=IMPERSONATION_CHAIN,
+            )
+
+    @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+    def test_create_task(self, mock_client):
+        self.hook.create_task(
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            body=BODY,
+            dataplex_task_id=DATAPLEX_TASK_ID,
+            validate_only=None,
+        )
+
+        parent = f'projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}'
+        mock_client.return_value.create_task.assert_called_once_with(
+            request=dict(
+                parent=parent,
+                task_id=DATAPLEX_TASK_ID,
+                task=BODY,
+            ),
+            retry=None,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+    def test_delete_task(self, mock_client):
+        self.hook.delete_task(
+            project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, 
dataplex_task_id=DATAPLEX_TASK_ID
+        )
+
+        name = 
f'projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/tasks/{DATAPLEX_TASK_ID}'
+        mock_client.return_value.delete_task.assert_called_once_with(
+            request=dict(
+                name=name,
+            ),
+            retry=None,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+    def test_list_tasks(self, mock_client):
+        self.hook.list_tasks(project_id=PROJECT_ID, region=REGION, 
lake_id=LAKE_ID)
+
+        parent = f'projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}'
+        mock_client.return_value.list_tasks.assert_called_once_with(
+            request=dict(
+                parent=parent,
+                page_size=None,
+                page_token=None,
+                filter=None,
+                order_by=None,
+            ),
+            retry=None,
+            timeout=None,
+            metadata=(),
+        )
+
+    @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+    def test_get_task(self, mock_client):
+        self.hook.get_task(
+            project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, 
dataplex_task_id=DATAPLEX_TASK_ID
+        )
+
+        name = 
f'projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/tasks/{DATAPLEX_TASK_ID}'
+        mock_client.return_value.get_task.assert_called_once_with(
+            request=dict(
+                name=name,
+            ),
+            retry=None,
+            timeout=None,
+            metadata=(),
+        )
diff --git a/tests/providers/google/cloud/operators/test_dataplex.py 
b/tests/providers/google/cloud/operators/test_dataplex.py
new file mode 100644
index 0000000..d8c958f
--- /dev/null
+++ b/tests/providers/google/cloud/operators/test_dataplex.py
@@ -0,0 +1,179 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import TestCase, mock
+
+from airflow.providers.google.cloud.operators.dataplex import (
+    DataplexCreateTaskOperator,
+    DataplexDeleteTaskOperator,
+    DataplexGetTaskOperator,
+    DataplexListTasksOperator,
+)
+
+HOOK_STR = "airflow.providers.google.cloud.operators.dataplex.DataplexHook"
+TASK_STR = "airflow.providers.google.cloud.operators.dataplex.Task"
+
+PROJECT_ID = "project-id"
+REGION = "region"
+LAKE_ID = "lake-id"
+BODY = {"body": "test"}
+DATAPLEX_TASK_ID = "testTask001"
+
+GCP_CONN_ID = "google_cloud_default"
+DELEGATE_TO = "test-delegate-to"
+API_VERSION = "v1"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+
+
+class TestDataplexCreateTaskOperator(TestCase):
+    @mock.patch(HOOK_STR)
+    @mock.patch(TASK_STR)
+    def test_execute(self, task_mock, hook_mock):
+        op = DataplexCreateTaskOperator(
+            task_id="create_dataplex_task",
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            body=BODY,
+            dataplex_task_id=DATAPLEX_TASK_ID,
+            validate_only=None,
+            api_version=API_VERSION,
+            gcp_conn_id=GCP_CONN_ID,
+            delegate_to=DELEGATE_TO,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.wait_for_operation.return_value = None
+        task_mock.return_value.to_dict.return_value = None
+        op.execute(context=mock.MagicMock())
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            delegate_to=DELEGATE_TO,
+            api_version=API_VERSION,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.create_task.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            body=BODY,
+            dataplex_task_id=DATAPLEX_TASK_ID,
+            validate_only=None,
+            retry=None,
+            timeout=None,
+            metadata=(),
+        )
+
+
+class TestDataplexDeleteTaskOperator(TestCase):
+    @mock.patch(HOOK_STR)
+    def test_execute(self, hook_mock):
+        op = DataplexDeleteTaskOperator(
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            dataplex_task_id=DATAPLEX_TASK_ID,
+            task_id="delete_dataplex_task",
+            api_version=API_VERSION,
+            gcp_conn_id=GCP_CONN_ID,
+            delegate_to=DELEGATE_TO,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        op.execute(context=None)
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            delegate_to=DELEGATE_TO,
+            api_version=API_VERSION,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.delete_task.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            dataplex_task_id=DATAPLEX_TASK_ID,
+            retry=None,
+            timeout=None,
+            metadata=(),
+        )
+
+
+class TestDataplexListTasksOperator(TestCase):
+    @mock.patch(HOOK_STR)
+    def test_execute(self, hook_mock):
+        op = DataplexListTasksOperator(
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            task_id="list_dataplex_task",
+            api_version=API_VERSION,
+            gcp_conn_id=GCP_CONN_ID,
+            delegate_to=DELEGATE_TO,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        op.execute(context=mock.MagicMock())
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            delegate_to=DELEGATE_TO,
+            api_version=API_VERSION,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.list_tasks.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            page_size=None,
+            page_token=None,
+            filter=None,
+            order_by=None,
+            retry=None,
+            timeout=None,
+            metadata=(),
+        )
+
+
+class TestDataplexGetTaskOperator(TestCase):
+    @mock.patch(HOOK_STR)
+    @mock.patch(TASK_STR)
+    def test_execute(self, task_mock, hook_mock):
+        op = DataplexGetTaskOperator(
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            dataplex_task_id=DATAPLEX_TASK_ID,
+            task_id="get_dataplex_task",
+            api_version=API_VERSION,
+            gcp_conn_id=GCP_CONN_ID,
+            delegate_to=DELEGATE_TO,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.wait_for_operation.return_value = None
+        task_mock.return_value.to_dict.return_value = None
+        op.execute(context=mock.MagicMock())
+        hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            delegate_to=DELEGATE_TO,
+            api_version=API_VERSION,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        hook_mock.return_value.get_task.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            dataplex_task_id=DATAPLEX_TASK_ID,
+            retry=None,
+            timeout=None,
+            metadata=(),
+        )
diff --git a/tests/providers/google/cloud/operators/test_dataplex_system.py 
b/tests/providers/google/cloud/operators/test_dataplex_system.py
new file mode 100644
index 0000000..1497281
--- /dev/null
+++ b/tests/providers/google/cloud/operators/test_dataplex_system.py
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest
+
+from airflow.providers.google.cloud.example_dags.example_dataplex import 
BUCKET, SPARK_FILE_NAME
+from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_KEY
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, 
GoogleSystemTest, provide_gcp_context
+
+GCS_URI = f"gs://{BUCKET}"
+
+spark_file = """
+#!/usr/bin/python
+print("### Hello, dataplex! ###")
+"""
+
+
[email protected]("mysql", "postgres")
[email protected]_file(GCP_GCS_KEY)
+class DataplexExampleDagsTest(GoogleSystemTest):
+    def setUp(self):
+        super().setUp()
+        self.create_gcs_bucket(BUCKET)
+        self.upload_content_to_gcs(lines=spark_file, bucket=GCS_URI, 
filename=SPARK_FILE_NAME)
+
+    def tearDown(self):
+        self.delete_gcs_bucket(BUCKET)
+        super().tearDown()
+
+    @provide_gcp_context(GCP_GCS_KEY)
+    def test_run_example_dag(self):
+        self.run_dag(dag_id="example_dataplex", dag_folder=CLOUD_DAG_FOLDER)
diff --git a/tests/providers/google/cloud/sensors/test_dataplex.py 
b/tests/providers/google/cloud/sensors/test_dataplex.py
new file mode 100644
index 0000000..f28028f
--- /dev/null
+++ b/tests/providers/google/cloud/sensors/test_dataplex.py
@@ -0,0 +1,103 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import mock
+
+import pytest
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.sensors.dataplex import 
DataplexTaskStateSensor, TaskState
+
+DATAPLEX_HOOK = "airflow.providers.google.cloud.sensors.dataplex.DataplexHook"
+
+TASK_ID = "test-sensor"
+PROJECT_ID = "project-id"
+REGION = "region"
+LAKE_ID = "lake-id"
+BODY = {"body": "test"}
+DATAPLEX_TASK_ID = "testTask001"
+
+GCP_CONN_ID = "google_cloud_default"
+DELEGATE_TO = "test-delegate-to"
+API_VERSION = "v1"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+
+
+class TestDataplexTaskStateSensor(unittest.TestCase):
+    def create_task(self, state: int):
+        task = mock.Mock()
+        task.state = state
+        return task
+
+    @mock.patch(DATAPLEX_HOOK)
+    def test_done(self, mock_hook):
+        task = self.create_task(TaskState.ACTIVE)
+        mock_hook.return_value.get_task.return_value = task
+
+        sensor = DataplexTaskStateSensor(
+            task_id=TASK_ID,
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            dataplex_task_id=DATAPLEX_TASK_ID,
+            api_version=API_VERSION,
+            gcp_conn_id=GCP_CONN_ID,
+            delegate_to=DELEGATE_TO,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        result = sensor.poke(context={})
+
+        mock_hook.return_value.get_task.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            dataplex_task_id=DATAPLEX_TASK_ID,
+            retry=None,
+            metadata=(),
+        )
+
+        assert result
+
+    @mock.patch(DATAPLEX_HOOK)
+    def test_deleting(self, mock_hook):
+        task = self.create_task(TaskState.DELETING)
+        mock_hook.return_value.get_task.return_value = task
+
+        sensor = DataplexTaskStateSensor(
+            task_id=TASK_ID,
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            dataplex_task_id=DATAPLEX_TASK_ID,
+            api_version=API_VERSION,
+            gcp_conn_id=GCP_CONN_ID,
+            delegate_to=DELEGATE_TO,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+
+        with pytest.raises(AirflowException, match="Task is going to be 
deleted"):
+            sensor.poke(context={})
+
+        mock_hook.return_value.get_task.assert_called_once_with(
+            project_id=PROJECT_ID,
+            region=REGION,
+            lake_id=LAKE_ID,
+            dataplex_task_id=DATAPLEX_TASK_ID,
+            retry=None,
+            metadata=(),
+        )

Reply via email to