Valentino Proietti created KAFKA-6742: -----------------------------------------
Summary: TopologyTestDriver error when dealing with stores from GlobalKTable Key: KAFKA-6742 URL: https://issues.apache.org/jira/browse/KAFKA-6742 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.1.0 Reporter: Valentino Proietti {color:#FF0000}This junit test simply fails:{color} @Test *public* *void* globalTable() { StreamsBuilder builder = *new* StreamsBuilder(); @SuppressWarnings("unused") *final* KTable<String,String> localTable = builder .table("local", Consumed._with_(Serdes._String_(), Serdes._String_()), Materialized._as_("localStore")) ; @SuppressWarnings("unused") *final* GlobalKTable<String,String> globalTable = builder .globalTable("global", Consumed._with_(Serdes._String_(), Serdes._String_()), Materialized._as_("globalStore")) ; // Properties props = *new* Properties(); props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test"); props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost"); TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), props); // *final* KeyValueStore<String,String> localStore = testDriver.getKeyValueStore("localStore"); Assert._assertNotNull_(localStore); Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore")); // *final* KeyValueStore<String,String> globalStore = testDriver.getKeyValueStore("globalStore"); Assert._assertNotNull_(globalStore); Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore")); // *final* ConsumerRecordFactory<String,String> crf = *new* ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer()); testDriver.pipeInput(crf.create("local", "one", "TheOne")); testDriver.pipeInput(crf.create("global", "one", "TheOne")); // Assert._assertEquals_("TheOne", localStore.get("one")); Assert._assertEquals_("TheOne", globalStore.get("one")); {color:#FF0000}to make it work I had to modify the TopologyTestDriver class as follow:{color} ... *public* Map<String, StateStore> getAllStateStores() { // final Map<String, StateStore> allStores = new HashMap<>(); // for (final String storeName : internalTopologyBuilder.allStateStoreName()) { // allStores.put(storeName, ((ProcessorContextImpl) task.context()).getStateMgr().getStore(storeName)); // } // return allStores; {color:#FF0000}// *FIXME*{color} *final* ProcessorStateManager psm = ((ProcessorContextImpl) task.context()).getStateMgr(); *final* Map<String, StateStore> allStores = *new* HashMap<>(); *for* (*final* String storeName : internalTopologyBuilder.allStateStoreName()) { StateStore res = psm.getStore(storeName); *if* (res == *null*) res = psm.getGlobalStore(storeName); allStores.put(storeName, res); } *return* allStores; } ... *public* StateStore getStateStore(*final* String name) { // return ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name); {color:#FF0000}// *FIXME*{color} *final* ProcessorStateManager psm = ((ProcessorContextImpl) task.context()).getStateMgr(); StateStore res = psm.getStore(name); *if* (res == *null*) res = psm.getGlobalStore(name); *return* res; } {color:#FF0000}moreover I think it would be very useful to make the internal MockProducer public for testing cases where a producer is used along side with the "normal" stream processing by adding the method:{color} /** * *@return* records sent with this producer are automatically streamed to the topology. */ *public* *final* Producer<*byte*[], *byte*[]> getProducer() { *return* producer; } {color:#FF0000}unfortunately this introduces another problem that could be verified by adding the following lines to the previous junit test:{color} ... ** // ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); // just to serialize keys and values testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, cr.timestamp(), cr.key(), cr.value())); testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, cr.timestamp(), cr.key(), cr.value())); testDriver.advanceWallClockTime(0); Assert._assertEquals_("TheOne", localStore.get("one")); Assert._assertEquals_("Second", localStore.get("two")); Assert._assertEquals_("TheOne", globalStore.get("one")); Assert._assertEquals_("Second", globalStore.get("two")); } {color:#FF0000}that could be fixed with:{color} *private* *void* captureOutputRecords() { // Capture all the records sent to the producer ... *final* List<ProducerRecord<*byte*[], *byte*[]>> output = producer.history(); producer.clear(); *for* (*final* ProducerRecord<*byte*[], *byte*[]> record : output) { Queue<ProducerRecord<*byte*[], *byte*[]>> outputRecords = outputRecordsByTopic.get(record.topic()); *if* (outputRecords == *null*) { outputRecords = *new* LinkedList<>(); outputRecordsByTopic.put(record.topic(), outputRecords); } outputRecords.add(record); // Forward back into the topology if the produced record is to an internal or a source topic ... *final* String outputTopicName = record.topic(); *if* (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName) || globalPartitionsByTopic.containsKey(outputTopicName)) { {color:#FF0000}// *FIXME*{color} *final* *byte*[] serializedKey = record.key(); *final* *byte*[] serializedValue = record.value(); pipeInput(*new* ConsumerRecord<>( outputTopicName, -1, -1L, record.timestamp(), TimestampType.*_CREATE_TIME_*, 0L, serializedKey == *null* ? 0 : serializedKey.length, serializedValue == *null* ? 0 : serializedValue.length, serializedKey, serializedValue)); } } } *Thank you* -- This message was sent by Atlassian JIRA (v7.6.3#76005)