Hi Carlos,

currently, the changelog output might not always be optimal. We are continously improving this. For the upsert Kafka connector, we have added an reducing buffer to avoid those tombstone messages:


Unfortunately, this is only available in 1.13. But maybe you can port these changes to 1.12. It should not be too difficult to add a custom connector.


On 14.07.21 08:21, Sanabria, Carlos wrote:
A little reminder, in case anyone knows why this happens.



*From:*Sanabria, Carlos
*Sent:* lunes, 14 de junio de 2021 17:49
*To:* user@flink.apache.org
*Subject:* Upsert Kafka SQL Connector used as a sink does not generate an upsert stream


I have a question related to the *Upsert Kafka SQL Connector, when used as a sink.*

For what I have tested, *it does not generate an upsert stream in Kafka*, but the Flink 1.12 documentation states that it does:

“As a sink, the upsert-kafka connector can consume a changelog stream. *It will write INSERT/UPDATE_AFTER data as normal Kafka messages value*, and write DELETE data as Kafka messages with null values (indicate tombstone for the key). Flink will guarantee the message ordering on the primary key by partition data on the values of the primary key columns, so the update/deletion messages on the same key will fall into the same partition.”

I have implemented a simple proof of concept of this connector, and in my Kafka output topic the UPSERT events are encoded with 2 Kafka events:

 1. DELETE event (tombstone)
 2. Normal event with the new value associated with that key

My expected behavior is that it only writes 1 single message (a normal event with the new value associated with that key) and, therefore, it does not write a tombstone message, which is not necessary.

I just wanted to know if I am doing something wrong, or if the documentation is mistaken, and this connector always generates a tombstone message + a normal event, for each UPSERT event.

Below is all the info related to the proof of concept I did with the connector:

    Source code

// Define env

*final*StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

*final*StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSettings);

// * Input tables

//      1. Movies Table


//      2. Directors Table


// * Output tables

//      1. DirectorsMovies Table


// Run SQL Query

Table queryTable = tableEnv.sqlQuery(Utils.getResourcesFileContent("dml/join-query.sql"));


// Execute Flink job

env.execute("Flink SQL Job");

    Entity relationship model

    Table definitions



   name STRING,

   nominatedToOscar *BOOLEAN*,

   directorId STRING,










*CREATE**TABLE*Directors (


   name STRING,










*CREATE**TABLE*DirectorsMovies (


   director STRING,

   movie STRING,

   nominatedToOscar *BOOLEAN*,










    SQL query


-- The id has the following format: "directorId:movieId"


-- Other fields

     Directors.name *AS*director,

     Movies.name *AS*movie,

     Movies.nominatedToOscar *AS*nominatedToOscar





*ON*Movies.directorId =Directors.*id*


     Movies.nominatedToOscar =*TRUE*

    Input events

Events are inserted in the same order as exposed here.

*TOPIC: *movies

*KEY: *movieId1

*VALUE: *{

   "id": "movieId1",

   "name": "Inception",

   "nominatedToOscar": true,

   "directorId": "directorId1"


*TOPIC: *directors

*KEY: *directorId1

*VALUE: *{

   "id": "directorId1",

   "name": "Christopher Nolan"


*TOPIC: *directors

*KEY: *directorId1

*VALUE: *{

   "id": "directorId1",

   "name": "Steven Spielberg"


    Output events

Events appear in the output Kafka topic in the same order as exposed here.

*TOPIC: *directors-movies

*KEY: *directorId1:movieId1

*VALUE: *{

   "id": "directorId1:movieId1",

   "director": "Christopher Nolan",

   "movie": "Inception",

   "nominatedToOscar": true


*TOPIC: *directors-movies

*KEY: *directorId1:movieId1


*INFO: *This is a DELETE event (tombstone), because the value is null

*TOPIC: *directors-movies

*KEY: *directorId1:movieId1

*VALUE: *{

   "id": "directorId1:movieId1",

   "director": "Steven Spielberg",

   "movie": "Inception",

   "nominatedToOscar": true


As you can see, the job generates 3 output events, whereas my expectation is that the tombstone event is not generated.

    Expected output events

*TOPIC: *directors-movies

*KEY: *directorId1:movieId1

*VALUE: *{

   "id": "directorId1:movieId1",

   "director": "Christopher Nolan",

   "movie": "Inception",

   "nominatedToOscar": true


*TOPIC: *directors-movies

*KEY: *directorId1:movieId1

*VALUE: *{

   "id": "directorId1:movieId1",

   "director": "Steven Spielberg",

   "movie": "Inception",

   "nominatedToOscar": true


Thanks in advance, and best regards.

Carlos Sanabria


