This is an automated email from the ASF dual-hosted git repository.
turbaszek pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 87c1246 Add Dataplex operators (#20377)
87c1246 is described below
commit 87c1246b79769f20214a339aadc6a8270d453953
Author: Wojciech Januszek <[email protected]>
AuthorDate: Mon Mar 14 19:07:49 2022 +0000
Add Dataplex operators (#20377)
---
.../google/cloud/example_dags/example_dataplex.py | 122 ++++++
airflow/providers/google/cloud/hooks/dataplex.py | 247 ++++++++++++
airflow/providers/google/cloud/links/dataplex.py | 76 ++++
.../providers/google/cloud/operators/dataplex.py | 428 +++++++++++++++++++++
airflow/providers/google/cloud/sensors/dataplex.py | 119 ++++++
airflow/providers/google/provider.yaml | 16 +
.../operators/cloud/dataplex.rst | 105 +++++
docs/spelling_wordlist.txt | 3 +
setup.py | 1 +
.../providers/google/cloud/hooks/test_dataplex.py | 120 ++++++
.../google/cloud/operators/test_dataplex.py | 179 +++++++++
.../google/cloud/operators/test_dataplex_system.py | 45 +++
.../google/cloud/sensors/test_dataplex.py | 103 +++++
13 files changed, 1564 insertions(+)
diff --git a/airflow/providers/google/cloud/example_dags/example_dataplex.py
b/airflow/providers/google/cloud/example_dags/example_dataplex.py
new file mode 100644
index 0000000..aabe17a
--- /dev/null
+++ b/airflow/providers/google/cloud/example_dags/example_dataplex.py
@@ -0,0 +1,122 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that shows how to use Dataplex.
+"""
+
+import datetime
+import os
+
+from airflow import models
+from airflow.models.baseoperator import chain
+from airflow.providers.google.cloud.operators.dataplex import (
+ DataplexCreateTaskOperator,
+ DataplexDeleteTaskOperator,
+ DataplexGetTaskOperator,
+ DataplexListTasksOperator,
+)
+from airflow.providers.google.cloud.sensors.dataplex import
DataplexTaskStateSensor
+
+PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "INVALID PROJECT ID")
+REGION = os.environ.get("GCP_REGION", "INVALID REGION")
+LAKE_ID = os.environ.get("GCP_LAKE_ID", "INVALID LAKE ID")
+SERVICE_ACC = os.environ.get("GCP_DATAPLEX_SERVICE_ACC",
"[email protected]")
+BUCKET = os.environ.get("GCP_DATAPLEX_BUCKET", "INVALID BUCKET NAME")
+SPARK_FILE_NAME = os.environ.get("SPARK_FILE_NAME", "INVALID FILE NAME")
+SPARK_FILE_FULL_PATH = f"gs://{BUCKET}/{SPARK_FILE_NAME}"
+DATAPLEX_TASK_ID = "task001"
+TRIGGER_SPEC_TYPE = "ON_DEMAND"
+
+# [START howto_dataplex_configuration]
+EXAMPLE_TASK_BODY = {
+ "trigger_spec": {"type_": TRIGGER_SPEC_TYPE},
+ "execution_spec": {"service_account": SERVICE_ACC},
+ "spark": {"python_script_file": SPARK_FILE_FULL_PATH},
+}
+# [END howto_dataplex_configuration]
+
+with models.DAG(
+ "example_dataplex", start_date=datetime.datetime(2021, 1, 1),
schedule_interval="@once", catchup=False
+) as dag:
+ # [START howto_dataplex_create_task_operator]
+ create_dataplex_task = DataplexCreateTaskOperator(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ body=EXAMPLE_TASK_BODY,
+ dataplex_task_id=DATAPLEX_TASK_ID,
+ task_id="create_dataplex_task",
+ )
+ # [END howto_dataplex_create_task_operator]
+
+ # [START howto_dataplex_async_create_task_operator]
+ create_dataplex_task_async = DataplexCreateTaskOperator(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ body=EXAMPLE_TASK_BODY,
+ dataplex_task_id=DATAPLEX_TASK_ID,
+ asynchronous=True,
+ task_id="create_dataplex_task_async",
+ )
+ # [END howto_dataplex_async_create_task_operator]
+
+ # [START howto_dataplex_delete_task_operator]
+ delete_dataplex_task = DataplexDeleteTaskOperator(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ dataplex_task_id=DATAPLEX_TASK_ID,
+ task_id="delete_dataplex_task",
+ )
+ # [END howto_dataplex_delete_task_operator]
+
+ # [START howto_dataplex_list_tasks_operator]
+ list_dataplex_task = DataplexListTasksOperator(
+ project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID,
task_id="list_dataplex_task"
+ )
+ # [END howto_dataplex_list_tasks_operator]
+
+ # [START howto_dataplex_get_task_operator]
+ get_dataplex_task = DataplexGetTaskOperator(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ dataplex_task_id=DATAPLEX_TASK_ID,
+ task_id="get_dataplex_task",
+ )
+ # [END howto_dataplex_get_task_operator]
+
+ # [START howto_dataplex_task_state_sensor]
+ dataplex_task_state = DataplexTaskStateSensor(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ dataplex_task_id=DATAPLEX_TASK_ID,
+ task_id="dataplex_task_state",
+ )
+ # [END howto_dataplex_task_state_sensor]
+
+ chain(
+ create_dataplex_task,
+ get_dataplex_task,
+ list_dataplex_task,
+ delete_dataplex_task,
+ create_dataplex_task_async,
+ dataplex_task_state,
+ )
diff --git a/airflow/providers/google/cloud/hooks/dataplex.py
b/airflow/providers/google/cloud/hooks/dataplex.py
new file mode 100644
index 0000000..a553dac
--- /dev/null
+++ b/airflow/providers/google/cloud/hooks/dataplex.py
@@ -0,0 +1,247 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""This module contains Google Dataplex hook."""
+from typing import Any, Dict, Optional, Sequence, Tuple, Union
+
+from google.api_core.operation import Operation
+from google.api_core.retry import Retry
+from google.cloud.dataplex_v1 import DataplexServiceClient
+from google.cloud.dataplex_v1.types import Task
+from googleapiclient.discovery import Resource
+
+from airflow.exceptions import AirflowException
+from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
+
+
+class DataplexHook(GoogleBaseHook):
+ """
+ Hook for Google Dataplex.
+
+ :param api_version: The version of the api that will be requested for
example 'v3'.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param delegate_to: The account to impersonate, if any. For this to work,
the service accountmaking the
+ request must have domain-wide delegation enabled.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ _conn = None # type: Optional[Resource]
+
+ def __init__(
+ self,
+ api_version: str = "v1",
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ ) -> None:
+ super().__init__(
+ gcp_conn_id=gcp_conn_id,
+ delegate_to=delegate_to,
+ impersonation_chain=impersonation_chain,
+ )
+ self.api_version = api_version
+
+ def get_dataplex_client(self) -> DataplexServiceClient:
+ """Returns DataplexServiceClient."""
+ client_options = {'api_endpoint': 'dataplex.googleapis.com:443'}
+
+ return DataplexServiceClient(
+ credentials=self._get_credentials(), client_info=self.client_info,
client_options=client_options
+ )
+
+ def wait_for_operation(self, timeout: Optional[float], operation:
Operation):
+ """Waits for long-lasting operation to complete."""
+ try:
+ return operation.result(timeout=timeout)
+ except Exception:
+ error = operation.exception(timeout=timeout)
+ raise AirflowException(error)
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def create_task(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ body: Union[Dict[str, Any], Task],
+ dataplex_task_id: str,
+ validate_only: Optional[bool] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ ) -> Any:
+ """
+ Creates a task resource within a lake.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the
task belongs to.
+ :param lake_id: Required. The ID of the Google Cloud lake that the
task belongs to.
+ :param body: Required. The Request body contains an instance of Task.
+ :param dataplex_task_id: Required. Task identifier.
+ :param validate_only: Optional. Only validate the request, but do not
perform mutations.
+ The default is false.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ """
+ parent = f'projects/{project_id}/locations/{region}/lakes/{lake_id}'
+
+ client = self.get_dataplex_client()
+ result = client.create_task(
+ request={
+ 'parent': parent,
+ 'task_id': dataplex_task_id,
+ 'task': body,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def delete_task(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ dataplex_task_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ ) -> Any:
+ """
+ Delete the task resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the
task belongs to.
+ :param lake_id: Required. The ID of the Google Cloud lake that the
task belongs to.
+ :param dataplex_task_id: Required. The ID of the Google Cloud task to
be deleted.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ """
+ name =
f'projects/{project_id}/locations/{region}/lakes/{lake_id}/tasks/{dataplex_task_id}'
+
+ client = self.get_dataplex_client()
+ result = client.delete_task(
+ request={
+ 'name': name,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def list_tasks(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ page_size: Optional[int] = None,
+ page_token: Optional[str] = None,
+ filter: Optional[str] = None,
+ order_by: Optional[str] = None,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ ) -> Any:
+ """
+ Lists tasks under the given lake.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the
task belongs to.
+ :param lake_id: Required. The ID of the Google Cloud lake that the
task belongs to.
+ :param page_size: Optional. Maximum number of tasks to return. The
service may return fewer than this
+ value. If unspecified, at most 10 tasks will be returned. The
maximum value is 1000;
+ values above 1000 will be coerced to 1000.
+ :param page_token: Optional. Page token received from a previous
ListZones call. Provide this to
+ retrieve the subsequent page. When paginating, all other
parameters provided to ListZones must
+ match the call that provided the page token.
+ :param filter: Optional. Filter request.
+ :param order_by: Optional. Order by fields for the result.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ """
+ parent = f'projects/{project_id}/locations/{region}/lakes/{lake_id}'
+
+ client = self.get_dataplex_client()
+ result = client.list_tasks(
+ request={
+ 'parent': parent,
+ 'page_size': page_size,
+ 'page_token': page_token,
+ 'filter': filter,
+ 'order_by': order_by,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
+
+ @GoogleBaseHook.fallback_to_default_project_id
+ def get_task(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ dataplex_task_id: str,
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ ) -> Any:
+ """
+ Get task resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that
the task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the
task belongs to.
+ :param lake_id: Required. The ID of the Google Cloud lake that the
task belongs to.
+ :param dataplex_task_id: Required. The ID of the Google Cloud task to
be retrieved.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the
request to complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ """
+ name =
f'projects/{project_id}/locations/{region}/lakes/{lake_id}/tasks/{dataplex_task_id}'
+ client = self.get_dataplex_client()
+ result = client.get_task(
+ request={
+ 'name': name,
+ },
+ retry=retry,
+ timeout=timeout,
+ metadata=metadata,
+ )
+ return result
diff --git a/airflow/providers/google/cloud/links/dataplex.py
b/airflow/providers/google/cloud/links/dataplex.py
new file mode 100644
index 0000000..8c08d83
--- /dev/null
+++ b/airflow/providers/google/cloud/links/dataplex.py
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google Dataplex links."""
+
+from typing import TYPE_CHECKING
+
+from airflow.providers.google.cloud.links.base import BaseGoogleLink
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+DATAPLEX_BASE_LINK = "https://console.cloud.google.com/dataplex/process/tasks"
+DATAPLEX_TASK_LINK = DATAPLEX_BASE_LINK +
"/{lake_id}.{task_id};location={region}/jobs?project={project_id}"
+DATAPLEX_TASKS_LINK = DATAPLEX_BASE_LINK +
"?project={project_id}&qLake={lake_id}.{region}"
+
+
+class DataplexTaskLink(BaseGoogleLink):
+ """Helper class for constructing Dataplex Task link"""
+
+ name = "Dataplex Task"
+ key = "task_conf"
+ format_str = DATAPLEX_TASK_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance,
+ ):
+ task_instance.xcom_push(
+ context=context,
+ key=DataplexTaskLink.key,
+ value={
+ "lake_id": task_instance.lake_id,
+ "task_id": task_instance.dataplex_task_id,
+ "region": task_instance.region,
+ "project_id": task_instance.project_id,
+ },
+ )
+
+
+class DataplexTasksLink(BaseGoogleLink):
+ """Helper class for constructing Dataplex Tasks link"""
+
+ name = "Dataplex Tasks"
+ key = "tasks_conf"
+ format_str = DATAPLEX_TASKS_LINK
+
+ @staticmethod
+ def persist(
+ context: "Context",
+ task_instance,
+ ):
+ task_instance.xcom_push(
+ context=context,
+ key=DataplexTasksLink.key,
+ value={
+ "project_id": task_instance.project_id,
+ "lake_id": task_instance.lake_id,
+ "region": task_instance.region,
+ },
+ )
diff --git a/airflow/providers/google/cloud/operators/dataplex.py
b/airflow/providers/google/cloud/operators/dataplex.py
new file mode 100644
index 0000000..21a89ea
--- /dev/null
+++ b/airflow/providers/google/cloud/operators/dataplex.py
@@ -0,0 +1,428 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""This module contains Google Dataplex operators."""
+from time import sleep
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Tuple,
Union
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+from google.api_core.retry import Retry, exponential_sleep_generator
+from google.cloud.dataplex_v1.types import Task
+from googleapiclient.errors import HttpError
+
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.hooks.dataplex import DataplexHook
+from airflow.providers.google.cloud.links.dataplex import DataplexTaskLink,
DataplexTasksLink
+
+
+class DataplexCreateTaskOperator(BaseOperator):
+ """
+ Creates a task resource within a lake.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the task
belongs to.
+ :param lake_id: Required. The ID of the Google Cloud lake that the task
belongs to.
+ :param body: Required. The Request body contains an instance of Task.
+ :param dataplex_task_id: Required. Task identifier.
+ :param validate_only: Optional. Only validate the request, but do not
perform mutations. The default is
+ false.
+ :param api_version: The version of the api that will be requested for
example 'v3'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param delegate_to: The account to impersonate, if any. For this to work,
the service accountmaking the
+ request must have domain-wide delegation enabled.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ :param asynchronous: Flag informing should the Dataplex task be created
asynchronously.
+ This is useful for long running creating tasks and
+ waiting on them asynchronously using the DataplexTaskSensor
+ """
+
+ template_fields = (
+ "project_id",
+ "dataplex_task_id",
+ "body",
+ "validate_only",
+ "delegate_to",
+ "impersonation_chain",
+ )
+ template_fields_renderers = {'body': 'json'}
+ operator_extra_links = (DataplexTaskLink(),)
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ body: Dict[str, Any],
+ dataplex_task_id: str,
+ validate_only: Optional[bool] = None,
+ api_version: str = "v1",
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ asynchronous: bool = False,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.lake_id = lake_id
+ self.body = body
+ self.dataplex_task_id = dataplex_task_id
+ self.validate_only = validate_only
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.impersonation_chain = impersonation_chain
+ self.asynchronous = asynchronous
+
+ def execute(self, context: "Context") -> dict:
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info("Creating Dataplex task %s", self.dataplex_task_id)
+ DataplexTaskLink.persist(context=context, task_instance=self)
+
+ try:
+ operation = hook.create_task(
+ project_id=self.project_id,
+ region=self.region,
+ lake_id=self.lake_id,
+ body=self.body,
+ dataplex_task_id=self.dataplex_task_id,
+ validate_only=self.validate_only,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ if not self.asynchronous:
+ self.log.info("Waiting for Dataplex task %s to be created",
self.dataplex_task_id)
+ task = hook.wait_for_operation(timeout=self.timeout,
operation=operation)
+ self.log.info("Task %s created successfully",
self.dataplex_task_id)
+ else:
+ is_done = operation.done()
+ self.log.info("Is operation done already? %s", is_done)
+ return is_done
+ except HttpError as err:
+ if err.resp.status not in (409, '409'):
+ raise
+ self.log.info("Task %s already exists", self.dataplex_task_id)
+ # Wait for task to be ready
+ for time_to_wait in exponential_sleep_generator(initial=10,
maximum=120):
+ task = hook.get_task(
+ project_id=self.project_id,
+ region=self.region,
+ lake_id=self.lake_id,
+ dataplex_task_id=self.dataplex_task_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ if task['state'] != 'CREATING':
+ break
+ sleep(time_to_wait)
+
+ return Task.to_dict(task)
+
+
+class DataplexDeleteTaskOperator(BaseOperator):
+ """
+ Delete the task resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the task
belongs to.
+ :param lake_id: Required. The ID of the Google Cloud lake that the task
belongs to.
+ :param dataplex_task_id: Required. Task identifier.
+ :param api_version: The version of the api that will be requested for
example 'v3'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param delegate_to: The account to impersonate, if any. For this to work,
the service accountmaking the
+ request must have domain-wide delegation enabled.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields = ("project_id", "dataplex_task_id", "delegate_to",
"impersonation_chain")
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ dataplex_task_id: str,
+ api_version: str = "v1",
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.lake_id = lake_id
+ self.dataplex_task_id = dataplex_task_id
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: "Context") -> None:
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info("Deleting Dataplex task %s", self.dataplex_task_id)
+
+ operation = hook.delete_task(
+ project_id=self.project_id,
+ region=self.region,
+ lake_id=self.lake_id,
+ dataplex_task_id=self.dataplex_task_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ hook.wait_for_operation(timeout=self.timeout, operation=operation)
+ self.log.info("Dataplex task %s deleted successfully!",
self.dataplex_task_id)
+
+
+class DataplexListTasksOperator(BaseOperator):
+ """
+ Lists tasks under the given lake.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the task
belongs to.
+ :param lake_id: Required. The ID of the Google Cloud lake that the task
belongs to.
+ :param page_size: Optional. Maximum number of tasks to return. The service
may return fewer than this
+ value. If unspecified, at most 10 tasks will be returned. The maximum
value is 1000; values above 1000
+ will be coerced to 1000.
+ :param page_token: Optional. Page token received from a previous ListZones
call. Provide this to retrieve
+ the subsequent page. When paginating, all other parameters provided to
ListZones must match the call
+ that provided the page token.
+ :param filter: Optional. Filter request.
+ :param order_by: Optional. Order by fields for the result.
+ :param api_version: The version of the api that will be requested for
example 'v3'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param delegate_to: The account to impersonate, if any. For this to work,
the service accountmaking the
+ request must have domain-wide delegation enabled.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields = (
+ "project_id",
+ "page_size",
+ "page_token",
+ "filter",
+ "order_by",
+ "delegate_to",
+ "impersonation_chain",
+ )
+ operator_extra_links = (DataplexTasksLink(),)
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ page_size: Optional[int] = None,
+ page_token: Optional[str] = None,
+ filter: Optional[str] = None,
+ order_by: Optional[str] = None,
+ api_version: str = "v1",
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.lake_id = lake_id
+ self.page_size = page_size
+ self.page_token = page_token
+ self.filter = filter
+ self.order_by = order_by
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: "Context") -> List[dict]:
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info("Listing Dataplex tasks from lake %s", self.lake_id)
+ DataplexTasksLink.persist(context=context, task_instance=self)
+
+ tasks = hook.list_tasks(
+ project_id=self.project_id,
+ region=self.region,
+ lake_id=self.lake_id,
+ page_size=self.page_size,
+ page_token=self.page_token,
+ filter=self.filter,
+ order_by=self.order_by,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ return [Task.to_dict(task) for task in tasks]
+
+
+class DataplexGetTaskOperator(BaseOperator):
+ """
+ Get task resource.
+
+ :param project_id: Required. The ID of the Google Cloud project that the
task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the task
belongs to.
+ :param lake_id: Required. The ID of the Google Cloud lake that the task
belongs to.
+ :param dataplex_task_id: Required. Task identifier.
+ :param api_version: The version of the api that will be requested for
example 'v3'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param timeout: The amount of time, in seconds, to wait for the request to
complete.
+ Note that if `retry` is specified, the timeout applies to each
individual attempt.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param delegate_to: The account to impersonate, if any. For this to work,
the service accountmaking the
+ request must have domain-wide delegation enabled.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields = ("project_id", "dataplex_task_id", "delegate_to",
"impersonation_chain")
+ operator_extra_links = (DataplexTaskLink(),)
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ dataplex_task_id: str,
+ api_version: str = "v1",
+ retry: Optional[Retry] = None,
+ timeout: Optional[float] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.lake_id = lake_id
+ self.dataplex_task_id = dataplex_task_id
+ self.api_version = api_version
+ self.retry = retry
+ self.timeout = timeout
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.impersonation_chain = impersonation_chain
+
+ def execute(self, context: "Context") -> dict:
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+ self.log.info("Retrieving Dataplex task %s", self.dataplex_task_id)
+ DataplexTaskLink.persist(context=context, task_instance=self)
+
+ task = hook.get_task(
+ project_id=self.project_id,
+ region=self.region,
+ lake_id=self.lake_id,
+ dataplex_task_id=self.dataplex_task_id,
+ retry=self.retry,
+ timeout=self.timeout,
+ metadata=self.metadata,
+ )
+ return Task.to_dict(task)
diff --git a/airflow/providers/google/cloud/sensors/dataplex.py
b/airflow/providers/google/cloud/sensors/dataplex.py
new file mode 100644
index 0000000..056b613
--- /dev/null
+++ b/airflow/providers/google/cloud/sensors/dataplex.py
@@ -0,0 +1,119 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""This module contains Google Dataplex sensors."""
+from typing import TYPE_CHECKING, Optional, Sequence, Tuple, Union
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+from google.api_core.retry import Retry
+
+from airflow.exceptions import AirflowException
+from airflow.providers.google.cloud.hooks.dataplex import DataplexHook
+from airflow.sensors.base import BaseSensorOperator
+
+
+class TaskState:
+ """Dataplex Task states"""
+
+ STATE_UNSPECIFIED = 0
+ ACTIVE = 1
+ CREATING = 2
+ DELETING = 3
+ ACTION_REQUIRED = 4
+
+
+class DataplexTaskStateSensor(BaseSensorOperator):
+ """
+ Check the status of the Dataplex task
+
+ :param project_id: Required. The ID of the Google Cloud project that the
task belongs to.
+ :param region: Required. The ID of the Google Cloud region that the task
belongs to.
+ :param lake_id: Required. The ID of the Google Cloud lake that the task
belongs to.
+ :param dataplex_task_id: Required. Task identifier.
+ :param api_version: The version of the api that will be requested for
example 'v3'.
+ :param retry: A retry object used to retry requests. If `None` is
specified, requests
+ will not be retried.
+ :param metadata: Additional metadata that is provided to the method.
+ :param gcp_conn_id: The connection ID to use when fetching connection info.
+ :param delegate_to: The account to impersonate, if any. For this to work,
the service accountmaking the
+ request must have domain-wide delegation enabled.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields = ['dataplex_task_id']
+
+ def __init__(
+ self,
+ project_id: str,
+ region: str,
+ lake_id: str,
+ dataplex_task_id: str,
+ api_version: str = "v1",
+ retry: Optional[Retry] = None,
+ metadata: Sequence[Tuple[str, str]] = (),
+ gcp_conn_id: str = "google_cloud_default",
+ delegate_to: Optional[str] = None,
+ impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
+ *args,
+ **kwargs,
+ ) -> None:
+ super().__init__(*args, **kwargs)
+ self.project_id = project_id
+ self.region = region
+ self.lake_id = lake_id
+ self.dataplex_task_id = dataplex_task_id
+ self.api_version = api_version
+ self.retry = retry
+ self.metadata = metadata
+ self.gcp_conn_id = gcp_conn_id
+ self.delegate_to = delegate_to
+ self.impersonation_chain = impersonation_chain
+
+ def poke(self, context: "Context") -> bool:
+ self.log.info("Waiting for task %s to be %s", self.dataplex_task_id,
TaskState.ACTIVE)
+ hook = DataplexHook(
+ gcp_conn_id=self.gcp_conn_id,
+ delegate_to=self.delegate_to,
+ api_version=self.api_version,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ task = hook.get_task(
+ project_id=self.project_id,
+ region=self.region,
+ lake_id=self.lake_id,
+ dataplex_task_id=self.dataplex_task_id,
+ retry=self.retry,
+ metadata=self.metadata,
+ )
+ task_status = task.state
+
+ if task_status == TaskState.DELETING:
+ raise AirflowException(f"Task is going to be deleted
{self.dataplex_task_id}")
+
+ self.log.info("Current status of the Dataplex task %s => %s",
self.dataplex_task_id, task_status)
+
+ return task_status == TaskState.ACTIVE
diff --git a/airflow/providers/google/provider.yaml
b/airflow/providers/google/provider.yaml
index dbb4a70..70c0472 100644
--- a/airflow/providers/google/provider.yaml
+++ b/airflow/providers/google/provider.yaml
@@ -235,6 +235,11 @@ integrations:
- /docs/apache-airflow-providers-google/operators/cloud/datafusion.rst
logo: /integration-logos/gcp/Google-Data-Fusion.png
tags: [gcp]
+ - integration-name: Google Dataplex
+ external-doc-url: https://cloud.google.com/dataplex/
+ how-to-guide:
+ - /docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
+ tags: [gcp]
- integration-name: Google Dataprep
external-doc-url: https://cloud.google.com/dataprep/
how-to-guide:
@@ -399,6 +404,9 @@ operators:
- integration-name: Google Data Fusion
python-modules:
- airflow.providers.google.cloud.operators.datafusion
+ - integration-name: Google Dataplex
+ python-modules:
+ - airflow.providers.google.cloud.operators.dataplex
- integration-name: Google Dataprep
python-modules:
- airflow.providers.google.cloud.operators.dataprep
@@ -513,6 +521,9 @@ sensors:
- integration-name: Google Data Fusion
python-modules:
- airflow.providers.google.cloud.sensors.datafusion
+ - integration-name: Google Dataplex
+ python-modules:
+ - airflow.providers.google.cloud.sensors.dataplex
- integration-name: Google Dataproc
python-modules:
- airflow.providers.google.cloud.sensors.dataproc
@@ -585,6 +596,9 @@ hooks:
- integration-name: Google Data Fusion
python-modules:
- airflow.providers.google.cloud.hooks.datafusion
+ - integration-name: Google Dataplex
+ python-modules:
+ - airflow.providers.google.cloud.hooks.dataplex
- integration-name: Google Dataprep
python-modules:
- airflow.providers.google.cloud.hooks.dataprep
@@ -853,6 +867,8 @@ extra-links:
- airflow.providers.google.cloud.operators.datafusion.DataFusionInstanceLink
- airflow.providers.google.cloud.operators.datafusion.DataFusionPipelineLink
- airflow.providers.google.cloud.operators.datafusion.DataFusionPipelinesLink
+ - airflow.providers.google.cloud.links.dataplex.DataplexTaskLink
+ - airflow.providers.google.cloud.links.dataplex.DataplexTasksLink
- airflow.providers.google.cloud.links.dataproc.DataprocLink
- airflow.providers.google.cloud.links.dataproc.DataprocListLink
-
airflow.providers.google.cloud.operators.dataproc_metastore.DataprocMetastoreDetailedLink
diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
new file mode 100644
index 0000000..00b1949
--- /dev/null
+++ b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
@@ -0,0 +1,105 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Google Dataplex Operators
+=========================
+
+Dataplex is an intelligent data fabric that provides unified analytics
+and data management across your data lakes, data warehouses, and data marts.
+
+For more information about the task visit `Dataplex production documentation
<Product documentation <https://cloud.google.com/dataplex/docs/reference>`__
+
+Create a Task
+-------------
+
+Before you create a dataplex task you need to define its body.
+For more information about the available fields to pass when creating a task,
visit `Dataplex create task API.
<https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.lakes.tasks#Task>`__
+
+A simple task configuration can look as followed:
+
+.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+ :language: python
+ :dedent: 0
+ :start-after: [START howto_dataplex_configuration]
+ :end-before: [END howto_dataplex_configuration]
+
+With this configuration we can create the task both synchronously &
asynchronously:
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateTaskOperator`
+
+.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_dataplex_create_task_operator]
+ :end-before: [END howto_dataplex_create_task_operator]
+
+.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_dataplex_async_create_task_operator]
+ :end-before: [END howto_dataplex_async_create_task_operator]
+
+Delete a task
+-------------
+
+To delete a task you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteTaskOperator`
+
+.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_dataplex_delete_task_operator]
+ :end-before: [END howto_dataplex_delete_task_operator]
+
+List tasks
+----------
+
+To list tasks you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexListTasksOperator`
+
+.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_dataplex_list_tasks_operator]
+ :end-before: [END howto_dataplex_list_tasks_operator]
+
+Get a task
+----------
+
+To get a task you can use:
+
+:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexGetTaskOperator`
+
+.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_dataplex_get_task_operator]
+ :end-before: [END howto_dataplex_get_task_operator]
+
+Wait for a task
+---------------
+
+To wait for a task created asynchronously you can use:
+
+:class:`~airflow.providers.google.cloud.sensors.dataplex.DataplexTaskStateSensor`
+
+.. exampleinclude::
/../../airflow/providers/google/cloud/example_dags/example_dataplex.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_dataplex_task_state_sensor]
+ :end-before: [END howto_dataplex_task_state_sensor]
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index b6242d9..81e0a7b 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -98,6 +98,7 @@ Dataflow
Dataframe
Datalake
Datanodes
+Dataplex
Dataprep
Dataproc
Dataset
@@ -442,6 +443,7 @@ Zsh
Zymergen
abc
accessor
+accountmaking
aci
ack
ackIds
@@ -666,6 +668,7 @@ dataflow
dataframe
dataframes
datapipe
+dataplex
datapoint
dataproc
dataset
diff --git a/setup.py b/setup.py
index 667819e..f31f2cd 100644
--- a/setup.py
+++ b/setup.py
@@ -328,6 +328,7 @@ google = [
'google-cloud-build>=3.0.0',
'google-cloud-container>=0.1.1,<2.0.0',
'google-cloud-datacatalog>=3.0.0',
+ 'google-cloud-dataplex>=0.1.0',
'google-cloud-dataproc>=3.1.0',
'google-cloud-dataproc-metastore>=1.2.0,<2.0.0',
'google-cloud-dlp>=0.11.0,<2.0.0',
diff --git a/tests/providers/google/cloud/hooks/test_dataplex.py
b/tests/providers/google/cloud/hooks/test_dataplex.py
new file mode 100644
index 0000000..6be8afb
--- /dev/null
+++ b/tests/providers/google/cloud/hooks/test_dataplex.py
@@ -0,0 +1,120 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import TestCase, mock
+
+from airflow.providers.google.cloud.operators.dataplex import DataplexHook
+from tests.providers.google.cloud.utils.base_gcp_mock import
mock_base_gcp_hook_default_project_id
+
+BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}"
+DATAPLEX_STRING = "airflow.providers.google.cloud.hooks.dataplex.{}"
+
+PROJECT_ID = "project-id"
+REGION = "region"
+LAKE_ID = "lake-id"
+BODY = {"body": "test"}
+DATAPLEX_TASK_ID = "testTask001"
+
+GCP_CONN_ID = "google_cloud_default"
+DELEGATE_TO = "test-delegate-to"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+
+
+class TestDataplexHook(TestCase):
+ def setUp(self):
+ with mock.patch(
+ BASE_STRING.format("GoogleBaseHook.__init__"),
+ new=mock_base_gcp_hook_default_project_id,
+ ):
+ self.hook = DataplexHook(
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=DELEGATE_TO,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
+ @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+ def test_create_task(self, mock_client):
+ self.hook.create_task(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ body=BODY,
+ dataplex_task_id=DATAPLEX_TASK_ID,
+ validate_only=None,
+ )
+
+ parent = f'projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}'
+ mock_client.return_value.create_task.assert_called_once_with(
+ request=dict(
+ parent=parent,
+ task_id=DATAPLEX_TASK_ID,
+ task=BODY,
+ ),
+ retry=None,
+ timeout=None,
+ metadata=(),
+ )
+
+ @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+ def test_delete_task(self, mock_client):
+ self.hook.delete_task(
+ project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID,
dataplex_task_id=DATAPLEX_TASK_ID
+ )
+
+ name =
f'projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/tasks/{DATAPLEX_TASK_ID}'
+ mock_client.return_value.delete_task.assert_called_once_with(
+ request=dict(
+ name=name,
+ ),
+ retry=None,
+ timeout=None,
+ metadata=(),
+ )
+
+ @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+ def test_list_tasks(self, mock_client):
+ self.hook.list_tasks(project_id=PROJECT_ID, region=REGION,
lake_id=LAKE_ID)
+
+ parent = f'projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}'
+ mock_client.return_value.list_tasks.assert_called_once_with(
+ request=dict(
+ parent=parent,
+ page_size=None,
+ page_token=None,
+ filter=None,
+ order_by=None,
+ ),
+ retry=None,
+ timeout=None,
+ metadata=(),
+ )
+
+ @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client"))
+ def test_get_task(self, mock_client):
+ self.hook.get_task(
+ project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID,
dataplex_task_id=DATAPLEX_TASK_ID
+ )
+
+ name =
f'projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/tasks/{DATAPLEX_TASK_ID}'
+ mock_client.return_value.get_task.assert_called_once_with(
+ request=dict(
+ name=name,
+ ),
+ retry=None,
+ timeout=None,
+ metadata=(),
+ )
diff --git a/tests/providers/google/cloud/operators/test_dataplex.py
b/tests/providers/google/cloud/operators/test_dataplex.py
new file mode 100644
index 0000000..d8c958f
--- /dev/null
+++ b/tests/providers/google/cloud/operators/test_dataplex.py
@@ -0,0 +1,179 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import TestCase, mock
+
+from airflow.providers.google.cloud.operators.dataplex import (
+ DataplexCreateTaskOperator,
+ DataplexDeleteTaskOperator,
+ DataplexGetTaskOperator,
+ DataplexListTasksOperator,
+)
+
+HOOK_STR = "airflow.providers.google.cloud.operators.dataplex.DataplexHook"
+TASK_STR = "airflow.providers.google.cloud.operators.dataplex.Task"
+
+PROJECT_ID = "project-id"
+REGION = "region"
+LAKE_ID = "lake-id"
+BODY = {"body": "test"}
+DATAPLEX_TASK_ID = "testTask001"
+
+GCP_CONN_ID = "google_cloud_default"
+DELEGATE_TO = "test-delegate-to"
+API_VERSION = "v1"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+
+
+class TestDataplexCreateTaskOperator(TestCase):
+ @mock.patch(HOOK_STR)
+ @mock.patch(TASK_STR)
+ def test_execute(self, task_mock, hook_mock):
+ op = DataplexCreateTaskOperator(
+ task_id="create_dataplex_task",
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ body=BODY,
+ dataplex_task_id=DATAPLEX_TASK_ID,
+ validate_only=None,
+ api_version=API_VERSION,
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=DELEGATE_TO,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.wait_for_operation.return_value = None
+ task_mock.return_value.to_dict.return_value = None
+ op.execute(context=mock.MagicMock())
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=DELEGATE_TO,
+ api_version=API_VERSION,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.create_task.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ body=BODY,
+ dataplex_task_id=DATAPLEX_TASK_ID,
+ validate_only=None,
+ retry=None,
+ timeout=None,
+ metadata=(),
+ )
+
+
+class TestDataplexDeleteTaskOperator(TestCase):
+ @mock.patch(HOOK_STR)
+ def test_execute(self, hook_mock):
+ op = DataplexDeleteTaskOperator(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ dataplex_task_id=DATAPLEX_TASK_ID,
+ task_id="delete_dataplex_task",
+ api_version=API_VERSION,
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=DELEGATE_TO,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ op.execute(context=None)
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=DELEGATE_TO,
+ api_version=API_VERSION,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.delete_task.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ dataplex_task_id=DATAPLEX_TASK_ID,
+ retry=None,
+ timeout=None,
+ metadata=(),
+ )
+
+
+class TestDataplexListTasksOperator(TestCase):
+ @mock.patch(HOOK_STR)
+ def test_execute(self, hook_mock):
+ op = DataplexListTasksOperator(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ task_id="list_dataplex_task",
+ api_version=API_VERSION,
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=DELEGATE_TO,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ op.execute(context=mock.MagicMock())
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=DELEGATE_TO,
+ api_version=API_VERSION,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.list_tasks.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ page_size=None,
+ page_token=None,
+ filter=None,
+ order_by=None,
+ retry=None,
+ timeout=None,
+ metadata=(),
+ )
+
+
+class TestDataplexGetTaskOperator(TestCase):
+ @mock.patch(HOOK_STR)
+ @mock.patch(TASK_STR)
+ def test_execute(self, task_mock, hook_mock):
+ op = DataplexGetTaskOperator(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ dataplex_task_id=DATAPLEX_TASK_ID,
+ task_id="get_dataplex_task",
+ api_version=API_VERSION,
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=DELEGATE_TO,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.wait_for_operation.return_value = None
+ task_mock.return_value.to_dict.return_value = None
+ op.execute(context=mock.MagicMock())
+ hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=DELEGATE_TO,
+ api_version=API_VERSION,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ hook_mock.return_value.get_task.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ dataplex_task_id=DATAPLEX_TASK_ID,
+ retry=None,
+ timeout=None,
+ metadata=(),
+ )
diff --git a/tests/providers/google/cloud/operators/test_dataplex_system.py
b/tests/providers/google/cloud/operators/test_dataplex_system.py
new file mode 100644
index 0000000..1497281
--- /dev/null
+++ b/tests/providers/google/cloud/operators/test_dataplex_system.py
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest
+
+from airflow.providers.google.cloud.example_dags.example_dataplex import
BUCKET, SPARK_FILE_NAME
+from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_KEY
+from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER,
GoogleSystemTest, provide_gcp_context
+
+GCS_URI = f"gs://{BUCKET}"
+
+spark_file = """
+#!/usr/bin/python
+print("### Hello, dataplex! ###")
+"""
+
+
[email protected]("mysql", "postgres")
[email protected]_file(GCP_GCS_KEY)
+class DataplexExampleDagsTest(GoogleSystemTest):
+ def setUp(self):
+ super().setUp()
+ self.create_gcs_bucket(BUCKET)
+ self.upload_content_to_gcs(lines=spark_file, bucket=GCS_URI,
filename=SPARK_FILE_NAME)
+
+ def tearDown(self):
+ self.delete_gcs_bucket(BUCKET)
+ super().tearDown()
+
+ @provide_gcp_context(GCP_GCS_KEY)
+ def test_run_example_dag(self):
+ self.run_dag(dag_id="example_dataplex", dag_folder=CLOUD_DAG_FOLDER)
diff --git a/tests/providers/google/cloud/sensors/test_dataplex.py
b/tests/providers/google/cloud/sensors/test_dataplex.py
new file mode 100644
index 0000000..f28028f
--- /dev/null
+++ b/tests/providers/google/cloud/sensors/test_dataplex.py
@@ -0,0 +1,103 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import mock
+
+import pytest
+
+from airflow import AirflowException
+from airflow.providers.google.cloud.sensors.dataplex import
DataplexTaskStateSensor, TaskState
+
+DATAPLEX_HOOK = "airflow.providers.google.cloud.sensors.dataplex.DataplexHook"
+
+TASK_ID = "test-sensor"
+PROJECT_ID = "project-id"
+REGION = "region"
+LAKE_ID = "lake-id"
+BODY = {"body": "test"}
+DATAPLEX_TASK_ID = "testTask001"
+
+GCP_CONN_ID = "google_cloud_default"
+DELEGATE_TO = "test-delegate-to"
+API_VERSION = "v1"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+
+
+class TestDataplexTaskStateSensor(unittest.TestCase):
+ def create_task(self, state: int):
+ task = mock.Mock()
+ task.state = state
+ return task
+
+ @mock.patch(DATAPLEX_HOOK)
+ def test_done(self, mock_hook):
+ task = self.create_task(TaskState.ACTIVE)
+ mock_hook.return_value.get_task.return_value = task
+
+ sensor = DataplexTaskStateSensor(
+ task_id=TASK_ID,
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ dataplex_task_id=DATAPLEX_TASK_ID,
+ api_version=API_VERSION,
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=DELEGATE_TO,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ result = sensor.poke(context={})
+
+ mock_hook.return_value.get_task.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ dataplex_task_id=DATAPLEX_TASK_ID,
+ retry=None,
+ metadata=(),
+ )
+
+ assert result
+
+ @mock.patch(DATAPLEX_HOOK)
+ def test_deleting(self, mock_hook):
+ task = self.create_task(TaskState.DELETING)
+ mock_hook.return_value.get_task.return_value = task
+
+ sensor = DataplexTaskStateSensor(
+ task_id=TASK_ID,
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ dataplex_task_id=DATAPLEX_TASK_ID,
+ api_version=API_VERSION,
+ gcp_conn_id=GCP_CONN_ID,
+ delegate_to=DELEGATE_TO,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+
+ with pytest.raises(AirflowException, match="Task is going to be
deleted"):
+ sensor.poke(context={})
+
+ mock_hook.return_value.get_task.assert_called_once_with(
+ project_id=PROJECT_ID,
+ region=REGION,
+ lake_id=LAKE_ID,
+ dataplex_task_id=DATAPLEX_TASK_ID,
+ retry=None,
+ metadata=(),
+ )