Byron Nikolaidis created KAFKA-4408:
---------------------------------------

             Summary: KTable doesn't work with ProcessorTopologyTestDriver in 
Kafka 0.10.1.0
                 Key: KAFKA-4408
                 URL: https://issues.apache.org/jira/browse/KAFKA-4408
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.10.1.0
         Environment: Linux
            Reporter: Byron Nikolaidis


In Kafka 0.10.1.0, the ProcessorTopologyTestDriver no longer works with 
KTables.  The below test code worked fine under Kafka 0.10.0.1 but now produces 
this error:

Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: 
task [0_0] Could not find partition info for topic: alertInputTopic
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:120)
        at 
org.apache.kafka.test.ProcessorTopologyTestDriver.<init>(ProcessorTopologyTestDriver.java:174)
        at 
mil.navy.icap.kafka.streams.processor.track.ProcessorDriverTest2.main(ProcessorDriverTest2.java:41)

{code}
package mil.navy.icap.kafka.streams.processor.track;


import java.io.IOException;
import java.util.Properties;


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.ProcessorTopologyTestDriver;


public class ProcessorDriverTest2 {
 
 public static void main(String[] args) throws IOException, 
InterruptedException {


 System.out.println("ProcessorDriverTest2");
 
 Properties props = new Properties();
 props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "ProcessorDriverTest2");
 props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
 props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
 props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
 props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
 props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
 
 StreamsConfig streamsConfig = new StreamsConfig(props);
 
 // topology
 KStreamBuilder kstreamBuilder = new KStreamBuilder();
 StringSerde stringSerde = new StringSerde();
 KTable<String, String> table = kstreamBuilder.table(stringSerde,
 stringSerde, "alertInputTopic");
 table.to(stringSerde, stringSerde, "alertOutputTopic");
 
 // create test driver
 ProcessorTopologyTestDriver testDriver = new ProcessorTopologyTestDriver(
 streamsConfig, 
 kstreamBuilder, 
 "alertStore");


 StringSerializer serializer = new StringSerializer();
 StringDeserializer deserializer = new StringDeserializer();


 // send data to input topic
 testDriver.process("alertInputTopic", 
 "the Key", "the Value", serializer, serializer);
 
 // read data from output topic
 ProducerRecord<String, String> rec = testDriver.readOutput("alertOutputTopic", 
 deserializer, deserializer);
 
 System.out.println("rec: " + rec);
 }
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to