Hi,

It looks like there was an error in asynchronous job of sending the records to 
Kafka. Probably this is a collateral damage of loosing connection to zookeeper. 

Piotrek

> On 15 May 2018, at 13:33, Ufuk Celebi <u...@apache.org> wrote:
> 
> Hey Tony,
> 
> thanks for the detailed report.
> 
> - In Flink 1.4, jobs are cancelled if the JM looses the connection to ZK and 
> recovered when the connection is re-established (and one JM becomes leader 
> again).
> 
> - Regarding the KafkaProducer: I'm not sure from the log message whether 
> Flink closes the KafkaProducer because the job is cancelled or because there 
> is a connectivity issue to the Kafka cluster. Including Piotr (cc) in this 
> thread who has worked on the KafkaProducer in the past. If it is a 
> connectivity issue, it might also explain why you lost the connection to ZK.
> 
> Glad to hear that everything is back to normal. Keep us updated if something 
> unexpected happens again.
> 
> – Ufuk
> 
> 
> On Tue, May 15, 2018 at 6:28 AM, Tony Wei <tony19920...@gmail.com 
> <mailto:tony19920...@gmail.com>> wrote:
> Hi all,
> 
> I restarted the cluster and changed the log level to DEBUG, and raised the 
> parallelism of my streaming job from 32 to 40.
> However, the problem just disappeared and I don't know why.
> I will remain these settings for a while. If the error happen again, I will 
> bring more informations back for help. Thank you.
> 
> Best Regards,
> Tony Wei
> 
> 2018-05-14 14:24 GMT+08:00 Tony Wei <tony19920...@gmail.com 
> <mailto:tony19920...@gmail.com>>:
> Hi all,
> 
> After I changed the `high-availability.zookeeper.client.session-timeout` and 
> `maxSessionTimeout` to 120000ms, the exception still occurred.
> 
> Here is the log snippet. It seems this is nothing to do with zookeeper client 
> timeout, but I still don't know why kafka producer would be closed without 
> any task state changed.
> 
> ```
> 2018-05-14 05:18:53,468 WARN  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client 
> session timed out, have not heard from server in 82828ms for sessionid 
> 0x305f957eb8d000a
> 2018-05-14 05:18:53,468 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client 
> session timed out, have not heard from server in 82828ms for sessionid 
> 0x305f957eb8d000a, closing socket connection and attempting reconnect
> 2018-05-14 05:18:53,571 INFO  
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>   - State change: SUSPENDED
> 2018-05-14 05:18:53,574 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.
> 2018-05-14 05:18:53,850 WARN  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL 
> configuration failed: javax.security.auth.login.LoginException: No JAAS 
> configuration section named 'Client' was found in specified JAAS 
> configuration file: '/mnt/jaas-466390940757021791.conf'. Will continue 
> connection to Zookeeper server without SASL authentication, if Zookeeper 
> server allows it.
> 2018-05-14 05:18:53,850 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening 
> socket connection to server XXX.XXX.XXX.XXX:2181
> 2018-05-14 05:18:53,852 ERROR 
> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - 
> Authentication failed
> 2018-05-14 05:18:53,853 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket 
> connection established to XXX.XXX.XXX.XXX:2181, initiating session
> 2018-05-14 05:18:53,859 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 
> establishment complete on server XXX.XXX.XXX.XXX:2181, sessionid = 
> 0x305f957eb8d000a, negotiated timeout = 120000
> 2018-05-14 05:18:53,860 INFO  
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>   - State change: RECONNECTED
> 2018-05-14 05:18:53,860 INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
> 2018-05-14 05:28:54,781 INFO  org.apache.kafka.clients.producer.KafkaProducer 
>               - Closing the Kafka producer with timeoutMillis = 
> 9223372036854775807 ms.
> 2018-05-14 05:28:54,829 INFO  org.apache.kafka.clients.producer.KafkaProducer 
>               - Closing the Kafka producer with timeoutMillis = 
> 9223372036854775807 ms.
> 2018-05-14 05:28:54,918 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd -> 
> Sink: kafka-sink-cd) (1/32) (e3462ff8bb565bb0cf4de49ffc2595fb) switched from 
> RUNNING to FAILED.
> java.lang.Exception: Failed to send data to Kafka: The server disconnected 
> before a response was received.
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:288)
>       at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:464)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:441)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:415)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
>       at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:464)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:441)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:415)
>       at 
> org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.collect(CopyingDirectedOutput.java:62)
>       at 
> org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.collect(CopyingDirectedOutput.java:34)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
>       at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>       at 
> com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$4.apply(MatchRuleOperator.scala:39)
>       at 
> com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$4.apply(MatchRuleOperator.scala:38)
>       at 
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>       at 
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>       at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>       at scala.collection.immutable.Map$Map2.foreach(Map.scala:137)
>       at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>       at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
>       at 
> com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRuleOperator.scala:38)
>       at 
> com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRuleOperator.scala:14)
>       at org.apache.flink.streaming.api.operators.co 
> <http://api.operators.co/>.CoStreamFlatMap.processElement1(CoStreamFlatMap.java:53)
>       at org.apache.flink.streaming.runtime.io 
> <http://runtime.io/>.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:243)
>       at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.errors.NetworkException: The server 
> disconnected before a response was received.
> ```
> 
> Best Regards,
> Tony Wei
> 
> 2018-05-14 11:36 GMT+08:00 Tony Wei <tony19920...@gmail.com 
> <mailto:tony19920...@gmail.com>>:
> Hi all,
> 
> Recently, my flink job met a problem that caused the job failed and restarted.
> 
> The log is list this screen snapshot
> 
> <exception.png>
> 
> or this
> 
> ```
> 2018-05-11 13:21:04,582 WARN  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client 
> session timed out, have not heard from server in 61054ms for sessionid 
> 0x3054b165fe2006a
> 2018-05-11 13:21:04,583 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client 
> session timed out, have not heard from server in 61054ms for sessionid 
> 0x3054b165fe2006a, closing socket connection and attempting reconnect
> 2018-05-11 13:21:04,683 INFO  
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>   - State change: SUSPENDED
> 2018-05-11 13:21:04,686 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.
> 2018-05-11 13:21:04,689 INFO  org.apache.kafka.clients.producer.KafkaProducer 
>               - Closing the Kafka producer with timeoutMillis = 
> 9223372036854775807 ms.
> 2018-05-11 13:21:04,694 INFO  org.apache.kafka.clients.producer.KafkaProducer 
>               - Closing the Kafka producer with timeoutMillis = 
> 9223372036854775807 ms.
> 2018-05-11 13:21:04,698 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd -> 
> Sink: kafka-sink-cd) (4/32) (65a4044ac963e083f2635fe24e7f2403) switched from 
> RUNNING to FAILED.
> java.lang.Exception: Failed to send data to Kafka: The server disconnected 
> before a response was received.
> ```
> 
> Logs showed `org.apache.kafka.clients.producer.KafkaProducer - Closing the 
> Kafka producer with timeoutMillis = 9223372036854775807 ms.` This timeout 
> value is Long.MAX_VALUE. It happened when someone called `producer.close()`.
> 
> And I also saw the log said 
> `org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Client 
> session timed out, have not heard from server in 61054ms for sessionid 
> 0x3054b165fe2006a, closing socket connection and attempting reconnect`
> and `org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService 
>  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.`
> 
> I have checked zookeeper and kafka and there was no error during that period.
> I was wondering if TM will stop the tasks when it lost zookeeper client in HA 
> mode. Since I didn't see any document or mailing thread discuss this, I'm not 
> sure if this is the reason that made kafka producer closed.
> Could someone who know HA well? Or someone know what happened in my job?
> 
> My flink cluster version is 1.4.0 with 2 masters and 10 slaves. My zookeeper 
> cluster version is 3.4.11 with 3 nodes.
> The `high-availability.zookeeper.client.session-timeout` is default value: 
> 60000 ms.
> The `maxSessionTimeout` in zoo.cfg is 40000ms.
> I have already change the maxSessionTimeout to 120000ms this morning.
> 
> This problem happened many many times during the last weekend and made my 
> kafka log delay grew up. Please help me. Thank you very much!
> 
> Best Regards,
> Tony Wei
> 
> 
> 
> 
> 
> 

Reply via email to