Hi, vtygoss
You can check out the official demo[1]
```
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
val settings = EnvironmentSettings
.newInstance()
//.inStreamingMode()
.inBatchMode()
.build()
val tEnv = TableEnvironment.create(setting)
```
Regards
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/common/
> On May 19, 2021, at 18:01, vtygoss <[email protected]> wrote:
>
>
> Hi,
>
> I have below use case
>
> Insert bounded data into dynamic table(upsert-kafka) using Flink 1.12 on
> yarn, but yarn application is still running when insert job finished, and
> yarn container is not released.
>
> I try to use BatchTableEnvironment, but “Primary key and unique key are not
> supported yet”; i try to use
> StreamingExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH), but
> it not works.
>
> Please help to offer some advice.
>
> Regards
>
>
> ```
> [test case code]
> val (senv, btenv) = FlinkSession.getOrCreate()
> val table = btenv.fromValues(
> Row.ofKind(RowKind.INSERT, "1"),
> Row.ofKind(RowKind.INSERT, "2")).select("f0")
>
> btenv.createTemporaryView("bound", table)
> btenv.executeSql(s"create table if not exists test_result(" +
> "id string, PRIMARY KEY(id) NOT ENFORCED) WITH(" +
>
> s"'connector'='kafka','topic'='test_result','properties.bootstrap.servers'='${KAFKA_SERVER}',"
> +
> s"'key.format'='json','value.format'='json')")
> btenv.executeSql("insert into test_result select f0 from bound")
>
> ```
>
>
>
>
>