In my jobmanager log I see this exception , probably is the root cause why the whole job is killed…is there any memory problem in jobmanager ? any clue for this error below? I ran the yarn-session
And my flink-conf.yaml is pretty much unmodified jobmanager.heap.mb: 256 taskmanager.heap.mb: 512 taskmanager.numberOfTaskSlots: 2 taskmanager.memory.preallocate: false parallelism.default: 1 invoke of yarn-session.sh HADOOP_CONF_DIR=/etc/hadoop/conf $FLINK_HOME/bin/yarn-session.sh -d -n 2 -s 1 -tm 2048 Thanks The error in job manager is as below 2017-04-14 04:26:33,570 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Co-Flat Map -> Sink: Unnamed (1/1) (0c515e082713970381dcd34da87cfcf4) switched from RUNNING to FAILED. java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:425) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:407) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:797) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:775) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.elliemae.stream.StreamerImpl$StreamerCoFlatMapFunction.flatMap2(StreamerImpl.java:253) at com.elliemae.stream.StreamerImpl$StreamerCoFlatMapFunction.flatMap2(StreamerImpl.java:234) at org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.processElement2(CoStreamFlatMap.java:56) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:213) at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:87) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Error sending data back to client (ip-172-31-31-172/172.31.31.172:46147) at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:64) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:422) ... 12 more Caused by: com.esotericsoftware.kryo.KryoException: java.net.SocketException: Connection reset at com.esotericsoftware.kryo.io.Output.flush(Output.java:165) at com.esotericsoftware.kryo.io.Output.require(Output.java:142) at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:444) at com.esotericsoftware.kryo.io.Output.writeString(Output.java:345) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:173) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:166) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:95) at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:203) at org.apache.flink.contrib.streaming.CollectSink.invoke(CollectSink.java:61) ... 14 more Caused by: java.net.SocketException: Connection reset at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115) at java.net.SocketOutputStream.write(SocketOutputStream.java:155) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39) at com.esotericsoftware.kryo.io.Output.flush(Output.java:163) ... 25 more 2017-04-14 04:26:33,572 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Job (6c9267453a74d0ef58e678351ac0b49c) switched from state RUNNING to FAILING. 2017-04-14 04:49:10,707 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://fl...@ip-172-31-31-172.us-west-2.compute.internal:37737] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://fl...@ip-172-31-31-172.us-west-2.compute.internal:37737]] Caused by: [Connection refused: ip-172-31-31-172.us-west-2.compute.internal/172.31.31.172:37737] 2017-04-14 04:49:10,712 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (b9af6dd459e576a669a12b08a830c24e) switched from DEPLOYING to RUNNING. 2017-04-14 04:49:10,713 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (c5fe4b6df26e03f4c2604ec89a92ad8f) switched from DEPLOYING to RUNNING. 2017-04-14 04:49:10,715 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (7f614d51527252cbb9ffcfc0d660fe4e) switched from DEPLOYING to RUNNING. 2017-04-14 04:49:10,717 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Co-Flat Map (1/1) (cd1f1ecf9408b4ac58bea6be99e5c89a) switched from DEPLOYING to RUNNING. 2017-04-14 04:49:10,718 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Co-Flat Map -> Sink: Unnamed (1/1) (82713fda323b6325815b1b228a3d93f8) switched from DEPLOYING to RUNNING. 2017-04-14 04:49:10,740 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Co-Flat Map -> Sink: Unnamed (1/1) (82713fda323b6325815b1b228a3d93f8) switched from RUNNING to FAILED. java.io.IOException: Cannot connect to the client to send back the stream at org.apache.flink.contrib.streaming.CollectSink.open(CollectSink.java:80) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:386) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.ConnectException: Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at java.net.Socket.connect(Socket.java:538) at java.net.Socket.<init>(Socket.java:434) at java.net.Socket.<init>(Socket.java:244) at org.apache.flink.contrib.streaming.CollectSink.open(CollectSink.java:75) ... 6 more From: Ted Yu <yuzhih...@gmail.com> Date: Thursday, April 13, 2017 at 6:01 PM To: Sathi Chowdhury <sathi.chowdh...@elliemae.com> Cc: "user@flink.apache.org" <user@flink.apache.org> Subject: Re: Flink errors out and job fails--IOException from CollectSink.open() Here is the line where NPE was thrown: mainThread.interrupt(); // the main thread may be sleeping for the discovery interval I wonder if runFetcher() encountered running being false - otherwise mainThread should not have been null. Looks like we should check whether mainThread is null when shutting down. On Thu, Apr 13, 2017 at 5:46 PM, Sathi Chowdhury <sathi.chowdh...@elliemae.com<mailto:sathi.chowdh...@elliemae.com>> wrote: The taskmanger log does not point a line in my code ..but it seems like the error occurs while it is trying to fetch kinesis record inside connector jar red sequence number 49572261908151269541343187919820576263466496304458235906 2017-04-13 23:28:23,470 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer - Subtask 0 is seeding the fetcher with restored shard KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 306254130228844617117037146688591390310,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49572254078827945986407789245674345090539511066904232082,}}'}, starting state set to the restored sequence number LATEST_SEQUENCE_NUM 2017-04-13 23:28:23,471 WARN org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer - Error while closing Kinesis data fetcher java.lang.NullPointerException at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) 2017-04-13 23:28:23,471 WARN org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer - Error while closing Kinesis data fetcher java.lang.NullPointerException at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) 2017-04-13 23:28:23,472 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source (1/1) (8a7301a437cb2d052208ee42c994104b) switched from CANCELING to CANCELED. 2017-04-13 23:28:23,471 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer - Subtask 0 is seeding the fetcher with restored shard KinesisStreamShard{streamName='dev-ingest-kinesis-us-west-2', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 204169420152563078078024764459060926873,EndingHashKey: 238197656844656924424362225202237748018},SequenceNumberRange: {StartingSequenceNumber: 49572254078761043750812197376249737935721565982386290786,}}'}, starting state set to the restored sequence number LATEST_SEQUENCE_NUM 2017-04-13 23:28:23,472 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source (1/1) (8a7301a437cb2d052208ee42c994104b). From: Sathi Chowdhury <sathi.chowdh...@elliemae.com<mailto:sathi.chowdh...@elliemae.com>> Date: Thursday, April 13, 2017 at 5:44 PM To: Ted Yu <yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>> Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Flink errors out and job fails--IOException from CollectSink.open() Hi Ted, Sorry for my big font earlier…was not intended ☺ I am on flink 1.2.0 I built flink-connector-kinesis_2.10-1.2.0.jar from source and included in the fatjar I am running. Followed this http://www.kidder.io/2017/02/15/flink-kinesis-streaming-connector/<https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.kidder.io%2F2017%2F02%2F15%2Fflink-kinesis-streaming-connector%2F&data=01%7C01%7C%7Cfc97be832d1a4062d75608d482cf675f%7C0d009d13c2cd47d891dd2ae838b00d4b%7C0&sdata=avPl4%2FU5DTjZW09Zby9CeUttUNpiGWH%2Bvnycy9PhUDA%3D&reserved=0> From code I read a kinesis stream using consumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, region); consumerConfig.setProperty(ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION, "LATEST"); consumerConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO"); consumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "10"); consumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "200"); consumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000"); DataStream<Map<String, Object>> stream = env.addSource(new FlinkKinesisConsumer<>(inputStream, new MyJsonDeserializationSchema(), consumerConfig)); While I push the json event to the Kinesis stream intermittently I see this NPE and flink job fails 2017-04-14 00:31:54,672 WARN org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer - Error while closing Kinesis data fetcher java.lang.NullPointerException at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.shutdownFetcher(KinesisDataFetcher.java:472) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:246) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:256) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) Thanks Sathi From: Ted Yu <yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>> Date: Thursday, April 13, 2017 at 5:02 PM To: Sathi Chowdhury <sathi.chowdh...@elliemae.com<mailto:sathi.chowdh...@elliemae.com>> Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Re: Flink errors out and job fails--IOException from CollectSink.open() Can you give us a bit more information ? release of flink snippet of your code Thanks =============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. ============= =============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. ============= =============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============