Hi.

Turned out that the cause was non-replicated (replication factor = 1)
topics in Kafka.

On Wed, Jul 24, 2019 at 4:20 PM Yitzchak Lieberman <
yitzch...@sentinelone.com> wrote:

> Hi.
>
> Do we have an idea for this exception?
>
> Thanks,
> Yitzchak.
>
> On Tue, Jul 23, 2019 at 12:59 PM Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Yitzchak,
>>
>> Thanks for reaching out.
>> I'm not an expert on the Kafka consumer, but I think the number of
>> partitions and the number of source tasks might be interesting to know.
>>
>> Maybe Gordon (in CC) has an idea of what's going wrong here.
>>
>> Best, Fabian
>>
>> Am Di., 23. Juli 2019 um 08:50 Uhr schrieb Yitzchak Lieberman <
>> yitzch...@sentinelone.com>:
>>
>>> Hi.
>>>
>>> Another question - what will happen during a triggered checkpoint if one
>>> of the kafka brokers is unavailable?
>>> Will appreciate your insights.
>>>
>>> Thanks.
>>>
>>> On Mon, Jul 22, 2019 at 12:42 PM Yitzchak Lieberman <
>>> yitzch...@sentinelone.com> wrote:
>>>
>>>> Hi.
>>>>
>>>> I'm running a Flink application (version 1.8.0) that
>>>> uses FlinkKafkaConsumer to fetch topic data and perform transformation on
>>>> the data, with state backend as below:
>>>> StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>         env.enableCheckpointing(5_000, CheckpointingMode.AT_LEAST_ONCE);
>>>>         env.setStateBackend((StateBackend) new
>>>> FsStateBackend("file:///test"));
>>>>         env.getCheckpointConfig().setCheckpointTimeout(30_000);
>>>>
>>>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>>>         env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>>>>         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
>>>>
>>>> My problem is with the kafka brokers, where in the cluster there are 3
>>>> operating brokers and 2 are down - total 5 brokers.
>>>> I was able to consume the data, but when the checkpoint triggered it
>>>> throws this exception:
>>>>
>>>> [INFO ] 2019-07-22 12:29:14.634
>>>> [flink-akka.actor.default-dispatcher-18] o.a.f.r.c.CheckpointCoordinator -
>>>> Decline checkpoint 6 by task 457b1f801fee89d6f9544409877e29d8 of job
>>>> 1c46aa5719bac1f0bea436d460b79db1.
>>>> [INFO ] 2019-07-22 12:29:14.636
>>>> [flink-akka.actor.default-dispatcher-28] o.a.f.r.t.TaskExecutor -
>>>> Un-registering task and sending final execution state FAILED to JobManager
>>>> for task Source: Custom Source -> Sink: Print to Std. Out
>>>> 457b1f801fee89d6f9544409877e29d8.
>>>> [INFO ] 2019-07-22 12:29:14.634
>>>> [flink-akka.actor.default-dispatcher-18] o.a.f.r.c.CheckpointCoordinator -
>>>> Discarding checkpoint 6 of job 1c46aa5719bac1f0bea436d460b79db1.
>>>> org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
>>>> Task Source: Custom Source -> Sink: Print to Std. Out (2/4) was not running
>>>> at
>>>> org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1198)
>>>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>>>> at
>>>> org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:700)
>>>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>>>> at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) ~[?:?]
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> ~[?:1.8.0_201]
>>>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201]
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:274)
>>>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:189)
>>>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>>>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>>>> at
>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>>> at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:502)
>>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala)
>>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> [scala-library-2.11.12.jar:?]
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> [scala-library-2.11.12.jar:?]
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> [scala-library-2.11.12.jar:?]
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> [scala-library-2.11.12.jar:?]
>>>> [INFO ] 2019-07-22 12:29:14.637
>>>> [flink-akka.actor.default-dispatcher-28] o.a.f.r.e.ExecutionGraph - Source:
>>>> Custom Source -> Sink: Print to Std. Out (2/4)
>>>> (457b1f801fee89d6f9544409877e29d8) switched from RUNNING to FAILED.
>>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
>>>> fetching topic metadata
>>>>
>>>> My question is (as I think what does the checkpoint tries to do) why is
>>>> it trying to fetch topic metadata from the brokers that are down?
>>>>
>>>> Thanks,
>>>> Yitzchak.
>>>>
>>>

Reply via email to