Tom Dearman created KAFKA-3207:
----------------------------------
Summary: StateStore seems to be writing state to one topic but
restoring from another
Key: KAFKA-3207
URL: https://issues.apache.org/jira/browse/KAFKA-3207
Project: Kafka
Issue Type: Bug
Components: kafka streams
Affects Versions: 0.9.1.0
Environment: MacOS El Capitan
Reporter: Tom Dearman
Priority: Blocker
The state store (I am using in-memory state store) writes to a topic call
[store-name] but restores from [job-id]-[store-name]-changelog. You can see in
StoreChangeLogger that it writes to a topic which is the [store-name] passed
through from the store supplier factory, but restores from the above topic
name. My topology is:
TopologyBuilder builder = new TopologyBuilder();
SerializerAdapter<CommonKey> commonKeyAdapter = new
SerializerAdapter<>(JDKBinarySerializer.INSTANCE);
SerializerAdapter<GamePlayValue> gamePlayAdapter = new
SerializerAdapter<>(JDKBinarySerializer.INSTANCE);
builder.addSource("SOURCE", commonKeyAdapter, gamePlayAdapter,
kafkaStreamConfig.getGamePlayTopic());
Duration activityInterval =
kafkaStreamConfig.getActivityInterval();
if (activityInterval.toMinutes() % 5 != 0 || 24 * 60 %
activityInterval.toMinutes() != 0)
{
throw new SystemFaultException(
"The game activity interval must be a multiple
of 5 minutes and divide into 24 hours current value [" +
activityInterval.toMinutes() + "]");
}
builder.addProcessor("PROCESS", new
GameActivitySupplier(kafkaStreamConfig.getStoreName(),
kafkaStreamConfig.getGameActivitySendPeriod(),
activityInterval,
kafkaStreamConfig.getRemoveOldestTime(),
kafkaStreamConfig.getRemoveAbsoluteTime()), "SOURCE");
SerializerAdapter<StoreValue> storeValueAdapter = new
SerializerAdapter<>(JDKBinarySerializer.INSTANCE);
builder.addStateStore(
Stores.create(kafkaStreamConfig.getStoreName()).withKeys(commonKeyAdapter,
commonKeyAdapter).withValues(
storeValueAdapter,
storeValueAdapter).inMemory().build(), "PROCESS");
builder.addSink("SINK",
kafkaStreamConfig.getGameActivityTopic(), commonKeyAdapter,
new
SerializerAdapter<GameActivityTotalMessage>(JDKBinarySerializer.INSTANCE),
"PROCESS");
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)