[
https://issues.apache.org/jira/browse/KAFKA-4828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15893141#comment-15893141
]
ASF GitHub Bot commented on KAFKA-4828:
---------------------------------------
GitHub user hrafzali opened a pull request:
https://github.com/apache/kafka/pull/2629
KAFKA-4828: ProcessorTopologyTestDriver does not work when using through
This resolves the following issues in the ProcessorTopologyTestDriver:
- It should not create an internal changelog topic when using `through()`
- It should forward the produced record back into the topology if it is to
a source topic
Jira ticket: https://issues.apache.org/jira/browse/KAFKA-4828
The contribution is my original work and I license the work to the project
under the project's open source license.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/hrafzali/kafka
KAFKA-4828_ProcessorTopologyTestDriver_through
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/2629.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2629
----
commit 93b6e2f185866fd4ae50624085ded17f9cb4cac2
Author: Hamidreza Afzali <[email protected]>
Date: 2017-03-01T19:01:47Z
KAFKA-4828: ProcessorTopologyTestDriver gets names of stores and changelog
topics from the topology
commit 884c8d40ef1c3a235fd89f863fd84392d9abe8ae
Author: Hamidreza Afzali <[email protected]>
Date: 2017-03-02T21:50:58Z
KAFKA-4828: Added support to ProcessorTopologyTestDriver for processing
records produced to source topics
----
> ProcessorTopologyTestDriver does not work when using .through()
> ---------------------------------------------------------------
>
> Key: KAFKA-4828
> URL: https://issues.apache.org/jira/browse/KAFKA-4828
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.2.0
> Reporter: Hamidreza Afzali
> Assignee: Hamidreza Afzali
> Labels: unit-test
>
> *Problem:*
> ProcessorTopologyTestDriver does not work when testing a topology that uses
> through().
> {code}
> org.apache.kafka.streams.errors.StreamsException: Store count2's change log
> (count2-topic) does not contain partition 1
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:81)
> {code}
> *Example:*
> {code}
> object Topology1 {
> def main(args: Array[String]): Unit = {
> val inputTopic = "input"
> val stateStore = "count"
> val stateStore2 = "count2"
> val outputTopic2 = "count2-topic"
> val inputs = Seq[(String, Integer)](("A", 1), ("A", 2))
> val props = new Properties
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
> val builder = new KStreamBuilder
> builder.stream(Serdes.String, Serdes.Integer, inputTopic)
> .groupByKey(Serdes.String, Serdes.Integer)
> .count(stateStore)
> .through(Serdes.String, Serdes.Long, outputTopic2, stateStore2)
> val driver = new ProcessorTopologyTestDriver(new StreamsConfig(props),
> builder, stateStore, stateStore2)
> inputs.foreach {
> case (key, value) => {
> driver.process(inputTopic, key, value, Serdes.String.serializer,
> Serdes.Integer.serializer)
> val record = driver.readOutput(outputTopic2,
> Serdes.String.deserializer, Serdes.Long.deserializer)
> println(record)
> }
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)