Hi all, I have a really simple pipeline to consume events from a local kinesis (kinesalite) and print them out to stdout. But struggling to make sense of why it's failing almost immediately
The pipeline code: /* Added this to verify it wasn't a problem with AWS CBOR which needs to be disabled */ System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true") System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true") System.setProperty("org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking", "true") val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val consumerConfig = new Properties() consumerConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1") consumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO") consumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "FAKE_ACCESS_KEY") consumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "FAKE_SECRET_ACCESS_KEY") consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST") consumerConfig.setProperty(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567") env .addSource( new FlinkKinesisConsumer[String]( "user-profile-events-local", new SimpleStringSchema, consumerConfig ) ) .print() env.execute("echo stream") When running this I am getting this: Error I get from running this locally: 22:27:23.372 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out (1/1) (7e920c6918655278fbd09e7658847264) switched from INITIALIZING to RUNNING. Dec 02, 2021 10:27:23 PM org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader loadProfiles WARNING: Your profile name includes a 'profile ' prefix. This is considered part of the profile name in the Java SDK, so you will need to include this prefix in your profile name when you reference this profile from your Java code. Dec 02, 2021 10:27:23 PM org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader loadProfiles WARNING: Your profile name includes a 'profile ' prefix. This is considered part of the profile name in the Java SDK, so you will need to include this prefix in your profile name when you reference this profile from your Java code. Dec 02, 2021 10:27:23 PM org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader loadProfiles WARNING: Your profile name includes a 'profile ' prefix. This is considered part of the profile name in the Java SDK, so you will need to include this prefix in your profile name when you reference this profile from your Java code. Dec 02, 2021 10:27:24 PM org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient createSocketFactoryRegistry WARNING: SSL Certificate checking for endpoints has been explicitly disabled. Dec 02, 2021 10:27:24 PM org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader loadProfiles WARNING: Your profile name includes a 'profile ' prefix. This is considered part of the profile name in the Java SDK, so you will need to include this prefix in your profile name when you reference this profile from your Java code. Dec 02, 2021 10:27:24 PM org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader loadProfiles WARNING: Your profile name includes a 'profile ' prefix. This is considered part of the profile name in the Java SDK, so you will need to include this prefix in your profile name when you reference this profile from your Java code. Dec 02, 2021 10:27:24 PM org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader loadProfiles WARNING: Your profile name includes a 'profile ' prefix. This is considered part of the profile name in the Java SDK, so you will need to include this prefix in your profile name when you reference this profile from your Java code. 22:27:24.328 [Source: Custom Source -> Sink: Print to Std. Out (1/1)#0] WARN org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Print to Std. Out (1/1)#0 (7e920c6918655278fbd09e7658847264) switched from RUNNING to FAILED with failure cause: org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: AmazonKinesis; Status Code: 400; Error Code: UnknownOperationException; Request ID: 05cf3f90-53bf-11ec-8337-db4162c4712c; Proxy: null) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2893) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2860) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2849) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1590) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1559) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:515) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:460) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:335) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:925) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:312) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323) Suppressed: java.lang.NullPointerException at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114) at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:130) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127) at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1035) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1021) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:928) at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:940) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:940) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:748) 22:27:24.329 [Source: Custom Source -> Sink: Print to Std. Out (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source -> Sink: Print to Std. Out (1/1)#0 (7e920c6918655278fbd09e7658847264). 22:27:24.341 [flink-akka.actor.default-dispatcher-9] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> Sink: Print to Std. Out (1/1)#0 7e920c6918655278fbd09e7658847264. 22:27:24.359 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out (1/1) (7e920c6918655278fbd09e7658847264) switched from RUNNING to FAILED on 948a82f7-e5e2-4e5f-b309-49ee92eb2006 @ localhost (dataPort=-1). org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: AmazonKinesis; Status Code: 400; Error Code: UnknownOperationException; Request ID: 05cf3f90-53bf-11ec-8337-db4162c4712c; Proxy: null) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2893) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2860) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2849) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1590) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1559) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:515) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:460) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:335) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:925) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:312) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0] Suppressed: java.lang.NullPointerException at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-core-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:130) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1035) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1021) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:928) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0] at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:940) ~[flink-runtime-1.14.0.jar:1.14.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-runtime-1.14.0.jar:1.14.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:940) ~[flink-runtime-1.14.0.jar:1.14.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-runtime-1.14.0.jar:1.14.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-runtime-1.14.0.jar:1.14.0] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] -- *Med Vänliga Hälsningar* *Jonas Eyob*