The taskmanger log does not point a line in my code ..but it seems like the 
error occurs while it is trying to fetch kinesis record inside connector jar

red sequence number 49572261908151269541343187919820576263466496304458235906
2017-04-13 23:28:23,470 INFO  
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 
is seeding the fetcher with restored shard 
KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId: 
shardId-000000000009,HashKeyRange: {StartingHashKey: 
306254130228844617117037146688591390310,EndingHashKey: 
340282366920938463463374607431768211455},SequenceNumberRange: 
{StartingSequenceNumber: 
49572254078827945986407789245674345090539511066904232082,}}'}, starting state 
set to the restored sequence number LATEST_SEQUENCE_NUM
2017-04-13 23:28:23,471 WARN  
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error 
while closing Kinesis data fetcher
java.lang.NullPointerException
        at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)
        at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)
        at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
        at java.lang.Thread.run(Thread.java:745)
2017-04-13 23:28:23,471 WARN  
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error 
while closing Kinesis data fetcher
java.lang.NullPointerException
        at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)
        at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)
        at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
        at java.lang.Thread.run(Thread.java:745)
2017-04-13 23:28:23,472 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Source: Custom Source (1/1) (8a7301a437cb2d052208ee42c994104b) 
switched from CANCELING to CANCELED.
2017-04-13 23:28:23,471 INFO  
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Subtask 0 
is seeding the fetcher with restored shard 
KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId: 
shardId-000000000006,HashKeyRange: {StartingHashKey: 
204169420152563078078024764459060926873,EndingHashKey: 
238197656844656924424362225202237748018},SequenceNumberRange: 
{StartingSequenceNumber: 
49572254078761043750812197376249737935721565982386290786,}}'}, starting state 
set to the restored sequence number LATEST_SEQUENCE_NUM
2017-04-13 23:28:23,472 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Freeing task resources for Source: Custom Source (1/1) 
(8a7301a437cb2d052208ee42c994104b).


From: Sathi Chowdhury <sathi.chowdh...@elliemae.com>
Date: Thursday, April 13, 2017 at 5:44 PM
To: Ted Yu <yuzhih...@gmail.com>
Cc: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

Hi Ted, Sorry for my big font earlier…was not intended ☺

I am on flink 1.2.0
I built flink-connector-kinesis_2.10-1.2.0.jar from source and included in the 
fatjar I am running.
Followed this 
http://www.kidder.io/2017/02/15/flink-kinesis-streaming-connector/<https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.kidder.io%2F2017%2F02%2F15%2Fflink-kinesis-streaming-connector%2F&data=01%7C01%7C%7Cfc97be832d1a4062d75608d482cf675f%7C0d009d13c2cd47d891dd2ae838b00d4b%7C0&sdata=avPl4%2FU5DTjZW09Zby9CeUttUNpiGWH%2Bvnycy9PhUDA%3D&reserved=0>


From code I read a kinesis stream using

consumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, region);
consumerConfig.setProperty(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION,
 "LATEST");
consumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"AUTO");
consumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10");
consumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, 
"200");
consumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
 "2000");
DataStream<Map<String, Object>> stream = env.addSource(new 
FlinkKinesisConsumer<>(inputStream, new MyJsonDeserializationSchema(), 
consumerConfig));


While I push the json event to the Kinesis stream intermittently I see this NPE 
and flink job fails


2017-04-14 00:31:54,672 WARN  
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  - Error 
while closing Kinesis data fetcher

java.lang.NullPointerException

               at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472)

               at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246)

               at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256)

               at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)

               at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)

               at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)

               at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343)

               at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

               at java.lang.Thread.run(Thread.java:745)

Thanks
Sathi



From: Ted Yu <yuzhih...@gmail.com>
Date: Thursday, April 13, 2017 at 5:02 PM
To: Sathi Chowdhury <sathi.chowdh...@elliemae.com>
Cc: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: Flink errors out and job fails--IOException from CollectSink.open()

Can you give us a bit more information ?

release of flink
snippet of your code

Thanks
=============Notice to Recipient: This e-mail transmission, and any documents, 
files or previous e-mail messages attached to it may contain information that 
is confidential or legally privileged, and intended for the use of the 
individual or entity named above. If you are not the intended recipient, or a 
person responsible for delivering it to the intended recipient, you are hereby 
notified that you must not read this transmission and that any disclosure, 
copying, printing, distribution or use of any of the information contained in 
or attached to this transmission is STRICTLY PROHIBITED. If you have received 
this transmission in error, please immediately notify the sender by telephone 
or return e-mail and delete the original transmission and its attachments 
without reading or saving in any manner. Thank you. =============
=============Notice to Recipient: This e-mail transmission, and any documents, 
files or previous e-mail messages attached to it may contain information that 
is confidential or legally privileged, and intended for the use of the 
individual or entity named above. If you are not the intended recipient, or a 
person responsible for delivering it to the intended recipient, you are hereby 
notified that you must not read this transmission and that any disclosure, 
copying, printing, distribution or use of any of the information contained in 
or attached to this transmission is STRICTLY PROHIBITED. If you have received 
this transmission in error, please immediately notify the sender by telephone 
or return e-mail and delete the original transmission and its attachments 
without reading or saving in any manner. Thank you. =============

Reply via email to