Hi, Jack,
Unfortunately, this would happen for all stores that has the changelog
configured, even you would try to iterate and remove the large records
*before* it is flushed. The reason that you saw this in CachedStore.all()
is that we call flush() in CachedStore when creating the iterator, which
The following code
for(KeyValueIterator itor = myStore.all();
itor.hasNext(); ) { ...
}
Throws the exception
*org.apache.samza.SamzaException: Unable to send message from
TaskName-Partition 8 to system kafka.*
at
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.