Hi, maybe you can see here[1]. 
When a record is written into the jdbc sink, it will first be collected with 
previous records as a batch. If the size of the input records exceeds the 
limit, the code you mentioned will work. It first emits the upsert 
records(insert or update_after), and then emit deleted records.


[1] 
https://github.com/apache/flink/blob/fe392645421d10923c75cd5438b91d9ed55900d3/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java#L195




--

    Best!
    Xuyang




At 2022-08-25 20:14:39, "Dhavan Vaidya" <dhavan.vai...@kofluence.com> wrote:

Hello,


I have postgres as source and mysql as sink. The user authenticating with mysql 
does _not_ have DELETE privileges.


In some cases, flink throws error because it is trying to _delete_ records and 
the user does not have privilege. In most cases (of the same job), upsert is 
working as expected and no exception is thrown.



My question is, when does flink execute DELETE statements if it is running in 
upsert mode?


I have found 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/executor/TableBufferReducedStatementExecutor.java#L98
 that might be useful, but I don't understand the logic here (I don't know Java 
anyway).


Can someone help me understand when the delete statements will be executed?


Thanks!

Reply via email to