Dear kafka mangers: 
      when I use kafka stream Tumbling Window, i found that if the producer 
send messages to source topic continuously, the stream TimeWindow could work as 
expected. but once i stop the producer, the last window couldn't close, until 
the producer send next message. Is this normally? 
my code is in the attachment.
Pease anser me when you have time, thanks a lot.

package com.example.server0.kafka;

import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2;
import static 
org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.EMIT;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.example.server0.kafka.serge.JSONDeserializer;
import com.example.server0.kafka.serge.JSONSerializer;
import com.google.common.collect.Lists;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import 
org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImpl;
import org.apache.kafka.streams.state.WindowStore;
import org.junit.jupiter.api.Test;

@Slf4j
public class KafkaStreamTests {

  private static final String SOURCE_TOPIC = "sourceTopic";
  private static final String SINK_TOPIC = "sinkTopic";

  @Test
  void helloWorld() {
    // kafka config
    Properties props = buildConfigProps();
    Serde<String> stringSerde = Serdes.String();
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
stringSerde.getClass().getName());
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
stringSerde.getClass().getName());
    props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2);
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000);

    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

    Serde<List<String>> jsonSerde = Serdes.serdeFrom(new JSONSerializer<>(),
        new JSONDeserializer<>());
    StreamsBuilder streamsBuilder = new StreamsBuilder();

    KStream<String, String> kStream = streamsBuilder.stream(SOURCE_TOPIC,
        Consumed.with(stringSerde, stringSerde));

//    Materialized<String, List<String>, SessionStore<Bytes, byte[]>> 
materialized = Materialized
//        .<String, List<String>, SessionStore<Bytes, byte[]>>as(
//            
"session-window-store").withRetention(Duration.ofMinutes(2)).withKeySerde(stringSerde)
//        .withValueSerde(jsonSerde);
//
//    KTable<Windowed<String>, List<String>> kTable = kStream.groupByKey()
//        
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(1)))
//        .aggregate(() -> new ArrayList<>(), (k, v, agg) -> {
//          System.out.println("========== aggregate record ==========");
//          log.info("k: {}, v: {}, agg: {}", k, v, JSON.toJSONString(agg));
//          agg.add(v);
//          return agg;
//        }, (k, agg1, agg2) -> agg2, 
materialized).suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()));

    Materialized<String, List<String>, WindowStore<Bytes, byte[]>> 
storeMaterialized = Materialized.<String, List<String>, WindowStore<Bytes, 
byte[]>>as(
            
"time-windowed-aggregated-stream-store").withKeySerde(stringSerde).withValueSerde(jsonSerde)
        .withRetention(Duration.ofMinutes(5)).withCachingDisabled();
    KTable<Windowed<String>, List<String>> kTable = kStream.groupBy((k, v) -> 
"defaultKey")
        .windowedBy(
            TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(10)))
        .aggregate(() -> new ArrayList<>(), (k, v, agg) -> {
          System.out.println("========== aggregate record ==========");
          log.info("k: {}, v: {}, agg: {}", k, v, JSON.toJSONString(agg));
          agg.add(v);
          return agg;
        }, 
storeMaterialized).suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()));

    kTable.mapValues(list -> list).mapValues(list -> list).toStream().flatMap(
        (k, v) -> {
          List<KeyValue<String, String>> keyValues = new ArrayList<>(v.size());
          System.out.println("========== flatMap record ==========");
          log.info("k: {}, v: {}", k, v);
          v.stream().forEach(str -> {
            JSONObject jsonObject = JSON.parseObject(str);
            keyValues.add(new KeyValue<>(jsonObject.getString("index"), str));
          });
          log.info("keyValues: {}", keyValues);
          return keyValues;
        }).to(SINK_TOPIC, Produced.with(stringSerde, stringSerde));

    KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
    kafkaStreams.start();
    while (true) {
      System.out.println("运行中......");
      try {
        TimeUnit.SECONDS.sleep(2);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    }
  }

  private static Properties buildConfigProps() {
    Properties props = new Properties();
    String applicationId = "test_zhangjiashuai33";
    props.put("bootstrap.servers", "192.168.10.152:9092");
    props.put("group.id", applicationId);
    props.put("application.id", applicationId);
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("max.poll.records", 1000);
    props.put("auto.offset.reset", "earliest");
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    props.put("key.serializer", StringSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());
    return props;
  }

  @Test
  public void sendSource() {
    createTopic(SOURCE_TOPIC);
    KafkaProducer<String, String> producer = new 
KafkaProducer<>(buildConfigProps());
    JSONObject jsonObject = new JSONObject();
    for (int i = 3; i < 6 ; i++) {
      jsonObject.put("index", i);
      producer.send(
          new ProducerRecord<>(SOURCE_TOPIC, String.valueOf(i), 
jsonObject.toJSONString()));
      try {
        TimeUnit.SECONDS.sleep(1);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    }

  }

  @Test
  public void consumeSource() {
    consume(SOURCE_TOPIC);
  }


  private void consume(String topic) {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, 
String>(buildConfigProps());
    consumer.subscribe(Collections.singletonList(topic));
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(Duration.of(3, 
ChronoUnit.SECONDS));
      Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
      while (iterator.hasNext()) {
        ConsumerRecord<String, String> record = iterator.next();
        System.out.println("========== consume record ==========");
        System.out.println(record.value());
      }
    }
  }

  public static void createTopic(String topic) {
    Properties props = buildConfigProps();
    Admin admin = Admin.create(props);
    ListTopicsResult listTopicsResult = admin.listTopics();
    try {
      Set<String> topicNames = listTopicsResult.names().get();
      if (topicNames.contains(topic)) {
        return;
      }
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    } catch (ExecutionException e) {
      throw new RuntimeException(e);
    }
    admin.createTopics(
        Lists.newArrayList(new NewTopic(topic, 1, (short) 1)));
  }

  public static void main(String[] args) {
    Properties props = buildConfigProps();
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, 
String>(props);
    consumer.subscribe(Collections.singletonList(SOURCE_TOPIC));
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(Duration.of(3, 
ChronoUnit.SECONDS));
      Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
      while (iterator.hasNext()) {
        ConsumerRecord<String, String> record = iterator.next();
        System.out.println("====================");
        System.out.println(record.value());
      }
    }
  }

}

Reply via email to