Hey Jonas, May I ask what version of Kinesalite you're targeting? With 3.3.3 and STREAM_INITIAL_POSITION = "LATEST", I received a "The timestampInMillis parameter cannot be greater than the currentTimestampInMillis" which may be a misconfiguration on my setup, but with STREAM_INITIAL_POSITION = "TRIM_HORIZON" I was able to consume events from the stream.
This was with 1.14.0 of the Kinesis Flink connector. Kind regards, Mika On 02.12.2021 23:05, jonas eyob wrote:
Hi all, I have a really simple pipeline to consume events from a local kinesis (kinesalite) and print them out to stdout. But struggling to make sense of why it's failing almost immediately The pipeline code: /* Added this to verify it wasn't a problem with AWS CBOR which needs to be disabled */ System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true") System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true") System.setProperty("org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking", "true") val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val consumerConfig = new Properties() consumerConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1") consumerConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO") consumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "FAKE_ACCESS_KEY") consumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "FAKE_SECRET_ACCESS_KEY") consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST") consumerConfig.setProperty(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567") env .addSource( new FlinkKinesisConsumer[String]( "user-profile-events-local", new SimpleStringSchema, consumerConfig ) ) .print() env.execute("echo stream") When running this I am getting this: Error I get from running this locally: 22:27:23.372 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out (1/1) (7e920c6918655278fbd09e7658847264) switched from INITIALIZING to RUNNING. Dec 02, 2021 10:27:23 PM org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader loadProfiles WARNING: Your profile name includes a 'profile ' prefix. This is considered part of the profile name in the Java SDK, so you will need to include this prefix in your profile name when you reference this profile from your Java code. Dec 02, 2021 10:27:23 PM org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader loadProfiles WARNING: Your profile name includes a 'profile ' prefix. This is considered part of the profile name in the Java SDK, so you will need to include this prefix in your profile name when you reference this profile from your Java code. Dec 02, 2021 10:27:23 PM org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader loadProfiles WARNING: Your profile name includes a 'profile ' prefix. This is considered part of the profile name in the Java SDK, so you will need to include this prefix in your profile name when you reference this profile from your Java code. Dec 02, 2021 10:27:24 PM org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient createSocketFactoryRegistry WARNING: SSL Certificate checking for endpoints has been explicitly disabled. Dec 02, 2021 10:27:24 PM org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader loadProfiles WARNING: Your profile name includes a 'profile ' prefix. This is considered part of the profile name in the Java SDK, so you will need to include this prefix in your profile name when you reference this profile from your Java code. Dec 02, 2021 10:27:24 PM org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader loadProfiles WARNING: Your profile name includes a 'profile ' prefix. This is considered part of the profile name in the Java SDK, so you will need to include this prefix in your profile name when you reference this profile from your Java code. Dec 02, 2021 10:27:24 PM org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.internal.BasicProfileConfigLoader loadProfiles WARNING: Your profile name includes a 'profile ' prefix. This is considered part of the profile name in the Java SDK, so you will need to include this prefix in your profile name when you reference this profile from your Java code. 22:27:24.328 [Source: Custom Source -> Sink: Print to Std. Out (1/1)#0] WARN org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Print to Std. Out (1/1)#0 (7e920c6918655278fbd09e7658847264) switched from RUNNING to FAILED with failure cause: org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: AmazonKinesis; Status Code: 400; Error Code: UnknownOperationException; Request ID: 05cf3f90-53bf-11ec-8337-db4162c4712c; Proxy: null) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2893) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2860) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2849) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1590) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1559) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:515) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:460) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:335) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:925) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:312) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323) Suppressed: java.lang.NullPointerException at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114) at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:130) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127) at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1035) at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1021) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:928) at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:940) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:940) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:748) 22:27:24.329 [Source: Custom Source -> Sink: Print to Std. Out (1/1)#0] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source -> Sink: Print to Std. Out (1/1)#0 (7e920c6918655278fbd09e7658847264). 22:27:24.341 [flink-akka.actor.default-dispatcher-9] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> Sink: Print to Std. Out (1/1)#0 7e920c6918655278fbd09e7658847264. 22:27:24.359 [flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out (1/1) (7e920c6918655278fbd09e7658847264) switched from RUNNING to FAILED on 948a82f7-e5e2-4e5f-b309-49ee92eb2006 @ localhost (dataPort=-1). org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: AmazonKinesis; Status Code: 400; Error Code: UnknownOperationException; Request ID: 05cf3f90-53bf-11ec-8337-db4162c4712c; Proxy: null) at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2893) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2860) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2849) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1590) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1559) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:515) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:460) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:335) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:925) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:312) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0] Suppressed: java.lang.NullPointerException at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421) ~[flink-connector-kinesis_2.12-1.14.0.jar:1.14.0] at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-core-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:130) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:141) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:127) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1035) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:1021) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:928) ~[flink-streaming-java_2.12-1.14.0.jar:1.14.0] at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:940) ~[flink-runtime-1.14.0.jar:1.14.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-runtime-1.14.0.jar:1.14.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:940) ~[flink-runtime-1.14.0.jar:1.14.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-runtime-1.14.0.jar:1.14.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-runtime-1.14.0.jar:1.14.0] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] -- *Med Vänliga Hälsningar* *Jonas Eyob*
Mika Naylor https://autophagy.io