[ https://issues.apache.org/jira/browse/FLINK-10721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tzu-Li (Gordon) Tai updated FLINK-10721: ---------------------------------------- Fix Version/s: (was: 1.6.3) 1.6.4 > kafkaFetcher runFetchLoop throw exception will cause follow-up code not > execute in FlinkKafkaConsumerBase run method > --------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-10721 > URL: https://issues.apache.org/jira/browse/FLINK-10721 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.6.2 > Reporter: zhaoshijie > Assignee: Chesnay Schepler > Priority: Major > Fix For: 1.6.4, 1.7.2, 1.8.0 > > > In FlinkKafkaConsumerBase run method on line 721(master branch), if > kafkaFetcher.runFetchLoop() throw exception(by discoveryLoopThread throw > exception then finally execute cancel method, cancel method will execute > kafkaFetcher.cancel, this implemented Kafka09Fetcher will execute > handover.close, then result in handover.pollNext throw ClosedException),then > next code will not execute,especially discoveryLoopError not be throwed,so, > real culprit exception will be Swallowed. > failed log like this: > {code:java} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply$mcV$sp(JobManager.scala:1229) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException > at > org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:180) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.cancel(Kafka09Fetcher.java:174) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:753) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase$2.run(FlinkKafkaConsumerBase.java:695) > at java.lang.Thread.run(Thread.java:745) > {code} > Shoud we modify it as follows? > {code:java} > try { > kafkaFetcher.runFetchLoop(); > } catch (Exception e) { > // if discoveryLoopErrorRef not null ,we should > throw real culprit exception > if (discoveryLoopErrorRef.get() != null){ > throw new > RuntimeException(discoveryLoopErrorRef.get()); > } else { > throw e; > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)