Prabhu Joseph created FLINK-34132: ------------------------------------- Summary: Batch WordCount job fails when run with AdaptiveBatch scheduler Key: FLINK-34132 URL: https://issues.apache.org/jira/browse/FLINK-34132 Project: Flink Issue Type: Bug Affects Versions: 1.18.1, 1.17.1 Reporter: Prabhu Joseph
Batch WordCount job fails when run with AdaptiveBatch scheduler. *Repro Steps* {code} flink-yarn-session -Djobmanager.scheduler=adaptive -d flink run -d /usr/lib/flink/examples/batch/WordCount.jar --input s3://prabhuflinks3/INPUT --output s3://prabhuflinks3/OUT {code} *Error logs* {code} The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095) at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157) Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1067) at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:144) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:73) at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:106) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ... 12 more Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1062) ... 20 more Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: At the moment, adaptive batch scheduler requires batch workloads to be executed with types of all edges being BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to configure 'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or 'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) ... 3 more Caused by: java.lang.IllegalStateException: At the moment, adaptive batch scheduler requires batch workloads to be executed with types of all edges being BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to configure 'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or 'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) at org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory.checkAllExchangesAreSupported(AdaptiveBatchSchedulerFactory.java:324) at org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory.createInstance(AdaptiveBatchSchedulerFactory.java:127) at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:124) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:393) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:362) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ... 3 more {code} *Analysis* Have set execution.batch-shuffle-mode to ALL_EXCHANGES_BLOCKING/ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE but all of them failed with same error message. The Wordcount program runs fine when setting below in the code {code} env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED); {code} Need to investigate why the execution.batch-shuffle-mode is not being recognized and, if this behavior is intentional, correct the reported misleading error message. Additionally, we need to address the Wordcount job to ensure it runs seamlessly with both batch and adaptive scheduler. -- This message was sent by Atlassian Jira (v8.20.10#820010)