Guozhen Yang created FLINK-32423:
------------------------------------

             Summary: Flink-sql-runner-example application fails if multiple 
execute() called in one sql file
                 Key: FLINK-32423
                 URL: https://issues.apache.org/jira/browse/FLINK-32423
             Project: Flink
          Issue Type: Improvement
          Components: Kubernetes Operator
            Reporter: Guozhen Yang


h2. Summary:

flink-sql-runner-example application fails if multiple execute() called in one 
sql file
h2. Background:

We have a series of batch jobs running on a table partitioned by date. The jobs 
need to be run sequencially in chronological order. Which means only after the 
batch job #1 finishes running 2023-06-01 partition, the batch job #2 running 
2023-06-02 partition starts running. So we loop through dates and submit 
multiple jobs in a single application, and the flink application is deployed in 
application mode with HA turned off.

According to [flink 
document|https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/overview/#application-mode],
 the Application Mode allows the submission of applications consisting of 
multiple jobs, but High-Availability is not supported in these cases.
h2. The problem:

The application consisted of multiple jobs fails when the second job is 
executed.

Stack trace is shown as below:
{noformat}
2023-06-21 03:21:44,720 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred 
in the cluster entrypoint.
java.util.concurrent.CompletionException: 
org.apache.flink.client.deployment.application.ApplicationExecutionException: 
Could not execute application.
    at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) 
~[?:?]
    at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) 
~[?:?]
    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown 
Source) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
    at java.util.concurrent.CompletableFuture.completeExceptionally(Unknown 
Source) ~[?:?]
    at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:337)
 ~[flink-dist-1.1
    6.2.jar:1.16.2]
    at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254)
 ~[flink-dist
    -1.16.2.jar:1.16.2]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
~[?:?]
    at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
    at 
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171)
 ~[flink-rpc-a
    kka_0e3d2618-241c-420f-a71d-2f4d1edcb5a1.jar:1.16.2]
    at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[flink-rpc-akka_0e3d2618-241c-420f-a71d-2f4d1ed
    cb5a1.jar:1.16.2]
    at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
 ~[flink-rpc-akka_0e3d2618-241c-420f-a71d-2
    f4d1edcb5a1.jar:1.16.2]
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) 
[flink-rpc-akka_0e3d2618-241c-420f-a71d-2f4d1edcb5a1.jar:1.16.2]
    at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
 [flink-rpc-akka_0e3d2618-241c-420f-a71d-2f4d1edcb5a1.jar
    :1.16.2]
    at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
    at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) 
[?:?]
    at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
    at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
    at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
Caused by: 
org.apache.flink.client.deployment.application.ApplicationExecutionException: 
Could not execute application.
    ... 14 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: Failed to execute sql
    at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
 ~[flink-dist-1.16.2.jar:1.16.2]
    at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist-1.16.2.jar:1.16.2]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) 
~[flink-dist-1.16.2.jar:1.16.2]
    at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301)
 ~[flink-dist-1.1
    6.2.jar:1.16.2]
    ... 13 more
Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:903)
 ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382)
 ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
 ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
    at org.apache.flink.examples.SqlRunner.main(SqlRunner.java:52) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
~[?:?]
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
~[?:?]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
    at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-dist-1.16.2.jar:1.16.2]
    at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist-1.16.2.jar:1.16.2]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) 
~[flink-dist-1.16.2.jar:1.16.2]
    at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301)
 ~[flink-dist-1.1
    6.2.jar:1.16.2]
    ... 13 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than 
one execute() or executeAsync() call in a single environment.
    at 
org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:217)
 ~[flink-dist-1.16.2.jar:1.16.2]
    at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:205)
 ~[flink-dist-1.16.2.jar:1.16.2]
    at 
org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
 ~[?:?]
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:884)
 ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1382)
 ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
 ~[flink-table-api-java-uber-1.16.2.jar:1.16.2]
    at org.apache.flink.examples.SqlRunner.main(SqlRunner.java:52) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
~[?:?]
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
~[?:?]
    at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
    at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-dist-1.16.2.jar:1.16.2]
    at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist-1.16.2.jar:1.16.2]
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) 
~[flink-dist-1.16.2.jar:1.16.2]
    at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301)
 ~[flink-dist-1.1
    6.2.jar:1.16.2]
    ... 13 more
{noformat}
h2. How to reproduce:

1. Start a minikube cluster
2. Add new script file _two-selects.sql_ to 
[examples/flink-sql-runner-example/sql-scripts 
folder|https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example].
 
The contents of _two-selects.sql_ is shown as below.
{noformat}
select 1;
select 1;
{noformat}
3. Follow the 
[instruction|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/README.md]
 to build the flink-sql-runner-example image.
4. Use minikube image load command to load the image.
4. Modify [flinkdep yaml 
file|https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/sql-example.yaml],
 change sepc.job.args to args: 
["/opt/flink/usrlib/sql-scripts/two-selects.sql"]. Then apply the flinkdep yaml 
file.
5. The application fails.
h2. Possible reason:

According to [flink-kubernetes-oeprator 
document|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/controller-flow/#application-reconciler],
 flink by default generate deterministic jobids based on clusterId.
{quote}Flink by default generates deterministic jobids based on the clusterId 
(which is the CR name in our case). This causes checkpoint path conflicts if 
the job is ever restarted from an empty state (stateless upgrade). We therefore 
generate a random jobid to avoid this.
{quote}
I found flink-kubernetes-operator always set job id when submitting 
application. [Corresponding code of setJobIdIfNecessary is 
here.|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java#L191C18-L191C37]

But according to [flink's 
code|https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java#L213],
 there are two situations.

1. HA is not activated and job id is not set when submitting application(line 
213 to 217). runApplicationAsync is called with 
enforceSingleJobExecution=false. So mult-job execution is viable.
2. If job id is not set when submitting application(line 218 to 233). Job id is 
set based on cluster id. After the job is fixed, runApplicationAsync is called 
with enforceSingleJobExecution=true. So multi-job execution is not viable.

If flink-kubernetes-operator always set job id when submitting application, 
condition of situation #1 will never match. So application submitted with 
flink-kubernetes-operator cannot execute multiple jobs, even if the application 
is deployed in application mode and with HA turned off.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to