I just stumbled on this same problem without any associated ZK issues. We had a
Kafka broker fail that caused this issue:
2018-07-18 02:48:13,497 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Produce:
<output_topic_name> (2/4) (7e7d61b286d90c51bbd20a15796633f2) switched from
RUNNING to FAILED.
java.lang.Exception: Failed to send data to Kafka: The server disconnected
before a response was received.
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:288)
at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.NetworkException: The server
disconnected before a response was received.
This is the kind of error we should be robust to - the Kafka cluster will
(reasonably quickly) recover and give a new broker for a particular partition
(in this case, partition #2). Maybe retries should be the default
configuration? I believe the client uses the Kafka defaults (acks=0,
retries=0), but we typically run with acks=1 (or all) and retries=MAX_INT. Do I
need to do anything more than that to get a more robust producer?
Ron
> On May 16, 2018, at 7:45 PM, Tony Wei <[email protected]> wrote:
>
> Hi Ufuk, Piotr
>
> Thanks for all of your replies. I knew that jobs are cancelled if the JM
> looses the connection to ZK, but JM didn't loose connection in my case.
> My job failed because of the exception from KafkaProducer. However, it
> happened before and after that exception that TM lost ZK connection.
> So, as Piotr said, it looks like an error in Kafka producer and I will pay
> more attention on it to see if there is something unexpected happens again.
>
> Best Regards,
> Tony Wei
>
> 2018-05-15 19:56 GMT+08:00 Piotr Nowojski <[email protected]
> <mailto:[email protected]>>:
> Hi,
>
> It looks like there was an error in asynchronous job of sending the records
> to Kafka. Probably this is a collateral damage of loosing connection to
> zookeeper.
>
> Piotrek
>
>> On 15 May 2018, at 13:33, Ufuk Celebi <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Hey Tony,
>>
>> thanks for the detailed report.
>>
>> - In Flink 1.4, jobs are cancelled if the JM looses the connection to ZK and
>> recovered when the connection is re-established (and one JM becomes leader
>> again).
>>
>> - Regarding the KafkaProducer: I'm not sure from the log message whether
>> Flink closes the KafkaProducer because the job is cancelled or because there
>> is a connectivity issue to the Kafka cluster. Including Piotr (cc) in this
>> thread who has worked on the KafkaProducer in the past. If it is a
>> connectivity issue, it might also explain why you lost the connection to ZK.
>>
>> Glad to hear that everything is back to normal. Keep us updated if something
>> unexpected happens again.
>>
>> – Ufuk
>>
>>
>> On Tue, May 15, 2018 at 6:28 AM, Tony Wei <[email protected]
>> <mailto:[email protected]>> wrote:
>> Hi all,
>>
>> I restarted the cluster and changed the log level to DEBUG, and raised the
>> parallelism of my streaming job from 32 to 40.
>> However, the problem just disappeared and I don't know why.
>> I will remain these settings for a while. If the error happen again, I will
>> bring more informations back for help. Thank you.
>>
>> Best Regards,
>> Tony Wei
>>
>> 2018-05-14 14:24 GMT+08:00 Tony Wei <[email protected]
>> <mailto:[email protected]>>:
>> Hi all,
>>
>> After I changed the `high-availability.zookeeper.client.session-timeout` and
>> `maxSessionTimeout` to 120000ms, the exception still occurred.
>>
>> Here is the log snippet. It seems this is nothing to do with zookeeper
>> client timeout, but I still don't know why kafka producer would be closed
>> without any task state changed.
>>
>> ```
>> 2018-05-14 05:18:53,468 WARN
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Client
>> session timed out, have not heard from server in 82828ms for sessionid
>> 0x305f957eb8d000a
>> 2018-05-14 05:18:53,468 INFO
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Client
>> session timed out, have not heard from server in 82828ms for sessionid
>> 0x305f957eb8d000a, closing socket connection and attempting reconnect
>> 2018-05-14 05:18:53,571 INFO
>> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>> - State change: SUSPENDED
>> 2018-05-14 05:18:53,574 WARN
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
>> Connection to ZooKeeper suspended. Can no longer retrieve the leader from
>> ZooKeeper.
>> 2018-05-14 05:18:53,850 WARN
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - SASL
>> configuration failed: javax.security.auth.login.LoginException: No JAAS
>> configuration section named 'Client' was found in specified JAAS
>> configuration file: '/mnt/jaas-466390940757021791.conf'. Will continue
>> connection to Zookeeper server without SASL authentication, if Zookeeper
>> server allows it.
>> 2018-05-14 05:18:53,850 INFO
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Opening
>> socket connection to server XXX.XXX.XXX.XXX:2181
>> 2018-05-14 05:18:53,852 ERROR
>> org.apache.flink.shaded.curator.org.apache.curator.ConnectionState -
>> Authentication failed
>> 2018-05-14 05:18:53,853 INFO
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Socket
>> connection established to XXX.XXX.XXX.XXX:2181, initiating session
>> 2018-05-14 05:18:53,859 INFO
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Session
>> establishment complete on server XXX.XXX.XXX.XXX:2181, sessionid =
>> 0x305f957eb8d000a, negotiated timeout = 120000
>> 2018-05-14 05:18:53,860 INFO
>> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>> - State change: RECONNECTED
>> 2018-05-14 05:18:53,860 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
>> Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.
>> 2018-05-14 05:28:54,781 INFO
>> org.apache.kafka.clients.producer.KafkaProducer - Closing the
>> Kafka producer with timeoutMillis = 9223372036854775807 ms.
>> 2018-05-14 05:28:54,829 INFO
>> org.apache.kafka.clients.producer.KafkaProducer - Closing the
>> Kafka producer with timeoutMillis = 9223372036854775807 ms.
>> 2018-05-14 05:28:54,918 INFO org.apache.flink.runtime.taskmanager.Task
>> - match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd ->
>> Sink: kafka-sink-cd) (1/32) (e3462ff8bb565bb0cf4de49ffc2595fb) switched from
>> RUNNING to FAILED.
>> java.lang.Exception: Failed to send data to Kafka: The server disconnected
>> before a response was received.
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:288)
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:464)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:441)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:415)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
>> at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:464)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:441)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:415)
>> at
>> org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.collect(CopyingDirectedOutput.java:62)
>> at
>> org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.collect(CopyingDirectedOutput.java:34)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> at
>> com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$4.apply(MatchRuleOperator.scala:39)
>> at
>> com.appier.rt.rt_match.flink.operator.MatchRuleOperator$$anonfun$flatMap1$4.apply(MatchRuleOperator.scala:38)
>> at
>> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>> at
>> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>> at
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>> at scala.collection.immutable.Map$Map2.foreach(Map.scala:137)
>> at
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>> at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
>> at
>> com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRuleOperator.scala:38)
>> at
>> com.appier.rt.rt_match.flink.operator.MatchRuleOperator.flatMap1(MatchRuleOperator.scala:14)
>> at org.apache.flink.streaming.api.operators.co
>> <http://api.operators.co/>.CoStreamFlatMap.processElement1(CoStreamFlatMap.java:53)
>> at org.apache.flink.streaming.runtime.io
>> <http://runtime.io/>.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:243)
>> at
>> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.kafka.common.errors.NetworkException: The server
>> disconnected before a response was received.
>> ```
>>
>> Best Regards,
>> Tony Wei
>>
>> 2018-05-14 11:36 GMT+08:00 Tony Wei <[email protected]
>> <mailto:[email protected]>>:
>> Hi all,
>>
>> Recently, my flink job met a problem that caused the job failed and
>> restarted.
>>
>> The log is list this screen snapshot
>>
>> <exception.png>
>>
>> or this
>>
>> ```
>> 2018-05-11 13:21:04,582 WARN
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Client
>> session timed out, have not heard from server in 61054ms for sessionid
>> 0x3054b165fe2006a
>> 2018-05-11 13:21:04,583 INFO
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Client
>> session timed out, have not heard from server in 61054ms for sessionid
>> 0x3054b165fe2006a, closing socket connection and attempting reconnect
>> 2018-05-11 13:21:04,683 INFO
>> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager
>> - State change: SUSPENDED
>> 2018-05-11 13:21:04,686 WARN
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
>> Connection to ZooKeeper suspended. Can no longer retrieve the leader from
>> ZooKeeper.
>> 2018-05-11 13:21:04,689 INFO
>> org.apache.kafka.clients.producer.KafkaProducer - Closing the
>> Kafka producer with timeoutMillis = 9223372036854775807 ms.
>> 2018-05-11 13:21:04,694 INFO
>> org.apache.kafka.clients.producer.KafkaProducer - Closing the
>> Kafka producer with timeoutMillis = 9223372036854775807 ms.
>> 2018-05-11 13:21:04,698 INFO org.apache.flink.runtime.taskmanager.Task
>> - match-rule -> (get-ordinary -> Sink: kafka-sink, get-cd ->
>> Sink: kafka-sink-cd) (4/32) (65a4044ac963e083f2635fe24e7f2403) switched from
>> RUNNING to FAILED.
>> java.lang.Exception: Failed to send data to Kafka: The server disconnected
>> before a response was received.
>> ```
>>
>> Logs showed `org.apache.kafka.clients.producer.KafkaProducer - Closing the
>> Kafka producer with timeoutMillis = 9223372036854775807 ms.` This timeout
>> value is Long.MAX_VALUE. It happened when someone called `producer.close()`.
>>
>> And I also saw the log said
>> `org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Client
>> session timed out, have not heard from server in 61054ms for sessionid
>> 0x3054b165fe2006a, closing socket connection and attempting reconnect`
>> and
>> `org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService -
>> Connection to ZooKeeper suspended. Can no longer retrieve the leader from
>> ZooKeeper.`
>>
>> I have checked zookeeper and kafka and there was no error during that period.
>> I was wondering if TM will stop the tasks when it lost zookeeper client in
>> HA mode. Since I didn't see any document or mailing thread discuss this, I'm
>> not sure if this is the reason that made kafka producer closed.
>> Could someone who know HA well? Or someone know what happened in my job?
>>
>> My flink cluster version is 1.4.0 with 2 masters and 10 slaves. My zookeeper
>> cluster version is 3.4.11 with 3 nodes.
>> The `high-availability.zookeeper.client.session-timeout` is default value:
>> 60000 ms.
>> The `maxSessionTimeout` in zoo.cfg is 40000ms.
>> I have already change the maxSessionTimeout to 120000ms this morning.
>>
>> This problem happened many many times during the last weekend and made my
>> kafka log delay grew up. Please help me. Thank you very much!
>>
>> Best Regards,
>> Tony Wei
>>
>>
>>
>>
>>
>>
>
>