tim yu created FLINK-27215: ------------------------------ Summary: JDBC sink transiently deleted a record because of -u message of that record Key: FLINK-27215 URL: https://issues.apache.org/jira/browse/FLINK-27215 Project: Flink Issue Type: Improvement Components: Connectors / JDBC Affects Versions: 1.12.0 Reporter: tim yu
A record is deleted transiently when using JDBC sink in upsert mode. The -U message is processed as delete operation in class TableBufferReducedStatementExecutor. The following codes show how to process -U message: {code:java} /** * Returns true if the row kind is INSERT or UPDATE_AFTER, returns false if the row kind is * DELETE or UPDATE_BEFORE. */ private boolean changeFlag(RowKind rowKind) { switch (rowKind) { case INSERT: case UPDATE_AFTER: return true; case DELETE: case UPDATE_BEFORE: return false; default: throw new UnsupportedOperationException( String.format( "Unknown row kind, the supported row kinds is: INSERT, UPDATE_BEFORE, UPDATE_AFTER," + " DELETE, but get: %s.", rowKind)); } } @Override public void executeBatch() throws SQLException { for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : reduceBuffer.entrySet()) { if (entry.getValue().f0) { upsertExecutor.addToBatch(entry.getValue().f1); } else { // delete by key deleteExecutor.addToBatch(entry.getKey()); } } upsertExecutor.executeBatch(); deleteExecutor.executeBatch(); reduceBuffer.clear(); } {code} If -U and +U messages of one record are executed separately in different JDBC batches, that record will be deleted transiently in external database and then insert a new updated record to it. In fact, this record should be merely updated once in the external database. -- This message was sent by Atlassian Jira (v8.20.1#820001)