Dear kafka mangers:
when I use kafka stream Tumbling Window, i found that if the producer
send messages to source topic continuously, the stream TimeWindow could work as
expected. but once i stop the producer, the last window couldn't close, until
the producer send next message. Is this normally?
my code is in the attachment.
Pease anser me when you have time, thanks a lot.
package com.example.server0.kafka;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2;
import static
org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.EMIT;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.example.server0.kafka.serge.JSONDeserializer;
import com.example.server0.kafka.serge.JSONSerializer;
import com.google.common.collect.Lists;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import
org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImpl;
import org.apache.kafka.streams.state.WindowStore;
import org.junit.jupiter.api.Test;
@Slf4j
public class KafkaStreamTests {
private static final String SOURCE_TOPIC = "sourceTopic";
private static final String SINK_TOPIC = "sinkTopic";
@Test
void helloWorld() {
// kafka config
Properties props = buildConfigProps();
Serde<String> stringSerde = Serdes.String();
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
stringSerde.getClass().getName());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
stringSerde.getClass().getName());
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
Serde<List<String>> jsonSerde = Serdes.serdeFrom(new JSONSerializer<>(),
new JSONDeserializer<>());
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> kStream = streamsBuilder.stream(SOURCE_TOPIC,
Consumed.with(stringSerde, stringSerde));
// Materialized<String, List<String>, SessionStore<Bytes, byte[]>>
materialized = Materialized
// .<String, List<String>, SessionStore<Bytes, byte[]>>as(
//
"session-window-store").withRetention(Duration.ofMinutes(2)).withKeySerde(stringSerde)
// .withValueSerde(jsonSerde);
//
// KTable<Windowed<String>, List<String>> kTable = kStream.groupByKey()
//
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(1)))
// .aggregate(() -> new ArrayList<>(), (k, v, agg) -> {
// System.out.println("========== aggregate record ==========");
// log.info("k: {}, v: {}, agg: {}", k, v, JSON.toJSONString(agg));
// agg.add(v);
// return agg;
// }, (k, agg1, agg2) -> agg2,
materialized).suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()));
Materialized<String, List<String>, WindowStore<Bytes, byte[]>>
storeMaterialized = Materialized.<String, List<String>, WindowStore<Bytes,
byte[]>>as(
"time-windowed-aggregated-stream-store").withKeySerde(stringSerde).withValueSerde(jsonSerde)
.withRetention(Duration.ofMinutes(5)).withCachingDisabled();
KTable<Windowed<String>, List<String>> kTable = kStream.groupBy((k, v) ->
"defaultKey")
.windowedBy(
TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(10)))
.aggregate(() -> new ArrayList<>(), (k, v, agg) -> {
System.out.println("========== aggregate record ==========");
log.info("k: {}, v: {}, agg: {}", k, v, JSON.toJSONString(agg));
agg.add(v);
return agg;
},
storeMaterialized).suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()));
kTable.mapValues(list -> list).mapValues(list -> list).toStream().flatMap(
(k, v) -> {
List<KeyValue<String, String>> keyValues = new ArrayList<>(v.size());
System.out.println("========== flatMap record ==========");
log.info("k: {}, v: {}", k, v);
v.stream().forEach(str -> {
JSONObject jsonObject = JSON.parseObject(str);
keyValues.add(new KeyValue<>(jsonObject.getString("index"), str));
});
log.info("keyValues: {}", keyValues);
return keyValues;
}).to(SINK_TOPIC, Produced.with(stringSerde, stringSerde));
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
kafkaStreams.start();
while (true) {
System.out.println("è¿è¡ä¸......");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private static Properties buildConfigProps() {
Properties props = new Properties();
String applicationId = "test_zhangjiashuai33";
props.put("bootstrap.servers", "192.168.10.152:9092");
props.put("group.id", applicationId);
props.put("application.id", applicationId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("max.poll.records", 1000);
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
return props;
}
@Test
public void sendSource() {
createTopic(SOURCE_TOPIC);
KafkaProducer<String, String> producer = new
KafkaProducer<>(buildConfigProps());
JSONObject jsonObject = new JSONObject();
for (int i = 3; i < 6 ; i++) {
jsonObject.put("index", i);
producer.send(
new ProducerRecord<>(SOURCE_TOPIC, String.valueOf(i),
jsonObject.toJSONString()));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
@Test
public void consumeSource() {
consume(SOURCE_TOPIC);
}
private void consume(String topic) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
String>(buildConfigProps());
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(3,
ChronoUnit.SECONDS));
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while (iterator.hasNext()) {
ConsumerRecord<String, String> record = iterator.next();
System.out.println("========== consume record ==========");
System.out.println(record.value());
}
}
}
public static void createTopic(String topic) {
Properties props = buildConfigProps();
Admin admin = Admin.create(props);
ListTopicsResult listTopicsResult = admin.listTopics();
try {
Set<String> topicNames = listTopicsResult.names().get();
if (topicNames.contains(topic)) {
return;
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
admin.createTopics(
Lists.newArrayList(new NewTopic(topic, 1, (short) 1)));
}
public static void main(String[] args) {
Properties props = buildConfigProps();
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
String>(props);
consumer.subscribe(Collections.singletonList(SOURCE_TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(3,
ChronoUnit.SECONDS));
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while (iterator.hasNext()) {
ConsumerRecord<String, String> record = iterator.next();
System.out.println("====================");
System.out.println(record.value());
}
}
}
}