Kenyore created FLINK-24626: ------------------------------- Summary: Flink JDBC Sink may lose data in retract stream Key: FLINK-24626 URL: https://issues.apache.org/jira/browse/FLINK-24626 Project: Flink Issue Type: Bug Components: Connectors / JDBC Reporter: Kenyore
The JDBC sink will lose some data while usingĀ TableBufferReducedStatementExecutor. Here are someĀ snippets. {code} @Override public void addToBatch(RowData record) throws SQLException { RowData key = keyExtractor.apply(record); if(record.getRowKind()==RowKind.DELETE) { //XXX cut delete off because the retract stream would generate return; } boolean flag = changeFlag(record.getRowKind()); RowData value = valueTransform.apply(record); // copy or not reduceBuffer.put(key, Tuple2.of(flag, value)); } private boolean changeFlag(RowKind rowKind) { switch (rowKind) { case INSERT: case UPDATE_AFTER: return true; case DELETE: case UPDATE_BEFORE: return false; default: throw new UnsupportedOperationException( String.format( "Unknown row kind, the supported row kinds is: INSERT, UPDATE_BEFORE, UPDATE_AFTER," + " DELETE, but get: %s.", rowKind)); } } {code} The code above add changeFlag to Tuple2 as the sign of upsert or delete {code} @Override public void executeBatch() throws SQLException { for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : reduceBuffer.entrySet()) { if (entry.getValue().f0) { upsertExecutor.addToBatch(entry.getValue().f1); } else { // delete by key deleteExecutor.addToBatch(entry.getKey()); } } upsertExecutor.executeBatch(); deleteExecutor.executeBatch(); reduceBuffer.clear(); } {code} executeBatch deletes all false flag data after true flag data. It means that the UPDATE_BEFORE could be execute after UPDATE_AFTER,and we would meet data lose because of this. -- This message was sent by Atlassian Jira (v8.3.4#803005)