Dear kafka mangers: when I use kafka stream Tumbling Window, i found that if the producer send messages to source topic continuously, the stream TimeWindow could work as expected. but once i stop the producer, the last window couldn't close, until the producer send next message. Is this normally? my code is in the attachment. Pease anser me when you have time, thanks a lot.
package com.example.server0.kafka; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2; import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.EMIT; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.example.server0.kafka.serge.JSONDeserializer; import com.example.server0.kafka.serge.JSONSerializer; import com.google.common.collect.Lists; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImpl; import org.apache.kafka.streams.state.WindowStore; import org.junit.jupiter.api.Test; @Slf4j public class KafkaStreamTests { private static final String SOURCE_TOPIC = "sourceTopic"; private static final String SINK_TOPIC = "sinkTopic"; @Test void helloWorld() { // kafka config Properties props = buildConfigProps(); Serde<String> stringSerde = Serdes.String(); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, stringSerde.getClass().getName()); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName()); props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000); props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); Serde<List<String>> jsonSerde = Serdes.serdeFrom(new JSONSerializer<>(), new JSONDeserializer<>()); StreamsBuilder streamsBuilder = new StreamsBuilder(); KStream<String, String> kStream = streamsBuilder.stream(SOURCE_TOPIC, Consumed.with(stringSerde, stringSerde)); // Materialized<String, List<String>, SessionStore<Bytes, byte[]>> materialized = Materialized // .<String, List<String>, SessionStore<Bytes, byte[]>>as( // "session-window-store").withRetention(Duration.ofMinutes(2)).withKeySerde(stringSerde) // .withValueSerde(jsonSerde); // // KTable<Windowed<String>, List<String>> kTable = kStream.groupByKey() // .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(1))) // .aggregate(() -> new ArrayList<>(), (k, v, agg) -> { // System.out.println("========== aggregate record =========="); // log.info("k: {}, v: {}, agg: {}", k, v, JSON.toJSONString(agg)); // agg.add(v); // return agg; // }, (k, agg1, agg2) -> agg2, materialized).suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())); Materialized<String, List<String>, WindowStore<Bytes, byte[]>> storeMaterialized = Materialized.<String, List<String>, WindowStore<Bytes, byte[]>>as( "time-windowed-aggregated-stream-store").withKeySerde(stringSerde).withValueSerde(jsonSerde) .withRetention(Duration.ofMinutes(5)).withCachingDisabled(); KTable<Windowed<String>, List<String>> kTable = kStream.groupBy((k, v) -> "defaultKey") .windowedBy( TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(10))) .aggregate(() -> new ArrayList<>(), (k, v, agg) -> { System.out.println("========== aggregate record =========="); log.info("k: {}, v: {}, agg: {}", k, v, JSON.toJSONString(agg)); agg.add(v); return agg; }, storeMaterialized).suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())); kTable.mapValues(list -> list).mapValues(list -> list).toStream().flatMap( (k, v) -> { List<KeyValue<String, String>> keyValues = new ArrayList<>(v.size()); System.out.println("========== flatMap record =========="); log.info("k: {}, v: {}", k, v); v.stream().forEach(str -> { JSONObject jsonObject = JSON.parseObject(str); keyValues.add(new KeyValue<>(jsonObject.getString("index"), str)); }); log.info("keyValues: {}", keyValues); return keyValues; }).to(SINK_TOPIC, Produced.with(stringSerde, stringSerde)); KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); kafkaStreams.start(); while (true) { System.out.println("è¿è¡ä¸......"); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { throw new RuntimeException(e); } } } private static Properties buildConfigProps() { Properties props = new Properties(); String applicationId = "test_zhangjiashuai33"; props.put("bootstrap.servers", "192.168.10.152:9092"); props.put("group.id", applicationId); props.put("application.id", applicationId); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("max.poll.records", 1000); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); return props; } @Test public void sendSource() { createTopic(SOURCE_TOPIC); KafkaProducer<String, String> producer = new KafkaProducer<>(buildConfigProps()); JSONObject jsonObject = new JSONObject(); for (int i = 3; i < 6 ; i++) { jsonObject.put("index", i); producer.send( new ProducerRecord<>(SOURCE_TOPIC, String.valueOf(i), jsonObject.toJSONString())); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } } } @Test public void consumeSource() { consume(SOURCE_TOPIC); } private void consume(String topic) { KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(buildConfigProps()); consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(3, ChronoUnit.SECONDS)); Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); while (iterator.hasNext()) { ConsumerRecord<String, String> record = iterator.next(); System.out.println("========== consume record =========="); System.out.println(record.value()); } } } public static void createTopic(String topic) { Properties props = buildConfigProps(); Admin admin = Admin.create(props); ListTopicsResult listTopicsResult = admin.listTopics(); try { Set<String> topicNames = listTopicsResult.names().get(); if (topicNames.contains(topic)) { return; } } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } admin.createTopics( Lists.newArrayList(new NewTopic(topic, 1, (short) 1))); } public static void main(String[] args) { Properties props = buildConfigProps(); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Collections.singletonList(SOURCE_TOPIC)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(3, ChronoUnit.SECONDS)); Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); while (iterator.hasNext()) { ConsumerRecord<String, String> record = iterator.next(); System.out.println("===================="); System.out.println(record.value()); } } } }