Mohsen Rezaei created FLINK-38578:
-------------------------------------
Summary: Invalid KinesisStreamsSource
AWS_CREDENTIALS_PROVIDER_OPTION conversion
Key: FLINK-38578
URL: https://issues.apache.org/jira/browse/FLINK-38578
Project: Flink
Issue Type: Bug
Components: Connectors / Kinesis
Affects Versions: aws-connector-5.0.0
Reporter: Mohsen Rezaei
Kinesis source requires configurations to be passed as
{{org.apache.flink.configuration.Configuration}}, e.g. in the
[createReader(...)|https://github.com/apache/flink-connector-aws/blob/97505df8540cba9fc56da1ce20c65ad64b3ec943/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java#L159-L159],
and directly converts the provided configs to a {{java.util.Properties}},
which becomes problematic for the AWS_CREDENTIALS_PROVIDER_OPTION which has a
required value of the enum class
[{{AWSConfigConstants.CredentialProvider}}|https://github.com/apache/flink-connector-aws/blob/97505df8540cba9fc56da1ce20c65ad64b3ec943/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/config/AWSConfigConstants.java#L33-L33].
When this [gets looked up against the same enum
class|https://github.com/apache/flink-connector-aws/blob/97505df8540cba9fc56da1ce20c65ad64b3ec943/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java#L101-L101],
it throws the following error:
{code}
Caused by: java.lang.NullPointerException: Name is null
at java.base/java.lang.Enum.valueOf(Unknown Source)
at
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.config.AWSConfigConstants$CredentialProvider.valueOf(AWSConfigConstants.java:33)
at
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.getCredentialProviderType(AWSGeneralUtil.java:101)
at
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.getCredentialsProvider(AWSGeneralUtil.java:140)
at
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.getCredentialsProvider(AWSGeneralUtil.java:134)
at
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.validateCredentialProvider(AWSGeneralUtil.java:487)
at
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.validateAwsConfiguration(AWSGeneralUtil.java:431)
at
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.validateAwsCredentials(AWSGeneralUtil.java:479)
at
org.apache.flink.connector.kinesis.source.KinesisStreamsSource.createKinesisStreamProxy(KinesisStreamsSource.java:314)
at
org.apache.flink.connector.kinesis.source.KinesisStreamsSource.createKinesisStreamProxy(KinesisStreamsSource.java:287)
at
org.apache.flink.connector.kinesis.source.KinesisStreamsSource.restoreEnumerator(KinesisStreamsSource.java:189)
at
org.apache.flink.connector.kinesis.source.KinesisStreamsSource.createEnumerator(KinesisStreamsSource.java:180)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:229)
... 8 common frames omitted
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)