sjwiesman commented on a change in pull request #14975: URL: https://github.com/apache/flink/pull/14975#discussion_r580600708
########## File path: docs/content/docs/connectors/datastream/jdbc.md ########## @@ -28,40 +28,119 @@ under the License. This connector provides a sink that writes data to a JDBC database. -To use it, add the following dependency to your project (along with your JDBC-driver): +To use it, add the following dependency to your project (along with your JDBC driver): {{< artifact flink-connector-jdbc withScalaVersion >}} Note that the streaming connectors are currently __NOT__ part of the binary distribution. See how to link with them for cluster execution [here]({{< ref "docs/dev/datastream/project-configuration" >}}). -Created JDBC sink provides at-least-once guarantee. -Effectively exactly-once can be achieved using upsert statements or idempotent updates. -Example usage: +## `JdbcSink.sink` -```java -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -env - .fromElements(...) - .addSink(JdbcSink.sink( - "insert into books (id, title, author, price, qty) values (?,?,?,?,?)", - (ps, t) -> { - ps.setInt(1, t.id); - ps.setString(2, t.title); - ps.setString(3, t.author); - ps.setDouble(4, t.price); - ps.setInt(5, t.qty); - }, - new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() - .withUrl(getDbMetadata().getUrl()) - .withDriverName(getDbMetadata().getDriverClass()) - .build())); -env.execute(); +The JDBC sink provides at-least-once guarantee. +Effectively though, exactly-once can be achieved by crafting upsert SQL statements or idempotent SQL updates. +Configuration goes as follow (see also [JdbcSink javadoc]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/connector/jdbc/JdbcSink.html)): + +``` +JdbcSink.sink( + sqlDmlStatement, // mandatory + jdbcStatementBuilder, // mandatory + jdbcExecutionOptions, // optional + jdbcConnectionOptions // mandatory +) +``` + +### SQL DML statement and JDBC statement builder + +The sink builds one [JDBC prepared statement](https://docs.oracle.com/en/java/javase/11/docs/api/java.sql/java/sql/PreparedStatement.html) from a user-provider SQL string, e.g.: + +``` +INSERT INTO some_table field1, field2 values (?, ?) +``` + +It then repeatedly calls a user-provided function to update that prepared statement with each value of the stream, e.g.: + +``` +(preparedStatement, someRecord) -> { ... update here the preparedStatement with values from someRecord ... } ``` -Please refer to the [API documentation]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/connector/jdbc/JdbcSink.html) for more details. +### JDBC execution options + +The SQL DML statements are executed in batches, which can optionally be configured with the following instance (see also [JdbcExecutionOptions javadoc]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.html)) + +``` Review comment: nit ```suggestion ```java ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org