[ https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16021520#comment-16021520 ]
Matthias J. Sax commented on KAFKA-5253: ---------------------------------------- As you can see from the code {{KStreamTestDriver#process()}} does lookup the source node to start processing via {{sourceNodeByTopicName()}} -- it did not run the example code but I guess it does not find the source node and you end up with NPE. Thus, {{sourceNodeByTopicName()}} should be able to handle patterns, too. > KStreamTestDriver must handle streams created with patterns > ----------------------------------------------------------- > > Key: KAFKA-5253 > URL: https://issues.apache.org/jira/browse/KAFKA-5253 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests > Affects Versions: 0.10.2.1 > Reporter: Wim Van Leuven > > *Context* > KStreamTestDriver is being used to unit test topologies while developing > KStreams apps. > One such topology uses a Pattern to consume from multiple topics at once. > *Problem* > The unit test of the topology fails because KStreamTestDriver fails to deal > with Patterns properly. > *Example* > Underneath is a unit test explaining what I understand should happen, but is > failing. > Explicitly adding a source topic matching the topic pattern, generates an > exception as the topology builder explicitly checks overlapping topic names > and patterns, in any order of adding pattern and topic. So, it is intended > behaviour. > {code:java} > @Test > public void shouldProcessFromSourcesThatDoMatchThePattern() { > // -- setup stream pattern > final KStream<String, String> source = > builder.stream(Pattern.compile("topic-source-\\d")); > source.to("topic-sink"); > // -- setup processor to capture results > final MockProcessorSupplier<String, String> processorSupplier = new > MockProcessorSupplier<>(); > source.process(processorSupplier); > // -- add source to stream data from > //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), > "topic-source-3"); > // -- build test driver > driver = new KStreamTestDriver(builder); > driver.setTime(0L); > // -- test > driver.process("topic-source-3", "A", "aa"); > // -- validate > // no exception was thrown > assertEquals(Utils.mkList("A:aa"), processorSupplier.processed); > } > {code} > *Solution* > If anybody can help in defining the solution, I can create a pull request for > this change. -- This message was sent by Atlassian JIRA (v6.3.15#6346)