Hi, vtygoss
You can check out the official demo[1] ``` import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} val settings = EnvironmentSettings .newInstance() //.inStreamingMode() .inBatchMode() .build() val tEnv = TableEnvironment.create(setting) ``` Regards [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/common/ > On May 19, 2021, at 18:01, vtygoss <vtyg...@126.com> wrote: > > > Hi, > > I have below use case > > Insert bounded data into dynamic table(upsert-kafka) using Flink 1.12 on > yarn, but yarn application is still running when insert job finished, and > yarn container is not released. > > I try to use BatchTableEnvironment, but “Primary key and unique key are not > supported yet”; i try to use > StreamingExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH), but > it not works. > > Please help to offer some advice. > > Regards > > > ``` > [test case code] > val (senv, btenv) = FlinkSession.getOrCreate() > val table = btenv.fromValues( > Row.ofKind(RowKind.INSERT, "1"), > Row.ofKind(RowKind.INSERT, "2")).select("f0") > > btenv.createTemporaryView("bound", table) > btenv.executeSql(s"create table if not exists test_result(" + > "id string, PRIMARY KEY(id) NOT ENFORCED) WITH(" + > > s"'connector'='kafka','topic'='test_result','properties.bootstrap.servers'='${KAFKA_SERVER}'," > + > s"'key.format'='json','value.format'='json')") > btenv.executeSql("insert into test_result select f0 from bound") > > ``` > > > > >