Hi Mans, > > Is there a DataStream api for using the upsert functionality ? > You could try use `JdbcSink#sink` method, pass a upsert query as first parameter value. However, there is no standard syntax for upsert, you need to check whether the external database supports upsert or not. If yes, what's its upsert grammer. The following table describes the database-specific DML that is used[1]. DatabaseUpsert Grammar MySQL INSERT .. ON DUPLICATE KEY UPDATE .. PostgreSQL INSERT .. ON CONFLICT .. DO UPDATE SET ..
Also, is there any reason for why the TableJdbcUpsertOutputFormat > constructors are not public ? > `TableJdbcUpsertOutputFormat` is designed to an internal class in Jdbc table connector. When build `JdbcOutputFormat`, `JdbcOutputFormat.Builder` would choose to create a `TableJdbcUpsertOutputFormat` or `JdbcOutputFormat` instance depends on whether key fields is defined in DML. [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/#idempotent-writes Best, JING ZHANG M Singh <mans2si...@yahoo.com> 于2021年10月19日周二 上午7:00写道: > Hi Jing: > > Thanks for your response and example. > > Is there a DataStream api for using the upsert functionality ? > > Also, is there any reason for why the TableJdbcUpsertOutputFormat > constructors are not public ? > > Thanks again for your help. > > Mans > > On Monday, October 18, 2021, 12:30:36 AM EDT, JING ZHANG < > beyond1...@gmail.com> wrote: > > > Hi, > If you need JDBC upsert functionality, it's easier to implement app using > Flink SQL. > You could use JDBC Table Connector [1]. You could define primary key in > DDL when writing data to external database. See CREATE TABLE DDL > <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#create-table> > for > more details about PRIMARY KEY syntax. > I find an example in `JdbcUpsertTableSinkITCase` of flink-connector-jdbc, > hope this helps. > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().enableObjectReuse(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); > > Table t = > tEnv.fromDataStream( > get4TupleDataStream(env) > .assignTimestampsAndWatermarks( > new AscendingTimestampExtractor< > Tuple4<Integer, Long, String, > Timestamp>>() { > @Override > public long extractAscendingTimestamp( > Tuple4<Integer, Long, String, > Timestamp> > element) { > return element.f0; > } > }), > $("id"), > $("num"), > $("text"), > $("ts")); > > tEnv.createTemporaryView("T", t); > tEnv.executeSql( > "CREATE TABLE upsertSink (" > + " cnt BIGINT," > + " lencnt BIGINT," > + " cTag INT," > + " ts TIMESTAMP(3)" > + ") WITH (" > + " 'connector.type'='jdbc'," > + " 'connector.url'='XXXX'," > + " 'connector.table'='upsertSink'" > + ")"); > > tEnv.executeSql( > "INSERT INTO upsertSink \n" > + "SELECT cnt, COUNT(len) AS lencnt, cTag, MAX(ts) AS > ts\n" > + "FROM (\n" > + " SELECT len, COUNT(id) as cnt, cTag, MAX(ts) AS > ts\n" > + " FROM (SELECT id, CHAR_LENGTH(text) AS len, (CASE > WHEN id > 0 THEN 1 ELSE 0 END) cTag, ts FROM T)\n" > + " GROUP BY len, cTag\n" > + ")\n" > + "GROUP BY cnt, cTag") > .await(); > > > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/#key-handling > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#create-table > > Best, > JING ZHANG > > M Singh <mans2si...@yahoo.com> 于2021年10月17日周日 上午12:59写道: > > Hi Folks: > > I am working on Flink DataStream pipeline and would like to use JDBC > upsert functionality. I found a class TableJdbcUpsertOutputFormat but am > not sure who to use it with the JdbcSink as shown in the document ( > https://nightlies.apache.org/flink/flink-docs-master/api/java//org/apache/flink/connector/jdbc/JdbcSink.html > ). > > I could not find how to pass OutputFormat argument to the JDBC sink. > > Please let me know if there is any documentation or example for using JDBC > sink with upsert for DataStreams. > > Thanks > > > > > >