Hi. Turned out that the cause was non-replicated (replication factor = 1) topics in Kafka.
On Wed, Jul 24, 2019 at 4:20 PM Yitzchak Lieberman < yitzch...@sentinelone.com> wrote: > Hi. > > Do we have an idea for this exception? > > Thanks, > Yitzchak. > > On Tue, Jul 23, 2019 at 12:59 PM Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Yitzchak, >> >> Thanks for reaching out. >> I'm not an expert on the Kafka consumer, but I think the number of >> partitions and the number of source tasks might be interesting to know. >> >> Maybe Gordon (in CC) has an idea of what's going wrong here. >> >> Best, Fabian >> >> Am Di., 23. Juli 2019 um 08:50 Uhr schrieb Yitzchak Lieberman < >> yitzch...@sentinelone.com>: >> >>> Hi. >>> >>> Another question - what will happen during a triggered checkpoint if one >>> of the kafka brokers is unavailable? >>> Will appreciate your insights. >>> >>> Thanks. >>> >>> On Mon, Jul 22, 2019 at 12:42 PM Yitzchak Lieberman < >>> yitzch...@sentinelone.com> wrote: >>> >>>> Hi. >>>> >>>> I'm running a Flink application (version 1.8.0) that >>>> uses FlinkKafkaConsumer to fetch topic data and perform transformation on >>>> the data, with state backend as below: >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> env.enableCheckpointing(5_000, CheckpointingMode.AT_LEAST_ONCE); >>>> env.setStateBackend((StateBackend) new >>>> FsStateBackend("file:///test")); >>>> env.getCheckpointConfig().setCheckpointTimeout(30_000); >>>> >>>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >>>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); >>>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); >>>> >>>> My problem is with the kafka brokers, where in the cluster there are 3 >>>> operating brokers and 2 are down - total 5 brokers. >>>> I was able to consume the data, but when the checkpoint triggered it >>>> throws this exception: >>>> >>>> [INFO ] 2019-07-22 12:29:14.634 >>>> [flink-akka.actor.default-dispatcher-18] o.a.f.r.c.CheckpointCoordinator - >>>> Decline checkpoint 6 by task 457b1f801fee89d6f9544409877e29d8 of job >>>> 1c46aa5719bac1f0bea436d460b79db1. >>>> [INFO ] 2019-07-22 12:29:14.636 >>>> [flink-akka.actor.default-dispatcher-28] o.a.f.r.t.TaskExecutor - >>>> Un-registering task and sending final execution state FAILED to JobManager >>>> for task Source: Custom Source -> Sink: Print to Std. Out >>>> 457b1f801fee89d6f9544409877e29d8. >>>> [INFO ] 2019-07-22 12:29:14.634 >>>> [flink-akka.actor.default-dispatcher-18] o.a.f.r.c.CheckpointCoordinator - >>>> Discarding checkpoint 6 of job 1c46aa5719bac1f0bea436d460b79db1. >>>> org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException: >>>> Task Source: Custom Source -> Sink: Print to Std. Out (2/4) was not running >>>> at >>>> org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1198) >>>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0] >>>> at >>>> org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:700) >>>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0] >>>> at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) ~[?:?] >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> ~[?:1.8.0_201] >>>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_201] >>>> at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:274) >>>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0] >>>> at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:189) >>>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0] >>>> at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:147) >>>> ~[flink-runtime_2.11-1.8.0.jar:1.8.0] >>>> at >>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) >>>> ~[akka-actor_2.11-2.4.20.jar:?] >>>> at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:502) >>>> ~[akka-actor_2.11-2.4.20.jar:?] >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala) >>>> ~[akka-actor_2.11-2.4.20.jar:?] >>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >>>> ~[akka-actor_2.11-2.4.20.jar:?] >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >>>> ~[akka-actor_2.11-2.4.20.jar:?] >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495) >>>> ~[akka-actor_2.11-2.4.20.jar:?] >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >>>> ~[akka-actor_2.11-2.4.20.jar:?] >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >>>> ~[akka-actor_2.11-2.4.20.jar:?] >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >>>> ~[akka-actor_2.11-2.4.20.jar:?] >>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> [scala-library-2.11.12.jar:?] >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> [scala-library-2.11.12.jar:?] >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> [scala-library-2.11.12.jar:?] >>>> at >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> [scala-library-2.11.12.jar:?] >>>> [INFO ] 2019-07-22 12:29:14.637 >>>> [flink-akka.actor.default-dispatcher-28] o.a.f.r.e.ExecutionGraph - Source: >>>> Custom Source -> Sink: Print to Std. Out (2/4) >>>> (457b1f801fee89d6f9544409877e29d8) switched from RUNNING to FAILED. >>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while >>>> fetching topic metadata >>>> >>>> My question is (as I think what does the checkpoint tries to do) why is >>>> it trying to fetch topic metadata from the brokers that are down? >>>> >>>> Thanks, >>>> Yitzchak. >>>> >>>