Hey,
The FlinkKinesisProducer is deprecated in favour of the KinesisSink. The new sink does not rely on KPL, so this would not be a problem here. Is there a reason you are using the FlinkKinesisProducer instead of KinesisSink? Thanks for the deep dive, generally speaking I agree it would be possible/useful to add a separate config for metrics. However since this connector is deprecated we will not be adding new features, unless there is a strong reason to do so. Thanks, Danny On Thu, Oct 12, 2023 at 5:45 PM Diem, Chase via user <user@flink.apache.org> wrote: > Hey Team, > > > > Looking for some thoughts here: > > - We have a Kinesis Producer that produces to a topic in another AWS > account > - The producer allows for configurations to set credentials for that > account: > > https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java#L494 > - We DO NOT have access produce to their Cloudwatch. We would prefer > to produce the metrics to our own account instead. > - The AWS *KinesisProducerConfiguration* supports setting separate > credentials for both the producer and Cloudwatch, but the > *KinesisConfigUtil* does not support it: > > https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L139 > > > > It would be nice to support properties for say: > > * metrics.aws.credentials.provider for custom Cloudwatch metrics > credentials provider, > > * metrics.aws.credentials.provider.role.provider for custom > Cloudwatch credentials provider for assuming a role, > > > > > > *AWS KinesisProducerConfiguration.java:* > > /** > > * {@link AWSCredentialsProvider} that supplies credentials used to > upload > > * metrics to CloudWatch. > > * <p> > > * If not given, the credentials used to put records > > * to Kinesis are also used for CloudWatch. > > * > > * @see #setCredentialsProvider(AWSCredentialsProvider) > > */ > > public KinesisProducerConfiguration > setMetricsCredentialsProvider(AWSCredentialsProvider > metricsCredentialsProvider) { > > this.metricsCredentialsProvider = metricsCredentialsProvider; > > return this; > > } > > > > *Flink KinesisConfigUtil.java:* > > KinesisProducerConfiguration kpc = > KinesisProducerConfiguration.fromProperties(config); > > kpc.setRegion(config.getProperty(AWSConfigConstants.AWS_REGION)); > > > > kpc.setCredentialsProvider(AWSUtil.getCredentialsProvider(config)); > > > > // we explicitly lower the credential refresh delay (default is 5 > seconds) > > // to avoid an ignorable interruption warning that occurs when > shutting down the > > // KPL client. See > https://github.com/awslabs/amazon-kinesis-producer/issues/10. > > kpc.setCredentialsRefreshDelay(100); > > > > > > > > >