[ 
https://issues.apache.org/jira/browse/KAFKA-6742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Valentino Proietti updated KAFKA-6742:
--------------------------------------
    Description: 
{color:#ff0000}This junit test simply fails:{color}

@Test

*public* *void* globalTable() {

StreamsBuilder builder = *new* StreamsBuilder();

@SuppressWarnings("unused")

*final* KTable<String,String> localTable = builder

.table("local", 

Consumed._with_(Serdes._String_(), Serdes._String_()),

Materialized._as_("localStore"))

;

@SuppressWarnings("unused")

*final* GlobalKTable<String,String> globalTable = builder

.globalTable("global", 

Consumed._with_(Serdes._String_(), Serdes._String_()),

        Materialized._as_("globalStore"))

;

//

Properties props = *new* Properties();

props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test");

props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost");

TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), 
props);

//

*final* KeyValueStore<String,String> localStore = 
testDriver.getKeyValueStore("localStore");

Assert._assertNotNull_(localStore);

Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore"));

//

*final* KeyValueStore<String,String> globalStore = 
testDriver.getKeyValueStore("globalStore");

Assert._assertNotNull_(globalStore);

Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore"));

//

    *final* ConsumerRecordFactory<String,String> crf = *new* 
ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer());

testDriver.pipeInput(crf.create("local", "one", "TheOne"));

testDriver.pipeInput(crf.create("global", "one", "TheOne"));

//

Assert._assertEquals_("TheOne", localStore.get("one"));

Assert._assertEquals_("TheOne", globalStore.get("one"));

 

 

{color:#ff0000}to make it work I had to modify the TopologyTestDriver class as 
follow:{color}

...

    *public* Map<String, StateStore> getAllStateStores() {

//        final Map<String, StateStore> allStores = new HashMap<>();

//        for (final String storeName : 
internalTopologyBuilder.allStateStoreName())

{ //            allStores.put(storeName, ((ProcessorContextImpl) 
task.context()).getStateMgr().getStore(storeName)); //        }

//        return allStores;

    {color:#ff0000}// *FIXME*{color}

    *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
task.context()).getStateMgr();

        *final* Map<String, StateStore> allStores = *new* HashMap<>();

        *for* (*final* String storeName : 
internalTopologyBuilder.allStateStoreName()) {            

StateStore res = psm.getStore(storeName);            

if (res == null)            

  res = psm.getGlobalStore(storeName);            

allStores.put(storeName, res);        

}

        *return* allStores;

    }

...

    *public* StateStore getStateStore(*final* String name) {

//        return ((ProcessorContextImpl) 
task.context()).getStateMgr().getStore(name);

        {color:#ff0000}// *FIXME*{color}

    *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
task.context()).getStateMgr();

        StateStore res = psm.getStore(name);

        *if* (res == *null*)

        res = psm.getGlobalStore(name);

        *return* res;

    }

 

{color:#ff0000}moreover I think it would be very useful to make the internal 
MockProducer public for testing cases where a producer is used along side with 
the "normal" stream processing by adding the method:{color}

    /**

     * *@return* records sent with this producer are automatically streamed to 
the topology.

     */

    *public* *final* Producer<*byte*[], *byte*[]> getProducer() {     

return producer;    

}

 

{color:#ff0000}unfortunately this introduces another problem that could be 
verified by adding the following lines to the previous junit test:{color}

...

**

//

ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); // 
just to serialize keys and values

testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, 
cr.timestamp(), cr.key(), cr.value()));

testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, 
cr.timestamp(), cr.key(), cr.value()));

testDriver.advanceWallClockTime(0);

Assert._assertEquals_("TheOne", localStore.get("one"));

Assert._assertEquals_("Second", localStore.get("two"));

Assert._assertEquals_("TheOne", globalStore.get("one"));

Assert._assertEquals_("Second", globalStore.get("two"));

}

 

