[ https://issues.apache.org/jira/browse/KAFKA-3207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang reassigned KAFKA-3207: ------------------------------------ Assignee: Guozhang Wang > StateStore seems to be writing state to one topic but restoring from another > ---------------------------------------------------------------------------- > > Key: KAFKA-3207 > URL: https://issues.apache.org/jira/browse/KAFKA-3207 > Project: Kafka > Issue Type: Bug > Components: kafka streams > Affects Versions: 0.9.1.0 > Environment: MacOS El Capitan > Reporter: Tom Dearman > Assignee: Guozhang Wang > Priority: Blocker > > The state store (I am using in-memory state store) writes to a topic call > [store-name] but restores from [job-id]-[store-name]-changelog. You can see > in StoreChangeLogger that it writes to a topic which is the [store-name] > passed through from the store supplier factory, but restores from the above > topic name. My topology is: > TopologyBuilder builder = new TopologyBuilder(); > SerializerAdapter<CommonKey> commonKeyAdapter = new > SerializerAdapter<>(JDKBinarySerializer.INSTANCE); > SerializerAdapter<GamePlayValue> gamePlayAdapter = new > SerializerAdapter<>(JDKBinarySerializer.INSTANCE); > builder.addSource("SOURCE", commonKeyAdapter, gamePlayAdapter, > kafkaStreamConfig.getGamePlayTopic()); > Duration activityInterval = > kafkaStreamConfig.getActivityInterval(); > if (activityInterval.toMinutes() % 5 != 0 || 24 * 60 % > activityInterval.toMinutes() != 0) > { > throw new SystemFaultException( > "The game activity interval must be a multiple > of 5 minutes and divide into 24 hours current value [" + > activityInterval.toMinutes() + "]"); > } > builder.addProcessor("PROCESS", new > GameActivitySupplier(kafkaStreamConfig.getStoreName(), > > kafkaStreamConfig.getGameActivitySendPeriod(), > > activityInterval, > > kafkaStreamConfig.getRemoveOldestTime(), > > kafkaStreamConfig.getRemoveAbsoluteTime()), "SOURCE"); > SerializerAdapter<StoreValue> storeValueAdapter = new > SerializerAdapter<>(JDKBinarySerializer.INSTANCE); > builder.addStateStore( > > Stores.create(kafkaStreamConfig.getStoreName()).withKeys(commonKeyAdapter, > commonKeyAdapter).withValues( > storeValueAdapter, > storeValueAdapter).inMemory().build(), "PROCESS"); > builder.addSink("SINK", > kafkaStreamConfig.getGameActivityTopic(), commonKeyAdapter, > new > SerializerAdapter<GameActivityTotalMessage>(JDKBinarySerializer.INSTANCE), > "PROCESS"); -- This message was sent by Atlassian JIRA (v6.3.4#6332)