potiuk commented on a change in pull request #4167: [AIRFLOW-3220] Implement 
Instance Group Manager Operators for GCE
URL: https://github.com/apache/incubator-airflow/pull/4167#discussion_r232685539
 
 

 ##########
 File path: airflow/contrib/operators/gcp_compute_operator.py
 ##########
 @@ -181,3 +195,241 @@ def execute(self, context):
         self._validate_all_body_fields()
         return self._hook.set_machine_type(self.project_id, self.zone,
                                            self.resource_id, self.body)
+
+
+GCE_INSTANCE_TEMPLATE_VALIDATION_PATCH_SPECIFICATION = [
+    dict(name="name", regexp="^.+$"),
+    dict(name="description", optional=True),
+    dict(name="properties", type='dict', optional=True, fields=[
+        dict(name="description", optional=True),
+        dict(name="tags", optional=True, fields=[
+            dict(name="items", optional=True)
+        ]),
+        dict(name="machineType", optional=True),
+        dict(name="canIpForward", optional=True),
+        dict(name="networkInterfaces", optional=True),  # not validating deeper
+        dict(name="disks", optional=True),  # not validating the array deeper
+        dict(name="metadata", optional=True, fields=[
+            dict(name="fingerprint", optional=True),
+            dict(name="items", optional=True),
+            dict(name="kind", optional=True),
+        ]),
+        dict(name="serviceAccounts", optional=True),  # not validating deeper
+        dict(name="scheduling", optional=True, fields=[
+            dict(name="onHostMaintenance", optional=True),
+            dict(name="automaticRestart", optional=True),
+            dict(name="preemptible", optional=True),
+            dict(name="nodeAffinitites", optional=True),  # not validating 
deeper
+        ]),
+        dict(name="labels", optional=True),
+        dict(name="guestAccelerators", optional=True),  # not validating deeper
+        dict(name="minCpuPlatform", optional=True),
+    ]),
+]
+
+GCE_INSTANCE_TEMPLATE_FIELDS_TO_SANITIZE = [
+    "kind",
+    "id",
+    "name",
+    "creationTimestamp",
+    "properties.disks.sha256",
+    "properties.disks.kind",
+    "properties.disks.sourceImageEncryptionKey.sha256",
+    "properties.disks.index",
+    "properties.disks.licenses",
+    "properties.networkInterfaces.kind",
+    "properties.networkInterfaces.accessConfigs.kind",
+    "properties.networkInterfaces.name",
+    "properties.metadata.kind",
+    "selfLink"
+]
+
+
+class GceInstanceTemplateCopyOperator(GceBaseOperator):
+    """
+    Copies the instance template, applying specified changes.
+
+    :param project_id: Google Cloud Platform Project ID where the Compute 
Engine
+        instance exists.
+    :type project_id: str
+    :param resource_id: Name of the Instance Template
+    :type resource_id: str
+    :param body_patch: Patch to the body of instanceTemplates object following 
rfc7386
+            PATCH semantics. The body_patch content follows
+            
https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates
+            Name field is required as we need to rename the template,
+            all the other fields are optional. It is important to follow PATCH 
semantics
+            - arrays are replaced fully, so if you need to update an array you 
should
+            provide the whole target array as patch element.
+    :type body_patch: dict
+    :param request_id: Optional, unique request_id that you might add to 
achieve
+           full idempotence (for example when client call times out repeating 
the request
+           with the same request id will not create a new instance template 
again).
+           It should be in UUID format as defined in RFC 4122.
+    :type request_id: str
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud 
Platform.
+    :type gcp_conn_id: str
+    :param api_version: API version used (for example v1 or beta).
+    :type api_version: str
+    :param validate_body: If set to False, body validation is not performed.
+    :type validate_body: bool
+    """
+    # [START gce_instance_template_copy_operator_template_fields]
+    template_fields = ('project_id', 'resource_id', 'request_id',
+                       'gcp_conn_id', 'api_version')
+    # [END gce_instance_template_copy_operator_template_fields]
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 resource_id,
+                 body_patch,
+                 request_id=None,
+                 gcp_conn_id='google_cloud_default',
+                 api_version='v1',
+                 validate_body=True,
+                 *args, **kwargs):
+        self.body_patch = body_patch
+        self.request_id = request_id
+        self._field_validator = None
+        if 'name' not in self.body_patch:
+            raise AirflowException("The body '{}' should contain at least "
+                                   "name for the new operator in the 'name' 
field".
+                                   format(body_patch))
+        if validate_body:
+            self._field_validator = GcpBodyFieldValidator(
+                GCE_INSTANCE_TEMPLATE_VALIDATION_PATCH_SPECIFICATION, 
api_version=api_version)
+        self._field_sanitizer = GcpBodyFieldSanitizer(
+            GCE_INSTANCE_TEMPLATE_FIELDS_TO_SANITIZE)
+        super(GceInstanceTemplateCopyOperator, self).__init__(
+            project_id=project_id, zone='global', resource_id=resource_id,
+            gcp_conn_id=gcp_conn_id, api_version=api_version, *args, **kwargs)
+
+    def _validate_all_body_fields(self):
+        if self._field_validator:
+            self._field_validator.validate(self.body_patch)
+
+    def execute(self, context):
+        self._validate_all_body_fields()
+        try:
+            # Idempotence check (sort of) - we want to check if the new 
template
+            # is already created and if is, then we assume it was created by 
previous run
+            # of CopyTemplate operator - we do not check if content of the 
template
+            # is as expected. Templates are immutable so we cannot update it 
anyway
+            # and deleting/recreating is not worth the hassle especially
+            # that we cannot delete template if it is already used in some 
Instance
+            # Group Manager. We assume success if the template is simply 
present
+            existing_template = self._hook.get_instance_template(
+                project_id=self.project_id, 
resource_id=self.body_patch['name'])
+            self.log.info("The {} template already existed. It was likely "
+                          "created by previous run of the operator. Assuming 
success.")
+            return existing_template
+        except HttpError as e:
+            # We actually expect to get 404 / Not Found here as the template 
should
+            # not yet exist
+            if not e.resp.status == 404:
+                raise e
+        old_body = self._hook.get_instance_template(project_id=self.project_id,
+                                                    
resource_id=self.resource_id)
+        new_body = deepcopy(old_body)
+        self._field_sanitizer.sanitize(new_body)
+        new_body = merge(new_body, self.body_patch)
+        self.log.info("Calling insert instance template with updated body: {}".
+                      format(new_body))
+        self._hook.insert_instance_template(project_id=self.project_id,
+                                            body=new_body,
+                                            request_id=self.request_id)
+        return self._hook.get_instance_template(project_id=self.project_id,
+                                                
resource_id=self.body_patch['name'])
+
+
+class GceInstanceGroupManagerUpdateTemplateOperator(GceBaseOperator):
+    """
+    Patches the Instance Group Manager, replacing source template URL with the
+    destination one. API V1 does not have update/patch operations for Instance
+    Group Manager, so you must use beta or newer API version. Beta is the 
default.
+
+    :param project_id: Google Cloud Platform Project ID where the Compute 
Engine
+        instance exists.
+    :type project_id: str
+    :param resource_id: Name of the Instance Group Manager
+    :type resource_id: str
+    :param zone: Google Cloud Platform zone where the Instance Group Manager 
exists.
+    :type zone: str
+    :param request_id: Optional, unique request_id that you might add to 
achieve
+           full idempotence (for example when client call times out repeating 
the request
+           with the same request id will not create a new instance template 
again).
+           It should be in UUID format as defined in RFC 4122
+    :type request_id: str
+    :param update_policy: The update policy for this managed instance group. 
See
+     
https://cloud.google.com/compute/docs/reference/rest/beta/instanceGroupManagers/patch
+     for details of the updatePolicy fields. It's an optional field.
 
 Review comment:
   Fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to