kaxil closed pull request #4192: [AIRFLOW-3345] Add Google Cloud Storage (GCS) operators for ACL URL: https://github.com/apache/incubator-airflow/pull/4192
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/example_dags/example_gcs_acl.py b/airflow/contrib/example_dags/example_gcs_acl.py new file mode 100644 index 0000000000..7247199a4f --- /dev/null +++ b/airflow/contrib/example_dags/example_gcs_acl.py @@ -0,0 +1,76 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG that creates a new ACL entry on the specified bucket and object. + +This DAG relies on the following OS environment variables + +* GCS_ACL_BUCKET - Name of a bucket. +* GCS_ACL_OBJECT - Name of the object. For information about how to URL encode object + names to be path safe, see: + https://cloud.google.com/storage/docs/json_api/#encoding +* GCS_ACL_ENTITY - The entity holding the permission. +* GCS_ACL_BUCKET_ROLE - The access permission for the entity for the bucket. +* GCS_ACL_OBJECT_ROLE - The access permission for the entity for the object. +""" +import os + +import airflow +from airflow import models +from airflow.contrib.operators.gcs_acl_operator import \ + GoogleCloudStorageBucketCreateAclEntryOperator, \ + GoogleCloudStorageObjectCreateAclEntryOperator + +# [START howto_operator_gcs_acl_args_common] +GCS_ACL_BUCKET = os.environ.get('GCS_ACL_BUCKET', 'example-bucket') +GCS_ACL_OBJECT = os.environ.get('GCS_ACL_OBJECT', 'example-object') +GCS_ACL_ENTITY = os.environ.get('GCS_ACL_ENTITY', 'example-entity') +GCS_ACL_BUCKET_ROLE = os.environ.get('GCS_ACL_BUCKET_ROLE', 'example-bucket-role') +GCS_ACL_OBJECT_ROLE = os.environ.get('GCS_ACL_OBJECT_ROLE', 'example-object-role') +# [END howto_operator_gcs_acl_args_common] + +default_args = { + 'start_date': airflow.utils.dates.days_ago(1) +} + +with models.DAG( + 'example_gcs_acl', + default_args=default_args, + schedule_interval=None # Change to match your use case +) as dag: + # [START howto_operator_gcs_bucket_create_acl_entry_task] + gcs_bucket_create_acl_entry_task = GoogleCloudStorageBucketCreateAclEntryOperator( + bucket=GCS_ACL_BUCKET, + entity=GCS_ACL_ENTITY, + role=GCS_ACL_BUCKET_ROLE, + task_id="gcs_bucket_create_acl_entry_task" + ) + # [END howto_operator_gcs_bucket_create_acl_entry_task] + # [START howto_operator_gcs_object_create_acl_entry_task] + gcs_object_create_acl_entry_task = GoogleCloudStorageObjectCreateAclEntryOperator( + bucket=GCS_ACL_BUCKET, + object_name=GCS_ACL_OBJECT, + entity=GCS_ACL_ENTITY, + role=GCS_ACL_OBJECT_ROLE, + task_id="gcs_object_create_acl_entry_task" + ) + # [END howto_operator_gcs_object_create_acl_entry_task] + + gcs_bucket_create_acl_entry_task >> gcs_object_create_acl_entry_task diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py index dc92b0cb3e..f848d25dce 100644 --- a/airflow/contrib/hooks/gcs_hook.py +++ b/airflow/contrib/hooks/gcs_hook.py @@ -568,6 +568,104 @@ def create_bucket(self, 'Bucket creation failed. Error was: {}'.format(ex.content) ) + def insert_bucket_acl(self, bucket, entity, role, user_project): + # type: (str, str, str, str) -> None + """ + Creates a new ACL entry on the specified bucket. + See: https://cloud.google.com/storage/docs/json_api/v1/bucketAccessControls/insert + + :param bucket: Name of a bucket. + :type bucket: str + :param entity: The entity holding the permission, in one of the following forms: + - user-userId + - user-email + - group-groupId + - group-email + - domain-domain + - project-team-projectId + - allUsers + - allAuthenticatedUsers + :type entity: str + :param role: The access permission for the entity. + Acceptable values are: "OWNER", "READER", "WRITER". + :type role: str + :param user_project: (Optional) The project to be billed for this request. + Required for Requester Pays buckets. + :type user_project: str + """ + self.log.info('Creating a new ACL entry in bucket: %s', bucket) + service = self.get_conn() + try: + response = service.bucketAccessControls().insert( + bucket=bucket, + body={ + "entity": entity, + "role": role + }, + userProject=user_project + ).execute() + if response: + self.log.info('A new ACL entry created in bucket: %s', bucket) + except errors.HttpError as ex: + raise AirflowException( + 'Bucket ACL entry creation failed. Error was: {}'.format(ex.content) + ) + + def insert_object_acl(self, bucket, object_name, entity, role, generation, + user_project): + # type: (str, str, str, str, str, str) -> None + """ + Creates a new ACL entry on the specified object. + See: https://cloud.google.com/storage/docs/json_api/v1/objectAccessControls/insert + + :param bucket: Name of a bucket. + :type bucket: str + :param object_name: Name of the object. For information about how to URL encode + object names to be path safe, see: + https://cloud.google.com/storage/docs/json_api/#encoding + :type object_name: str + :param entity: The entity holding the permission, in one of the following forms: + - user-userId + - user-email + - group-groupId + - group-email + - domain-domain + - project-team-projectId + - allUsers + - allAuthenticatedUsers + :type entity: str + :param role: The access permission for the entity. + Acceptable values are: "OWNER", "READER". + :type role: str + :param generation: (Optional) If present, selects a specific revision of this + object (as opposed to the latest version, the default). + :type generation: str + :param user_project: (Optional) The project to be billed for this request. + Required for Requester Pays buckets. + :type user_project: str + """ + self.log.info('Creating a new ACL entry for object: %s in bucket: %s', + object_name, bucket) + service = self.get_conn() + try: + response = service.objectAccessControls().insert( + bucket=bucket, + object=object_name, + body={ + "entity": entity, + "role": role + }, + generation=generation, + userProject=user_project + ).execute() + if response: + self.log.info('A new ACL entry created for object: %s in bucket: %s', + object_name, bucket) + except errors.HttpError as ex: + raise AirflowException( + 'Object ACL entry creation failed. Error was: {}'.format(ex.content) + ) + def _parse_gcs_url(gsurl): """ diff --git a/airflow/contrib/operators/gcs_acl_operator.py b/airflow/contrib/operators/gcs_acl_operator.py new file mode 100644 index 0000000000..d918444131 --- /dev/null +++ b/airflow/contrib/operators/gcs_acl_operator.py @@ -0,0 +1,138 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + + +class GoogleCloudStorageBucketCreateAclEntryOperator(BaseOperator): + """ + Creates a new ACL entry on the specified bucket. + + :param bucket: Name of a bucket. + :type bucket: str + :param entity: The entity holding the permission, in one of the following forms: + - user-userId + - user-email + - group-groupId + - group-email + - domain-domain + - project-team-projectId + - allUsers + - allAuthenticatedUsers + :type entity: str + :param role: The access permission for the entity. + Acceptable values are: "OWNER", "READER", "WRITER". + :type role: str + :param user_project: (Optional) The project to be billed for this request. + Required for Requester Pays buckets. + :type user_project: str + :param google_cloud_storage_conn_id: The connection ID to use when + connecting to Google Cloud Storage. + :type google_cloud_storage_conn_id: str + """ + # [START gcs_bucket_create_acl_template_fields] + template_fields = ('bucket', 'entity', 'role', 'user_project') + # [END gcs_bucket_create_acl_template_fields] + + @apply_defaults + def __init__(self, bucket, entity, role, user_project=None, + google_cloud_storage_conn_id='google_cloud_default', *args, **kwargs): + super(GoogleCloudStorageBucketCreateAclEntryOperator, self).__init__(*args, + **kwargs) + self.bucket = bucket + self.entity = entity + self.role = role + self.user_project = user_project + self.google_cloud_storage_conn_id = google_cloud_storage_conn_id + + def execute(self, context): + hook = GoogleCloudStorageHook( + google_cloud_storage_conn_id=self.google_cloud_storage_conn_id + ) + hook.insert_bucket_acl(bucket=self.bucket, entity=self.entity, role=self.role, + user_project=self.user_project) + + +class GoogleCloudStorageObjectCreateAclEntryOperator(BaseOperator): + """ + Creates a new ACL entry on the specified object. + + :param bucket: Name of a bucket. + :type bucket: str + :param object_name: Name of the object. For information about how to URL encode object + names to be path safe, see: + https://cloud.google.com/storage/docs/json_api/#encoding + :type object_name: str + :param entity: The entity holding the permission, in one of the following forms: + - user-userId + - user-email + - group-groupId + - group-email + - domain-domain + - project-team-projectId + - allUsers + - allAuthenticatedUsers + :type entity: str + :param role: The access permission for the entity. + Acceptable values are: "OWNER", "READER". + :type role: str + :param generation: (Optional) If present, selects a specific revision of this object + (as opposed to the latest version, the default). + :type generation: str + :param user_project: (Optional) The project to be billed for this request. + Required for Requester Pays buckets. + :type user_project: str + :param google_cloud_storage_conn_id: The connection ID to use when + connecting to Google Cloud Storage. + :type google_cloud_storage_conn_id: str + """ + # [START gcs_object_create_acl_template_fields] + template_fields = ('bucket', 'object_name', 'entity', 'role', 'generation', + 'user_project') + # [END gcs_object_create_acl_template_fields] + + @apply_defaults + def __init__(self, + bucket, + object_name, + entity, + role, + generation=None, + user_project=None, + google_cloud_storage_conn_id='google_cloud_default', + *args, **kwargs): + super(GoogleCloudStorageObjectCreateAclEntryOperator, self).__init__(*args, + **kwargs) + self.bucket = bucket + self.object_name = object_name + self.entity = entity + self.role = role + self.generation = generation + self.user_project = user_project + self.google_cloud_storage_conn_id = google_cloud_storage_conn_id + + def execute(self, context): + hook = GoogleCloudStorageHook( + google_cloud_storage_conn_id=self.google_cloud_storage_conn_id + ) + hook.insert_object_acl(bucket=self.bucket, object_name=self.object_name, + entity=self.entity, role=self.role, + generation=self.generation, user_project=self.user_project) diff --git a/docs/howto/operator.rst b/docs/howto/operator.rst index 7d8053b41a..dc0f4c99d3 100644 --- a/docs/howto/operator.rst +++ b/docs/howto/operator.rst @@ -921,3 +921,92 @@ See `Google Cloud Sql Proxy documentation <https://cloud.google.com/sql/docs/postgres/sql-proxy>`_ for details about Cloud Sql Proxy. +Google Cloud Storage Operators +------------------------------ + +GoogleCloudStorageBucketCreateAclEntryOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Creates a new ACL entry on the specified bucket. + +For parameter definition take a look at +:class:`~airflow.contrib.operators.gcs_acl_operator.GoogleCloudStorageBucketCreateAclEntryOperator` + +Arguments +""""""""" + +Some arguments in the example DAG are taken from the OS environment variables: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcs_acl.py + :language: python + :start-after: [START howto_operator_gcs_acl_args_common] + :end-before: [END howto_operator_gcs_acl_args_common] + +Using the operator +"""""""""""""""""" + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcs_acl.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_gcs_bucket_create_acl_entry_task] + :end-before: [END howto_operator_gcs_bucket_create_acl_entry_task] + +Templating +"""""""""" + +.. literalinclude:: ../../airflow/contrib/operators/gcs_acl_operator.py + :language: python + :dedent: 4 + :start-after: [START gcs_bucket_create_acl_template_fields] + :end-before: [END gcs_bucket_create_acl_template_fields] + +More information +"""""""""""""""" + +See `Google Cloud Storage BucketAccessControls insert documentation +<https://cloud.google.com/storage/docs/json_api/v1/bucketAccessControls/insert>`_ +for details. + +GoogleCloudStorageObjectCreateAclEntryOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Creates a new ACL entry on the specified object. + +For parameter definition take a look at +:class:`~airflow.contrib.operators.gcs_acl_operator.GoogleCloudStorageObjectCreateAclEntryOperator` + +Arguments +""""""""" + +Some arguments in the example DAG are taken from the OS environment variables: + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcs_acl.py + :language: python + :start-after: [START howto_operator_gcs_acl_args_common] + :end-before: [END howto_operator_gcs_acl_args_common] + +Using the operator +"""""""""""""""""" + +.. literalinclude:: ../../airflow/contrib/example_dags/example_gcs_acl.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_gcs_object_create_acl_entry_task] + :end-before: [END howto_operator_gcs_object_create_acl_entry_task] + +Templating +"""""""""" + +.. literalinclude:: ../../airflow/contrib/operators/gcs_acl_operator.py + :language: python + :dedent: 4 + :start-after: [START gcs_object_create_acl_template_fields] + :end-before: [END gcs_object_create_acl_template_fields] + +More information +"""""""""""""""" + +See `Google Cloud Storage ObjectAccessControls insert documentation +<https://cloud.google.com/storage/docs/json_api/v1/objectAccessControls/insert>`_ +for details. + diff --git a/docs/integration.rst b/docs/integration.rst index d4fffa550d..091f462ea5 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -973,9 +973,11 @@ Storage Operators """"""""""""""""" - :ref:`FileToGoogleCloudStorageOperator` : Uploads a file to Google Cloud Storage. +- :ref:`GoogleCloudStorageBucketCreateAclEntryOperator` : Creates a new ACL entry on the specified bucket. - :ref:`GoogleCloudStorageCreateBucketOperator` : Creates a new cloud storage bucket. -- :ref:`GoogleCloudStorageListOperator` : List all objects from the bucket with the give string prefix and delimiter in name. - :ref:`GoogleCloudStorageDownloadOperator` : Downloads a file from Google Cloud Storage. +- :ref:`GoogleCloudStorageListOperator` : List all objects from the bucket with the give string prefix and delimiter in name. +- :ref:`GoogleCloudStorageObjectCreateAclEntryOperator` : Creates a new ACL entry on the specified object. - :ref:`GoogleCloudStorageToBigQueryOperator` : Loads files from Google cloud storage into BigQuery. - :ref:`GoogleCloudStorageToGoogleCloudStorageOperator` : Copies objects from a bucket to another, with renaming if requested. @@ -988,6 +990,13 @@ FileToGoogleCloudStorageOperator .. _GoogleCloudStorageCreateBucketOperator: +GoogleCloudStorageBucketCreateAclEntryOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcs_acl_operator.GoogleCloudStorageBucketCreateAclEntryOperator + +.. _GoogleCloudStorageBucketCreateAclEntryOperator: + GoogleCloudStorageCreateBucketOperator ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -1009,6 +1018,13 @@ GoogleCloudStorageListOperator .. _GoogleCloudStorageToBigQueryOperator: +GoogleCloudStorageObjectCreateAclEntryOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.gcs_acl_operator.GoogleCloudStorageObjectCreateAclEntryOperator + +.. _GoogleCloudStorageObjectCreateAclEntryOperator: + GoogleCloudStorageToBigQueryOperator ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/tests/contrib/operators/test_gcp_base.py b/tests/contrib/operators/test_gcp_base.py index 60e5abeb9f..7e786c5b4b 100644 --- a/tests/contrib/operators/test_gcp_base.py +++ b/tests/contrib/operators/test_gcp_base.py @@ -51,6 +51,7 @@ GCP_CLOUDSQL_KEY = 'gcp_cloudsql.json' GCP_BIGTABLE_KEY = 'gcp_bigtable.json' GCP_SPANNER_KEY = 'gcp_spanner.json' +GCP_GCS_KEY = 'gcp_gcs.json' SKIP_TEST_WARNING = """ The test is only run when there is GCP connection available! " diff --git a/tests/contrib/operators/test_gcs_acl_operator.py b/tests/contrib/operators/test_gcs_acl_operator.py new file mode 100644 index 0000000000..562c653574 --- /dev/null +++ b/tests/contrib/operators/test_gcs_acl_operator.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest + +from airflow.contrib.operators.gcs_acl_operator import \ + GoogleCloudStorageBucketCreateAclEntryOperator, \ + GoogleCloudStorageObjectCreateAclEntryOperator +from tests.contrib.operators.test_gcp_base import BaseGcpIntegrationTestCase, \ + SKIP_TEST_WARNING, GCP_GCS_KEY + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + + +class GoogleCloudStorageAclTest(unittest.TestCase): + @mock.patch('airflow.contrib.operators.gcs_acl_operator.GoogleCloudStorageHook') + def test_bucket_create_acl(self, mock_hook): + operator = GoogleCloudStorageBucketCreateAclEntryOperator( + bucket="test-bucket", + entity="test-entity", + role="test-role", + user_project="test-user-project", + task_id="id" + ) + operator.execute(None) + mock_hook.return_value.insert_bucket_acl.assert_called_once_with( + bucket="test-bucket", + entity="test-entity", + role="test-role", + user_project="test-user-project" + ) + + @mock.patch('airflow.contrib.operators.gcs_acl_operator.GoogleCloudStorageHook') + def test_object_create_acl(self, mock_hook): + operator = GoogleCloudStorageObjectCreateAclEntryOperator( + bucket="test-bucket", + object_name="test-object", + entity="test-entity", + role="test-role", + generation="test-generation", + user_project="test-user-project", + task_id="id" + ) + operator.execute(None) + mock_hook.return_value.insert_object_acl.assert_called_once_with( + bucket="test-bucket", + object_name="test-object", + entity="test-entity", + role="test-role", + generation="test-generation", + user_project="test-user-project" + ) + + [email protected]( + BaseGcpIntegrationTestCase.skip_check(GCP_GCS_KEY), SKIP_TEST_WARNING) +class CloudStorageExampleDagsIntegrationTest(BaseGcpIntegrationTestCase): + def __init__(self, method_name='runTest'): + super(CloudStorageExampleDagsIntegrationTest, self).__init__( + method_name, + dag_id='example_gcs_acl', + gcp_key=GCP_GCS_KEY) + + def test_run_example_dag_gcs_acl(self): + self._run_dag() ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
