[ https://issues.apache.org/jira/browse/KAFKA-4408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hamidreza Afzali reassigned KAFKA-4408: --------------------------------------- Assignee: Hamidreza Afzali > KTable doesn't work with ProcessorTopologyTestDriver in Kafka 0.10.1.0 > ---------------------------------------------------------------------- > > Key: KAFKA-4408 > URL: https://issues.apache.org/jira/browse/KAFKA-4408 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.1.0 > Environment: Linux > Reporter: Byron Nikolaidis > Assignee: Hamidreza Afzali > Labels: newbie, unit-test > > In Kafka 0.10.1.0, the ProcessorTopologyTestDriver no longer works with > KTables. The below test code worked fine under Kafka 0.10.0.1 but now > produces this error: > Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: > task [0_0] Could not find partition info for topic: alertInputTopic > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123) > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62) > at > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) > at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:120) > at > org.apache.kafka.test.ProcessorTopologyTestDriver.<init>(ProcessorTopologyTestDriver.java:174) > at > mil.navy.icap.kafka.streams.processor.track.ProcessorDriverTest2.main(ProcessorDriverTest2.java:41) > {code} > package mil.navy.icap.kafka.streams.processor.track; > import java.io.IOException; > import java.util.Properties; > import org.apache.kafka.clients.consumer.ConsumerConfig; > import org.apache.kafka.clients.producer.ProducerConfig; > import org.apache.kafka.clients.producer.ProducerRecord; > import org.apache.kafka.common.serialization.Serdes.StringSerde; > import org.apache.kafka.common.serialization.StringDeserializer; > import org.apache.kafka.common.serialization.StringSerializer; > import org.apache.kafka.streams.StreamsConfig; > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.KTable; > import org.apache.kafka.test.ProcessorTopologyTestDriver; > public class ProcessorDriverTest2 { > > public static void main(String[] args) throws IOException, > InterruptedException { > System.out.println("ProcessorDriverTest2"); > > Properties props = new Properties(); > props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, > "ProcessorDriverTest2"); > props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > > StreamsConfig streamsConfig = new StreamsConfig(props); > > // topology > KStreamBuilder kstreamBuilder = new KStreamBuilder(); > StringSerde stringSerde = new StringSerde(); > KTable<String, String> table = kstreamBuilder.table(stringSerde, > stringSerde, "alertInputTopic"); > table.to(stringSerde, stringSerde, "alertOutputTopic"); > > // create test driver > ProcessorTopologyTestDriver testDriver = new ProcessorTopologyTestDriver( > streamsConfig, > kstreamBuilder, > "alertStore"); > StringSerializer serializer = new StringSerializer(); > StringDeserializer deserializer = new StringDeserializer(); > // send data to input topic > testDriver.process("alertInputTopic", > "the Key", "the Value", serializer, serializer); > > // read data from output topic > ProducerRecord<String, String> rec = > testDriver.readOutput("alertOutputTopic", > deserializer, deserializer); > > System.out.println("rec: " + rec); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)