Hi, It looks like there was an error in asynchronous job of sending the records to Kafka. Probably this is a collateral damage of loosing connection to zookeeper.
Piotrek > On 15 May 2018, at 13:33, Ufuk Celebi <u...@apache.org> wrote: > > Hey Tony, > > thanks for the detailed report. > > - In Flink 1.4, jobs are cancelled if the JM looses the connection to ZK and > recovered when the connection is re-established (and one JM becomes leader > again). > > - Regarding the KafkaProducer: I'm not sure from the log message whether > Flink closes the KafkaProducer because the job is cancelled or because there > is a connectivity issue to the Kafka cluster. Including Piotr (cc) in this > thread who has worked on the KafkaProducer in the past. If it is a > connectivity issue, it might also explain why you lost the connection to ZK. > > Glad to hear that everything is back to normal. Keep us updated if something > unexpected happens again. > > – Ufuk > > > On Tue, May 15, 2018 at 6:28 AM, Tony Wei <tony19920...@gmail.com > <mailto:tony19920...@gmail.com>> wrote: > Hi all, > > I restarted the cluster and changed the log level to DEBUG, and raised the > parallelism of my streaming job from 32 to 40. > However, the problem just disappeared and I don't know why. > I will remain these settings for a while. If the error happen again, I will > bring more informations back for help. Thank you. > > Best Regards, > Tony Wei > > 2018-05-14 14:24 GMT+08:00 Tony Wei <tony19920...@gmail.com > <mailto:tony19920...@gmail.com>>: > Hi all, > > After I changed the `high-availability.zookeeper.client.session-timeout` and > `maxSessionTimeout` to 120000ms, the exception still occurred. > > Here is the log snippet. It seems this is nothing to do with zookeeper client > timeout, but I still don't know why kafka producer would be closed without > any task state changed. > > ``` > 2018-05-14 05:18:53,468 WARN > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Client > session timed out, have not heard from server in 82828ms for sessionid > 0x305f957eb8d000a > 2018-05-14 05:18:53,468 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Client > session timed out, have not heard from server in 82828ms for sessionid > 0x305f957eb8d000a, closing socket connection and attempting reconnect > 2018-05-14 05:18:53,571 INFO > org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager > - State change: SUSPENDED > 2018-05-14 05:18:53,574 WARN > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - > Connection to ZooKeeper suspended. Can no longer retrieve the leader from > ZooKeeper. > 2018-05-14 05:18:53,850 WARN > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - SASL > configuration failed: javax.security.auth.login.LoginException: No JAAS > configuration section named 'Client' was found in specified JAAS > configuration file: '/mnt/jaas-466390940757021791.conf'. Will continue > connection to Zookeeper server without SASL authentication, if Zookeeper > server allows it. > 2018-05-14 05:18:53,850 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Opening > socket connection to server XXX.XXX.XXX.XXX:2181 > 2018-05-14 05:18:53,852 ERROR > org.apache.flink.shaded.curator.org.apache.curator.ConnectionState - > Authentication failed > 2018-05-14 05:18:53,853 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Socket > connection established to XXX.XXX.XXX.XXX:2181, initiating session > 2018-05-14 05:18:53,859 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Session > establishment complete on server XXX.XXX.XXX.XXX:2181, sessionid = > 0x305f957eb8d000a, negotiated timeout = 120000 > 2018-05-14 05:18:53,860 INFO > org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager > - State change: RECONNECTED > 2018-05-14 05:18:53,860 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - > Connection to ZooKeeper was reconnected. Leader retrieval can be restarted. > 2018-05-14 05:28:54,781 INFO org.apache.kafka.clients.producer.KafkaProducer > - Closing the Kafka producer with timeoutMillis = > 9223372036854775807 ms. > 2018-05-14 05:28:54,829 INFO org.apache.kafka.clients.producer.KafkaProducer > - Closing the Kafka producer with timeoutMillis = > 9223372036854775807 ms. > 2018-05-14 05:28:54,918 INFO org.apache.flink.runtime.taskmanager.Task > - match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd -> > Sink: kafka-sink-cd) (1/32) (e3462ff8bb565bb0cf4de49ffc2595fb) switched from > RUNNING to FAILED. > java.lang.Exception: Failed to send data to Kafka: The server disconnected > before a response was received. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:288) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:464) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:441) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:415) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:464) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:441) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:415) > at > org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.collect(CopyingDirectedOutput.java:62) > at > org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.collect(CopyingDirectedOutput.java:34) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$4.apply(MatchRuleOperator.scala:39) > at > com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$4.apply(MatchRuleOperator.scala:38) > at > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) > at > scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at scala.collection.immutable.Map$Map2.foreach(Map.scala:137) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245) > at > com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRuleOperator.scala:38) > at > com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRuleOperator.scala:14) > at org.apache.flink.streaming.api.operators.co > <http://api.operators.co/>.CoStreamFlatMap.processElement1(CoStreamFlatMap.java:53) > at org.apache.flink.streaming.runtime.io > <http://runtime.io/>.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:243) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.errors.NetworkException: The server > disconnected before a response was received. > ``` > > Best Regards, > Tony Wei > > 2018-05-14 11:36 GMT+08:00 Tony Wei <tony19920...@gmail.com > <mailto:tony19920...@gmail.com>>: > Hi all, > > Recently, my flink job met a problem that caused the job failed and restarted. > > The log is list this screen snapshot > > <exception.png> > > or this > > ``` > 2018-05-11 13:21:04,582 WARN > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Client > session timed out, have not heard from server in 61054ms for sessionid > 0x3054b165fe2006a > 2018-05-11 13:21:04,583 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Client > session timed out, have not heard from server in 61054ms for sessionid > 0x3054b165fe2006a, closing socket connection and attempting reconnect > 2018-05-11 13:21:04,683 INFO > org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager > - State change: SUSPENDED > 2018-05-11 13:21:04,686 WARN > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - > Connection to ZooKeeper suspended. Can no longer retrieve the leader from > ZooKeeper. > 2018-05-11 13:21:04,689 INFO org.apache.kafka.clients.producer.KafkaProducer > - Closing the Kafka producer with timeoutMillis = > 9223372036854775807 ms. > 2018-05-11 13:21:04,694 INFO org.apache.kafka.clients.producer.KafkaProducer > - Closing the Kafka producer with timeoutMillis = > 9223372036854775807 ms. > 2018-05-11 13:21:04,698 INFO org.apache.flink.runtime.taskmanager.Task > - match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd -> > Sink: kafka-sink-cd) (4/32) (65a4044ac963e083f2635fe24e7f2403) switched from > RUNNING to FAILED. > java.lang.Exception: Failed to send data to Kafka: The server disconnected > before a response was received. > ``` > > Logs showed `org.apache.kafka.clients.producer.KafkaProducer - Closing the > Kafka producer with timeoutMillis = 9223372036854775807 ms.` This timeout > value is Long.MAX_VALUE. It happened when someone called `producer.close()`. > > And I also saw the log said > `org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Client > session timed out, have not heard from server in 61054ms for sessionid > 0x3054b165fe2006a, closing socket connection and attempting reconnect` > and `org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService > - Connection to ZooKeeper suspended. Can no longer retrieve the leader from > ZooKeeper.` > > I have checked zookeeper and kafka and there was no error during that period. > I was wondering if TM will stop the tasks when it lost zookeeper client in HA > mode. Since I didn't see any document or mailing thread discuss this, I'm not > sure if this is the reason that made kafka producer closed. > Could someone who know HA well? Or someone know what happened in my job? > > My flink cluster version is 1.4.0 with 2 masters and 10 slaves. My zookeeper > cluster version is 3.4.11 with 3 nodes. > The `high-availability.zookeeper.client.session-timeout` is default value: > 60000 ms. > The `maxSessionTimeout` in zoo.cfg is 40000ms. > I have already change the maxSessionTimeout to 120000ms this morning. > > This problem happened many many times during the last weekend and made my > kafka log delay grew up. Please help me. Thank you very much! > > Best Regards, > Tony Wei > > > > > >