Hello, I apologies with Matthias since I posted yesterday this issue on the wrong place on github :(
I'm trying a simple use case of session windowing. TimeWindows works perfectly, however as I replace with SessionWindows, this exception is thrown: Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:612) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) Caused by: java.lang.IndexOutOfBoundsException at java.nio.Buffer.checkIndex(Buffer.java:546) at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:416) at org.apache.kafka.streams.kstream.internals.SessionKeySerde.extractEnd(SessionKeySerde.java:117) at org.apache.kafka.streams.state.internals.SessionKeySchema.segmentTimestamp(SessionKeySchema.java:45) at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:71) at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore$1.restore(RocksDBSegmentedBytesStore.java:104) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:230) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99) at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.init(RocksDBSegmentedBytesStore.java:101) at org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.init(ChangeLoggingSegmentedBytesStore.java:68) at org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.init(MeteredSegmentedBytesStore.java:66) at org.apache.kafka.streams.state.internals.RocksDBSessionStore.init(RocksDBSessionStore.java:78) at org.apache.kafka.streams.state.internals.CachingSessionStore.init(CachingSessionStore.java:97) at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) ... 1 more the code is very simple: KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> macs = builder.stream(stringSerde, stringSerde, "test01"); macs .groupByKey() .aggregate(() -> new String(), (String aggKey, String value, String aggregate) -> { return aggregate += value; }, (String arg0, String arg1, String arg2) -> { return arg1 += arg2; }, SessionWindows.with(30 * 1000).until(10 * 60 * 1000),//TimeWindows.of(1000).until(1000), stringSerde, "aggs") .toStream().map((Windowed<String> key, String value) -> { return KeyValue.pair(key.key(), value); }).print(); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); Also with my real use case doesn't work. While debugging, I've noticed that is doesn't reach neither the beginning of the stream pipeline (groupby). Can you please help investigating this issue? Best. Marco