[
https://issues.apache.org/jira/browse/KAFKA-4828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15891151#comment-15891151
]
Matthias J. Sax commented on KAFKA-4828:
----------------------------------------
I am not sure if the JIRA title and error message align... Do you know about
this FAQ:
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whatdoesexception"Store<someStoreName>'schangelog(<someStoreName>-changelog)doesnotcontainpartition<someNumber>"mean?
Does this help?
> ProcessorTopologyTestDriver does not work when using .through()
> ---------------------------------------------------------------
>
> Key: KAFKA-4828
> URL: https://issues.apache.org/jira/browse/KAFKA-4828
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.2.0
> Reporter: Hamidreza Afzali
> Assignee: Hamidreza Afzali
> Labels: unit-test
>
> *Problem:*
> ProcessorTopologyTestDriver does not work when testing a topology that uses
> through().
> {code}
> org.apache.kafka.streams.errors.StreamsException: Store count2's change log
> (count2-topic) does not contain partition 1
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:81)
> {code}
> *Example:*
> {code}
> object Topology1 {
> def main(args: Array[String]): Unit = {
> val inputTopic = "input"
> val stateStore = "count"
> val stateStore2 = "count2"
> val outputTopic2 = "count2-topic"
> val inputs = Seq[(String, Integer)](("A", 1), ("A", 2))
> val props = new Properties
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
> val builder = new KStreamBuilder
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
> .groupByKey(Serdes.String, Serdes.Integer)
> .count(stateStore)
> .through(Serdes.String, Serdes.Long, outputTopic2, stateStore2)
> val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props),
> builder, stateStore, stateStore2)
> inputs.foreach {
> case (key, value) => {
> driver.process(inputTopic, key, value, Serdes.String.serializer,
> Serdes.Integer.serializer)
> val record = driver.readOutput(outputTopic2,
> Serdes.String.deserializer, Serdes.Long.deserializer)
> println(record)
> }
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)