[ 
https://issues.apache.org/jira/browse/KAFKA-10395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177139#comment-17177139
 ] 

John Roesler commented on KAFKA-10395:
--------------------------------------

Thanks, [~ableegoldman] , I agree with your analysis.

I think TestOutputTopic is supposed to be the "only" way to read data from 
TopologyTestDriver, so it seems that having it auto-register its topic argument 
is not practically different than just skipping that check.

Because TopicNameExtractor is only a one-way function from record to topic 
name, there's no way to check if a topic name "matches" a TopicNameExtractor. 
It seems like if we still really want to have some kind of check, we could add 
a `TopicNameExtractor#matches(String topicName)` method. But that interface may 
be unsatisfiable for some implementations.

I did notice that the javadoc on TopicNameExtractor says, "The topic name must 
already exist, since the Kafka Streams library will not try to automatically 
create the topic with the extracted name." So, maybe it makes more sense to 
make the TestOutputTopic register the output topics, and then throw an 
exception when the application produces to a topic that hasn't been 
"registered".

Even that might break some tests, since right now people can register the 
output topic after they produce data, so it would need a KIP.

Realistically, I don't think it's too bad just to remove that check. We already 
guarantee that processing is synchronous, so if you use the wrong output topic 
name, it should be pretty obvious that something is wrong when you get no 
output. I guess we could split the difference by still enforcing that check 
when there are only StaticTopicNameExtractors in the topology.

> TopologyTestDriver does not work with dynamic topic routing
> -----------------------------------------------------------
>
>                 Key: KAFKA-10395
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10395
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Sophie Blee-Goldman
>            Assignee: Sophie Blee-Goldman
>            Priority: Major
>              Labels: test-framework
>
> The TopologyTestDriver#read(topic) methods all call #getRecordsQueue which 
> checks 
>  
> {code:java}
> final Queue<ProducerRecord<byte[], byte[]>> outputRecords = 
> outputRecordsByTopic.get(topicName);
> if (outputRecords == null) {
>     if (!processorTopology.sinkTopics().contains(topicName)) {
>         throw new IllegalArgumentException("Unknown topic: " + topicName); 
>     } 
> }
> {code}
> The outputRecordsByTopic map keeps track of all topics that are actually 
> produced to, but obviously doesn't capture any topics that haven't yet 
> received output. The `processorTopology#sinkTopics` is supposed to account 
> for that by checking to make sure the topic is actually registered in the 
> topology, and throw an exception if not in case the user supplied the wrong 
> topic name to read from. 
> Unfortunately the TopicNameExtractor allows for dynamic routing of records to 
> any topic, so the topology isn't aware of all the possible output topics. If 
> trying to read from one of these topics that happens to not have received any 
> output yet, the test will throw the above misleading IllegalArgumentException.
> We could just relax this check, but warning users who may actually have 
> accidentally passed in the wrong topic to read from seems quite useful. A 
> better solution would be to require registering all possible output topics to 
> the TTD up front. This would obviously require a KIP, but it would be a very 
> small one and shouldn't be too much trouble
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to