Yennick Trevels created KAFKA-4963:
--------------------------------------

             Summary: Global Store: startup recovery process skipping processor
                 Key: KAFKA-4963
                 URL: https://issues.apache.org/jira/browse/KAFKA-4963
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.10.2.0
            Reporter: Yennick Trevels


This issue is related to the recovery process of a global store. It might be 
that I'm misunderstanding the design of the global store as it's all quite new 
to me, but I wanted to verify this case.
I'm trying to create a global store with a processor which transforms the 
values from the source and puts them into the state store, and I want all these 
transformed values to be available in every streams job (therefore the use of a 
global store)
I'll give you an example which I created based on an existing Kafka Streams 
unit test:

{code}
final StateStoreSupplier storeSupplier = Stores.create("my-store")
                
.withStringKeys().withIntegerValues().inMemory().disableLogging().build();
final String global = "global";
final String topic = "topic";
final KeyValueStore<String, String> globalStore = (KeyValueStore<String, 
String>) storeSupplier.get();
final TopologyBuilder topologyBuilder = this.builder
        .addGlobalStore(globalStore, global, STRING_DESERIALIZER, 
STRING_DESERIALIZER, topic, "processor", define(new 
ValueToLengthStatefulProcessor("my-store")));

driver = new ProcessorTopologyTestDriver(config, topologyBuilder);
driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
assertEquals("value1".length(), globalStore.get("key1"));
assertEquals("value2".length(), globalStore.get("key2"));
{code}

The ValueToLengthStatefulProcessor basically takes the incoming value, 
calculates the length of the string, and puts the result in the state store. 
Note the difference in types between the source stream (string values) and the 
data store (integer values)

If I understand global stores correctly and based on what I've tried out 
already, the stream of data runs like this:
a source stream named "global" reading values from a Kafka topic called "topic" 
 -> ValueToLengthStatefulProcessor -> "my-store" state store

However, when the streams job starts up it runs the recovery process by reading 
out the source stream again. I've noticed that in this case it seems to skip 
the processor entirely and acts like the source stream is the changelog of the 
state store, making the data flow like this during the recovery process:
source stream -> "my store" state store

Because it acts like the source stream is the changelog of the state store, it 
also tries to use the deserializer of the state store. This won't work since 
the values of the state store should be integers, while the values in the 
source stream are strings.
So all this will startup nicely as long as the source stream has no values yet. 
However, once the source stream has (string) values, the startup recovery 
process will fail since it will be sourcing directly to the state store instead 
of passing the source values to the processor.

I believe this is caused by the following line of code in 
TopologyBuilder.addGlobalStore, which connects the store directly to the source 
topic.
https://github.com/apache/kafka/blob/0.10.2.0/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java#L507

Please let me know if I'm totally misunderstanding how global stores should 
work. But I think this might be a crucial bug or design flaw.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to