Hi Marco, Did you run this example with the same store name using TimeWindows? It looks to me that it is trying to restore state from the changelog that has been used with TimeWindows. The data in the topic will be incompatible with SessionWindows as the keys are in a different format.
You'll either need to use a different store name, i.e, change "aggs", or you will need to use the streams reset tool to reset the topics: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool Thanks, Damian On Wed, 22 Feb 2017 at 09:35 Marco Abitabile <marco.abitab...@gmail.com> wrote: > Hello, > > I apologies with Matthias since I posted yesterday this issue on the wrong > place on github :( > > I'm trying a simple use case of session windowing. TimeWindows works > perfectly, however as I replace with SessionWindows, this exception is > thrown: > > Exception in thread "StreamThread-1" > org.apache.kafka.streams.errors.StreamsException: stream-thread > [StreamThread-1] Failed to rebalance > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:612) > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Caused by: java.lang.IndexOutOfBoundsException > at java.nio.Buffer.checkIndex(Buffer.java:546) > at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:416) > at > > org.apache.kafka.streams.kstream.internals.SessionKeySerde.extractEnd(SessionKeySerde.java:117) > at > > org.apache.kafka.streams.state.internals.SessionKeySchema.segmentTimestamp(SessionKeySchema.java:45) > at > > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:71) > at > > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore$1.restore(RocksDBSegmentedBytesStore.java:104) > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:230) > at > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193) > at > > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99) > at > > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.init(RocksDBSegmentedBytesStore.java:101) > at > > org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.init(ChangeLoggingSegmentedBytesStore.java:68) > at > > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.init(MeteredSegmentedBytesStore.java:66) > at > > org.apache.kafka.streams.state.internals.RocksDBSessionStore.init(RocksDBSessionStore.java:78) > at > > org.apache.kafka.streams.state.internals.CachingSessionStore.init(CachingSessionStore.java:97) > at > > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) > at > > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141) > at > > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > at > > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > at > > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > at > > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > at > > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > at > > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > at > > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > at > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > ... 1 more > > the code is very simple: > > KStreamBuilder builder = new KStreamBuilder(); > KStream<String, String> macs = builder.stream(stringSerde, > stringSerde, "test01"); > macs > .groupByKey() > .aggregate(() -> new String(), > (String aggKey, String value, String aggregate) -> > { > return aggregate += value; > }, > (String arg0, String arg1, String arg2) -> { > return arg1 += arg2; > }, > SessionWindows.with(30 * 1000).until(10 * 60 * > 1000),//TimeWindows.of(1000).until(1000), > stringSerde, "aggs") > .toStream().map((Windowed<String> key, String value) -> { > > return KeyValue.pair(key.key(), value); > }).print(); > > KafkaStreams streams = new KafkaStreams(builder, props); > streams.start(); > > Also with my real use case doesn't work. > While debugging, I've noticed that is doesn't reach neither the beginning > of the stream pipeline (groupby). > > Can you please help investigating this issue? > > Best. > Marco >