[ 
https://issues.apache.org/jira/browse/KAFKA-3207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-3207:
------------------------------------

    Assignee: Guozhang Wang

> StateStore seems to be writing state to one topic but restoring from another
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-3207
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3207
>             Project: Kafka
>          Issue Type: Bug
>          Components: kafka streams
>    Affects Versions: 0.9.1.0
>         Environment: MacOS El Capitan
>            Reporter: Tom Dearman
>            Assignee: Guozhang Wang
>            Priority: Blocker
>
> The state store (I am using in-memory state store) writes to a topic call 
> [store-name] but restores from [job-id]-[store-name]-changelog.  You can see 
> in StoreChangeLogger that it writes to a topic which is the [store-name] 
> passed through from the store supplier factory, but restores from the above 
> topic name. My topology is:
>               TopologyBuilder builder = new TopologyBuilder();
>               SerializerAdapter<CommonKey> commonKeyAdapter = new 
> SerializerAdapter<>(JDKBinarySerializer.INSTANCE);
>               SerializerAdapter<GamePlayValue> gamePlayAdapter = new 
> SerializerAdapter<>(JDKBinarySerializer.INSTANCE);
>               builder.addSource("SOURCE", commonKeyAdapter, gamePlayAdapter, 
> kafkaStreamConfig.getGamePlayTopic());
>               Duration activityInterval = 
> kafkaStreamConfig.getActivityInterval();
>               if (activityInterval.toMinutes() % 5 != 0 || 24 * 60 % 
> activityInterval.toMinutes() != 0)
>               {
>                       throw new SystemFaultException(
>                               "The game activity interval must be a multiple 
> of 5 minutes and divide into 24 hours current value [" +
>                                       activityInterval.toMinutes() + "]");
>               }
>               builder.addProcessor("PROCESS", new 
> GameActivitySupplier(kafkaStreamConfig.getStoreName(),
>                                                                        
> kafkaStreamConfig.getGameActivitySendPeriod(),
>                                                                        
> activityInterval,
>                                                                        
> kafkaStreamConfig.getRemoveOldestTime(),
>                                                                        
> kafkaStreamConfig.getRemoveAbsoluteTime()), "SOURCE");
>               SerializerAdapter<StoreValue> storeValueAdapter = new 
> SerializerAdapter<>(JDKBinarySerializer.INSTANCE);
>               builder.addStateStore(
>                       
> Stores.create(kafkaStreamConfig.getStoreName()).withKeys(commonKeyAdapter, 
> commonKeyAdapter).withValues(
>                               storeValueAdapter, 
> storeValueAdapter).inMemory().build(), "PROCESS");
>               builder.addSink("SINK", 
> kafkaStreamConfig.getGameActivityTopic(), commonKeyAdapter,
>                               new 
> SerializerAdapter<GameActivityTotalMessage>(JDKBinarySerializer.INSTANCE), 
> "PROCESS");



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to