potiuk closed pull request #4196: Support for cloud build
URL: https://github.com/apache/incubator-airflow/pull/4196
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/contrib/example_dags/example_gcp_compute_igm.py
b/airflow/contrib/example_dags/example_gcp_compute_igm.py
index 3e4543c60d..718b813aad 100644
--- a/airflow/contrib/example_dags/example_gcp_compute_igm.py
+++ b/airflow/contrib/example_dags/example_gcp_compute_igm.py
@@ -76,13 +76,13 @@
SOURCE_TEMPLATE_URL = os.environ.get(
'SOURCE_TEMPLATE_URL',
- "https://www.googleapis.com/compute/beta/projects/"
- "example-project/global/instanceTemplates/instance-template-test")
+ "https://www.googleapis.com/compute/beta/projects/" + PROJECT_ID +
+ "/global/instanceTemplates/instance-template-test")
DESTINATION_TEMPLATE_URL = os.environ.get(
'DESTINATION_TEMPLATE_URL',
- "https://www.googleapis.com/compute/beta/projects/"
- "example-airflow/global/instanceTemplates/" + NEW_TEMPLATE_NAME)
+ "https://www.googleapis.com/compute/beta/projects/" + PROJECT_ID +
+ "/global/instanceTemplates/" + NEW_TEMPLATE_NAME)
UPDATE_POLICY = {
"type": "OPPORTUNISTIC",
diff --git a/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
b/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
index 76563d7596..6c1ad89818 100644
--- a/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
+++ b/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
@@ -53,7 +53,7 @@
SOURCE_REPOSITORY = os.environ.get(
'SOURCE_REPOSITORY',
'https://source.developers.google.com/'
- 'projects/example-project/repos/hello-world/moveable-aliases/master')
+ 'projects/{}/repos/hello-world/moveable-aliases/master'.format(PROJECT_ID))
ZIP_PATH = os.environ.get('ZIP_PATH', '')
ENTRYPOINT = os.environ.get('ENTRYPOINT', 'helloWorld')
FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID,
LOCATION,
diff --git a/airflow/contrib/example_dags/example_gcp_sql_query.py
b/airflow/contrib/example_dags/example_gcp_sql_query.py
index af7c5c2447..f4f80526e4 100644
--- a/airflow/contrib/example_dags/example_gcp_sql_query.py
+++ b/airflow/contrib/example_dags/example_gcp_sql_query.py
@@ -42,6 +42,7 @@
import os
import subprocess
+from os.path import expanduser
from six.moves.urllib.parse import quote_plus
@@ -61,11 +62,11 @@
POSTGRES_PUBLIC_IP = os.environ.get('POSTGRES_PUBLIC_IP', '0.0.0.0')
POSTGRES_PUBLIC_PORT = os.environ.get('POSTGRES_PUBLIC_PORT', 5432)
POSTGRES_CLIENT_CERT_FILE = os.environ.get('POSTGRES_CLIENT_CERT_FILE',
- "/tmp/client-cert.pem")
+ ".key/client-cert.pem")
POSTGRES_CLIENT_KEY_FILE = os.environ.get('POSTGRES_CLIENT_KEY_FILE',
- "/tmp/client-key.pem")
+ ".key/client-key.pem")
POSTGRES_SERVER_CA_FILE = os.environ.get('POSTGRES_SERVER_CA_FILE',
- "/tmp/server-ca.pem")
+ ".key/server-ca.pem")
MYSQL_INSTANCE_NAME = os.environ.get('MYSQL_INSTANCE_NAME', 'testmysql')
MYSQL_DATABASE_NAME = os.environ.get('MYSQL_DATABASE_NAME', 'mysqldb')
@@ -74,11 +75,11 @@
MYSQL_PUBLIC_IP = os.environ.get('MYSQL_PUBLIC_IP', '0.0.0.0')
MYSQL_PUBLIC_PORT = os.environ.get('MYSQL_PUBLIC_PORT', 3306)
MYSQL_CLIENT_CERT_FILE = os.environ.get('MYSQL_CLIENT_CERT_FILE',
- "/tmp/client-cert.pem")
+ ".key/client-cert.pem")
MYSQL_CLIENT_KEY_FILE = os.environ.get('MYSQL_CLIENT_KEY_FILE',
- "/tmp/client-key.pem")
+ ".key/client-key.pem")
MYSQL_SERVER_CA_FILE = os.environ.get('MYSQL_SERVER_CA_FILE',
- "/tmp/server-ca.pem")
+ ".key/server-ca.pem")
SQL = [
'CREATE TABLE IF NOT EXISTS TABLE_TEST (I INTEGER)',
@@ -97,6 +98,16 @@
# [START howto_operator_cloudsql_query_connections]
+HOME_DIR = expanduser("~")
+
+
+def get_absolute_path(path):
+ if path.startswith("/"):
+ return path
+ else:
+ return os.path.join(HOME_DIR, path)
+
+
postgres_kwargs = dict(
user=quote_plus(POSTGRES_USER),
password=quote_plus(POSTGRES_PASSWORD),
@@ -106,9 +117,9 @@
location=quote_plus(LOCATION),
instance=quote_plus(POSTGRES_INSTANCE_NAME),
database=quote_plus(POSTGRES_DATABASE_NAME),
- client_cert_file=quote_plus(POSTGRES_CLIENT_CERT_FILE),
- client_key_file=quote_plus(POSTGRES_CLIENT_KEY_FILE),
- server_ca_file=quote_plus(POSTGRES_SERVER_CA_FILE)
+ client_cert_file=quote_plus(get_absolute_path(POSTGRES_CLIENT_CERT_FILE)),
+ client_key_file=quote_plus(get_absolute_path(POSTGRES_CLIENT_KEY_FILE)),
+ server_ca_file=quote_plus(get_absolute_path(POSTGRES_SERVER_CA_FILE))
)
# The connections below are created using one of the standard approaches - via
environment
@@ -169,9 +180,9 @@
location=quote_plus(LOCATION),
instance=quote_plus(MYSQL_INSTANCE_NAME),
database=quote_plus(MYSQL_DATABASE_NAME),
- client_cert_file=quote_plus(MYSQL_CLIENT_CERT_FILE),
- client_key_file=quote_plus(MYSQL_CLIENT_KEY_FILE),
- server_ca_file=quote_plus(MYSQL_SERVER_CA_FILE)
+ client_cert_file=quote_plus(get_absolute_path(MYSQL_CLIENT_CERT_FILE)),
+ client_key_file=quote_plus(get_absolute_path(MYSQL_CLIENT_KEY_FILE)),
+ server_ca_file=quote_plus(get_absolute_path(MYSQL_SERVER_CA_FILE))
)
# MySQL: connect via proxy over TCP (specific proxy version)
diff --git a/cloudbuild-modules.txt b/cloudbuild-modules.txt
new file mode 100644
index 0000000000..cb43d62bda
--- /dev/null
+++ b/cloudbuild-modules.txt
@@ -0,0 +1,3 @@
+tests.contrib.operators.test_gcp_compute_operator
+tests.contrib.operators.test_gcp_function_operator
+tests.contrib.operators.test_gcp_sql_operator
diff --git a/cloudbuild.yml b/cloudbuild.yml
new file mode 100644
index 0000000000..0ab1932827
--- /dev/null
+++ b/cloudbuild.yml
@@ -0,0 +1,81 @@
+steps:
+steps:
+- name: 'gcr.io/cloud-builders/gcloud'
+ args: ['source', 'repos', 'clone', 'incubator-airflow-keys', '/keys']
+ volumes:
+ - name: 'keys'
+ path: '/keys'
+- name: 'gcr.io/cloud-builders/gcloud'
+ entrypoint: '/bin/bash'
+ args: ['-c', './decrypt_all_files.bash']
+ dir: '/keys'
+ volumes:
+ - name: 'keys'
+ path: '/keys'
+- name: 'ubuntu'
+ id: 'tag'
+ args: ['bash', '-c', 'date -u +%Y%m%dT%H%M_$BUILD_ID | tee _TAG']
+- name: 'gcr.io/${PROJECT_ID}/airflow-breeze:latest'
+ id: 'docs'
+ waitFor: ['tag']
+ env: ["PYTHON_VERSION=2", "HOME=/root"]
+ args: [ "/bin/bash", "-c", "/airflow/_init.sh /airflow/_build_docs.sh"]
+ volumes:
+ - name: 'output'
+ path: '/airflow/output'
+- name: 'gcr.io/${PROJECT_ID}/airflow-breeze:latest'
+ id: 'python2-test'
+ waitFor: ['tag']
+ env:
+ - "HOME=/root"
+ - "PYTHON_VERSION=2"
+ - "TEST_POSTFIX=python2"
+ args: [ "/bin/bash", "-c",
+ "/airflow/_init.sh /airflow/_run_ci_tests.sh"]
+ volumes:
+ - name: 'output'
+ path: '/airflow/output'
+- name: 'gcr.io/${PROJECT_ID}/airflow-breeze:latest'
+ id: 'python3-test'
+ env:
+ - "HOME=/root"
+ - "PYTHON_VERSION=3"
+ - "TEST_POSTFIX=python3"
+ args: [ "/bin/bash", "-c",
+ "/airflow/_init.sh /airflow/_run_ci_tests.sh"]
+ volumes:
+ - name: 'output'
+ path: '/airflow/output'
+ - name: 'keys'
+ path: '/root/.key'
+ waitFor: ['tag']
+- name: 'gcr.io/${PROJECT_ID}/airflow-breeze:latest'
+ id: 'merge_tests'
+ waitFor: ['docs', 'python2-test', 'python3-test']
+ env: ["PYTHON_VERSION=2", "HOME=/root" ]
+ args: [ "/bin/bash", "-c", "/airflow/_init.sh /airflow/_merge_tests.sh
python2 python3"]
+ volumes:
+ - name: 'output'
+ path: '/airflow/output'
+ - name: 'keys'
+ path: '/root/.key'
+- name: "gcr.io/cloud-builders/gsutil"
+ id: 'send_artifacts'
+ args: ['-m', 'cp', '-r', '.', 'gs://${PROJECT_ID}${_GCS_BUCKET_POSTFIX}/']
+ dir: '/output'
+ volumes:
+ - name: 'output'
+ path: '/output'
+- name: 'gcr.io/${PROJECT_ID}/airflow-breeze:latest'
+ id: 'verify_tests'
+ waitFor: ['send_artifacts']
+ env: ["PYTHON_VERSION=2", "HOME=/root" ]
+ args: [ "/bin/bash", "-c", "/airflow/_init.sh /airflow/_verify_tests.sh"]
+ volumes:
+ - name: 'output'
+ path: '/airflow/output'
+substitutions:
+ _GCS_BUCKET_POSTFIX: "-builds"
+options:
+ machineType: 'N1_HIGHCPU_8'
+timeout: 2800s
diff --git a/setup.py b/setup.py
index 7aeb5b59c9..20626ad93c 100644
--- a/setup.py
+++ b/setup.py
@@ -161,6 +161,7 @@ def write_version(filename=os.path.join(*['airflow',
]
# major update coming soon, clamp to 0.x
cloudant = ['cloudant>=0.5.9,<2.0']
+coverage = ['coverage']
crypto = ['cryptography>=0.9.3']
dask = [
'distributed>=1.17.1, <2'
@@ -200,6 +201,7 @@ def write_version(filename=os.path.join(*['airflow',
jdbc = ['jaydebeapi>=1.1.1']
jenkins = ['python-jenkins>=0.4.15']
jira = ['JIRA>1.0.7']
+junit2html = ['junit2html==22']
kerberos = ['pykerberos>=1.1.13',
'requests_kerberos>=0.10.0',
'thrift_sasl>=0.2.0',
@@ -233,6 +235,7 @@ def write_version(filename=os.path.join(*['airflow',
vertica = ['vertica-python>=0.5.1']
webhdfs = ['hdfs[dataframe,avro,kerberos]>=2.0.4']
winrm = ['pywinrm==0.2.2']
+xunitmerge = ['xunitmerge==1.0.4']
zendesk = ['zdesk']
all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant + druid
+ pinot \
@@ -268,7 +271,7 @@ def write_version(filename=os.path.join(*['airflow',
docker + ssh + kubernetes + celery + azure_blob_storage + redis +
gcp_api +
datadog + zendesk + jdbc + ldap + kerberos + password + webhdfs +
jenkins +
druid + pinot + segment + snowflake + elasticsearch +
azure_data_lake +
- atlas)
+ atlas + coverage + junit2html + xunitmerge)
# Snakebite & Google Cloud Dataflow are not Python 3 compatible :'(
if PY3:
diff --git a/tests/contrib/operators/test_gcp_base.py
b/tests/contrib/operators/test_gcp_base.py
index 60e5abeb9f..1179189602 100644
--- a/tests/contrib/operators/test_gcp_base.py
+++ b/tests/contrib/operators/test_gcp_base.py
@@ -20,6 +20,7 @@
import os
import subprocess
import unittest
+from os.path import expanduser
from airflow import models, settings, configuration, AirflowException
from airflow.utils.timezone import datetime
@@ -41,10 +42,12 @@
OPERATORS_EXAMPLES_DAG_FOLDER = os.path.join(
AIRFLOW_MAIN_FOLDER, "airflow", "example_dags")
-TESTS_DAG_FOLDER = os.path.join(
+AIRFLOW_HOME = os.environ.get('AIRFLOW_HOME',
+ os.path.join(os.path.expanduser('~'), 'airflow'))
+UNIT_TEST_DAG_FOLDER = os.path.join(
AIRFLOW_MAIN_FOLDER, "tests", "dags")
-GCP_FOLDER_ENVIRONMENT_VARIABLE = "GCP_SERVICE_ACCOUNT_KEY_FOLDER"
+DAG_FOLDER = os.path.join(AIRFLOW_HOME, "dags")
GCP_COMPUTE_KEY = 'gcp_compute.json'
GCP_FUNCTION_KEY = 'gcp_function.json'
@@ -53,9 +56,11 @@
GCP_SPANNER_KEY = 'gcp_spanner.json'
SKIP_TEST_WARNING = """
-The test is only run when there is GCP connection available! "
-Set GCP_SERVICE_ACCOUNT_KEY_FOLDER environment variable if "
-you want to run them".
+
+The test is only run when there is GCP connection available!
+
+Set GCP_SERVICE_ACCOUNT_KEY_FOLDER environment variable if you want to run
them.
+
"""
@@ -75,13 +80,43 @@ def __init__(self,
self.project_extra = project_extra
self.full_key_path = None
- def _gcp_authenticate(self):
- key_dir_path = os.environ['GCP_SERVICE_ACCOUNT_KEY_FOLDER']
- self.full_key_path = os.path.join(key_dir_path, self.gcp_key)
+ @staticmethod
+ def execute_cmd(cmd):
+ print("Executing: '{}'".format(" ".join(cmd)))
+ return subprocess.call(cmd)
+
+ @staticmethod
+ def _get_key_path(key):
+ """
+ Returns key path - if GCP_SERVICE_ACCOUNT_KEY_FOLDER points to absolute
+ directory, it tries to find the key in this directory, otherwise
it assumes
+ that the folder is sub-directory of the HOME directory.
+ :param key: name of the key file to find.
+ :return: path of the key file or None if the key is not found
+ :rtype: str
+ """
+ if "GCP_SERVICE_ACCOUNT_KEY_FOLDER" not in os.environ:
+ return None
+ key_folder = os.environ["GCP_SERVICE_ACCOUNT_KEY_FOLDER"]
+ if not key_folder.startswith("/"):
+ home_dir = expanduser("~")
+ key_folder = os.path.join(home_dir, key_folder)
+ if not os.path.isdir(key_folder):
+ return None
+ key_path = os.path.join(key_folder, key)
+ if not os.path.isfile(key_path):
+ return None
+ return key_path
+
+ def gcp_authenticate(self):
+ """
+ Authenticate with service account specified.
+ """
+ self.full_key_path = self._get_key_path(self.gcp_key)
if not os.path.isfile(self.full_key_path):
raise Exception("The key {} could not be found. Please copy it to
the "
- "{} folder.".format(self.gcp_key, key_dir_path))
+ "{} path.".format(self.gcp_key,
self.full_key_path))
print("Setting the GCP key to {}".format(self.full_key_path))
# Checking if we can authenticate using service account credentials
provided
retcode = subprocess.call(['gcloud', 'auth',
'activate-service-account',
@@ -89,8 +124,13 @@ def _gcp_authenticate(self):
if retcode != 0:
raise AirflowException("The gcloud auth method was not
successful!")
self.update_connection_with_key_path()
- # Now we revoke all authentication here because we want to make sure
- # that all works fine with the credentials retrieved from the
gcp_connection
+
+ @staticmethod
+ def gcp_revoke_authentication():
+ """
+ Revoke default authentication. Tests should be run with default
authentication
+ revoked because the authentication from Connection table should be
used.
+ """
subprocess.call(['gcloud', 'auth', 'revoke'])
def update_connection_with_key_path(self):
@@ -135,8 +175,15 @@ def update_connection_with_dictionary(self):
finally:
session.close()
+ @staticmethod
+ def _get_dag_folder():
+ if os.environ.get('AIRFLOW__CORE__UNIT_TEST_MODE'):
+ return UNIT_TEST_DAG_FOLDER
+ else:
+ return DAG_FOLDER
+
def _symlink_dag(self):
- target_path = os.path.join(TESTS_DAG_FOLDER, self.dag_name)
+ target_path = os.path.join(self._get_dag_folder(), self.dag_name)
if os.path.exists(target_path):
os.remove(target_path)
os.symlink(
@@ -144,10 +191,11 @@ def _symlink_dag(self):
os.path.join(target_path))
def _rm_symlink_dag(self):
- os.remove(os.path.join(TESTS_DAG_FOLDER, self.dag_name))
+ os.remove(os.path.join(self._get_dag_folder(), self.dag_name))
def _run_dag(self):
- dag_bag = models.DagBag(dag_folder=TESTS_DAG_FOLDER,
include_examples=False)
+ dag_bag = models.DagBag(dag_folder=self._get_dag_folder(),
+ include_examples=False)
self.args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
dag = dag_bag.get_dag(self.dag_id)
dag.clear(reset_dag_runs=True)
@@ -155,7 +203,10 @@ def _run_dag(self):
def setUp(self):
configuration.conf.load_test_config()
- self._gcp_authenticate()
+ self.gcp_authenticate()
+ # We checked that authentication works - but then we revoke it to make
+ # sure we are not relying on the authentication
+ self.gcp_revoke_authentication()
self._symlink_dag()
def tearDown(self):
@@ -163,12 +214,5 @@ def tearDown(self):
@staticmethod
def skip_check(key):
- if GCP_FOLDER_ENVIRONMENT_VARIABLE not in os.environ:
- return True
- key_folder = os.environ[GCP_FOLDER_ENVIRONMENT_VARIABLE]
- if not os.path.isdir(key_folder):
- return True
- key_path = os.path.join(key_folder, key)
- if not os.path.isfile(key_path):
- return True
- return False
+ key_path = BaseGcpIntegrationTestCase._get_key_path(key)
+ return key_path is None
diff --git a/tests/contrib/operators/test_gcp_compute_operator.py
b/tests/contrib/operators/test_gcp_compute_operator.py
index 4a4e336b7c..f54d56575c 100644
--- a/tests/contrib/operators/test_gcp_compute_operator.py
+++ b/tests/contrib/operators/test_gcp_compute_operator.py
@@ -17,6 +17,7 @@
# specific language governing permissions and limitations
# under the License.
import ast
+import os
import unittest
from copy import deepcopy
@@ -29,6 +30,8 @@
GceInstanceGroupManagerUpdateTemplateOperator
from airflow.models import TaskInstance, DAG
from airflow.utils import timezone
+from tests.contrib.operators.test_gcp_base import BaseGcpIntegrationTestCase, \
+ GCP_COMPUTE_KEY, SKIP_TEST_WARNING
try:
# noinspection PyProtectedMember
@@ -73,7 +76,7 @@ def test_instance_start(self, mock_hook):
# Setting all of the operator's input parameters as template dag_ids
# (could be anything else) just to test if the templating works for all
fields
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
- def test_instance_start_with_templates(self, mock_hook):
+ def test_instance_start_with_templates(self, _):
dag_id = 'test_dag_id'
configuration.load_test_config()
args = {
@@ -161,7 +164,7 @@ def test_instance_stop(self, mock_hook):
# Setting all of the operator's input parameters as templated dag_ids
# (could be anything else) just to test if the templating works for all
fields
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
- def test_instance_stop_with_templates(self, mock_hook):
+ def test_instance_stop_with_templates(self, _):
dag_id = 'test_dag_id'
configuration.load_test_config()
args = {
@@ -250,7 +253,7 @@ def test_set_machine_type(self, mock_hook):
# Setting all of the operator's input parameters as templated dag_ids
# (could be anything else) just to test if the templating works for all
fields
@mock.patch('airflow.contrib.operators.gcp_compute_operator.GceHook')
- def test_set_machine_type_with_templates(self, mock_hook):
+ def test_set_machine_type_with_templates(self, _):
dag_id = 'test_dag_id'
configuration.load_test_config()
args = {
@@ -367,7 +370,8 @@ def test_set_machine_type_should_handle_and_trim_gce_error(
self, get_conn, _execute_set_machine_type,
_check_zone_operation_status):
get_conn.return_value = {}
_execute_set_machine_type.return_value = {"name": "test-operation"}
- _check_zone_operation_status.return_value =
ast.literal_eval(self.MOCK_OP_RESPONSE)
+ _check_zone_operation_status.return_value = ast.literal_eval(
+ self.MOCK_OP_RESPONSE)
with self.assertRaises(AirflowException) as cm:
op = GceSetMachineTypeOperator(
project_id=PROJECT_ID,
@@ -1039,3 +1043,112 @@ def test_try_to_use_non_existing_template(self,
mock_hook):
gcp_conn_id='google_cloud_default')
mock_hook.return_value.patch_instance_group_manager.assert_not_called()
self.assertTrue(result)
+
+
+ITEST_INSTANCE = os.environ.get('INSTANCE', 'testinstance')
+ITEST_PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
+ITEST_INSTANCE_GROUP_MANAGER_NAME =
os.environ.get('INSTANCE_GROUP_MANAGER_NAME',
+ 'instance-group-test')
+ITEST_ZONE = os.environ.get('ZONE', 'europe-west1-b')
+ITEST_TEMPLATE_NAME = os.environ.get('TEMPLATE_NAME',
+ 'instance-template-test')
+ITEST_NEW_TEMPLATE_NAME = os.environ.get('NEW_TEMPLATE_NAME',
+ 'instance-template-test-new')
+
+
[email protected](
+ BaseGcpIntegrationTestCase.skip_check(GCP_COMPUTE_KEY), SKIP_TEST_WARNING)
+class GcpComputeExampleDagsIntegrationTest(BaseGcpIntegrationTestCase):
+
+ @staticmethod
+ def delete_instance():
+ BaseGcpIntegrationTestCase.execute_cmd([
+ 'gcloud', 'beta', 'compute', '--project', ITEST_PROJECT_ID,
'--quiet',
+ 'instances', 'delete', ITEST_INSTANCE, '--zone', ITEST_ZONE,
+ ])
+
+ @staticmethod
+ def create_instance():
+ BaseGcpIntegrationTestCase.execute_cmd([
+ 'gcloud', 'beta', 'compute', '--project', ITEST_PROJECT_ID,
'--quiet',
+ 'instances', 'create', ITEST_INSTANCE,
+ '--zone', ITEST_ZONE
+ ])
+
+ def setUp(self):
+ super(GcpComputeExampleDagsIntegrationTest, self).setUp()
+ self.gcp_authenticate()
+ self.delete_instance()
+ self.create_instance()
+ self.gcp_revoke_authentication()
+
+ def tearDown(self):
+ self.gcp_authenticate()
+ self.delete_instance()
+ self.gcp_revoke_authentication()
+ super(GcpComputeExampleDagsIntegrationTest, self).tearDown()
+
+ def __init__(self, method_name='runTest'):
+ super(GcpComputeExampleDagsIntegrationTest, self).__init__(
+ method_name,
+ dag_id='example_gcp_compute',
+ gcp_key=GCP_COMPUTE_KEY)
+
+ def test_run_example_dag_compute_igm(self):
+ self._run_dag()
+
+
[email protected](
+ BaseGcpIntegrationTestCase.skip_check(GCP_COMPUTE_KEY), SKIP_TEST_WARNING)
+class GcpComputeIgmExampleDagsIntegrationTest(BaseGcpIntegrationTestCase):
+
+ @staticmethod
+ def delete_instance_group_and_template():
+ BaseGcpIntegrationTestCase.execute_cmd([
+ 'gcloud', 'beta', 'compute', '--project', ITEST_PROJECT_ID,
'--quiet',
+ 'instance-groups', 'managed', 'delete',
ITEST_INSTANCE_GROUP_MANAGER_NAME,
+ ])
+ BaseGcpIntegrationTestCase.execute_cmd([
+ 'gcloud', 'beta', 'compute', '--project', ITEST_PROJECT_ID,
'--quiet',
+ 'instance-templates', 'delete', ITEST_NEW_TEMPLATE_NAME
+ ])
+ BaseGcpIntegrationTestCase.execute_cmd([
+ 'gcloud', 'beta', 'compute',
+ '--project', ITEST_PROJECT_ID, '--quiet',
+ 'instance-templates', 'delete', ITEST_TEMPLATE_NAME
+ ])
+
+ @staticmethod
+ def create_instance_group_and_template():
+ BaseGcpIntegrationTestCase.execute_cmd([
+ 'gcloud', 'beta', 'compute', '--project', ITEST_PROJECT_ID,
'--quiet',
+ 'instance-templates', 'create', ITEST_TEMPLATE_NAME
+ ])
+ BaseGcpIntegrationTestCase.execute_cmd([
+ 'gcloud', 'beta', 'compute', '--project', ITEST_PROJECT_ID,
'--quiet',
+ 'instance-groups', 'managed', 'create',
ITEST_INSTANCE_GROUP_MANAGER_NAME,
+ '--template', ITEST_TEMPLATE_NAME,
+ '--zone', ITEST_ZONE, '--size=1'
+ ])
+
+ def setUp(self):
+ super(GcpComputeIgmExampleDagsIntegrationTest, self).setUp()
+ self.gcp_authenticate()
+ self.delete_instance_group_and_template()
+ self.create_instance_group_and_template()
+ self.gcp_revoke_authentication()
+
+ def tearDown(self):
+ self.gcp_authenticate()
+ self.delete_instance_group_and_template()
+ self.gcp_revoke_authentication()
+ super(GcpComputeIgmExampleDagsIntegrationTest, self).tearDown()
+
+ def __init__(self, method_name='runTest'):
+ super(GcpComputeIgmExampleDagsIntegrationTest, self).__init__(
+ method_name,
+ dag_id='example_gcp_compute_igm',
+ gcp_key=GCP_COMPUTE_KEY)
+
+ def test_run_example_dag_compute_igm(self):
+ self._run_dag()
diff --git a/tests/contrib/operators/test_gcp_function_operator.py
b/tests/contrib/operators/test_gcp_function_operator.py
index 46d599bf7d..18ab25119f 100644
--- a/tests/contrib/operators/test_gcp_function_operator.py
+++ b/tests/contrib/operators/test_gcp_function_operator.py
@@ -29,6 +29,9 @@
from copy import deepcopy
+from tests.contrib.operators.test_gcp_base import BaseGcpIntegrationTestCase, \
+ GCP_FUNCTION_KEY, SKIP_TEST_WARNING
+
try:
# noinspection PyProtectedMember
from unittest import mock
@@ -639,3 +642,29 @@ def test_non_404_gcf_error_bubbled_up(self, mock_hook):
mock_hook.return_value.delete_function.assert_called_once_with(
'projects/project_name/locations/project_location/functions/function_name'
)
+
+
[email protected](
+ BaseGcpIntegrationTestCase.skip_check(GCP_FUNCTION_KEY), SKIP_TEST_WARNING)
+class
CloudFunctionsDeleteExampleDagsIntegrationTest(BaseGcpIntegrationTestCase):
+ def __init__(self, method_name='runTest'):
+ super(CloudFunctionsDeleteExampleDagsIntegrationTest, self).__init__(
+ method_name,
+ dag_id='example_gcp_function_delete',
+ gcp_key=GCP_FUNCTION_KEY)
+
+ def test_run_example_dag_delete_query(self):
+ self._run_dag()
+
+
[email protected](
+ BaseGcpIntegrationTestCase.skip_check(GCP_FUNCTION_KEY), SKIP_TEST_WARNING)
+class
CloudFunctionsDeployDeleteExampleDagsIntegrationTest(BaseGcpIntegrationTestCase):
+ def __init__(self, method_name='runTest'):
+ super(CloudFunctionsDeployDeleteExampleDagsIntegrationTest,
self).__init__(
+ method_name,
+ dag_id='example_gcp_function_deploy_delete',
+ gcp_key=GCP_FUNCTION_KEY)
+
+ def test_run_example_dag_deploy_delete_query(self):
+ self._run_dag()
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services