Hi,
If you need JDBC upsert functionality, it's easier to implement app using
Flink SQL.
You could use JDBC Table Connector [1]. You could define primary key in DDL
when writing data to external database. See CREATE TABLE DDL
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#create-table>
for
more details about PRIMARY KEY syntax.
I find an example in `JdbcUpsertTableSinkITCase` of flink-connector-jdbc,
hope this helps.

> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().enableObjectReuse();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>
> Table t =
>         tEnv.fromDataStream(
>                 get4TupleDataStream(env)
>                         .assignTimestampsAndWatermarks(
>                                 new AscendingTimestampExtractor<
>                                         Tuple4<Integer, Long, String, 
> Timestamp>>() {
>                                     @Override
>                                     public long extractAscendingTimestamp(
>                                             Tuple4<Integer, Long, String, 
> Timestamp>
>                                                     element) {
>                                         return element.f0;
>                                     }
>                                 }),
>                 $("id"),
>                 $("num"),
>                 $("text"),
>                 $("ts"));
>
> tEnv.createTemporaryView("T", t);
> tEnv.executeSql(
>         "CREATE TABLE upsertSink ("
>                 + "  cnt BIGINT,"
>                 + "  lencnt BIGINT,"
>                 + "  cTag INT,"
>                 + "  ts TIMESTAMP(3)"
>                 + ") WITH ("
>                 + "  'connector.type'='jdbc',"
>                 + "  'connector.url'='XXXX',"
>                 + "  'connector.table'='upsertSink'"
>                 + ")");
>
> tEnv.executeSql(
>                 "INSERT INTO upsertSink \n"
>                         + "SELECT cnt, COUNT(len) AS lencnt, cTag, MAX(ts) AS 
> ts\n"
>                         + "FROM (\n"
>                         + "  SELECT len, COUNT(id) as cnt, cTag, MAX(ts) AS 
> ts\n"
>                         + "  FROM (SELECT id, CHAR_LENGTH(text) AS len, (CASE 
> WHEN id > 0 THEN 1 ELSE 0 END) cTag, ts FROM T)\n"
>                         + "  GROUP BY len, cTag\n"
>                         + ")\n"
>                         + "GROUP BY cnt, cTag")
>         .await();
>
>

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/#key-handling
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#create-table

Best,
JING ZHANG

M Singh <mans2si...@yahoo.com> 于2021年10月17日周日 上午12:59写道:

> Hi Folks:
>
> I am working on Flink DataStream pipeline and would like to use JDBC
> upsert functionality.  I found a class TableJdbcUpsertOutputFormat but am
> not sure who to use it with the JdbcSink as shown in the document (
> https://nightlies.apache.org/flink/flink-docs-master/api/java//org/apache/flink/connector/jdbc/JdbcSink.html
> ).
>
> I could not find how to pass OutputFormat argument to the JDBC sink.
>
> Please let me know if there is any documentation or example for using JDBC
> sink with upsert for DataStreams.
>
> Thanks
>
>
>
>
>
>

Reply via email to