Awesome, thanks! looks good
On Wed, Dec 16, 2020 at 12:55 PM Cranmer, Danny <cranm...@amazon.com> wrote: > Hey Avi, > > > > I have reproduced and found a solution. The issue is not MFA, it is the > BASIC credential provider is not using the token: > > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L181 > > > > If you want to supply AK/SK/Token then you will have to use another > CredentialProviderType, below is an example using SYS_PROP. We could > improve the Kinesis connector to detect the session token and construct a > BasicSessionCredentials: > > > https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/auth/BasicSessionCredentials.java > > > > Properties systemProperties = System.*getProperties*(); > systemProperties.setProperty("aws.accessKeyId", accessKey); > systemProperties.setProperty("aws.secretKey", secretKey); > systemProperties.setProperty("aws.sessionToken", seesionToken); > > Properties producerConfig = new Properties(); > producerConfig.setProperty(AWSConfigConstants.*AWS_REGION*, *REGION*); > producerConfig.setProperty(AWSConfigConstants.*AWS_CREDENTIALS_PROVIDER*, > "SYS_PROP"); > > > > I will add this to the Jira also. Let me know if you have any issues. > > > > Thanks, > > Danny > > > > *From: *Avi Levi <a...@neosec.com> > *Date: *Wednesday, 16 December 2020 at 08:09 > *To: *Robert Metzger <rmetz...@apache.org> > *Cc: *user <user@flink.apache.org> > *Subject: *RE: [EXTERNAL] Connecting to kinesis with mfa > > > > *CAUTION*: This email originated from outside of the organization. Do not > click links or open attachments unless you can confirm the sender and know > the content is safe. > > > > Thanks Robert, I actually tried all of the above but got to the same > unfortunate result > > > > On Wed, Dec 16, 2020 at 8:24 AM Robert Metzger <rmetz...@apache.org> > wrote: > > Hey Avi, > > > > Maybe providing secret/access key + session token doesn't work, and you > need to provide either one of them? > > > https://docs.aws.amazon.com/credref/latest/refdocs/setting-global-aws_session_token.html > > > > I'll also ping some AWS contributors active in Flink to take a look at > this. > > > > Best, > > Robert > > > > On Tue, Dec 15, 2020 at 10:07 AM Avi Levi <a...@neosec.com> wrote: > > Hi guys, > > we are struggling to connect to kinesis when mfa is activated. I did > configured everything according to the documentation but still getting > exception : > > > val producerConfig = new Properties() > > producerConfig.put(AWSConfigConstants.AWS_REGION, awsRegion) > > producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey) > > producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, > awsSecretAccessKey) > > producerConfig.put(com.amazonaws.auth.profile.internal.ProfileKeyConstants.AWS_SESSION_TOKEN, > awsSessionToken) > > with a very simple pipeline : > > > > val producer = new FlinkKinesisProducer(new SimpleStringSchema(), > producerConfig) > > producer.setFailOnError(true) > > producer.setDefaultStream(outputStreamName) > > producer.setDefaultPartition("0") > > env.fromElements("a", "b", "c").addSink(producer) > > env.execute() > > the results with: > > 15:30:44,292 WARN > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader > - [2020-12-14 15:30:44.292188] [0x0000cb5f][0x000070000512c000] [warning] > [AWS Log: WARN](AWSClient)If the signature check failed. This could be > because of a time skew. Attempting to adjust the signer. > > 15:30:44,378 INFO > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader > - [2020-12-14 15:30:44.377865] [0x0000cb5b][0x00007000082c1000] [info] > [shard_map.cc:87] Updating shard map for stream "ExampleOutputStream" > > 15:30:44,396 WARN > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader > - [2020-12-14 15:30:44.396208] [0x0000cb55][0x0000700002a3e000] [warning] > [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError > 'UnrecognizedClientException': The security token included in the request is > invalid. > > 15:30:44,396 ERROR > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader > - [2020-12-14 15:30:44.396256] [0x0000cb55][0x0000700002a3e000] [error] [AWS > Log: ERROR](AWSClient)HTTP response code: 400 > > Exception name: UnrecognizedClientException > > Error message: The security token included in the request is invalid. > > 6 response headers: > > connection : close > > I double check that all keys are correct using the same keys that work > perfectly when I execute commands from the cli. also when removing the mfa > from kinesis the pipeline works as expected. finally i did open a ticket > <https://issues.apache.org/jira/browse/FLINK-20602> for that also . > >