Hi, maybe you can see here[1]. When a record is written into the jdbc sink, it will first be collected with previous records as a batch. If the size of the input records exceeds the limit, the code you mentioned will work. It first emits the upsert records(insert or update_after), and then emit deleted records.
[1] https://github.com/apache/flink/blob/fe392645421d10923c75cd5438b91d9ed55900d3/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java#L195 -- Best! Xuyang At 2022-08-25 20:14:39, "Dhavan Vaidya" <dhavan.vai...@kofluence.com> wrote: Hello, I have postgres as source and mysql as sink. The user authenticating with mysql does _not_ have DELETE privileges. In some cases, flink throws error because it is trying to _delete_ records and the user does not have privilege. In most cases (of the same job), upsert is working as expected and no exception is thrown. My question is, when does flink execute DELETE statements if it is running in upsert mode? I have found https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java#L98 that might be useful, but I don't understand the logic here (I don't know Java anyway). Can someone help me understand when the delete statements will be executed? Thanks!