Hello everyone, Answering my own question, it turns out that Flink Table Store removes the normalization node on read from an external log system only if log.changelog-mode='all' and log.consistency = 'transactional' [1].
1. https://github.com/apache/flink-table-store/blob/7e0d55ff3dc9fd48455b17d9a439647b0554d020/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java#L136-L141 Best, Alex On Fri, Dec 16, 2022 at 5:28 PM Alexander Sorokoumov < asorokou...@confluent.io> wrote: > Hello community, > > I want to ask about streaming queries with Flink Table Store. After > reading the documentation on Streaming Queries [1], I was under the > impression that only tables with LogStore-over-TableStore and No Changelog > Producer need the normalization step since the Kafka log has the `before` > values. > > However, when I created the following table: > > CREATE TABLE word_count ( > word STRING PRIMARY KEY NOT ENFORCED, > cnt BIGINT > ) WITH ( > 'connector' = 'table-store', > 'path' = 's3://my-bucket/table-store', > 'log.system' = 'kafka', > 'kafka.bootstrap.servers' = 'broker:9092', > 'kafka.topic' = 'word_count_log', > 'auto-create' = 'true', > 'log.changelog-mode' = 'all', > 'log.consistency' = 'eventual' > ); > > And ran a streaming query against it: > > SELECT * FROM word_count; > > The topology for this query had the normalization task > (ChangelogNormalize). > > Is this a bug or expected behavior? If it is the latter, can you please > clarify why this is the case? > > 1. > https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/streaming-query/ > > Thank you, > Alex >