Hi Flink Community, I need some help with JDBC sink in Datastream API. I can produce some records and sink it to database correctly. However, if I wait for 5 minutes between insertions. I will run into broken pipeline issue. Ater that, the Flink application will restart and recover from checkpoint and execute the failed SQL query. I tried hard to search for resources to understand such broken pipeline will happen, but I still can’t understand it.
The interesting thing is that, if the idle time is around 3 minutes, everything seems to be fine. It seems to be a timeout related issue, but I just don’t know what should I do to fix the issue. I have shared the sink code. Could anyone share some ideas? Thank you so much! My environment settings: Flink version: 1.12.1 Scala version: 2.11 Java version: 1.11 Flink System parallelism: 1 JDBC Driver: Oracle ojdbc10 Database: Oracle Autonomous Database on Oracle Cloud Infrastructure(You can regard this as an cloud based Oracle Database) The code for the sink: boDataStream .addSink( JdbcSink.sink( "INSERT INTO TEST_ADW (invoice_id, json_doc) values(?, ?)", (preparedStatement, testInvoiceBo) -> { try { Gson gson = new GsonBuilder() .excludeFieldsWithoutExposeAnnotation() .create(); String invoiceId = testInvoiceBo.getINVOICE_ID(); String json = gson.toJson(testInvoiceBo); log.info("insertion information: {}", json); preparedStatement.setString(1, invoiceId); preparedStatement.setString(2, json); } catch (JsonIOException e) { log.error("Failed to parse JSON", e); } }, new JdbcExecutionOptions.Builder() .withBatchIntervalMs(0) .withBatchSize(1) .withMaxRetries(3) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl(DB_URL) .withDriverName("oracle.jdbc.driver.OracleDriver") .withUsername("admin") .withPassword("password") .build())) .name("adwSink") .uid("adwSink") .setParallelism(1); The JDBC broken pipeline log: