[ 
https://issues.apache.org/jira/browse/KAFKA-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16441136#comment-16441136
 ] 

ASF GitHub Bot commented on KAFKA-6742:
---------------------------------------

guozhangwang closed pull request #4823: KAFKA-6742: TopologyTestDriver error 
when dealing with stores from GlobalKTable
URL: https://github.com/apache/kafka/pull/4823
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index abcc99d362f..7eca27b826b 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -41,6 +41,7 @@
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.GlobalStateManager;
 import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
 import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
@@ -59,6 +60,7 @@
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.streams.test.OutputVerifier;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -162,18 +164,20 @@
  * @see OutputVerifier
  */
 @InterfaceStability.Evolving
-public class TopologyTestDriver {
+public class TopologyTestDriver implements Closeable {
 
     private final Time mockTime;
     private final InternalTopologyBuilder internalTopologyBuilder;
 
     private final static int PARTITION_ID = 0;
     private final static TaskId TASK_ID = new TaskId(0, PARTITION_ID);
-    private StreamTask task;
-    private GlobalStateUpdateTask globalStateTask;
+    private final StreamTask task;
+    private final GlobalStateUpdateTask globalStateTask;
+    private final GlobalStateManager globalStateManager;
 
     private final StateDirectory stateDirectory;
     private final ProcessorTopology processorTopology;
+    
     private final MockProducer<byte[], byte[]> producer;
 
     private final Set<String> internalTopics = new HashSet<>();
@@ -264,7 +268,7 @@ public void onRestoreEnd(TopicPartition topicPartition, 
String storeName, long t
                 consumer.updateEndOffsets(Collections.singletonMap(partition, 
0L));
             }
 
-            final GlobalStateManagerImpl stateManager = new 
GlobalStateManagerImpl(
+            globalStateManager = new GlobalStateManagerImpl(
                 new LogContext("mock "),
                 globalTopology,
                 consumer,
@@ -273,16 +277,19 @@ public void onRestoreEnd(TopicPartition topicPartition, 
String storeName, long t
                 streamsConfig);
 
             final GlobalProcessorContextImpl globalProcessorContext
-                = new GlobalProcessorContextImpl(streamsConfig, stateManager, 
streamsMetrics, cache);
-            stateManager.setGlobalProcessorContext(globalProcessorContext);
+                = new GlobalProcessorContextImpl(streamsConfig, 
globalStateManager, streamsMetrics, cache);
+            
globalStateManager.setGlobalProcessorContext(globalProcessorContext);
 
             globalStateTask = new GlobalStateUpdateTask(
                 globalTopology,
                 globalProcessorContext,
-                stateManager,
+                globalStateManager,
                 new LogAndContinueExceptionHandler(),
                 new LogContext());
             globalStateTask.initialize();
+        } else {
+            globalStateManager = null;
+            globalStateTask = null;
         }
 
         if (!partitionsByTopic.isEmpty()) {
@@ -303,6 +310,8 @@ public void onRestoreEnd(TopicPartition topicPartition, 
String storeName, long t
                 producer);
             task.initializeStateStores();
             task.initializeTopology();
+        } else {
+            task = null;
         }
     }
 
@@ -372,7 +381,8 @@ private void captureOutputRecords() {
 
             // Forward back into the topology if the produced record is to an 
internal or a source topic ...
             final String outputTopicName = record.topic();
-            if (internalTopics.contains(outputTopicName) || 
processorTopology.sourceTopics().contains(outputTopicName)) {
+            if (internalTopics.contains(outputTopicName) || 
processorTopology.sourceTopics().contains(outputTopicName)
+                    || globalPartitionsByTopic.containsKey(outputTopicName)) {
                 final byte[] serializedKey = record.key();
                 final byte[] serializedValue = record.value();
 
@@ -410,8 +420,10 @@ public void pipeInput(final List<ConsumerRecord<byte[], 
byte[]>> records) {
      */
     public void advanceWallClockTime(final long advanceMs) {
         mockTime.sleep(advanceMs);
-        task.maybePunctuateSystemTime();
-        task.commit();
+        if (task != null) {
+            task.maybePunctuateSystemTime();
+            task.commit();
+        }
         captureOutputRecords();
     }
 
@@ -450,7 +462,7 @@ public void advanceWallClockTime(final long advanceMs) {
         final V value = valueDeserializer.deserialize(record.topic(), 
record.value());
         return new ProducerRecord<>(record.topic(), record.partition(), 
record.timestamp(), key, value);
     }
-
+    
     /**
      * Get all {@link StateStore StateStores} from the topology.
      * The stores can be a "regular" or global stores.
@@ -467,7 +479,7 @@ public void advanceWallClockTime(final long advanceMs) {
     public Map<String, StateStore> getAllStateStores() {
         final Map<String, StateStore> allStores = new HashMap<>();
         for (final String storeName : 
internalTopologyBuilder.allStateStoreName()) {
-            allStores.put(storeName, ((ProcessorContextImpl) 
task.context()).getStateMgr().getStore(storeName));
+            allStores.put(storeName, getStateStore(storeName));
         }
         return allStores;
     }
@@ -487,7 +499,12 @@ public void advanceWallClockTime(final long advanceMs) {
      * @see #getSessionStore(String)
      */
     public StateStore getStateStore(final String name) {
-        return ((ProcessorContextImpl) 
task.context()).getStateMgr().getStore(name);
+        StateStore res = task == null ? null : 
+            ((ProcessorContextImpl) 
task.context()).getStateMgr().getStore(name);
+        if (res == null && globalStateManager != null) {
+            res = globalStateManager.getGlobalStore(name);
+        }
+        return res;
     }
 
     /**
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index b74a754a4d6..d757f3384b9 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -27,6 +27,7 @@
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -584,6 +585,10 @@ public void 
shouldForwardRecordsFromSubtopologyToSubtopology() {
     public void shouldPopulateGlobalStore() {
         testDriver = new 
TopologyTestDriver(setupGlobalStoreTopology(SOURCE_TOPIC_1), config);
 
+        final KeyValueStore<byte[], byte[]> globalStore = 
testDriver.getKeyValueStore(SOURCE_TOPIC_1 + "-globalStore");
+        Assert.assertNotNull(globalStore);
+        Assert.assertNotNull(testDriver.getAllStateStores().get(SOURCE_TOPIC_1 
+ "-globalStore"));
+
         testDriver.pipeInput(consumerRecord1);
 
         final List<Record> processedRecords = 
mockProcessors.get(0).processedRecords;
@@ -897,4 +902,21 @@ public void close() {}
             );
         }
     }
+    
+    @Test
+    public void shouldFeedStoreFromGlobalKTable() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.globalTable("topic",  
+            Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("globalStore"));
+        try (final TopologyTestDriver testDriver = new 
TopologyTestDriver(builder.build(), config)) {
+            final KeyValueStore<String, String> globalStore = 
testDriver.getKeyValueStore("globalStore");
+            Assert.assertNotNull(globalStore);
+            
Assert.assertNotNull(testDriver.getAllStateStores().get("globalStore"));
+            final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+            testDriver.pipeInput(recordFactory.create("topic", "k1", 
"value1"));
+            // we expect to have both in the global store, the one from 
pipeInput and the one from the producer
+            Assert.assertEquals("value1", globalStore.get("k1"));
+        }
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TopologyTestDriver error when dealing with stores from GlobalKTable
> -------------------------------------------------------------------
>
>                 Key: KAFKA-6742
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6742
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Valentino Proietti
>            Assignee: Valentino Proietti
>            Priority: Minor
>             Fix For: 1.2.0, 1.1.1
>
>
> {color:#ff0000}This junit test simply fails:{color}
> @Test
> *public* *void* globalTable() {
> StreamsBuilder builder = *new* StreamsBuilder();
> @SuppressWarnings("unused")
> *final* KTable<String,String> localTable = builder
> .table("local", 
> Consumed._with_(Serdes._String_(), Serdes._String_()),
> Materialized._as_("localStore"))
> ;
> @SuppressWarnings("unused")
> *final* GlobalKTable<String,String> globalTable = builder
> .globalTable("global", 
> Consumed._with_(Serdes._String_(), Serdes._String_()),
>         Materialized._as_("globalStore"))
> ;
> //
> Properties props = *new* Properties();
> props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test");
> props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost");
> TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), 
> props);
> //
> *final* KeyValueStore<String,String> localStore = 
> testDriver.getKeyValueStore("localStore");
> Assert._assertNotNull_(localStore);
> Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore"));
> //
> *final* KeyValueStore<String,String> globalStore = 
> testDriver.getKeyValueStore("globalStore");
> Assert._assertNotNull_(globalStore);
> Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore"));
> //
>     *final* ConsumerRecordFactory<String,String> crf = *new* 
> ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer());
> testDriver.pipeInput(crf.create("local", "one", "TheOne"));
> testDriver.pipeInput(crf.create("global", "one", "TheOne"));
> //
> Assert._assertEquals_("TheOne", localStore.get("one"));
> Assert._assertEquals_("TheOne", globalStore.get("one"));
>  
>  
> {color:#ff0000}to make it work I had to modify the TopologyTestDriver class 
> as follow:{color}
> ...
>     *public* Map<String, StateStore> getAllStateStores() {
> //        final Map<String, StateStore> allStores = new HashMap<>();
> //        for (final String storeName : 
> internalTopologyBuilder.allStateStoreName())
> { //            allStores.put(storeName, ((ProcessorContextImpl) 
> task.context()).getStateMgr().getStore(storeName)); //        }
> //        return allStores;
>     {color:#ff0000}// *FIXME*{color}
>     *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
> task.context()).getStateMgr();
>         *final* Map<String, StateStore> allStores = *new* HashMap<>();
>         *for* (*final* String storeName : 
> internalTopologyBuilder.allStateStoreName()) {            
> StateStore res = psm.getStore(storeName);            
> if (res == null)            
>   res = psm.getGlobalStore(storeName);            
> allStores.put(storeName, res);        
> }
>         *return* allStores;
>     }
> ...
>     *public* StateStore getStateStore(*final* String name) {
> //        return ((ProcessorContextImpl) 
> task.context()).getStateMgr().getStore(name);
>         {color:#ff0000}// *FIXME*{color}
>     *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
> task.context()).getStateMgr();
>         StateStore res = psm.getStore(name);
>         *if* (res == *null*)
>         res = psm.getGlobalStore(name);
>         *return* res;
>     }
>  
> {color:#ff0000}moreover I think it would be very useful to make the internal 
> MockProducer public for testing cases where a producer is used along side 
> with the "normal" stream processing by adding the method:{color}
>     /**
>      * *@return* records sent with this producer are automatically streamed 
> to the topology.
>      */
>     *public* *final* Producer<*byte*[], *byte*[]> getProducer() {     
> return producer;    
> }
>  
> {color:#ff0000}unfortunately this introduces another problem that could be 
> verified by adding the following lines to the previous junit test:{color}
> ...
> **
> //
> ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); 
> // just to serialize keys and values
> testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, 
> cr.timestamp(), cr.key(), cr.value()));
> testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, 
> cr.timestamp(), cr.key(), cr.value()));
> testDriver.advanceWallClockTime(0);
> Assert._assertEquals_("TheOne", localStore.get("one"));
> Assert._assertEquals_("Second", localStore.get("two"));
> Assert._assertEquals_("TheOne", globalStore.get("one"));
> Assert._assertEquals_("Second", globalStore.get("two"));
> }
>  
> {color:#ff0000}that could be fixed with:{color}
>  
>     *private* *void* captureOutputRecords() {
>         // Capture all the records sent to the producer ...
>         *final* List<ProducerRecord<*byte*[], *byte*[]>> output = 
> producer.history();
>         producer.clear();
>         *for* (*final* ProducerRecord<*byte*[], *byte*[]> record : output) {
>             Queue<ProducerRecord<*byte*[], *byte*[]>> outputRecords = 
> outputRecordsByTopic.get(record.topic());
>             *if* (outputRecords == *null*)
> {                 outputRecords = *new* LinkedList<>();                 
> outputRecordsByTopic.put(record.topic(), outputRecords);             }
>             outputRecords.add(record);
>  
>             // Forward back into the topology if the produced record is to an 
> internal or a source topic ...
>             *final* String outputTopicName = record.topic();
>             *if* (internalTopics.contains(outputTopicName) || 
> processorTopology.sourceTopics().contains(outputTopicName)
>             || globalPartitionsByTopic.containsKey(outputTopicName)) {  
> {color:#ff0000}// *FIXME*{color}
>                 *final* *byte*[] serializedKey = record.key();
>                 *final* *byte*[] serializedValue = record.value();
>  
>                 pipeInput(*new* ConsumerRecord<>(
>                     outputTopicName,
>                     -1,
>                     -1L,
>                     record.timestamp(),
>                     TimestampType.*_CREATE_TIME_*,
>                     0L,
>                     serializedKey == *null* ? 0 : serializedKey.length,
>                     serializedValue == *null* ? 0 : serializedValue.length,
>                     serializedKey,
>                     serializedValue));
>             }
>         }
>     }
>  
>  
>  
> *Thank you*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to