[ 
https://issues.apache.org/jira/browse/AIRFLOW-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16670871#comment-16670871
 ] 

ASF GitHub Bot commented on AIRFLOW-3231:
-----------------------------------------

kaxil closed pull request #4097: [AIRFLOW-3231] Basic operators for Google 
Cloud SQL
URL: https://github.com/apache/incubator-airflow/pull/4097
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/example_dags/example_gcp_sql.py 
b/airflow/contrib/example_dags/example_gcp_sql.py
new file mode 100644
index 0000000000..a484456f6e
--- /dev/null
+++ b/airflow/contrib/example_dags/example_gcp_sql.py
@@ -0,0 +1,134 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that deploys, updates, patches and deletes a Cloud SQL 
instance
+in Google Cloud Platform.
+
+This DAG relies on the following Airflow variables
+https://airflow.apache.org/concepts.html#variables
+* PROJECT_ID - Google Cloud Platform project for the Cloud SQL instance.
+* INSTANCE_NAME - Name of the Cloud SQL instance.
+"""
+
+import datetime
+
+import airflow
+from airflow import models
+
+from airflow.contrib.operators.gcp_sql_operator import 
CloudSqlInstanceCreateOperator, \
+    CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator
+
+# [START howto_operator_cloudsql_arguments]
+PROJECT_ID = models.Variable.get('PROJECT_ID', '')
+INSTANCE_NAME = models.Variable.get('INSTANCE_NAME', '')
+# [END howto_operator_cloudsql_arguments]
+
+# Bodies below represent Cloud SQL instance resources:
+# https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances
+
+# [START howto_operator_cloudsql_create_body]
+body = {
+    "name": INSTANCE_NAME,
+    "settings": {
+        "tier": "db-n1-standard-1",
+        "backupConfiguration": {
+            "binaryLogEnabled": True,
+            "enabled": True,
+            "startTime": "05:00"
+        },
+        "activationPolicy": "ALWAYS",
+        "dataDiskSizeGb": 30,
+        "dataDiskType": "PD_SSD",
+        "databaseFlags": [],
+        "ipConfiguration": {
+            "ipv4Enabled": True,
+            "requireSsl": True,
+        },
+        "locationPreference": {
+            "zone": "europe-west4-a"
+        },
+        "maintenanceWindow": {
+            "hour": 5,
+            "day": 7,
+            "updateTrack": "canary"
+        },
+        "pricingPlan": "PER_USE",
+        "replicationType": "ASYNCHRONOUS",
+        "storageAutoResize": False,
+        "storageAutoResizeLimit": 0,
+        "userLabels": {
+            "my-key": "my-value"
+        }
+    },
+    "databaseVersion": "MYSQL_5_7",
+    "region": "europe-west4",
+}
+# [END howto_operator_cloudsql_create_body]
+# [START howto_operator_cloudsql_patch_body]
+patch_body = {
+    "name": INSTANCE_NAME,
+    "settings": {
+        "dataDiskSizeGb": 35,
+        "maintenanceWindow": {
+            "hour": 3,
+            "day": 6,
+            "updateTrack": "canary"
+        },
+        "userLabels": {
+            "my-key-patch": "my-value-patch"
+        }
+    }
+}
+# [END howto_operator_cloudsql_patch_body]
+
+default_args = {
+    'start_date': airflow.utils.dates.days_ago(1)
+}
+
+with models.DAG(
+    'example_gcp_sql',
+    default_args=default_args,
+    schedule_interval=datetime.timedelta(days=1)
+) as dag:
+    # [START howto_operator_cloudsql_create]
+    sql_instance_create_task = CloudSqlInstanceCreateOperator(
+        project_id=PROJECT_ID,
+        body=body,
+        instance=INSTANCE_NAME,
+        task_id='sql_instance_create_task'
+    )
+    # [END howto_operator_cloudsql_create]
+    # [START howto_operator_cloudsql_patch]
+    sql_instance_patch_task = CloudSqlInstancePatchOperator(
+        project_id=PROJECT_ID,
+        body=patch_body,
+        instance=INSTANCE_NAME,
+        task_id='sql_instance_patch_task'
+    )
+    # [END howto_operator_cloudsql_patch]
+    # [START howto_operator_cloudsql_delete]
+    sql_instance_delete_task = CloudSqlInstanceDeleteOperator(
+        project_id=PROJECT_ID,
+        instance=INSTANCE_NAME,
+        task_id='sql_instance_delete_task'
+    )
+    # [END howto_operator_cloudsql_delete]
+
+    sql_instance_create_task >> sql_instance_patch_task >> 
sql_instance_delete_task
diff --git a/airflow/contrib/hooks/gcp_sql_hook.py 
b/airflow/contrib/hooks/gcp_sql_hook.py
new file mode 100644
index 0000000000..e0b3f92d8f
--- /dev/null
+++ b/airflow/contrib/hooks/gcp_sql_hook.py
@@ -0,0 +1,173 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import time
+from googleapiclient.discovery import build
+
+from airflow import AirflowException
+from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
+
+# Number of retries - used by googleapiclient method calls to perform retries
+# For requests that are "retriable"
+NUM_RETRIES = 5
+
+# Time to sleep between active checks of the operation results
+TIME_TO_SLEEP_IN_SECONDS = 1
+
+
+class CloudSqlOperationStatus:
+    PENDING = "PENDING"
+    RUNNING = "RUNNING"
+    DONE = "DONE"
+    UNKNOWN = "UNKNOWN"
+
+
+# noinspection PyAbstractClass
+class CloudSqlHook(GoogleCloudBaseHook):
+    """
+    Hook for Google Cloud SQL APIs.
+    """
+    _conn = None
+
+    def __init__(self,
+                 api_version,
+                 gcp_conn_id='google_cloud_default',
+                 delegate_to=None):
+        super(CloudSqlHook, self).__init__(gcp_conn_id, delegate_to)
+        self.api_version = api_version
+
+    def get_conn(self):
+        """
+        Retrieves connection to Cloud SQL.
+
+        :return: Google Cloud SQL services object.
+        :rtype: dict
+        """
+        if not self._conn:
+            http_authorized = self._authorize()
+            self._conn = build('sqladmin', self.api_version,
+                               http=http_authorized, cache_discovery=False)
+        return self._conn
+
+    def get_instance(self, project_id, instance):
+        """
+        Retrieves a resource containing information about a Cloud SQL instance.
+
+        :param project_id: Project ID of the project that contains the 
instance.
+        :type project_id: str
+        :param instance: Database instance ID. This does not include the 
project ID.
+        :type instance: str
+        :return: A Cloud SQL instance resource.
+        :rtype: dict
+        """
+        return self.get_conn().instances().get(
+            project=project_id,
+            instance=instance
+        ).execute(num_retries=NUM_RETRIES)
+
+    def create_instance(self, project_id, body):
+        """
+        Creates a new Cloud SQL instance.
+
+        :param project_id: Project ID of the project to which the newly created
+            Cloud SQL instances should belong.
+        :type project_id: str
+        :param body: Body required by the Cloud SQL insert API, as described in
+            
https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/insert#request-body
+        :type body: dict
+        :return: True if the operation succeeded, raises an error otherwise
+        :rtype: bool
+        """
+        response = self.get_conn().instances().insert(
+            project=project_id,
+            body=body
+        ).execute(num_retries=NUM_RETRIES)
+        operation_name = response["name"]
+        return self._wait_for_operation_to_complete(project_id, operation_name)
+
+    def patch_instance(self, project_id, body, instance):
+        """
+        Updates settings of a Cloud SQL instance.
+
+        Caution: This is not a partial update, so you must include values for
+        all the settings that you want to retain.
+
+        :param project_id: Project ID of the project that contains the 
instance.
+        :type project_id: str
+        :param body: Body required by the Cloud SQL patch API, as described in
+            
https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/patch#request-body
+        :type body: dict
+        :param instance: Cloud SQL instance ID. This does not include the 
project ID.
+        :type instance: str
+        :return: True if the operation succeeded, raises an error otherwise
+        :rtype: bool
+        """
+        response = self.get_conn().instances().patch(
+            project=project_id,
+            instance=instance,
+            body=body
+        ).execute(num_retries=NUM_RETRIES)
+        operation_name = response["name"]
+        return self._wait_for_operation_to_complete(project_id, operation_name)
+
+    def delete_instance(self, project_id, instance):
+        """
+        Deletes a Cloud SQL instance.
+
+        :param project_id: Project ID of the project that contains the 
instance.
+        :type project_id: str
+        :param instance: Cloud SQL instance ID. This does not include the 
project ID.
+        :type instance: str
+        :return: True if the operation succeeded, raises an error otherwise
+        :rtype: bool
+        """
+        response = self.get_conn().instances().delete(
+            project=project_id,
+            instance=instance,
+        ).execute(num_retries=NUM_RETRIES)
+        operation_name = response["name"]
+        return self._wait_for_operation_to_complete(project_id, operation_name)
+
+    def _wait_for_operation_to_complete(self, project_id, operation_name):
+        """
+        Waits for the named operation to complete - checks status of the
+        asynchronous call.
+
+        :param project_id: Project ID of the project that contains the 
instance.
+        :type project_id: str
+        :param operation_name: name of the operation
+        :type operation_name: str
+        :return: response returned by the operation
+        :rtype: dict
+        """
+        service = self.get_conn()
+        while True:
+            operation_response = service.operations().get(
+                project=project_id,
+                operation=operation_name,
+            ).execute(num_retries=NUM_RETRIES)
+            if operation_response.get("status") == 
CloudSqlOperationStatus.DONE:
+                error = operation_response.get("error")
+                if error:
+                    # Extracting the errors list as string and trimming square 
braces
+                    error_msg = str(error.get("errors"))[1:-1]
+                    raise AirflowException(error_msg)
+                # No meaningful info to return from the response in case of 
success
+                return True
+            time.sleep(TIME_TO_SLEEP_IN_SECONDS)
diff --git a/airflow/contrib/operators/gcp_sql_operator.py 
b/airflow/contrib/operators/gcp_sql_operator.py
new file mode 100644
index 0000000000..0ba7a300c9
--- /dev/null
+++ b/airflow/contrib/operators/gcp_sql_operator.py
@@ -0,0 +1,297 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from googleapiclient.errors import HttpError
+
+from airflow import AirflowException
+from airflow.contrib.hooks.gcp_sql_hook import CloudSqlHook
+from airflow.contrib.utils.gcp_field_validator import GcpBodyFieldValidator
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+SETTINGS = 'settings'
+SETTINGS_VERSION = 'settingsVersion'
+
+CLOUD_SQL_VALIDATION = [
+    dict(name="name", allow_empty=False),
+    dict(name="settings", type="dict", fields=[
+        dict(name="tier", allow_empty=False),
+        dict(name="backupConfiguration", type="dict", fields=[
+            dict(name="binaryLogEnabled", optional=True),
+            dict(name="enabled", optional=True),
+            dict(name="replicationLogArchivingEnabled", optional=True),
+            dict(name="startTime", allow_empty=False, optional=True)
+        ], optional=True),
+        dict(name="activationPolicy", allow_empty=False, optional=True),
+        dict(name="authorizedGaeApplications", type="list", optional=True),
+        dict(name="crashSafeReplicationEnabled", optional=True),
+        dict(name="dataDiskSizeGb", optional=True),
+        dict(name="dataDiskType", allow_empty=False, optional=True),
+        dict(name="databaseFlags", type="list", optional=True),
+        dict(name="ipConfiguration", type="dict", fields=[
+            dict(name="authorizedNetworks", type="list", fields=[
+                dict(name="expirationTime", optional=True),
+                dict(name="name", allow_empty=False, optional=True),
+                dict(name="value", allow_empty=False, optional=True)
+            ], optional=True),
+            dict(name="ipv4Enabled", optional=True),
+            dict(name="privateNetwork", allow_empty=False, optional=True),
+            dict(name="requireSsl", optional=True),
+        ], optional=True),
+        dict(name="locationPreference", type="dict", fields=[
+            dict(name="followGaeApplication", allow_empty=False, 
optional=True),
+            dict(name="zone", allow_empty=False, optional=True),
+        ], optional=True),
+        dict(name="maintenanceWindow", type="dict", fields=[
+            dict(name="hour", optional=True),
+            dict(name="day", optional=True),
+            dict(name="updateTrack", allow_empty=False, optional=True),
+        ], optional=True),
+        dict(name="pricingPlan", allow_empty=False, optional=True),
+        dict(name="replicationType", allow_empty=False, optional=True),
+        dict(name="storageAutoResize", optional=True),
+        dict(name="storageAutoResizeLimit", optional=True),
+        dict(name="userLabels", type="dict", optional=True),
+    ]),
+    dict(name="databaseVersion", allow_empty=False, optional=True),
+    dict(name="failoverReplica", type="dict", fields=[
+        dict(name="name", allow_empty=False)
+    ], optional=True),
+    dict(name="masterInstanceName", allow_empty=False, optional=True),
+    dict(name="onPremisesConfiguration", type="dict", optional=True),
+    dict(name="region", allow_empty=False, optional=True),
+    dict(name="replicaConfiguration", type="dict", fields=[
+        dict(name="failoverTarget", optional=True),
+        dict(name="mysqlReplicaConfiguration", type="dict", fields=[
+            dict(name="caCertificate", allow_empty=False, optional=True),
+            dict(name="clientCertificate", allow_empty=False, optional=True),
+            dict(name="clientKey", allow_empty=False, optional=True),
+            dict(name="connectRetryInterval", optional=True),
+            dict(name="dumpFilePath", allow_empty=False, optional=True),
+            dict(name="masterHeartbeatPeriod", optional=True),
+            dict(name="password", allow_empty=False, optional=True),
+            dict(name="sslCipher", allow_empty=False, optional=True),
+            dict(name="username", allow_empty=False, optional=True),
+            dict(name="verifyServerCertificate", optional=True)
+        ], optional=True),
+    ], optional=True)
+]
+
+
+class CloudSqlBaseOperator(BaseOperator):
+    """
+    Abstract base operator for Google Cloud SQL operators to inherit from.
+
+    :param project_id: Project ID of the Google Cloud Platform project to 
operate it.
+    :type project_id: str
+    :param instance: Cloud SQL instance ID. This does not include the project 
ID.
+    :type instance: str
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud 
Platform.
+    :type gcp_conn_id: str
+    :param api_version: API version used (e.g. v1).
+    :type api_version: str
+    """
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 instance,
+                 gcp_conn_id='google_cloud_default',
+                 api_version='v1beta4',
+                 *args, **kwargs):
+        self.project_id = project_id
+        self.instance = instance
+        self.gcp_conn_id = gcp_conn_id
+        self.api_version = api_version
+        self._validate_inputs()
+        self._hook = CloudSqlHook(gcp_conn_id=self.gcp_conn_id,
+                                  api_version=self.api_version)
+        super(CloudSqlBaseOperator, self).__init__(*args, **kwargs)
+
+    def _validate_inputs(self):
+        if not self.project_id:
+            raise AirflowException("The required parameter 'project_id' is 
empty")
+        if not self.instance:
+            raise AirflowException("The required parameter 'instance' is 
empty")
+
+    def _check_if_instance_exists(self, instance):
+        try:
+            return self._hook.get_instance(self.project_id, instance)
+        except HttpError as e:
+            status = e.resp.status
+            if status == 404:
+                return False
+            raise e
+
+    def execute(self, context):
+        pass
+
+    @staticmethod
+    def _get_settings_version(instance):
+        return instance.get(SETTINGS).get(SETTINGS_VERSION)
+
+
+class CloudSqlInstanceCreateOperator(CloudSqlBaseOperator):
+    """
+    Creates a new Cloud SQL instance.
+    If an instance with the same name exists, no action will be taken and
+    the operator will succeed.
+
+    :param project_id: Project ID of the project to which the newly created 
Cloud SQL
+        instances should belong.
+    :type project_id: str
+    :param body: Body required by the Cloud SQL insert API, as described in
+        
https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/insert
+        #request-body
+    :type body: dict
+    :param instance: Cloud SQL instance ID. This does not include the project 
ID.
+    :type instance: str
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud 
Platform.
+    :type gcp_conn_id: str
+    :param api_version: API version used (e.g. v1).
+    :type api_version: str
+    :param validate_body: True if body should be validated, False otherwise.
+    :type validate_body: bool
+    """
+    # [START gcp_sql_create_template_fields]
+    template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
+    # [END gcp_sql_create_template_fields]
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 body,
+                 instance,
+                 gcp_conn_id='google_cloud_default',
+                 api_version='v1beta4',
+                 validate_body=True,
+                 *args, **kwargs):
+        self.body = body
+        self.validate_body = validate_body
+        super(CloudSqlInstanceCreateOperator, self).__init__(
+            project_id=project_id, instance=instance, gcp_conn_id=gcp_conn_id,
+            api_version=api_version, *args, **kwargs)
+
+    def _validate_inputs(self):
+        super(CloudSqlInstanceCreateOperator, self)._validate_inputs()
+        if not self.body:
+            raise AirflowException("The required parameter 'body' is empty")
+
+    def _validate_body_fields(self):
+        if self.validate_body:
+            GcpBodyFieldValidator(CLOUD_SQL_VALIDATION,
+                                  
api_version=self.api_version).validate(self.body)
+
+    def execute(self, context):
+        self._validate_body_fields()
+        if not self._check_if_instance_exists(self.instance):
+            return self._hook.create_instance(self.project_id, self.body)
+        else:
+            self.log.info("Cloud SQL instance with ID {} already exists. "
+                          "Aborting create.".format(self.instance))
+            return True
+
+
+class CloudSqlInstancePatchOperator(CloudSqlBaseOperator):
+    """
+    Updates settings of a Cloud SQL instance.
+
+    Caution: This is a partial update, so only included values for the 
settings will be
+    updated.
+
+    In the request body, supply the relevant portions of an instance resource, 
according
+    to the rules of patch semantics.
+    https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch
+
+    :param project_id: Project ID of the project that contains the instance.
+    :type project_id: str
+    :param body: Body required by the Cloud SQL patch API, as described in
+        
https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/patch#request-body
+    :type body: dict
+    :param instance: Cloud SQL instance ID. This does not include the project 
ID.
+    :type instance: str
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud 
Platform.
+    :type gcp_conn_id: str
+    :param api_version: API version used (e.g. v1).
+    :type api_version: str
+    """
+    # [START gcp_sql_patch_template_fields]
+    template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
+    # [END gcp_sql_patch_template_fields]
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 body,
+                 instance,
+                 gcp_conn_id='google_cloud_default',
+                 api_version='v1beta4',
+                 *args, **kwargs):
+        self.body = body
+        super(CloudSqlInstancePatchOperator, self).__init__(
+            project_id=project_id, instance=instance, gcp_conn_id=gcp_conn_id,
+            api_version=api_version, *args, **kwargs)
+
+    def _validate_inputs(self):
+        super(CloudSqlInstancePatchOperator, self)._validate_inputs()
+        if not self.body:
+            raise AirflowException("The required parameter 'body' is empty")
+
+    def execute(self, context):
+        if not self._check_if_instance_exists(self.instance):
+            raise AirflowException('Cloud SQL instance with ID {} does not 
exist. '
+                                   'Please specify another instance to patch.'
+                                   .format(self.instance))
+        else:
+            return self._hook.patch_instance(self.project_id, self.body, 
self.instance)
+
+
+class CloudSqlInstanceDeleteOperator(CloudSqlBaseOperator):
+    """
+    Deletes a Cloud SQL instance.
+
+    :param project_id: Project ID of the project that contains the instance to 
be deleted.
+    :type project_id: str
+    :param instance: Cloud SQL instance ID. This does not include the project 
ID.
+    :type instance: str
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud 
Platform.
+    :type gcp_conn_id: str
+    :param api_version: API version used (e.g. v1).
+    :type api_version: str
+    """
+    # [START gcp_sql_delete_template_fields]
+    template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
+    # [END gcp_sql_delete_template_fields]
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 instance,
+                 gcp_conn_id='google_cloud_default',
+                 api_version='v1beta4',
+                 *args, **kwargs):
+        super(CloudSqlInstanceDeleteOperator, self).__init__(
+            project_id=project_id, instance=instance, gcp_conn_id=gcp_conn_id,
+            api_version=api_version, *args, **kwargs)
+
+    def execute(self, context):
+        if not self._check_if_instance_exists(self.instance):
+            print("Cloud SQL instance with ID {} does not exist. Aborting 
delete."
+                  .format(self.instance))
+            return True
+        else:
+            return self._hook.delete_instance(self.project_id, self.instance)
diff --git a/airflow/contrib/utils/gcp_field_validator.py 
b/airflow/contrib/utils/gcp_field_validator.py
index 20f72d94b8..e8b59c855e 100644
--- a/airflow/contrib/utils/gcp_field_validator.py
+++ b/airflow/contrib/utils/gcp_field_validator.py
@@ -69,17 +69,23 @@
   Each of the fields in the array is then expected (unless marked as optional)
   and validated recursively. If an extra field is present in the dictionary, 
warning is
   printed in log file (but the validation succeeds - see the 
Forward-compatibility notes)
+* List fields: (key = "type", value="list"):
+  Field of this type should be a list. Only the type correctness is validated.
+  The contents of a list are not subject to validation.
 * Union fields (key = "type", value="union"): field of this type should 
contain nested
   fields in form of an array of dicts. One of the fields (and only one) should 
be
   present (unless the union is marked as optional). If more than one union 
field is
   present, FieldValidationException is raised. If none of the union fields is
   present - warning is printed in the log (see below Forward-compatibility 
notes).
+* Fields validated for non-emptiness: (key = "allow_empty") - this applies 
only to
+  fields the value of which is a string, and it allows to check for 
non-emptiness of
+  the field (allow_empty=False).
 * Regexp-validated fields: (key = "regexp") - fields of this type are assumed 
to be
   strings and they are validated with the regexp specified. Remember that the 
regexps
   should ideally contain ^ at the beginning and $ at the end to make sure that
   the whole field content is validated. Typically such regexp
   validations should be used carefully and sparingly (see Forward-compatibility
-  notes below). Most of regexp validation should be at most r'^.+$'.
+  notes below).
 * Custom-validated fields: (key = "custom_validation") - fields of this type 
are validated
   using method specified via custom_validation field. Any exception thrown in 
the custom
   validation will be turned into FieldValidationException and will cause 
validation to
@@ -130,7 +136,7 @@
 
 from airflow import LoggingMixin, AirflowException
 
-COMPOSITE_FIELD_TYPES = ['union', 'dict']
+COMPOSITE_FIELD_TYPES = ['union', 'dict', 'list']
 
 
 class GcpFieldValidationException(AirflowException):
@@ -158,8 +164,8 @@ def _int_greater_than_zero(value):
 
 
 EXAMPLE_VALIDATION_SPECIFICATION = [
-    dict(name="name", regexp="^.+$"),
-    dict(name="description", regexp="^.+$", optional=True),
+    dict(name="name", allow_empty=False),
+    dict(name="description", allow_empty=False, optional=True),
     dict(name="availableMemoryMb", custom_validation=_int_greater_than_zero,
          optional=True),
     dict(name="labels", optional=True, type="dict"),
@@ -202,7 +208,7 @@ def _get_field_name_with_parent(field_name, parent):
 
     @staticmethod
     def _sanity_checks(children_validation_specs, field_type, full_field_path,
-                       regexp, custom_validation, value):
+                       regexp, allow_empty, custom_validation, value):
         # type: (dict, str, str, str, function, object) -> None
         if value is None and field_type != 'union':
             raise GcpFieldValidationException(
@@ -213,6 +219,11 @@ def _sanity_checks(children_validation_specs, field_type, 
full_field_path,
                 "The validation specification entry '{}' has both type and 
regexp. "
                 "The regexp is only allowed without type (i.e. assume type is 
'str' "
                 "that can be validated with regexp)".format(full_field_path))
+        if allow_empty is not None and field_type:
+            raise GcpValidationSpecificationException(
+                "The validation specification entry '{}' has both type and 
allow_empty. "
+                "The allow_empty is only allowed without type (i.e. assume 
type is 'str' "
+                "that can be validated with 
allow_empty)".format(full_field_path))
         if children_validation_specs and field_type not in 
COMPOSITE_FIELD_TYPES:
             raise GcpValidationSpecificationException(
                 "Nested fields are specified in field '{}' of type '{}'. "
@@ -234,6 +245,14 @@ def _validate_regexp(full_field_path, regexp, value):
                 "specification regexp: '{}'.".
                 format(full_field_path, value, regexp))
 
+    @staticmethod
+    def _validate_is_empty(full_field_path, value):
+        # type: (str, str) -> None
+        if not value:
+            raise GcpFieldValidationException(
+                "The body field '{}' can't be empty. Please provide a value."
+                .format(full_field_path, value))
+
     def _validate_dict(self, children_validation_specs, full_field_path, 
value):
         # type: (dict, str, dict) -> None
         for child_validation_spec in children_validation_specs:
@@ -306,6 +325,7 @@ def _validate_field(self, validation_spec, 
dictionary_to_validate, parent=None,
         field_type = validation_spec.get('type')
         optional = validation_spec.get('optional')
         regexp = validation_spec.get('regexp')
+        allow_empty = validation_spec.get('allow_empty')
         children_validation_specs = validation_spec.get('fields')
         required_api_version = validation_spec.get('api_version')
         custom_validation = validation_spec.get('custom_validation')
@@ -332,15 +352,18 @@ def _validate_field(self, validation_spec, 
dictionary_to_validate, parent=None,
                             field_type=field_type,
                             full_field_path=full_field_path,
                             regexp=regexp,
+                            allow_empty=allow_empty,
                             custom_validation=custom_validation,
                             value=value)
 
+        if allow_empty is False:
+            self._validate_is_empty(full_field_path, value)
         if regexp:
             self._validate_regexp(full_field_path, regexp, value)
         elif field_type == 'dict':
             if not isinstance(value, dict):
                 raise GcpFieldValidationException(
-                    "The field '{}' should be dictionary type according to "
+                    "The field '{}' should be of dictionary type according to 
the "
                     "specification '{}' but it is '{}'".
                     format(full_field_path, validation_spec, value))
             if children_validation_specs is None:
@@ -359,6 +382,12 @@ def _validate_field(self, validation_spec, 
dictionary_to_validate, parent=None,
                     "nested field defined.".format(full_field_path, 
validation_spec))
             self._validate_union(children_validation_specs, full_field_path,
                                  dictionary_to_validate)
+        elif field_type == 'list':
+            if not isinstance(value, list):
+                raise GcpFieldValidationException(
+                    "The field '{}' should be of list type according to the "
+                    "specification '{}' but it is '{}'".
+                    format(full_field_path, validation_spec, value))
         elif custom_validation:
             try:
                 custom_validation(value)
diff --git a/docs/howto/operator.rst b/docs/howto/operator.rst
index 6138d5eca7..025274a5a4 100644
--- a/docs/howto/operator.rst
+++ b/docs/howto/operator.rst
@@ -283,3 +283,155 @@ See `Adding the IAM service agent user role to the 
runtime service <https://clou
 If the source code for your function is in Google Source Repository, make sure 
that
 your service account has the Source Repository Viewer role so that the source 
code
 can be downloaded if necessary.
+
+CloudSqlInstanceDeleteOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Deletes a Cloud SQL instance in Google Cloud Platform.
+
+For parameter definition take a look at
+:class:`~airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDeleteOperator`.
+
+Arguments
+"""""""""
+
+Some arguments in the example DAG are taken from Airflow variables:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_arguments]
+    :end-before: [END howto_operator_cloudsql_arguments]
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_cloudsql_delete]
+    :end-before: [END howto_operator_cloudsql_delete]
+
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_sql_operator.py
+  :language: python
+  :dedent: 4
+  :start-after: [START gcp_sql_delete_template_fields]
+  :end-before: [END gcp_sql_delete_template_fields]
+
+More information
+""""""""""""""""
+
+See `Google Cloud SQL API documentation for delete
+<https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/delete>`_.
+
+.. _CloudSqlInstanceCreateOperator:
+
+CloudSqlInstanceCreateOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Creates a new Cloud SQL instance in Google Cloud Platform.
+
+For parameter definition take a look at
+:class:`~airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceCreateOperator`.
+
+If an instance with the same name exists, no action will be taken and the 
operator
+will succeed.
+
+Arguments
+"""""""""
+
+Some arguments in the example DAG are taken from Airflow variables:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_arguments]
+    :end-before: [END howto_operator_cloudsql_arguments]
+
+Example body defining the instance:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_create_body]
+    :end-before: [END howto_operator_cloudsql_create_body]
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_cloudsql_create]
+    :end-before: [END howto_operator_cloudsql_create]
+
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_sql_operator.py
+  :language: python
+  :dedent: 4
+  :start-after: [START gcp_sql_create_template_fields]
+  :end-before: [END gcp_sql_create_template_fields]
+
+More information
+""""""""""""""""
+
+See `Google Cloud SQL API documentation for insert <https://cloud.google
+.com/sql/docs/mysql/admin-api/v1beta4/instances/insert>`_.
+
+
+.. _CloudSqlInstancePatchOperator:
+
+CloudSqlInstancePatchOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Updates settings of a Cloud SQL instance in Google Cloud Platform (partial 
update).
+
+For parameter definition take a look at
+:class:`~airflow.contrib.operators.gcp_sql_operator.CloudSqlInstancePatchOperator`.
+
+This is a partial update, so only values for the settings specified in the body
+will be set / updated. The rest of the existing instance's configuration will 
remain
+unchanged.
+
+Arguments
+"""""""""
+
+Some arguments in the example DAG are taken from Airflow variables:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_arguments]
+    :end-before: [END howto_operator_cloudsql_arguments]
+
+Example body defining the instance:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_patch_body]
+    :end-before: [END howto_operator_cloudsql_patch_body]
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_cloudsql_patch]
+    :end-before: [END howto_operator_cloudsql_patch]
+
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_sql_operator.py
+  :language: python
+  :dedent: 4
+  :start-after: [START gcp_sql_patch_template_fields]
+  :end-before: [END gcp_sql_patch_template_fields]
+
+More information
+""""""""""""""""
+
+See `Google Cloud SQL API documentation for patch <https://cloud.google
+.com/sql/docs/mysql/admin-api/v1beta4/instances/patch>`_.
diff --git a/docs/integration.rst b/docs/integration.rst
index 67298b15b6..6c116bc938 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -501,6 +501,43 @@ BigQueryHook
 .. autoclass:: airflow.contrib.hooks.bigquery_hook.BigQueryHook
     :members:
 
+Cloud SQL
+'''''''''
+
+Cloud SQL Operators
+"""""""""""""""""""
+
+- :ref:`CloudSqlInstanceDeleteOperator` : delete a Cloud SQL instance.
+- :ref:`CloudSqlInstanceCreateOperator` : create a new Cloud SQL instance.
+- :ref:`CloudSqlInstancePatchOperator` : patch a Cloud SQL instance.
+
+.. CloudSqlInstanceDeleteOperator:
+
+CloudSqlInstanceDeleteOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: 
airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDeleteOperator
+
+.. CloudSqlInstanceCreateOperator:
+
+CloudSqlInstanceCreateOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: 
airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceCreateOperator
+
+.. CloudSqlInstancePatchOperator:
+
+CloudSqlInstancePatchOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: 
airflow.contrib.operators.gcp_sql_operator.CloudSqlInstancePatchOperator
+
+Cloud SQL Hook
+""""""""""""""""""""
+
+.. autoclass:: airflow.contrib.hooks.gcp_sql_hook.CloudSqlHook
+    :members:
+
 Compute Engine
 ''''''''''''''
 
diff --git a/tests/contrib/operators/test_gcp_function_operator.py 
b/tests/contrib/operators/test_gcp_function_operator.py
index 4192560dd9..46d599bf7d 100644
--- a/tests/contrib/operators/test_gcp_function_operator.py
+++ b/tests/contrib/operators/test_gcp_function_operator.py
@@ -344,7 +344,7 @@ def test_invalid_field_values(self, key, value, message, 
mock_hook):
          "Parameter 'sourceUploadUrl' is empty in the body and argument "
          "'zip_path' is missing or empty."),
         ({'sourceArchiveUrl': 'gs://adasda', 'sourceRepository': ''},
-         "The field 'source_code.sourceRepository' should be dictionary type"),
+         "The field 'source_code.sourceRepository' should be of dictionary 
type"),
         ({'sourceUploadUrl': '', 'sourceRepository': ''},
          "Parameter 'sourceUploadUrl' is empty in the body and argument 
'zip_path' "
          "is missing or empty."),
@@ -360,7 +360,7 @@ def test_invalid_field_values(self, key, value, message, 
mock_hook):
         ({'sourceUploadUrl': ''}, "Parameter 'sourceUploadUrl' is empty in the 
body "
                                   "and argument 'zip_path' is missing or 
empty."),
         ({'sourceRepository': ''}, "The field 'source_code.sourceRepository' "
-                                   "should be dictionary type"),
+                                   "should be of dictionary type"),
         ({'sourceRepository': {}}, "The required body field "
                                    "'source_code.sourceRepository.url' is 
missing"),
         ({'sourceRepository': {'url': ''}},
@@ -452,7 +452,7 @@ def test_valid_source_code_union_field(self, source_code, 
mock_hook):
                            'service': 'service_name',
                            'failurePolicy': {'retry': ''}}},
          "The field 'trigger.eventTrigger.failurePolicy.retry' "
-         "should be dictionary type")
+         "should be of dictionary type")
     ]
     )
     @mock.patch('airflow.contrib.operators.gcp_function_operator.GcfHook')
diff --git a/tests/contrib/operators/test_gcp_sql_operator.py 
b/tests/contrib/operators/test_gcp_sql_operator.py
new file mode 100644
index 0000000000..245631808a
--- /dev/null
+++ b/tests/contrib/operators/test_gcp_sql_operator.py
@@ -0,0 +1,314 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+from airflow import AirflowException
+from airflow.contrib.operators.gcp_sql_operator import 
CloudSqlInstanceCreateOperator, \
+    CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator
+
+try:
+    # noinspection PyProtectedMember
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+PROJECT_ID = "project-id"
+INSTANCE_NAME = "test-name"
+CREATE_BODY = {
+    "name": INSTANCE_NAME,
+    "settings": {
+        "tier": "db-n1-standard-1",
+        "backupConfiguration": {
+            "binaryLogEnabled": True,
+            "enabled": True,
+            "replicationLogArchivingEnabled": True,
+            "startTime": "05:00"
+        },
+        "activationPolicy": "ALWAYS",
+        "authorizedGaeApplications": [],
+        "crashSafeReplicationEnabled": True,
+        "dataDiskSizeGb": 30,
+        "dataDiskType": "PD_SSD",
+        "databaseFlags": [],
+        "ipConfiguration": {
+            "ipv4Enabled": True,
+            "authorizedNetworks": [
+                {
+                    "value": "192.168.100.0/24",
+                    "name": "network1",
+                    "expirationTime": "2012-11-15T16:19:00.094Z"
+                },
+            ],
+            "privateNetwork": "/vpc/resource/link",
+            "requireSsl": True
+        },
+        "locationPreference": {
+            "zone": "europe-west4-a",
+            "followGaeApplication": "/app/engine/application/to/follow"
+        },
+        "maintenanceWindow": {
+            "hour": 5,
+            "day": 7,
+            "updateTrack": "canary"
+        },
+        "pricingPlan": "PER_USE",
+        "replicationType": "ASYNCHRONOUS",
+        "storageAutoResize": False,
+        "storageAutoResizeLimit": 0,
+        "userLabels": {
+            "my-key": "my-value"
+        }
+    },
+    "databaseVersion": "MYSQL_5_7",
+    "failoverReplica": {
+        "name": "replica-1"
+    },
+    "masterInstanceName": "master-instance-1",
+    "onPremisesConfiguration": {},
+    "region": "europe-west4",
+    "replicaConfiguration": {
+        "mysqlReplicaConfiguration": {
+            "caCertificate": "cert-pem",
+            "clientCertificate": "cert-pem",
+            "clientKey": "cert-pem",
+            "connectRetryInterval": 30,
+            "dumpFilePath": "/path/to/dump",
+            "masterHeartbeatPeriod": 100,
+            "password": "secret_pass",
+            "sslCipher": "list-of-ciphers",
+            "username": "user",
+            "verifyServerCertificate": True
+        },
+    }
+}
+PATCH_BODY = {
+    "name": INSTANCE_NAME,
+    "settings": {
+        "tier": "db-n1-standard-2",
+        "dataDiskType": "PD_HDD"
+    },
+    "region": "europe-west4"
+}
+
+
+class CloudSqlTest(unittest.TestCase):
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator"
+                ".CloudSqlInstanceCreateOperator._check_if_instance_exists")
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_instance_create(self, mock_hook, _check_if_instance_exists):
+        _check_if_instance_exists.return_value = False
+        mock_hook.return_value.create_instance.return_value = True
+        op = CloudSqlInstanceCreateOperator(
+            project_id=PROJECT_ID,
+            instance=INSTANCE_NAME,
+            body=CREATE_BODY,
+            task_id="id"
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version="v1beta4",
+                                          gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.create_instance.assert_called_once_with(
+            PROJECT_ID, CREATE_BODY
+        )
+        self.assertTrue(result)
+
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator"
+                ".CloudSqlInstanceCreateOperator._check_if_instance_exists")
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_instance_create_idempotent(self, mock_hook, 
_check_if_instance_exists):
+        _check_if_instance_exists.return_value = True
+        mock_hook.return_value.create_instance.return_value = True
+        op = CloudSqlInstanceCreateOperator(
+            project_id=PROJECT_ID,
+            instance=INSTANCE_NAME,
+            body=CREATE_BODY,
+            task_id="id"
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version="v1beta4",
+                                          gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.create_instance.assert_not_called()
+        self.assertTrue(result)
+
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_create_should_throw_ex_when_empty_project_id(self, mock_hook):
+        with self.assertRaises(AirflowException) as cm:
+            op = CloudSqlInstanceCreateOperator(
+                project_id="",
+                body=CREATE_BODY,
+                instance=INSTANCE_NAME,
+                task_id="id"
+            )
+            op.execute(None)
+        err = cm.exception
+        self.assertIn("The required parameter 'project_id' is empty", str(err))
+        mock_hook.assert_not_called()
+
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_create_should_throw_ex_when_empty_body(self, mock_hook):
+        with self.assertRaises(AirflowException) as cm:
+            op = CloudSqlInstanceCreateOperator(
+                project_id=PROJECT_ID,
+                body={},
+                instance=INSTANCE_NAME,
+                task_id="id"
+            )
+            op.execute(None)
+        err = cm.exception
+        self.assertIn("The required parameter 'body' is empty", str(err))
+        mock_hook.assert_not_called()
+
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_create_should_throw_ex_when_empty_instance(self, mock_hook):
+        with self.assertRaises(AirflowException) as cm:
+            op = CloudSqlInstanceCreateOperator(
+                project_id=PROJECT_ID,
+                body=CREATE_BODY,
+                instance="",
+                task_id="id"
+            )
+            op.execute(None)
+        err = cm.exception
+        self.assertIn("The required parameter 'instance' is empty", str(err))
+        mock_hook.assert_not_called()
+
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_create_should_validate_list_type(self, mock_hook):
+        wrong_list_type_body = {
+            "name": INSTANCE_NAME,
+            "settings": {
+                "tier": "db-n1-standard-1",
+                "ipConfiguration": {
+                    "authorizedNetworks": {}  # Should be a list, not a dict.
+                                              # Testing if the validation 
catches this.
+                }
+            }
+        }
+        with self.assertRaises(AirflowException) as cm:
+            op = CloudSqlInstanceCreateOperator(
+                project_id=PROJECT_ID,
+                body=wrong_list_type_body,
+                instance=INSTANCE_NAME,
+                task_id="id"
+            )
+            op.execute(None)
+        err = cm.exception
+        self.assertIn("The field 'settings.ipConfiguration.authorizedNetworks' 
"
+                      "should be of list type according to the specification", 
str(err))
+        mock_hook.assert_called_once_with(api_version="v1beta4",
+                                          gcp_conn_id="google_cloud_default")
+
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_create_should_validate_non_empty_fields(self, mock_hook):
+        empty_tier_body = {
+            "name": INSTANCE_NAME,
+            "settings": {
+                "tier": "",  # Field can't be empty (defined in 
CLOUD_SQL_VALIDATION).
+                             # Testing if the validation catches this.
+            }
+        }
+        with self.assertRaises(AirflowException) as cm:
+            op = CloudSqlInstanceCreateOperator(
+                project_id=PROJECT_ID,
+                body=empty_tier_body,
+                instance=INSTANCE_NAME,
+                task_id="id"
+            )
+            op.execute(None)
+        err = cm.exception
+        self.assertIn("The body field 'settings.tier' can't be empty. "
+                      "Please provide a value.", str(err))
+        mock_hook.assert_called_once_with(api_version="v1beta4",
+                                          gcp_conn_id="google_cloud_default")
+
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_instance_patch(self, mock_hook):
+        mock_hook.return_value.patch_instance.return_value = True
+        op = CloudSqlInstancePatchOperator(
+            project_id=PROJECT_ID,
+            body=PATCH_BODY,
+            instance=INSTANCE_NAME,
+            task_id="id"
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version="v1beta4",
+                                          gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.patch_instance.assert_called_once_with(
+            PROJECT_ID, PATCH_BODY, INSTANCE_NAME
+        )
+        self.assertTrue(result)
+
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator"
+                ".CloudSqlInstancePatchOperator._check_if_instance_exists")
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_instance_patch_should_bubble_up_ex_if_not_exists(self, mock_hook,
+                                                              
_check_if_instance_exists):
+        _check_if_instance_exists.return_value = False
+        with self.assertRaises(AirflowException) as cm:
+            op = CloudSqlInstancePatchOperator(
+                project_id=PROJECT_ID,
+                body=PATCH_BODY,
+                instance=INSTANCE_NAME,
+                task_id="id"
+            )
+            op.execute(None)
+        err = cm.exception
+        self.assertIn('specify another instance to patch', str(err))
+        mock_hook.assert_called_once_with(api_version="v1beta4",
+                                          gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.patch_instance.assert_not_called()
+
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator"
+                ".CloudSqlInstanceDeleteOperator._check_if_instance_exists")
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_instance_delete(self, mock_hook, _check_if_instance_exists):
+        _check_if_instance_exists.return_value = True
+        op = CloudSqlInstanceDeleteOperator(
+            project_id=PROJECT_ID,
+            instance=INSTANCE_NAME,
+            task_id="id"
+        )
+        result = op.execute(None)
+        self.assertTrue(result)
+        mock_hook.assert_called_once_with(api_version="v1beta4",
+                                          gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.delete_instance.assert_called_once_with(
+            PROJECT_ID, INSTANCE_NAME
+        )
+
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator"
+                ".CloudSqlInstanceDeleteOperator._check_if_instance_exists")
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_instance_delete_should_abort_and_succeed_if_not_exists(
+            self, mock_hook, _check_if_instance_exists):
+        _check_if_instance_exists.return_value = False
+        op = CloudSqlInstanceDeleteOperator(
+            project_id=PROJECT_ID,
+            instance=INSTANCE_NAME,
+            task_id="id"
+        )
+        result = op.execute(None)
+        self.assertTrue(result)
+        mock_hook.assert_called_once_with(api_version="v1beta4",
+                                          gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.delete_instance.assert_not_called()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Basic operators for Google Cloud SQL (deploy / patch / delete)
> --------------------------------------------------------------
>
>                 Key: AIRFLOW-3231
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3231
>             Project: Apache Airflow
>          Issue Type: New Feature
>            Reporter: Szymon Przedwojski
>            Assignee: Szymon Przedwojski
>            Priority: Trivial
>
> In order to be able to interact with Google Cloud SQL, we need operators that 
> should be able to:
> Deploy Instance (Insert or Update): 
> ([https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/insert]
> [https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/update])
> Patch Instance 
> ([https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/patch]) 
> Delete Instance: 
> ([https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/delete])



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to