Tom Dearman created KAFKA-3207:
----------------------------------

             Summary: StateStore seems to be writing state to one topic but 
restoring from another
                 Key: KAFKA-3207
                 URL: https://issues.apache.org/jira/browse/KAFKA-3207
             Project: Kafka
          Issue Type: Bug
          Components: kafka streams
    Affects Versions: 0.9.1.0
         Environment: MacOS El Capitan
            Reporter: Tom Dearman
            Priority: Blocker


The state store (I am using in-memory state store) writes to a topic call 
[store-name] but restores from [job-id]-[store-name]-changelog.  You can see in 
StoreChangeLogger that it writes to a topic which is the [store-name] passed 
through from the store supplier factory, but restores from the above topic 
name. My topology is:
                TopologyBuilder builder = new TopologyBuilder();

                SerializerAdapter<CommonKey> commonKeyAdapter = new 
SerializerAdapter<>(JDKBinarySerializer.INSTANCE);
                SerializerAdapter<GamePlayValue> gamePlayAdapter = new 
SerializerAdapter<>(JDKBinarySerializer.INSTANCE);
                builder.addSource("SOURCE", commonKeyAdapter, gamePlayAdapter, 
kafkaStreamConfig.getGamePlayTopic());

                Duration activityInterval = 
kafkaStreamConfig.getActivityInterval();
                if (activityInterval.toMinutes() % 5 != 0 || 24 * 60 % 
activityInterval.toMinutes() != 0)
                {
                        throw new SystemFaultException(
                                "The game activity interval must be a multiple 
of 5 minutes and divide into 24 hours current value [" +
                                        activityInterval.toMinutes() + "]");
                }
                builder.addProcessor("PROCESS", new 
GameActivitySupplier(kafkaStreamConfig.getStoreName(),
                                                                         
kafkaStreamConfig.getGameActivitySendPeriod(),
                                                                         
activityInterval,
                                                                         
kafkaStreamConfig.getRemoveOldestTime(),
                                                                         
kafkaStreamConfig.getRemoveAbsoluteTime()), "SOURCE");

                SerializerAdapter<StoreValue> storeValueAdapter = new 
SerializerAdapter<>(JDKBinarySerializer.INSTANCE);
                builder.addStateStore(
                        
Stores.create(kafkaStreamConfig.getStoreName()).withKeys(commonKeyAdapter, 
commonKeyAdapter).withValues(
                                storeValueAdapter, 
storeValueAdapter).inMemory().build(), "PROCESS");

                builder.addSink("SINK", 
kafkaStreamConfig.getGameActivityTopic(), commonKeyAdapter,
                                new 
SerializerAdapter<GameActivityTotalMessage>(JDKBinarySerializer.INSTANCE), 
"PROCESS");




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to