[ https://issues.apache.org/jira/browse/KAFKA-4963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15946730#comment-15946730 ]
Damian Guy commented on KAFKA-4963: ----------------------------------- [~mjsax] what you have said is correct. This ticket can be closed as "not a problem" > Global Store: startup recovery process skipping processor > --------------------------------------------------------- > > Key: KAFKA-4963 > URL: https://issues.apache.org/jira/browse/KAFKA-4963 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0 > Reporter: Yennick Trevels > > This issue is related to the recovery process of a global store. It might be > that I'm misunderstanding the design of the global store as it's all quite > new to me, but I wanted to verify this case. > I'm trying to create a global store with a processor which transforms the > values from the source and puts them into the state store, and I want all > these transformed values to be available in every streams job (therefore the > use of a global store) > I'll give you an example which I created based on an existing Kafka Streams > unit test: > {code} > final StateStoreSupplier storeSupplier = Stores.create("my-store") > > .withStringKeys().withIntegerValues().inMemory().disableLogging().build(); > final String global = "global"; > final String topic = "topic"; > final KeyValueStore<String, String> globalStore = (KeyValueStore<String, > String>) storeSupplier.get(); > final TopologyBuilder topologyBuilder = this.builder > .addGlobalStore(globalStore, global, STRING_DESERIALIZER, > STRING_DESERIALIZER, topic, "processor", define(new > ValueToLengthStatefulProcessor("my-store"))); > driver = new ProcessorTopologyTestDriver(config, topologyBuilder); > driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); > driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); > assertEquals("value1".length(), globalStore.get("key1")); > assertEquals("value2".length(), globalStore.get("key2")); > {code} > The ValueToLengthStatefulProcessor basically takes the incoming value, > calculates the length of the string, and puts the result in the state store. > Note the difference in types between the source stream (string values) and > the data store (integer values) > If I understand global stores correctly and based on what I've tried out > already, the stream of data runs like this: > a source stream named "global" reading values from a Kafka topic called > "topic" -> ValueToLengthStatefulProcessor -> "my-store" state store > However, when the streams job starts up it runs the recovery process by > reading out the source stream again. I've noticed that in this case it seems > to skip the processor entirely and acts like the source stream is the > changelog of the state store, making the data flow like this during the > recovery process: > source stream -> "my store" state store > Because it acts like the source stream is the changelog of the state store, > it also tries to use the deserializer of the state store. This won't work > since the values of the state store should be integers, while the values in > the source stream are strings. > So all this will startup nicely as long as the source stream has no values > yet. However, once the source stream has (string) values, the startup > recovery process will fail since it will be sourcing directly to the state > store instead of passing the source values to the processor. > I believe this is caused by the following line of code in > TopologyBuilder.addGlobalStore, which connects the store directly to the > source topic. > https://github.com/apache/kafka/blob/0.10.2.0/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java#L507 > Please let me know if I'm totally misunderstanding how global stores should > work. But I think this might be a crucial bug or design flaw. -- This message was sent by Atlassian JIRA (v6.3.15#6346)