Hi, vtygoss

You can check out the official demo[1]

```
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

val settings = EnvironmentSettings
    .newInstance()
    //.inStreamingMode()
    .inBatchMode()
    .build()

val tEnv = TableEnvironment.create(setting)
```

Regards


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/common/
  


> On May 19, 2021, at 18:01, vtygoss <vtyg...@126.com> wrote:
> 
> 
> Hi,
> 
> I have below use case
> 
> Insert bounded data into dynamic table(upsert-kafka) using Flink 1.12 on 
> yarn, but  yarn application is still running when insert job finished, and 
> yarn container is not released.
> 
> I try to use BatchTableEnvironment, but “Primary key and unique key are not 
> supported yet”; i try to use 
> StreamingExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH), but 
> it not works.
> 
> Please help to offer some advice. 
> 
> Regards
> 
> 
> ```
> [test case code]
> val (senv, btenv) = FlinkSession.getOrCreate()
> val table = btenv.fromValues(
>   Row.ofKind(RowKind.INSERT, "1"),
>   Row.ofKind(RowKind.INSERT, "2")).select("f0")
> 
> btenv.createTemporaryView("bound", table)
> btenv.executeSql(s"create table if not exists test_result(" +
>   "id string, PRIMARY KEY(id) NOT ENFORCED) WITH(" +
>   
> s"'connector'='kafka','topic'='test_result','properties.bootstrap.servers'='${KAFKA_SERVER}',"
>  +
>   s"'key.format'='json','value.format'='json')")
> btenv.executeSql("insert into test_result select f0 from bound")
> 
> ```
> 
>  
> 
> 
> 

Reply via email to