{color:#ff0000}that could be fixed with:{color}

 

    *private* *void* captureOutputRecords() {

        // Capture all the records sent to the producer ...

        *final* List<ProducerRecord<*byte*[], *byte*[]>> output = 
producer.history();

        producer.clear();

        *for* (*final* ProducerRecord<*byte*[], *byte*[]> record : output) {

            Queue<ProducerRecord<*byte*[], *byte*[]>> outputRecords = 
outputRecordsByTopic.get(record.topic());

            *if* (outputRecords == *null*)

{                 outputRecords = *new* LinkedList<>();                 
outputRecordsByTopic.put(record.topic(), outputRecords);             }

            outputRecords.add(record);

 

            // Forward back into the topology if the produced record is to an 
internal or a source topic ...

            *final* String outputTopicName = record.topic();

            *if* (internalTopics.contains(outputTopicName) || 
processorTopology.sourceTopics().contains(outputTopicName)

            || globalPartitionsByTopic.containsKey(outputTopicName)) {  
{color:#ff0000}// *FIXME*{color}

                *final* *byte*[] serializedKey = record.key();

                *final* *byte*[] serializedValue = record.value();

 

                pipeInput(*new* ConsumerRecord<>(

                    outputTopicName,

                    -1,

                    -1L,

                    record.timestamp(),

                    TimestampType.*_CREATE_TIME_*,

                    0L,

                    serializedKey == *null* ? 0 : serializedKey.length,

                    serializedValue == *null* ? 0 : serializedValue.length,

                    serializedKey,

                    serializedValue));

            }

        }

    }

 

 

 

*Thank you*

  was:
{color:#FF0000}This junit test simply fails:{color}

@Test

*public* *void* globalTable() {

StreamsBuilder builder = *new* StreamsBuilder();

@SuppressWarnings("unused")

*final* KTable<String,String> localTable = builder

.table("local", 

Consumed._with_(Serdes._String_(), Serdes._String_()),

Materialized._as_("localStore"))

;

@SuppressWarnings("unused")

*final* GlobalKTable<String,String> globalTable = builder

.globalTable("global", 

Consumed._with_(Serdes._String_(), Serdes._String_()),

        Materialized._as_("globalStore"))

;

//

Properties props = *new* Properties();

props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test");

props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost");

TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), 
props);

//

*final* KeyValueStore<String,String> localStore = 
testDriver.getKeyValueStore("localStore");

Assert._assertNotNull_(localStore);

Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore"));

//

*final* KeyValueStore<String,String> globalStore = 
testDriver.getKeyValueStore("globalStore");

Assert._assertNotNull_(globalStore);

Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore"));

//

    *final* ConsumerRecordFactory<String,String> crf = *new* 
ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer());

testDriver.pipeInput(crf.create("local", "one", "TheOne"));

testDriver.pipeInput(crf.create("global", "one", "TheOne"));

//

Assert._assertEquals_("TheOne", localStore.get("one"));

Assert._assertEquals_("TheOne", globalStore.get("one"));

 

 

{color:#FF0000}to make it work I had to modify the TopologyTestDriver class as 
follow:{color}

...

    *public* Map<String, StateStore> getAllStateStores() {

//        final Map<String, StateStore> allStores = new HashMap<>();

//        for (final String storeName : 
internalTopologyBuilder.allStateStoreName()) {

//            allStores.put(storeName, ((ProcessorContextImpl) 
task.context()).getStateMgr().getStore(storeName));

//        }

//        return allStores;

    {color:#FF0000}// *FIXME*{color}

    *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
task.context()).getStateMgr();

        *final* Map<String, StateStore> allStores = *new* HashMap<>();

        *for* (*final* String storeName : 
internalTopologyBuilder.allStateStoreName()) {

            StateStore res = psm.getStore(storeName);

            *if* (res == *null*)

            res = psm.getGlobalStore(storeName);

            allStores.put(storeName, res);

        }

        *return* allStores;

    }

...

    *public* StateStore getStateStore(*final* String name) {

//        return ((ProcessorContextImpl) 
task.context()).getStateMgr().getStore(name);

        {color:#FF0000}// *FIXME*{color}

    *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
task.context()).getStateMgr();

        StateStore res = psm.getStore(name);

        *if* (res == *null*)

        res = psm.getGlobalStore(name);

        *return* res;

    }

 

{color:#FF0000}moreover I think it would be very useful to make the internal 
MockProducer public for testing cases where a producer is used along side with 
the "normal" stream processing by adding the method:{color}

    /**

     * *@return* records sent with this producer are automatically streamed to 
the topology.

     */

    *public* *final* Producer<*byte*[], *byte*[]> getProducer() {

    *return* producer;

    }

 

{color:#FF0000}unfortunately this introduces another problem that could be 
verified by adding the following lines to the previous junit test:{color}

...

**

//

ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); // 
just to serialize keys and values

testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, 
cr.timestamp(), cr.key(), cr.value()));

testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, 
cr.timestamp(), cr.key(), cr.value()));

testDriver.advanceWallClockTime(0);

Assert._assertEquals_("TheOne", localStore.get("one"));

Assert._assertEquals_("Second", localStore.get("two"));

Assert._assertEquals_("TheOne", globalStore.get("one"));

Assert._assertEquals_("Second", globalStore.get("two"));

}

 

{color:#FF0000}that could be fixed with:{color}

 

    *private* *void* captureOutputRecords() {

        // Capture all the records sent to the producer ...

        *final* List<ProducerRecord<*byte*[], *byte*[]>> output = 
producer.history();

        producer.clear();

        *for* (*final* ProducerRecord<*byte*[], *byte*[]> record : output) {

            Queue<ProducerRecord<*byte*[], *byte*[]>> outputRecords = 
outputRecordsByTopic.get(record.topic());

            *if* (outputRecords == *null*) {

                outputRecords = *new* LinkedList<>();

                outputRecordsByTopic.put(record.topic(), outputRecords);

            }

            outputRecords.add(record);

 

            // Forward back into the topology if the produced record is to an 
internal or a source topic ...

            *final* String outputTopicName = record.topic();

            *if* (internalTopics.contains(outputTopicName) || 
processorTopology.sourceTopics().contains(outputTopicName)

            || globalPartitionsByTopic.containsKey(outputTopicName)) {  
{color:#FF0000}// *FIXME*{color}

                *final* *byte*[] serializedKey = record.key();

                *final* *byte*[] serializedValue = record.value();

 

                pipeInput(*new* ConsumerRecord<>(

                    outputTopicName,

                    -1,

                    -1L,

                    record.timestamp(),

                    TimestampType.*_CREATE_TIME_*,

                    0L,

                    serializedKey == *null* ? 0 : serializedKey.length,

                    serializedValue == *null* ? 0 : serializedValue.length,

                    serializedKey,

                    serializedValue));

            }

        }

    }

 

 

 

*Thank you*


> TopologyTestDriver error when dealing with stores from GlobalKTable
> -------------------------------------------------------------------
>
>                 Key: KAFKA-6742
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6742
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Valentino Proietti
>            Priority: Minor
>
> {color:#ff0000}This junit test simply fails:{color}
> @Test
> *public* *void* globalTable() {
> StreamsBuilder builder = *new* StreamsBuilder();
> @SuppressWarnings("unused")
> *final* KTable<String,String> localTable = builder
> .table("local", 
> Consumed._with_(Serdes._String_(), Serdes._String_()),
> Materialized._as_("localStore"))
> ;
> @SuppressWarnings("unused")
> *final* GlobalKTable<String,String> globalTable = builder
> .globalTable("global", 
> Consumed._with_(Serdes._String_(), Serdes._String_()),
>         Materialized._as_("globalStore"))
> ;
> //
> Properties props = *new* Properties();
> props.setProperty(StreamsConfig.*_APPLICATION_ID_CONFIG_*, "test");
> props.setProperty(StreamsConfig.*_BOOTSTRAP_SERVERS_CONFIG_*, "localhost");
> TopologyTestDriver testDriver = *new* TopologyTestDriver(builder.build(), 
> props);
> //
> *final* KeyValueStore<String,String> localStore = 
> testDriver.getKeyValueStore("localStore");
> Assert._assertNotNull_(localStore);
> Assert._assertNotNull_(testDriver.getAllStateStores().get("localStore"));
> //
> *final* KeyValueStore<String,String> globalStore = 
> testDriver.getKeyValueStore("globalStore");
> Assert._assertNotNull_(globalStore);
> Assert._assertNotNull_(testDriver.getAllStateStores().get("globalStore"));
> //
>     *final* ConsumerRecordFactory<String,String> crf = *new* 
> ConsumerRecordFactory<>(*new* StringSerializer(), *new* StringSerializer());
> testDriver.pipeInput(crf.create("local", "one", "TheOne"));
> testDriver.pipeInput(crf.create("global", "one", "TheOne"));
> //
> Assert._assertEquals_("TheOne", localStore.get("one"));
> Assert._assertEquals_("TheOne", globalStore.get("one"));
>  
>  
> {color:#ff0000}to make it work I had to modify the TopologyTestDriver class 
> as follow:{color}
> ...
>     *public* Map<String, StateStore> getAllStateStores() {
> //        final Map<String, StateStore> allStores = new HashMap<>();
> //        for (final String storeName : 
> internalTopologyBuilder.allStateStoreName())
> { //            allStores.put(storeName, ((ProcessorContextImpl) 
> task.context()).getStateMgr().getStore(storeName)); //        }
> //        return allStores;
>     {color:#ff0000}// *FIXME*{color}
>     *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
> task.context()).getStateMgr();
>         *final* Map<String, StateStore> allStores = *new* HashMap<>();
>         *for* (*final* String storeName : 
> internalTopologyBuilder.allStateStoreName()) {            
> StateStore res = psm.getStore(storeName);            
> if (res == null)            
>   res = psm.getGlobalStore(storeName);            
> allStores.put(storeName, res);        
> }
>         *return* allStores;
>     }
> ...
>     *public* StateStore getStateStore(*final* String name) {
> //        return ((ProcessorContextImpl) 
> task.context()).getStateMgr().getStore(name);
>         {color:#ff0000}// *FIXME*{color}
>     *final* ProcessorStateManager psm = ((ProcessorContextImpl) 
> task.context()).getStateMgr();
>         StateStore res = psm.getStore(name);
>         *if* (res == *null*)
>         res = psm.getGlobalStore(name);
>         *return* res;
>     }
>  
> {color:#ff0000}moreover I think it would be very useful to make the internal 
> MockProducer public for testing cases where a producer is used along side 
> with the "normal" stream processing by adding the method:{color}
>     /**
>      * *@return* records sent with this producer are automatically streamed 
> to the topology.
>      */
>     *public* *final* Producer<*byte*[], *byte*[]> getProducer() {     
> return producer;    
> }
>  
> {color:#ff0000}unfortunately this introduces another problem that could be 
> verified by adding the following lines to the previous junit test:{color}
> ...
> **
> //
> ConsumerRecord<*byte*[],*byte*[]> cr = crf.create("dummy", "two", "Second"); 
> // just to serialize keys and values
> testDriver.getProducer().send(*new* ProducerRecord<>("local", *null*, 
> cr.timestamp(), cr.key(), cr.value()));
> testDriver.getProducer().send(*new* ProducerRecord<>("global", *null*, 
> cr.timestamp(), cr.key(), cr.value()));
> testDriver.advanceWallClockTime(0);
> Assert._assertEquals_("TheOne", localStore.get("one"));
> Assert._assertEquals_("Second", localStore.get("two"));
> Assert._assertEquals_("TheOne", globalStore.get("one"));
> Assert._assertEquals_("Second", globalStore.get("two"));
> }
>  
> {color:#ff0000}that could be fixed with:{color}
>  
>     *private* *void* captureOutputRecords() {
>         // Capture all the records sent to the producer ...
>         *final* List<ProducerRecord<*byte*[], *byte*[]>> output = 
> producer.history();
>         producer.clear();
>         *for* (*final* ProducerRecord<*byte*[], *byte*[]> record : output) {
>             Queue<ProducerRecord<*byte*[], *byte*[]>> outputRecords = 
> outputRecordsByTopic.get(record.topic());
>             *if* (outputRecords == *null*)
> {                 outputRecords = *new* LinkedList<>();                 
> outputRecordsByTopic.put(record.topic(), outputRecords);             }
>             outputRecords.add(record);
>  
>             // Forward back into the topology if the produced record is to an 
> internal or a source topic ...
>             *final* String outputTopicName = record.topic();
>             *if* (internalTopics.contains(outputTopicName) || 
> processorTopology.sourceTopics().contains(outputTopicName)
>             || globalPartitionsByTopic.containsKey(outputTopicName)) {  
> {color:#ff0000}// *FIXME*{color}
>                 *final* *byte*[] serializedKey = record.key();
>                 *final* *byte*[] serializedValue = record.value();
>  
>                 pipeInput(*new* ConsumerRecord<>(
>                     outputTopicName,
>                     -1,
>                     -1L,
>                     record.timestamp(),
>                     TimestampType.*_CREATE_TIME_*,
>                     0L,
>                     serializedKey == *null* ? 0 : serializedKey.length,
>                     serializedValue == *null* ? 0 : serializedValue.length,
>                     serializedKey,
>                     serializedValue));
>             }
>         }
>     }
>  
>  
>  
> *Thank you*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to