Vik Gamov created KAFKA-7595: -------------------------------- Summary: Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable Key: KAFKA-7595 URL: https://issues.apache.org/jira/browse/KAFKA-7595 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.0.0 Reporter: Vik Gamov
When perform KTable to KTable join after aggregation, there are duplicates in resulted KTable. 1. caching disabled, no materialized => duplicates {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);}} {{KTable<Long, Long> ratingCounts = ratingsById.count();}} {{KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} {{KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts,}} {{ (sum, count) -> sum / count.doubleValue());}} 2. caching disabled, materialized => duplicate {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);}}{{KTable<Long, Long> ratingCounts = ratingsById.count();}} {{KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} {{KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts,}} {{ (sum, count) -> sum / count.doubleValue(),}} {{ Materialized.as("average-ratings"));}} 3. caching enabled, materiazlized => all good {{// Enable record cache of size 10 MB.}} {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);}} {{// Set commit interval to 1 second.}} {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);}}{{KTable<Long, Long> ratingCounts = ratingsById.count();}} {{KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} {{KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts,}} {{ (sum, count) -> sum / count.doubleValue(),}} {{ Materialized.as("average-ratings"));}} Demo app [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107] -- This message was sent by Atlassian JIRA (v7.6.3#76005)