Hello,

I apologies with Matthias since I posted yesterday this issue on the wrong
place on github :(

I'm trying a simple use case of session windowing. TimeWindows works
perfectly, however as I replace with SessionWindows, this exception is
thrown:

Exception in thread "StreamThread-1"
org.apache.kafka.streams.errors.StreamsException: stream-thread
[StreamThread-1] Failed to rebalance
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:612)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: java.lang.IndexOutOfBoundsException
at java.nio.Buffer.checkIndex(Buffer.java:546)
at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:416)
at
org.apache.kafka.streams.kstream.internals.SessionKeySerde.extractEnd(SessionKeySerde.java:117)
at
org.apache.kafka.streams.state.internals.SessionKeySchema.segmentTimestamp(SessionKeySchema.java:45)
at
org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:71)
at
org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore$1.restore(RocksDBSegmentedBytesStore.java:104)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:230)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
at
org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.init(RocksDBSegmentedBytesStore.java:101)
at
org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.init(ChangeLoggingSegmentedBytesStore.java:68)
at
org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.init(MeteredSegmentedBytesStore.java:66)
at
org.apache.kafka.streams.state.internals.RocksDBSessionStore.init(RocksDBSessionStore.java:78)
at
org.apache.kafka.streams.state.internals.CachingSessionStore.init(CachingSessionStore.java:97)
at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
at
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
... 1 more

the code is very simple:

KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> macs = builder.stream(stringSerde,
stringSerde, "test01");
        macs
                .groupByKey()
                .aggregate(() -> new String(),
                        (String aggKey, String value, String aggregate) -> {
                            return aggregate += value;
                        },
                        (String arg0, String arg1, String arg2) -> {
                            return arg1 += arg2;
                        },
                        SessionWindows.with(30 * 1000).until(10 * 60 *
1000),//TimeWindows.of(1000).until(1000),
                        stringSerde, "aggs")
                .toStream().map((Windowed<String> key, String value) -> {

                    return KeyValue.pair(key.key(), value);
                }).print();

KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

Also with my real use case doesn't work.
While debugging, I've noticed that is doesn't reach neither the beginning
of the stream pipeline (groupby).

Can you please help investigating this issue?

Best.
Marco

Reply via email to