I do have this in init as well... public void init(Config config, TaskContext context) {
store = (KeyValueStore<String, Integer>) context.getStore("store"); } You are right. These are primitive types but I was trying to address this exception: Exception in thread "main" org.apache.samza.SamzaException: Must define a key serde when using key value storage. I have changed it to stores.store.key.serde=org.apache.samza.serializers.ByteSerdeFactory Regardless, exception is persisting. - Shekar