Prabhu Joseph created FLINK-34132:
-------------------------------------

             Summary: Batch WordCount job fails when run with AdaptiveBatch 
scheduler
                 Key: FLINK-34132
                 URL: https://issues.apache.org/jira/browse/FLINK-34132
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.18.1, 1.17.1
            Reporter: Prabhu Joseph


Batch WordCount job fails when run with AdaptiveBatch scheduler.

*Repro Steps*

{code}
flink-yarn-session -Djobmanager.scheduler=adaptive -d

 flink run -d /usr/lib/flink/examples/batch/WordCount.jar --input 
s3://prabhuflinks3/INPUT --output s3://prabhuflinks3/OUT
{code}

*Error logs*

{code}
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: java.util.concurrent.ExecutionException: 
java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
        at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
        at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
        at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at 
org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
        at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
        at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1067)
        at 
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:144)
        at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:73)
        at 
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:106)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        ... 12 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1062)
        ... 20 more
Caused by: java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
        at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
        at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
        at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at 
java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
not start the JobMaster.
        at 
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.CompletionException: 
java.lang.IllegalStateException: At the moment, adaptive batch scheduler 
requires batch workloads to be executed with types of all edges being BLOCKING 
or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to configure 
'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or 
'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'.
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
        at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
        ... 3 more
Caused by: java.lang.IllegalStateException: At the moment, adaptive batch 
scheduler requires batch workloads to be executed with types of all edges being 
BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. To do that, you need to configure 
'execution.batch-shuffle-mode' to 'ALL_EXCHANGES_BLOCKING' or 
'ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE'.
        at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
        at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory.checkAllExchangesAreSupported(AdaptiveBatchSchedulerFactory.java:324)
        at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory.createInstance(AdaptiveBatchSchedulerFactory.java:127)
        at 
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:124)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:393)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:362)
        at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)
        at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)
        at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        ... 3 more

{code}

*Analysis*

Have set execution.batch-shuffle-mode to 
ALL_EXCHANGES_BLOCKING/ALL_EXCHANGES_HYBRID_FULL/ALL_EXCHANGES_HYBRID_SELECTIVE 
but all of them failed with same error message.

The Wordcount program runs fine when setting below in the code

{code}
env.getConfig().setExecutionMode(ExecutionMode.BATCH_FORCED);
{code}


Need to investigate why the execution.batch-shuffle-mode is not being 
recognized and, if this behavior is intentional, correct the reported 
misleading error message. Additionally, we need to address the Wordcount job to 
ensure it runs seamlessly with both batch and adaptive scheduler.









--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to