[ https://issues.apache.org/jira/browse/KAFKA-4960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
j yeargers updated KAFKA-4960: ------------------------------ Attachment: text.html This is essentially all the app does: ObjectMapper objectMapper = new ObjectMapper(); KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream<String, String> sourceStream = kStreamBuilder.stream(yamlConfigRunner.getString("topic_sourcestream")); RowReducer rowReducer = new RowReducer(); KTable<Windowed<String>, String> outTable = sourceStream.groupByKey().reduce(rowReducer, TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 * 1000).until(70 * 60 * 1000L), "HourAggStore"); outTable.foreach((k, v) -> { DDog.getDDog().increment("valueCount"); }); KGroupedStream<String, String> kGroupedStream = sourceStream.groupByKey(); kGroupedStream.count("countstore"); kGroupedStream.count(TimeWindows.of(60000), "windowed-store-count"); kafkaStreams = new KafkaStreams(kStreamBuilder,config); kafkaStreams.start(); The 'RowReducer' takes the JSON from the topic, converts it back to an Object and combines two List<Object> objects. Mostly this is for me to attempt a distributed query app. It uses com.sun.net.httpserver.* as a 'contact point' On Mon, Mar 27, 2017 at 5:56 PM, Matthias J. Sax (JIRA) <j...@apache.org<mailto:j...@apache.org>> wrote: [ https://issues.apache.org/jira/browse/KAFKA-4960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15944329#comment-15944329 ] Matthias J. Sax commented on KAFKA-4960: ---------------------------------------- Thanks for reporting this. Can you give some more context information? How can we reproduce this? -- This message was sent by Atlassian JIRA (v6.3.15#6346) > Invalid state store exception > ----------------------------- > > Key: KAFKA-4960 > URL: https://issues.apache.org/jira/browse/KAFKA-4960 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0 > Reporter: j yeargers > Attachments: text.html > > > Attempts to run windowed aggregation returns this exception: > 2017-03-27 20:14:28,776 [StreamThread-1] WARN > o.a.k.s.p.internals.StreamThread - Unexpected state transition from RUNNING > to NOT_RUNNING > 2017-03-27 20:14:28,776 [StreamThread-1] WARN > o.a.k.s.p.internals.StreamThread - Unexpected state transition from RUNNING > to NOT_RUNNING > Exception in thread "StreamThread-1" > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000, topic=vi_preproc, > partition=1, offset=243574962 > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: store > %s has closed > at > org.apache.kafka.streams.state.internals.RocksDBStore$RocksDbIterator.hasNext(RocksDBStore.java:398) > at > org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBRangeIterator.hasNext(RocksDBStore.java:457) > at > org.apache.kafka.streams.state.internals.WindowStoreKeySchema$1.hasNext(WindowStoreKeySchema.java:30) > at > org.apache.kafka.streams.state.internals.SegmentIterator.hasNext(SegmentIterator.java:69) > at > org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore$MeteredSegmentedBytesStoreIterator.hasNext(MeteredSegmentedBytesStore.java:131) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore$TheWindowStoreIterator.hasNext(RocksDBWindowStore.java:131) > at > org.apache.kafka.streams.state.internals.AbstractMergedSortedCacheStoreIterator.next(AbstractMergedSortedCacheStoreIterator.java:84) > at > org.apache.kafka.streams.state.internals.AbstractMergedSortedCacheStoreIterator.next(AbstractMergedSortedCacheStoreIterator.java:35) > at > org.apache.kafka.streams.kstream.internals.KStreamWindowReduce$KStreamWindowReduceProcessor.process(KStreamWindowReduce.java:94) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197) > ... 2 more -- This message was sent by Atlassian JIRA (v6.3.15#6346)