[
https://issues.apache.org/jira/browse/KAFKA-4960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
j yeargers updated KAFKA-4960:
------------------------------
Attachment: text.html
This is essentially all the app does:
ObjectMapper objectMapper = new ObjectMapper();
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> sourceStream =
kStreamBuilder.stream(yamlConfigRunner.getString("topic_sourcestream"));
RowReducer rowReducer = new RowReducer();
KTable<Windowed<String>, String> outTable =
sourceStream.groupByKey().reduce(rowReducer,
TimeWindows.of(65 * 60 * 1000L).advanceBy(5 * 60 *
1000).until(70 * 60 * 1000L),
"HourAggStore");
outTable.foreach((k, v) -> {
DDog.getDDog().increment("valueCount");
});
KGroupedStream<String, String> kGroupedStream =
sourceStream.groupByKey();
kGroupedStream.count("countstore");
kGroupedStream.count(TimeWindows.of(60000), "windowed-store-count");
kafkaStreams = new KafkaStreams(kStreamBuilder,config);
kafkaStreams.start();
The 'RowReducer' takes the JSON from the topic, converts it back to an Object
and combines two List<Object> objects.
Mostly this is for me to attempt a distributed query app. It uses
com.sun.net.httpserver.* as a 'contact point'
On Mon, Mar 27, 2017 at 5:56 PM, Matthias J. Sax (JIRA)
<[email protected]<mailto:[email protected]>> wrote:
[
https://issues.apache.org/jira/browse/KAFKA-4960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15944329#comment-15944329
]
Matthias J. Sax commented on KAFKA-4960:
----------------------------------------
Thanks for reporting this. Can you give some more context information? How can
we reproduce this?
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
> Invalid state store exception
> -----------------------------
>
> Key: KAFKA-4960
> URL: https://issues.apache.org/jira/browse/KAFKA-4960
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.2.0
> Reporter: j yeargers
> Attachments: text.html
>
>
> Attempts to run windowed aggregation returns this exception:
> 2017-03-27 20:14:28,776 [StreamThread-1] WARN
> o.a.k.s.p.internals.StreamThread - Unexpected state transition from RUNNING
> to NOT_RUNNING
> 2017-03-27 20:14:28,776 [StreamThread-1] WARN
> o.a.k.s.p.internals.StreamThread - Unexpected state transition from RUNNING
> to NOT_RUNNING
> Exception in thread "StreamThread-1"
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000, topic=vi_preproc,
> partition=1, offset=243574962
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException: store
> %s has closed
> at
> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDbIterator.hasNext(RocksDBStore.java:398)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBRangeIterator.hasNext(RocksDBStore.java:457)
> at
> org.apache.kafka.streams.state.internals.WindowStoreKeySchema$1.hasNext(WindowStoreKeySchema.java:30)
> at
> org.apache.kafka.streams.state.internals.SegmentIterator.hasNext(SegmentIterator.java:69)
> at
> org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore$MeteredSegmentedBytesStoreIterator.hasNext(MeteredSegmentedBytesStore.java:131)
> at
> org.apache.kafka.streams.state.internals.RocksDBWindowStore$TheWindowStoreIterator.hasNext(RocksDBWindowStore.java:131)
> at
> org.apache.kafka.streams.state.internals.AbstractMergedSortedCacheStoreIterator.next(AbstractMergedSortedCacheStoreIterator.java:84)
> at
> org.apache.kafka.streams.state.internals.AbstractMergedSortedCacheStoreIterator.next(AbstractMergedSortedCacheStoreIterator.java:35)
> at
> org.apache.kafka.streams.kstream.internals.KStreamWindowReduce$KStreamWindowReduceProcessor.process(KStreamWindowReduce.java:94)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
> ... 2 more
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)