Hi.

Do we have an idea for this exception?

Thanks,
Yitzchak.

On Tue, Jul 23, 2019 at 12:59 PM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Yitzchak,
>
> Thanks for reaching out.
> I'm not an expert on the Kafka consumer, but I think the number of
> partitions and the number of source tasks might be interesting to know.
>
> Maybe Gordon (in CC) has an idea of what's going wrong here.
>
> Best, Fabian
>
> Am Di., 23. Juli 2019 um 08:50 Uhr schrieb Yitzchak Lieberman <
> yitzch...@sentinelone.com>:
>
>> Hi.
>>
>> Another question - what will happen during a triggered checkpoint if one
>> of the kafka brokers is unavailable?
>> Will appreciate your insights.
>>
>> Thanks.
>>
>> On Mon, Jul 22, 2019 at 12:42 PM Yitzchak Lieberman <
>> yitzch...@sentinelone.com> wrote:
>>
>>> Hi.
>>>
>>> I'm running a Flink application (version 1.8.0) that
>>> uses FlinkKafkaConsumer to fetch topic data and perform transformation on
>>> the data, with state backend as below:
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>         env.enableCheckpointing(5_000, CheckpointingMode.AT_LEAST_ONCE);
>>>         env.setStateBackend((StateBackend) new
>>> FsStateBackend("file:///test"));
>>>         env.getCheckpointConfig().setCheckpointTimeout(30_000);
>>>
>>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>>         env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>>>         env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
>>>
>>> My problem is with the kafka brokers, where in the cluster there are 3
>>> operating brokers and 2 are down - total 5 brokers.
>>> I was able to consume the data, but when the checkpoint triggered it
>>> throws this exception:
>>>
>>> [INFO ] 2019-07-22 12:29:14.634 [flink-akka.actor.default-dispatcher-18]
>>> o.a.f.r.c.CheckpointCoordinator - Decline checkpoint 6 by task
>>> 457b1f801fee89d6f9544409877e29d8 of job 1c46aa5719bac1f0bea436d460b79db1.
>>> [INFO ] 2019-07-22 12:29:14.636 [flink-akka.actor.default-dispatcher-28]
>>> o.a.f.r.t.TaskExecutor - Un-registering task and sending final execution
>>> state FAILED to JobManager for task Source: Custom Source -> Sink: Print to
>>> Std. Out 457b1f801fee89d6f9544409877e29d8.
>>> [INFO ] 2019-07-22 12:29:14.634 [flink-akka.actor.default-dispatcher-18]
>>> o.a.f.r.c.CheckpointCoordinator - Discarding checkpoint 6 of job
>>> 1c46aa5719bac1f0bea436d460b79db1.
>>> org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
>>> Task Source: Custom Source -> Sink: Print to Std. Out (2/4) was not running
>>> at
>>> org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1198)
>>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>>> at
>>> org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:700)
>>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>>> at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) ~[?:?]
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> ~[?:1.8.0_201]
>>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:274)
>>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:189)
>>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147)
>>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0]
>>> at
>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>> at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:502)
>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala)
>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>> ~[akka-actor_2.11-2.4.20.jar:?]
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> [scala-library-2.11.12.jar:?]
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> [scala-library-2.11.12.jar:?]
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> [scala-library-2.11.12.jar:?]
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> [scala-library-2.11.12.jar:?]
>>> [INFO ] 2019-07-22 12:29:14.637 [flink-akka.actor.default-dispatcher-28]
>>> o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out
>>> (2/4) (457b1f801fee89d6f9544409877e29d8) switched from RUNNING to FAILED.
>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
>>> fetching topic metadata
>>>
>>> My question is (as I think what does the checkpoint tries to do) why is
>>> it trying to fetch topic metadata from the brokers that are down?
>>>
>>> Thanks,
>>> Yitzchak.
>>>
>>

Reply via email to