Prabhu Joseph created FLINK-34132:
-------------------------------------
Summary: Batch WordCount job fails when run with AdaptiveBatch
scheduler
Key: FLINK-34132
URL: https://issues.apache.org/jira/browse/FLINK-34132
Project: Flink
Issue Type: Bug
Affects Versions: 1.18.1, 1.17.1
Reporter: Prabhu Joseph
Batch WordCount job fails when run with AdaptiveBatch scheduler.
*Repro Steps*
{code}
flink-yarn-session -Djobmanager.scheduler=adaptive -d
flink run -d /usr/lib/flink/examples/batch/WordCount.jar --input
s3://prabhuflinks3/INPUT --output s3://prabhuflinks3/OUT
{code}
*Error logs*
{code}
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: java.util.concurrent.ExecutionException:
java.lang.RuntimeException:
org.apache.flink.runtime.client.JobInitializationException: Could not start the
JobMaster.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
at
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException:
java.lang.RuntimeException:
org.apache.flink.runtime.client.JobInitializationException: Could not start the
JobMaster.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
at
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1067)
at
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:144)
at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:73)
at
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:106)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 12 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException:
org.apache.flink.runtime.client.JobInitializationException: Could not start the
JobMaster.
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1062)
... 20 more
Caused by: java.lang.RuntimeException:
org.apache.flink.runtime.client.JobInitializationException: Could not start the
JobMaster.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at
java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could
not start the JobMaster.
at
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.CompletionException:
java.lang.IllegalStateException: At the moment, adaptive batch scheduler
requires batch workloads to be executed with types of all edges being BLOCKING
or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to configure
'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or
'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'.
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
... 3 more
Caused by: java.lang.IllegalStateException: At the moment, adaptive batch
scheduler requires batch workloads to be executed with types of all edges being
BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to configure
'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or
'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'.
at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory.checkAllExchangesAreSupported(AdaptiveBatchSchedulerFactory.java:324)
at
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory.createInstance(AdaptiveBatchSchedulerFactory.java:127)
at
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:124)
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:393)
at
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:362)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
... 3 more
{code}
*Analysis*
Have set execution.batch-shuffle-mode to
ALL_EXCHANGES_BLOCKING/ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE
but all of them failed with same error message.
The Wordcount program runs fine when setting below in the code
{code}
env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);
{code}
Need to investigate why the execution.batch-shuffle-mode is not being
recognized and, if this behavior is intentional, correct the reported
misleading error message. Additionally, we need to address the Wordcount job to
ensure it runs seamlessly with both batch and adaptive scheduler.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)