[ 
https://issues.apache.org/jira/browse/KAFKA-4963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15947496#comment-15947496
 ] 

Matthias J. Sax commented on KAFKA-4963:
----------------------------------------

I guess this would be possible to add into the API. If you want this feature, 
please create a JIRA for it (and add the label "needs-kip"). Personally, I 
think the API should be somewhat different -- but this would be part of the KIP 
discussion that is required for any user facing API changes: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

> Global Store: startup recovery process skipping processor
> ---------------------------------------------------------
>
>                 Key: KAFKA-4963
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4963
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Yennick Trevels
>
> This issue is related to the recovery process of a global store. It might be 
> that I'm misunderstanding the design of the global store as it's all quite 
> new to me, but I wanted to verify this case.
> I'm trying to create a global store with a processor which transforms the 
> values from the source and puts them into the state store, and I want all 
> these transformed values to be available in every streams job (therefore the 
> use of a global store)
> I'll give you an example which I created based on an existing Kafka Streams 
> unit test:
> {code}
> final StateStoreSupplier storeSupplier = Stores.create("my-store")
>                 
> .withStringKeys().withIntegerValues().inMemory().disableLogging().build();
> final String global = "global";
> final String topic = "topic";
> final KeyValueStore<String, String> globalStore = (KeyValueStore<String, 
> String>) storeSupplier.get();
> final TopologyBuilder topologyBuilder = this.builder
>         .addGlobalStore(globalStore, global, STRING_DESERIALIZER, 
> STRING_DESERIALIZER, topic, "processor", define(new 
> ValueToLengthStatefulProcessor("my-store")));
> driver = new ProcessorTopologyTestDriver(config, topologyBuilder);
> driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
> driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
> assertEquals("value1".length(), globalStore.get("key1"));
> assertEquals("value2".length(), globalStore.get("key2"));
> {code}
> The ValueToLengthStatefulProcessor basically takes the incoming value, 
> calculates the length of the string, and puts the result in the state store. 
> Note the difference in types between the source stream (string values) and 
> the data store (integer values)
> If I understand global stores correctly and based on what I've tried out 
> already, the stream of data runs like this:
> a source stream named "global" reading values from a Kafka topic called 
> "topic"  -> ValueToLengthStatefulProcessor -> "my-store" state store
> However, when the streams job starts up it runs the recovery process by 
> reading out the source stream again. I've noticed that in this case it seems 
> to skip the processor entirely and acts like the source stream is the 
> changelog of the state store, making the data flow like this during the 
> recovery process:
> source stream -> "my store" state store
> Because it acts like the source stream is the changelog of the state store, 
> it also tries to use the deserializer of the state store. This won't work 
> since the values of the state store should be integers, while the values in 
> the source stream are strings.
> So all this will startup nicely as long as the source stream has no values 
> yet. However, once the source stream has (string) values, the startup 
> recovery process will fail since it will be sourcing directly to the state 
> store instead of passing the source values to the processor.
> I believe this is caused by the following line of code in 
> TopologyBuilder.addGlobalStore, which connects the store directly to the 
> source topic.
> https://github.com/apache/kafka/blob/0.10.2.0/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java#L507
> Please let me know if I'm totally misunderstanding how global stores should 
> work. But I think this might be a crucial bug or design flaw.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to