Hi, If you need JDBC upsert functionality, it's easier to implement app using Flink SQL. You could use JDBC Table Connector [1]. You could define primary key in DDL when writing data to external database. See CREATE TABLE DDL <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#create-table> for more details about PRIMARY KEY syntax. I find an example in `JdbcUpsertTableSinkITCase` of flink-connector-jdbc, hope this helps.
> StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().enableObjectReuse(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); > > Table t = > tEnv.fromDataStream( > get4TupleDataStream(env) > .assignTimestampsAndWatermarks( > new AscendingTimestampExtractor< > Tuple4<Integer, Long, String, > Timestamp>>() { > @Override > public long extractAscendingTimestamp( > Tuple4<Integer, Long, String, > Timestamp> > element) { > return element.f0; > } > }), > $("id"), > $("num"), > $("text"), > $("ts")); > > tEnv.createTemporaryView("T", t); > tEnv.executeSql( > "CREATE TABLE upsertSink (" > + " cnt BIGINT," > + " lencnt BIGINT," > + " cTag INT," > + " ts TIMESTAMP(3)" > + ") WITH (" > + " 'connector.type'='jdbc'," > + " 'connector.url'='XXXX'," > + " 'connector.table'='upsertSink'" > + ")"); > > tEnv.executeSql( > "INSERT INTO upsertSink \n" > + "SELECT cnt, COUNT(len) AS lencnt, cTag, MAX(ts) AS > ts\n" > + "FROM (\n" > + " SELECT len, COUNT(id) as cnt, cTag, MAX(ts) AS > ts\n" > + " FROM (SELECT id, CHAR_LENGTH(text) AS len, (CASE > WHEN id > 0 THEN 1 ELSE 0 END) cTag, ts FROM T)\n" > + " GROUP BY len, cTag\n" > + ")\n" > + "GROUP BY cnt, cTag") > .await(); > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/#key-handling [2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#create-table Best, JING ZHANG M Singh <mans2si...@yahoo.com> 于2021年10月17日周日 上午12:59写道: > Hi Folks: > > I am working on Flink DataStream pipeline and would like to use JDBC > upsert functionality. I found a class TableJdbcUpsertOutputFormat but am > not sure who to use it with the JdbcSink as shown in the document ( > https://nightlies.apache.org/flink/flink-docs-master/api/java//org/apache/flink/connector/jdbc/JdbcSink.html > ). > > I could not find how to pass OutputFormat argument to the JDBC sink. > > Please let me know if there is any documentation or example for using JDBC > sink with upsert for DataStreams. > > Thanks > > > > > >