Hi Kirill, As far as I know, multiple statements cannot be run by calling an execute() method on each, so you are missing nothing. Try to look at the Statement Set which allows you to submit multiple SQL statements within one Flink app.
For example: https://github.com/novakov-alexey/flink-ml-sandbox/blob/main/src/main/scala/com/example/customerChurn.scala#L223 Best regards, Alexey On Thu, Mar 20, 2025 at 4:57 PM Kirill Lyashko <kirill.v.lyas...@gmail.com> wrote: > I' m trying to submit multiple DELETE statements in to the > TableEnvironment in way: > > val settings = EnvironmentSettings.newInstance.inBatchMode.build() > val env = TableEnvironment.create(settings) > createSchema(env) > val queries = getDeleteQueries > queries.foreach(q => env.executeSql(q).await()) > > what according to documentation should just work delete-statements > <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/delete/#delete-statements>. > But in reality I have follow exception after the first query is finished: > > Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than > one execute() or executeAsync() call in a single environment. > at > org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:199) > ~[flink-dist-1.19.1.jar:1.19.1] > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:187) > ~[flink-dist-1.19.1.jar:1.19.1] > at > org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:110) > ~[?:?] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1032) > ~[flink-table-api-java-uber-1.19.1.jar:1.19.1] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:876) > ~[flink-table-api-java-uber-1.19.1.jar:1.19.1] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1112) > ~[flink-table-api-java-uber-1.19.1.jar:1.19.1] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735) > ~[flink-table-api-java-uber-1.19.1.jar:1.19.1] > > When I check the code I see that there is indeed a validation that not > more then 1 job can be submitted: > > private void validateAllowedExecution() { > if (enforceSingleJobExecution && jobCounter > 0) { > throw new FlinkRuntimeException( > "Cannot have more than one execute() or executeAsync() call > in a single environment."); > } > jobCounter++; > } > > When I've tried to submit multiple statements separated by ; I've got > follow exception: > > Caused by: java.lang.IllegalArgumentException: only single statement supported > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) > ~[flink-dist-1.19.1.jar:1.19.1] > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:104) > ~[?:?] > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728) > ~[flink-table-api-java-uber-1.19.1.jar:1.19.1] > > My job is submitted to Kubernetes vie usage of flink-kubernetes-operator > <https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.10/> > > Flink version is 1.19.1 > > Flink Kubernetes operator version is 1.10.0 > > My queries look like: > > DELETE FROM hdfs_table > WHERE id IN ( > SELECT DISTINCT a.id > FROM jdbc_table_a a > JOIN jdbc_table_b b ON a.b_id = b.id > WHERE b.should_be_deleted > ) > > Two questions which I would appreciate to get help with: > > 1. What am I missing in the configuration that's preventing me from > submitting multiple DELETE statements? > 2. Is it possible to submit multiple DELETE questions in parallel to > be able to reuse jdbc tables rather than load them on every query? > >