I' m trying to submit multiple DELETE statements in to the TableEnvironment
in way:

val settings = EnvironmentSettings.newInstance.inBatchMode.build()
val env = TableEnvironment.create(settings)
createSchema(env)
val queries = getDeleteQueries
queries.foreach(q => env.executeSql(q).await())

what according to documentation should just work delete-statements
<https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/delete/#delete-statements>.
But in reality I have follow exception after the first query is finished:

Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have
more than one execute() or executeAsync() call in a single
environment.
    at 
org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:199)
~[flink-dist-1.19.1.jar:1.19.1]
    at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:187)
~[flink-dist-1.19.1.jar:1.19.1]
    at 
org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:110)
~[?:?]
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1032)
~[flink-table-api-java-uber-1.19.1.jar:1.19.1]
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:876)
~[flink-table-api-java-uber-1.19.1.jar:1.19.1]
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1112)
~[flink-table-api-java-uber-1.19.1.jar:1.19.1]
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
~[flink-table-api-java-uber-1.19.1.jar:1.19.1]

When I check the code I see that there is indeed a validation that not more
then 1 job can be submitted:

    private void validateAllowedExecution() {
    if (enforceSingleJobExecution && jobCounter > 0) {
        throw new FlinkRuntimeException(
                "Cannot have more than one execute() or executeAsync()
call in a single environment.");
    }
    jobCounter++;
}

When I've tried to submit multiple statements separated by ; I've got
follow exception:

Caused by: java.lang.IllegalArgumentException: only single statement supported
    at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
~[flink-dist-1.19.1.jar:1.19.1]
    at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:104)
~[?:?]
    at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
~[flink-table-api-java-uber-1.19.1.jar:1.19.1]

My job is submitted to Kubernetes vie usage of flink-kubernetes-operator
<https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.10/>

Flink version is 1.19.1

Flink Kubernetes operator version is 1.10.0

My queries look like:

DELETE FROM hdfs_table
WHERE id IN (
  SELECT DISTINCT a.id
  FROM jdbc_table_a a
  JOIN jdbc_table_b b ON a.b_id = b.id
WHERE b.should_be_deleted
)

Two questions which I would appreciate to get help with:

   1. What am I missing in the configuration that's preventing me from
   submitting multiple DELETE statements?
   2. Is it possible to submit multiple DELETE questions in parallel to be
   able to reuse jdbc tables rather than load them on every query?

Reply via email to