Thanks :)

On Thu, Jun 16, 2016 at 3:21 PM, Hironori Ogibayashi
<> wrote:
> Ufuk,
> Yes, of course. I will be sure to update when I got some more information.
> Hironori
> 2016-06-16 1:56 GMT+09:00 Ufuk Celebi <>:
>> Hey Hironori,
>> thanks for reporting this. Could you please update this thread when
>> you have more information from the Kafka list?
>> – Ufuk
>> On Wed, Jun 15, 2016 at 2:48 PM, Hironori Ogibayashi
>> <> wrote:
>>> Kostas,
>>> Thank you for your advise. I have posted my question to the Kafka mailing 
>>> list.
>>> I think Kafka brokers are fine because no errors on producer side with
>>> 15,000 msg/sec and
>>> from OS metrics, all of my brokers receives almost the same amount of
>>> network traffic.
>>> Thanks,
>>> Hironori
>>> 2016-06-14 22:40 GMT+09:00 Kostas Kloudas <>:
>>>> Hello Hironori,
>>>> The logs just show that you get stuck in the Kafka consumer polling loop,
>>>> which does not allow the consumer lock to be released. Thus the Flink
>>>> part of the consumer is never actually called.
>>>> To my understanding this does not seem to be a Flink issue.
>>>> Or at least this is not shown from the logs.
>>>> From googling a bit, I found this:
>>>> which relates the problem to network issues.
>>>> Have you tried posting the problem also to the Kafka mailing list?
>>>> Can it be that the kafka broker fails and tries to reconnect but does not
>>>> make it?
>>>> Kostas
>>>> On Jun 14, 2016, at 2:59 PM, Hironori Ogibayashi <>
>>>> wrote:
>>>> Kostas,
>>>> I have attached a log file from one of the taskManager. (The same host
>>>> I executed jstack)
>>>> I noticed that there are lots of "Marking the coordinator 2147482645
>>>> dead" message in the log.
>>>> MyContinuousProcessingTimeTriggerGlobal in the log is my custom
>>>> trigger which is based on
>>>> ContinuousProcessingTimeTrigger but clean up windows when it received
>>>> specific log records.
>>>> Thanks,
>>>> Hironori
>>>> 2016-06-14 21:23 GMT+09:00 Kostas Kloudas <>:
>>>> Hi Hironori,
>>>> Could you also provide the logs of the taskManager?
>>>> As you described, it seems that the consumer is stuck in the polling loop,
>>>> although Flink polls with
>>>> a timeout. This would normally mean that periodically it should release the
>>>> lock for the checkpoints to go through.
>>>> The logs of the task manager can help at clarifying why this does not
>>>> happen.
>>>> Thanks,
>>>> Kostas
>>>> On Jun 14, 2016, at 12:48 PM, Hironori Ogibayashi <>
>>>> wrote:
>>>> Kostas,
>>>> Thank you for your response.
>>>> Yes, I am using latest Flink, which is 1.0.3.
>>>> Thanks,
>>>> Hironori
>>>> 2016-06-14 19:02 GMT+09:00 Kostas Kloudas <>:
>>>> Hello Hironori,
>>>> Are you using the latest Flink version?
>>>> There were some changes in the FlinkConsumer in the latest releases.
>>>> Thanks,
>>>> Kostas
>>>> On Jun 14, 2016, at 11:52 AM, Hironori Ogibayashi <>
>>>> wrote:
>>>> Hello,
>>>> I am running Flink job which reads topics from Kafka and write results
>>>> to Redis. I use FsStatebackend with HDFS.
>>>> I noticed that taking checkpoint takes serveral minutes and sometimes
>>>> expires.
>>>> ---
>>>> 2016-06-14 17:25:40,734 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Completed checkpoint 1456 (in 257956 ms)
>>>> 2016-06-14 17:25:40,735 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Triggering checkpoint 1457 @ 1465892740734
>>>> 2016-06-14 17:35:40,735 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Checkpoint 1457 expired before completing.
>>>> 2016-06-14 17:35:40,736 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Triggering checkpoint 1458 @ 1465893340735
>>>> 2016-06-14 17:45:40,736 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Checkpoint 1458 expired before completing.
>>>> 2016-06-14 17:45:40,737 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Triggering checkpoint 1459 @ 1465893940736
>>>> 2016-06-14 17:55:40,738 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Checkpoint 1459 expired before completing.
>>>> 2016-06-14 17:55:40,739 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>>> Triggering checkpoint 1460 @ 1465894540738
>>>> ---
>>>> According to WebUI, checkpoint size is just 1MB. Why checkpointing
>>>> takes so long?
>>>> I took jstack during checkpointing. It looks that checkpointing thread
>>>> is blocked in commitOffsets.
>>>> ----
>>>> "Async calls on Source: Custom Source -> Flat Map (2/3)" daemon
>>>> prio=10 tid=0x00007f2b14010800 nid=0x1b89a waiting for monitor entry
>>>> [0x00007f2b3ddfc000]
>>>> java.lang.Thread.State: BLOCKED (on object monitor)
>>>>      at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.commitOffsets(
>>>>      - waiting to lock <0x0000000659111b58> (a
>>>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>>>      at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.notifyCheckpointComplete(
>>>>      at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(
>>>>      at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(
>>>>      - locked <0x0000000659111cc8> (a java.lang.Object)
>>>>      at org.apache.flink.runtime.taskmanager.Task$
>>>>      at
>>>> java.util.concurrent.Executors$
>>>>      at
>>>>      at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>      at
>>>> java.util.concurrent.ThreadPoolExecutor$
>>>>      at
>>>> ---
>>>> Blocker is this.
>>>> ---
>>>> "Thread-9" daemon prio=10 tid=0x00007f2b2440d000 nid=0x1b838 runnable
>>>> [0x00007f2b3dbfa000]
>>>> java.lang.Thread.State: RUNNABLE
>>>>      at Method)
>>>>      at
>>>>      at
>>>>      at
>>>>      - locked <0x0000000659457dc8> (a$2)
>>>>      - locked <0x0000000659457db8> (a 
>>>> java.util.Collections$UnmodifiableSet)
>>>>      - locked <0x0000000659457108> (a
>>>>      at
>>>>      at
>>>>      at
>>>>      at org.apache.kafka.clients.NetworkClient.poll(
>>>>      at
>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(
>>>>      at
>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
>>>>      at
>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
>>>>      at
>>>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
>>>>      at
>>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>>>>      at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$
>>>>      - locked <0x0000000659111b58> (a
>>>> org.apache.kafka.clients.consumer.KafkaConsumer)
>>>> ---
>>>> If someone could advise me of the cause or the way to investigate
>>>> further, that would be appreciated.
>>>> Thanks,
>>>> Hironori
>>>> <flink-flink-taskmanager-0-FLINK1503.log.gz>

Reply via email to