This is an automated email from the ASF dual-hosted git repository. beto pushed a commit to branch aws-iam in repository https://gitbox.apache.org/repos/asf/superset.git
commit af16bd9a19884e7e93ce1c35a5d2af1c62a07a77 Author: Beto Dealmeida <[email protected]> AuthorDate: Mon Jan 26 10:41:15 2026 -0500 Provisioned Redshift clusters --- superset/db_engine_specs/aws_iam.py | 153 +++++++++++-- tests/unit_tests/db_engine_specs/test_aws_iam.py | 247 ++++++++++++++++++++- .../db_engine_specs/test_redshift_iam.py | 137 ++++++++++++ 3 files changed, 518 insertions(+), 19 deletions(-) diff --git a/superset/db_engine_specs/aws_iam.py b/superset/db_engine_specs/aws_iam.py index 29145b811a5..cdd597af741 100644 --- a/superset/db_engine_specs/aws_iam.py +++ b/superset/db_engine_specs/aws_iam.py @@ -70,6 +70,8 @@ class AWSIAMConfig(TypedDict, total=False): # Redshift Serverless fields workgroup_name: str db_name: str + # Redshift provisioned cluster fields + cluster_identifier: str class AWSIAMAuthMixin: @@ -318,7 +320,68 @@ class AWSIAMAuthMixin: except ClientError as ex: raise SupersetSecurityException( SupersetError( - message=f"Failed to get Redshift credentials: {ex}", + message=f"Failed to get Redshift Serverless credentials: {ex}", + error_type=SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR, + level=ErrorLevel.ERROR, + ) + ) from ex + + @classmethod + def generate_redshift_cluster_credentials( + cls, + credentials: dict[str, Any], + cluster_identifier: str, + db_user: str, + db_name: str, + region: str, + auto_create: bool = False, + ) -> tuple[str, str]: + """ + Generate credentials for a provisioned Redshift cluster using temporary + STS credentials. + + :param credentials: STS credentials from assume_role + :param cluster_identifier: Redshift cluster identifier + :param db_user: Database username to get credentials for + :param db_name: Redshift database name + :param region: AWS region + :param auto_create: Whether to auto-create the database user if it doesn't exist + :returns: Tuple of (username, password) for Redshift connection + :raises SupersetSecurityException: If credential generation fails + """ + try: + import boto3 + from botocore.exceptions import ClientError + except ImportError as ex: + raise SupersetSecurityException( + SupersetError( + message="boto3 is required for AWS IAM authentication.", + error_type=SupersetErrorType.GENERIC_DB_ENGINE_ERROR, + level=ErrorLevel.ERROR, + ) + ) from ex + + try: + client = boto3.client( + "redshift", + region_name=region, + aws_access_key_id=credentials["AccessKeyId"], + aws_secret_access_key=credentials["SecretAccessKey"], + aws_session_token=credentials["SessionToken"], + ) + + response = client.get_cluster_credentials( + ClusterIdentifier=cluster_identifier, + DbUser=db_user, + DbName=db_name, + AutoCreate=auto_create, + ) + return response["DbUser"], response["DbPassword"] + + except ClientError as ex: + raise SupersetSecurityException( + SupersetError( + message=f"Failed to get Redshift cluster credentials: {ex}", error_type=SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR, level=ErrorLevel.ERROR, ) @@ -441,7 +504,11 @@ class AWSIAMAuthMixin: iam_config: AWSIAMConfig, ) -> None: """ - Apply Redshift Serverless IAM authentication to connection parameters. + Apply Redshift IAM authentication to connection parameters. + + Supports both Redshift Serverless (workgroup_name) and provisioned + clusters (cluster_identifier). The method auto-detects which type + based on the configuration provided. Flow: assume role -> get Redshift credentials -> update connect_args -> SSL. @@ -455,20 +522,56 @@ class AWSIAMAuthMixin: region = iam_config.get("region") external_id = iam_config.get("external_id") session_duration = iam_config.get("session_duration", DEFAULT_SESSION_DURATION) + + # Serverless fields workgroup_name = iam_config.get("workgroup_name") + + # Provisioned cluster fields + cluster_identifier = iam_config.get("cluster_identifier") + db_username = iam_config.get("db_username") + + # Common field db_name = iam_config.get("db_name") - # Validate required fields + # Determine deployment type + is_serverless = bool(workgroup_name) + is_provisioned = bool(cluster_identifier) + + if is_serverless and is_provisioned: + raise SupersetSecurityException( + SupersetError( + message="AWS IAM configuration cannot have both workgroup_name " + "(Serverless) and cluster_identifier (provisioned). " + "Please specify only one.", + error_type=SupersetErrorType.CONNECTION_MISSING_PARAMETERS_ERROR, + level=ErrorLevel.ERROR, + ) + ) + + if not is_serverless and not is_provisioned: + raise SupersetSecurityException( + SupersetError( + message="AWS IAM configuration must include either workgroup_name " + "(for Redshift Serverless) or cluster_identifier " + "(for provisioned Redshift clusters).", + error_type=SupersetErrorType.CONNECTION_MISSING_PARAMETERS_ERROR, + level=ErrorLevel.ERROR, + ) + ) + + # Validate common required fields missing_fields = [] if not role_arn: missing_fields.append("role_arn") if not region: missing_fields.append("region") - if not workgroup_name: - missing_fields.append("workgroup_name") if not db_name: missing_fields.append("db_name") + # Validate provisioned cluster specific fields + if is_provisioned and not db_username: + missing_fields.append("db_username") + if missing_fields: raise SupersetSecurityException( SupersetError( @@ -482,14 +585,8 @@ class AWSIAMAuthMixin: # Type assertions after validation assert role_arn is not None assert region is not None - assert workgroup_name is not None assert db_name is not None - logger.debug( - "Applying Redshift IAM authentication for workgroup %s", - workgroup_name, - ) - # Step 1: Assume the IAM role credentials = cls.get_iam_credentials( role_arn=role_arn, @@ -498,13 +595,33 @@ class AWSIAMAuthMixin: session_duration=session_duration, ) - # Step 2: Get Redshift Serverless credentials - db_user, db_password = cls.generate_redshift_credentials( - credentials=credentials, - workgroup_name=workgroup_name, - db_name=db_name, - region=region, - ) + # Step 2: Get Redshift credentials based on deployment type + if is_serverless: + assert workgroup_name is not None + logger.debug( + "Applying Redshift Serverless IAM authentication for workgroup %s", + workgroup_name, + ) + db_user, db_password = cls.generate_redshift_credentials( + credentials=credentials, + workgroup_name=workgroup_name, + db_name=db_name, + region=region, + ) + else: + assert cluster_identifier is not None + assert db_username is not None + logger.debug( + "Applying Redshift provisioned cluster IAM authentication for %s", + cluster_identifier, + ) + db_user, db_password = cls.generate_redshift_cluster_credentials( + credentials=credentials, + cluster_identifier=cluster_identifier, + db_user=db_username, + db_name=db_name, + region=region, + ) # Step 3: Update connection parameters connect_args = params.setdefault("connect_args", {}) diff --git a/tests/unit_tests/db_engine_specs/test_aws_iam.py b/tests/unit_tests/db_engine_specs/test_aws_iam.py index 607b5a0fbf1..7b316f58aa6 100644 --- a/tests/unit_tests/db_engine_specs/test_aws_iam.py +++ b/tests/unit_tests/db_engine_specs/test_aws_iam.py @@ -646,7 +646,7 @@ def test_generate_redshift_credentials_client_error() -> None: region="us-east-1", ) - assert "Failed to get Redshift credentials" in str(exc_info.value) + assert "Failed to get Redshift Serverless credentials" in str(exc_info.value) def test_apply_redshift_iam_authentication() -> None: @@ -755,3 +755,248 @@ def test_apply_redshift_iam_authentication_missing_db_name() -> None: ) assert "db_name" in str(exc_info.value) + + +def test_generate_redshift_cluster_credentials() -> None: + from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin + + credentials = { + "AccessKeyId": "ASIA...", + "SecretAccessKey": "secret...", + "SessionToken": "token...", + } + + with patch("boto3.client") as mock_boto3_client: + mock_redshift = MagicMock() + mock_redshift.get_cluster_credentials.return_value = { + "DbUser": "IAM:superset_user", + "DbPassword": "redshift-cluster-temp-password", + } + mock_boto3_client.return_value = mock_redshift + + db_user, db_password = AWSIAMAuthMixin.generate_redshift_cluster_credentials( + credentials=credentials, + cluster_identifier="my-redshift-cluster", + db_user="superset_user", + db_name="analytics", + region="us-east-1", + ) + + assert db_user == "IAM:superset_user" + assert db_password == "redshift-cluster-temp-password" # noqa: S105 + mock_boto3_client.assert_called_once_with( + "redshift", + region_name="us-east-1", + aws_access_key_id="ASIA...", + aws_secret_access_key="secret...", # noqa: S106 + aws_session_token="token...", # noqa: S106 + ) + mock_redshift.get_cluster_credentials.assert_called_once_with( + ClusterIdentifier="my-redshift-cluster", + DbUser="superset_user", + DbName="analytics", + AutoCreate=False, + ) + + +def test_generate_redshift_cluster_credentials_with_auto_create() -> None: + from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin + + credentials = { + "AccessKeyId": "ASIA...", + "SecretAccessKey": "secret...", + "SessionToken": "token...", + } + + with patch("boto3.client") as mock_boto3_client: + mock_redshift = MagicMock() + mock_redshift.get_cluster_credentials.return_value = { + "DbUser": "IAM:new_user", + "DbPassword": "temp-password", + } + mock_boto3_client.return_value = mock_redshift + + AWSIAMAuthMixin.generate_redshift_cluster_credentials( + credentials=credentials, + cluster_identifier="my-cluster", + db_user="new_user", + db_name="dev", + region="us-west-2", + auto_create=True, + ) + + mock_redshift.get_cluster_credentials.assert_called_once_with( + ClusterIdentifier="my-cluster", + DbUser="new_user", + DbName="dev", + AutoCreate=True, + ) + + +def test_generate_redshift_cluster_credentials_client_error() -> None: + from botocore.exceptions import ClientError + + from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin + + credentials = { + "AccessKeyId": "ASIA...", + "SecretAccessKey": "secret...", + "SessionToken": "token...", + } + + with patch("boto3.client") as mock_boto3_client: + mock_redshift = MagicMock() + mock_redshift.get_cluster_credentials.side_effect = ClientError( + {"Error": {"Code": "ClusterNotFound", "Message": "Cluster not found"}}, + "GetClusterCredentials", + ) + mock_boto3_client.return_value = mock_redshift + + with pytest.raises(SupersetSecurityException) as exc_info: + AWSIAMAuthMixin.generate_redshift_cluster_credentials( + credentials=credentials, + cluster_identifier="nonexistent-cluster", + db_user="superset_user", + db_name="dev", + region="us-east-1", + ) + + assert "Failed to get Redshift cluster credentials" in str(exc_info.value) + + +def test_apply_redshift_iam_authentication_provisioned_cluster() -> None: + from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin, AWSIAMConfig + + mock_database = MagicMock() + mock_database.sqlalchemy_uri_decrypted = ( + "redshift+psycopg2://[email protected]" + ".redshift.amazonaws.com:5439/analytics" + ) + + iam_config: AWSIAMConfig = { + "enabled": True, + "role_arn": "arn:aws:iam::123456789012:role/RedshiftRole", + "region": "us-east-1", + "cluster_identifier": "my-cluster", + "db_username": "superset_user", + "db_name": "analytics", + } + + params: dict[str, Any] = {} + + with ( + patch.object( + AWSIAMAuthMixin, + "get_iam_credentials", + return_value={ + "AccessKeyId": "ASIA...", + "SecretAccessKey": "secret...", + "SessionToken": "token...", + }, + ) as mock_get_creds, + patch.object( + AWSIAMAuthMixin, + "generate_redshift_cluster_credentials", + return_value=("IAM:superset_user", "cluster-temp-password"), + ) as mock_gen_creds, + ): + AWSIAMAuthMixin._apply_redshift_iam_authentication( + mock_database, params, iam_config + ) + + mock_get_creds.assert_called_once_with( + role_arn="arn:aws:iam::123456789012:role/RedshiftRole", + region="us-east-1", + external_id=None, + session_duration=3600, + ) + + mock_gen_creds.assert_called_once_with( + credentials={ + "AccessKeyId": "ASIA...", + "SecretAccessKey": "secret...", + "SessionToken": "token...", + }, + cluster_identifier="my-cluster", + db_user="superset_user", + db_name="analytics", + region="us-east-1", + ) + + assert params["connect_args"]["password"] == "cluster-temp-password" # noqa: S105 + assert params["connect_args"]["user"] == "IAM:superset_user" + assert params["connect_args"]["sslmode"] == "verify-ca" + + +def test_apply_redshift_iam_authentication_provisioned_missing_db_username() -> None: + from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin, AWSIAMConfig + + mock_database = MagicMock() + mock_database.sqlalchemy_uri_decrypted = "redshift+psycopg2://user@host:5439/dev" + + iam_config: AWSIAMConfig = { + "enabled": True, + "role_arn": "arn:aws:iam::123456789012:role/RedshiftRole", + "region": "us-east-1", + "cluster_identifier": "my-cluster", + "db_name": "dev", + # Missing db_username - required for provisioned clusters + } + + params: dict[str, Any] = {} + + with pytest.raises(SupersetSecurityException) as exc_info: + AWSIAMAuthMixin._apply_redshift_iam_authentication( + mock_database, params, iam_config + ) + + assert "db_username" in str(exc_info.value) + + +def test_apply_redshift_iam_authentication_both_workgroup_and_cluster() -> None: + from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin, AWSIAMConfig + + mock_database = MagicMock() + mock_database.sqlalchemy_uri_decrypted = "redshift+psycopg2://user@host:5439/dev" + + iam_config: AWSIAMConfig = { + "enabled": True, + "role_arn": "arn:aws:iam::123456789012:role/RedshiftRole", + "region": "us-east-1", + "workgroup_name": "my-workgroup", + "cluster_identifier": "my-cluster", + "db_name": "dev", + } + + params: dict[str, Any] = {} + + with pytest.raises(SupersetSecurityException) as exc_info: + AWSIAMAuthMixin._apply_redshift_iam_authentication( + mock_database, params, iam_config + ) + + assert "cannot have both" in str(exc_info.value) + + +def test_apply_redshift_iam_authentication_neither_workgroup_nor_cluster() -> None: + from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin, AWSIAMConfig + + mock_database = MagicMock() + mock_database.sqlalchemy_uri_decrypted = "redshift+psycopg2://user@host:5439/dev" + + iam_config: AWSIAMConfig = { + "enabled": True, + "role_arn": "arn:aws:iam::123456789012:role/RedshiftRole", + "region": "us-east-1", + "db_name": "dev", + # Missing both workgroup_name and cluster_identifier + } + + params: dict[str, Any] = {} + + with pytest.raises(SupersetSecurityException) as exc_info: + AWSIAMAuthMixin._apply_redshift_iam_authentication( + mock_database, params, iam_config + ) + + assert "must include either workgroup_name" in str(exc_info.value) diff --git a/tests/unit_tests/db_engine_specs/test_redshift_iam.py b/tests/unit_tests/db_engine_specs/test_redshift_iam.py index 3f0da7e734e..9e17259f6d6 100644 --- a/tests/unit_tests/db_engine_specs/test_redshift_iam.py +++ b/tests/unit_tests/db_engine_specs/test_redshift_iam.py @@ -243,3 +243,140 @@ def test_redshift_mask_encrypted_extra() -> None: assert masked_config["aws_iam"]["region"] == "us-east-1" assert masked_config["aws_iam"]["workgroup_name"] == "my-workgroup" assert masked_config["aws_iam"]["db_name"] == "dev" + + +def test_redshift_update_params_with_iam_provisioned_cluster() -> None: + from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin + from superset.db_engine_specs.redshift import RedshiftEngineSpec + + database = MagicMock() + database.encrypted_extra = json.dumps( + { + "aws_iam": { + "enabled": True, + "role_arn": "arn:aws:iam::123456789012:role/RedshiftRole", + "region": "us-east-1", + "cluster_identifier": "my-redshift-cluster", + "db_username": "superset_user", + "db_name": "analytics", + } + } + ) + database.sqlalchemy_uri_decrypted = ( + "redshift+psycopg2://[email protected]" + ".redshift.amazonaws.com:5439/analytics" + ) + + params: dict[str, Any] = {} + + with ( + patch.object( + AWSIAMAuthMixin, + "get_iam_credentials", + return_value={ + "AccessKeyId": "ASIA...", + "SecretAccessKey": "secret...", + "SessionToken": "token...", + }, + ), + patch.object( + AWSIAMAuthMixin, + "generate_redshift_cluster_credentials", + return_value=("IAM:superset_user", "cluster-temp-password"), + ), + ): + RedshiftEngineSpec.update_params_from_encrypted_extra(database, params) + + assert "connect_args" in params + assert params["connect_args"]["password"] == "cluster-temp-password" # noqa: S105 + assert params["connect_args"]["user"] == "IAM:superset_user" + assert params["connect_args"]["sslmode"] == "verify-ca" + + +def test_redshift_update_params_provisioned_cluster_with_external_id() -> None: + from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin + from superset.db_engine_specs.redshift import RedshiftEngineSpec + + database = MagicMock() + database.encrypted_extra = json.dumps( + { + "aws_iam": { + "enabled": True, + "role_arn": "arn:aws:iam::222222222222:role/CrossAccountRedshift", + "external_id": "superset-prod-12345", + "region": "us-west-2", + "cluster_identifier": "prod-cluster", + "db_username": "analytics_user", + "db_name": "prod_db", + "session_duration": 1800, + } + } + ) + database.sqlalchemy_uri_decrypted = ( + "redshift+psycopg2://[email protected]" + ".redshift.amazonaws.com:5439/prod_db" + ) + + params: dict[str, Any] = {} + + with ( + patch.object( + AWSIAMAuthMixin, + "get_iam_credentials", + return_value={ + "AccessKeyId": "ASIA...", + "SecretAccessKey": "secret...", + "SessionToken": "token...", + }, + ) as mock_get_creds, + patch.object( + AWSIAMAuthMixin, + "generate_redshift_cluster_credentials", + return_value=("IAM:analytics_user", "cluster-temp-password"), + ), + ): + RedshiftEngineSpec.update_params_from_encrypted_extra(database, params) + + mock_get_creds.assert_called_once_with( + role_arn="arn:aws:iam::222222222222:role/CrossAccountRedshift", + region="us-west-2", + external_id="superset-prod-12345", + session_duration=1800, + ) + + +def test_redshift_mask_encrypted_extra_provisioned_cluster() -> None: + from superset.db_engine_specs.redshift import RedshiftEngineSpec + + encrypted_extra = json.dumps( + { + "aws_iam": { + "enabled": True, + "role_arn": "arn:aws:iam::123456789012:role/SecretRole", + "external_id": "secret-external-id-12345", + "region": "us-east-1", + "cluster_identifier": "my-cluster", + "db_username": "superset_user", + "db_name": "analytics", + } + } + ) + + masked = RedshiftEngineSpec.mask_encrypted_extra(encrypted_extra) + assert masked is not None + + masked_config = json.loads(masked) + + # role_arn and external_id should be masked + assert ( + masked_config["aws_iam"]["role_arn"] + != "arn:aws:iam::123456789012:role/SecretRole" + ) + assert masked_config["aws_iam"]["external_id"] != "secret-external-id-12345" + + # Non-sensitive fields should remain unchanged + assert masked_config["aws_iam"]["enabled"] is True + assert masked_config["aws_iam"]["region"] == "us-east-1" + assert masked_config["aws_iam"]["cluster_identifier"] == "my-cluster" + assert masked_config["aws_iam"]["db_username"] == "superset_user" + assert masked_config["aws_iam"]["db_name"] == "analytics"
