Hi,

You are using a special Kafka connector. From the definition in website:

"as a sink, the upsert-kafka connector can consume a changelog stream. It will 
write INSERT/UPDATE_AFTER data as normal Kafka messages value, and write DELETE 
data as Kafka messages with null values (indicate tombstone for the key).”

But in your sql statement, there is only one field which is a key field “total” 
in your source table “bla”. Thus, I suspect that there is not ant INSERT/UPDATE 
generated since you don’t have any modification to that key field. You can try 
to add one more field to your source table and see if there are any changes.

Hopes this reply can help you and best,
Yuan
 



> On 12 Oct 2023, at 2:37 AM, Ralph Matthias Debusmann 
> <matthias.debusm...@gmail.com> wrote:
> 
> Hi,
> 
> I am stumbling on the next Flink SQL problem - but I am sure you can help me 
> :)
> 
> I have an extremely simple table called "bla" which just has one column of 
> type double. Now I want to sink that table into a Kafka topic. This is how I 
> do it:
> 
> 
> CREATE TABLE bla_sink (
>     total DOUBLE,
>     PRIMARY KEY (total) NOT ENFORCED
> ) WITH (
>     'connector' = 'upsert-kafka',
>     'property-version' = 'universal',
>     'properties.bootstrap.servers' = 'localhost:9092',
>     'topic' = 'total',
>     'key.format' = 'json',
>     'value.format' = 'json',
>     'properties.group.id <http://properties.group.id/>' = 'bla_sink'
> );
> This does go through but the topic doesn't receive any messages... what could 
> be the problem? The primary key?
> 
> Best,
> Ralph
> 
> 

Reply via email to