Hi, Sorry for my late response. Actually, I received no response in Kafka mailing list and still cannot find the root cause. But when I use FlinkKafkaConsumer082, I do not encounter this issue, so I will use FlinkKafkaConsumer082.
Thanks Hironori 2016-06-17 2:59 GMT+09:00 Ufuk Celebi <u...@apache.org>: > Thanks :) > > On Thu, Jun 16, 2016 at 3:21 PM, Hironori Ogibayashi > <ogibaya...@gmail.com> wrote: >> Ufuk, >> >> Yes, of course. I will be sure to update when I got some more information. >> >> Hironori >> >> 2016-06-16 1:56 GMT+09:00 Ufuk Celebi <u...@apache.org>: >>> Hey Hironori, >>> >>> thanks for reporting this. Could you please update this thread when >>> you have more information from the Kafka list? >>> >>> – Ufuk >>> >>> >>> On Wed, Jun 15, 2016 at 2:48 PM, Hironori Ogibayashi >>> <ogibaya...@gmail.com> wrote: >>>> Kostas, >>>> >>>> Thank you for your advise. I have posted my question to the Kafka mailing >>>> list. >>>> I think Kafka brokers are fine because no errors on producer side with >>>> 15,000 msg/sec and >>>> from OS metrics, all of my brokers receives almost the same amount of >>>> network traffic. >>>> >>>> Thanks, >>>> Hironori >>>> >>>> >>>> >>>> >>>> 2016-06-14 22:40 GMT+09:00 Kostas Kloudas <k.klou...@data-artisans.com>: >>>>> Hello Hironori, >>>>> >>>>> The logs just show that you get stuck in the Kafka consumer polling loop, >>>>> which does not allow the consumer lock to be released. Thus the Flink >>>>> part of the consumer is never actually called. >>>>> >>>>> To my understanding this does not seem to be a Flink issue. >>>>> Or at least this is not shown from the logs. >>>>> >>>>> From googling a bit, I found this: >>>>> >>>>> http://stackoverflow.com/questions/35636739/kafka-consumer-marking-the-coordinator-2147483647-dead >>>>> >>>>> which relates the problem to network issues. >>>>> >>>>> Have you tried posting the problem also to the Kafka mailing list? >>>>> Can it be that the kafka broker fails and tries to reconnect but does not >>>>> make it? >>>>> >>>>> Kostas >>>>> >>>>> On Jun 14, 2016, at 2:59 PM, Hironori Ogibayashi <ogibaya...@gmail.com> >>>>> wrote: >>>>> >>>>> Kostas, >>>>> >>>>> I have attached a log file from one of the taskManager. (The same host >>>>> I executed jstack) >>>>> I noticed that there are lots of "Marking the coordinator 2147482645 >>>>> dead" message in the log. >>>>> MyContinuousProcessingTimeTriggerGlobal in the log is my custom >>>>> trigger which is based on >>>>> ContinuousProcessingTimeTrigger but clean up windows when it received >>>>> specific log records. >>>>> >>>>> Thanks, >>>>> Hironori >>>>> >>>>> 2016-06-14 21:23 GMT+09:00 Kostas Kloudas <k.klou...@data-artisans.com>: >>>>> >>>>> Hi Hironori, >>>>> >>>>> Could you also provide the logs of the taskManager? >>>>> >>>>> As you described, it seems that the consumer is stuck in the polling loop, >>>>> although Flink polls with >>>>> a timeout. This would normally mean that periodically it should release >>>>> the >>>>> lock for the checkpoints to go through. >>>>> >>>>> The logs of the task manager can help at clarifying why this does not >>>>> happen. >>>>> >>>>> Thanks, >>>>> Kostas >>>>> >>>>> On Jun 14, 2016, at 12:48 PM, Hironori Ogibayashi <ogibaya...@gmail.com> >>>>> wrote: >>>>> >>>>> Kostas, >>>>> >>>>> Thank you for your response. >>>>> Yes, I am using latest Flink, which is 1.0.3. >>>>> >>>>> Thanks, >>>>> Hironori >>>>> >>>>> 2016-06-14 19:02 GMT+09:00 Kostas Kloudas <k.klou...@data-artisans.com>: >>>>> >>>>> Hello Hironori, >>>>> >>>>> Are you using the latest Flink version? >>>>> There were some changes in the FlinkConsumer in the latest releases. >>>>> >>>>> Thanks, >>>>> Kostas >>>>> >>>>> On Jun 14, 2016, at 11:52 AM, Hironori Ogibayashi <ogibaya...@gmail.com> >>>>> wrote: >>>>> >>>>> Hello, >>>>> >>>>> I am running Flink job which reads topics from Kafka and write results >>>>> to Redis. I use FsStatebackend with HDFS. >>>>> >>>>> I noticed that taking checkpoint takes serveral minutes and sometimes >>>>> expires. >>>>> --- >>>>> 2016-06-14 17:25:40,734 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>>>> Completed checkpoint 1456 (in 257956 ms) >>>>> 2016-06-14 17:25:40,735 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>>>> Triggering checkpoint 1457 @ 1465892740734 >>>>> 2016-06-14 17:35:40,735 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>>>> Checkpoint 1457 expired before completing. >>>>> 2016-06-14 17:35:40,736 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>>>> Triggering checkpoint 1458 @ 1465893340735 >>>>> 2016-06-14 17:45:40,736 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>>>> Checkpoint 1458 expired before completing. >>>>> 2016-06-14 17:45:40,737 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>>>> Triggering checkpoint 1459 @ 1465893940736 >>>>> 2016-06-14 17:55:40,738 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>>>> Checkpoint 1459 expired before completing. >>>>> 2016-06-14 17:55:40,739 INFO >>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - >>>>> Triggering checkpoint 1460 @ 1465894540738 >>>>> --- >>>>> >>>>> According to WebUI, checkpoint size is just 1MB. Why checkpointing >>>>> takes so long? >>>>> >>>>> I took jstack during checkpointing. It looks that checkpointing thread >>>>> is blocked in commitOffsets. >>>>> >>>>> ---- >>>>> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon >>>>> prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry >>>>> [0x00007f2b3ddfc000] >>>>> java.lang.Thread.State: BLOCKED (on object monitor) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(FlinkKafkaConsumer09.java:392) >>>>> - waiting to lock <0x0000000659111b58> (a >>>>> org.apache.kafka.clients.consumer.KafkaConsumer) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(FlinkKafkaConsumerBase.java:169) >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:179) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:596) >>>>> - locked <0x0000000659111cc8> (a java.lang.Object) >>>>> at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:945) >>>>> at >>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:262) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>> at >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> --- >>>>> >>>>> Blocker is this. >>>>> >>>>> --- >>>>> "Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable >>>>> [0x00007f2b3dbfa000] >>>>> java.lang.Thread.State: RUNNABLE >>>>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) >>>>> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) >>>>> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) >>>>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) >>>>> - locked <0x0000000659457dc8> (a sun.nio.ch.Util$2) >>>>> - locked <0x0000000659457db8> (a >>>>> java.util.Collections$UnmodifiableSet) >>>>> - locked <0x0000000659457108> (a sun.nio.ch.EPollSelectorImpl) >>>>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98) >>>>> at org.apache.kafka.common.network.Selector.select(Selector.java:425) >>>>> at org.apache.kafka.common.network.Selector.poll(Selector.java:254) >>>>> at >>>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) >>>>> at >>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) >>>>> at >>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) >>>>> at >>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) >>>>> at >>>>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908) >>>>> at >>>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) >>>>> at >>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:449) >>>>> - locked <0x0000000659111b58> (a >>>>> org.apache.kafka.clients.consumer.KafkaConsumer) >>>>> --- >>>>> >>>>> If someone could advise me of the cause or the way to investigate >>>>> further, that would be appreciated. >>>>> >>>>> Thanks, >>>>> Hironori >>>>> >>>>> >>>>> >>>>> <flink-flink-taskmanager-0-FLINK1503.log.gz> >>>>> >>>>>