SHWETA SINHA created KAFKA-12542: ------------------------------------ Summary: TopologyTestDriver Key: KAFKA-12542 URL: https://issues.apache.org/jira/browse/KAFKA-12542 Project: Kafka Issue Type: Bug Components: streams-test-utils Affects Versions: 2.6.0 Reporter: SHWETA SINHA
I am using Kafka Streams DSL to create topology. StreamsBuilder streamsBuilder=new StreamsBuilder(); valueSerde.configure(getConfigForSpecificAvro(appId),false); KStream<String, AvroDTO> stream = streamsBuilder.stream(inputTopic, Consumed.with(new Serdes.StringSerde(), valueSerde)); KStream<String, AvroDTO> filtered = stream .filter((key, value) -> ServiceConsumer.filter(key,value)); filtered .map((KeyValueMapper<String, AvroDTO, KeyValue<String, SpecificRecordBase>>) (key, value) -> ServiceConsumer.process(key,value)) .to((k,v,recordContext) -> v instanceof AvroDTO? dlqTopic:outputTopic,Produced.with(new Serdes.StringSerde(), valueSerde)); Topology topology=streamsBuilder.build(); KafkaStreams kafkaStreams=new KafkaStreams(topology,getKafkaStreamsConfig(appId)); To Test the Topology, I am using TopologyTestDriver. when(getKafkaStreamsConfig(any())).thenReturn(kafkaConfig); when(ServiceConsumer.filter(any(),any())).thenReturn(false); // when(ServiceConsumer.process(any(),any())).thenReturn(new KeyValue<>(statusDto.getTaskId().toString(),statusDto)); topologyTestDriver=new TopologyTestDriver(getTopologyAndStartKafkaStreams(),kafkaConfig); StreamInput = topologyTestDriver.createInputTopic("INPUT_TOPIC", new StringSerializer(), new KafkaAvroSerializer(schemaRegistryClient)); UpdateOutput =topologyTestDriver.createOutputTopic("OUTPUT_TOPIC",new StringDeserializer(),new KafkaAvroDeserializer(schemaRegistryClient,config)); StreamInput.pipeInput("Hi); assertThat(UpdateOutput .isEmpty()).isTrue(); I am checking if there are no filtered messages then my output topic is empty. Getting Error while Unit Testing java.lang.IllegalArgumentException: Unknown topic: OUTPUT_TOPIC Changing when(ServiceConsumer.filter(any(),any())).thenReturn(false); to true doesnt gives any error. -- This message was sent by Atlassian Jira (v8.3.4#803005)