The following code for(KeyValueIterator<String, MyObject> itor = myStore.all(); itor.hasNext(); ) { ... }
Throws the exception *org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 8 to system kafka.* at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply$mcV$sp(KafkaSystemProducer.scala:152) at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:136) at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:136) at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) at org.apache.samza.system.kafka.KafkaSystemProducer.updateTimer(KafkaSystemProducer.scala:39) at org.apache.samza.system.kafka.KafkaSystemProducer.flush(KafkaSystemProducer.scala:136) at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$stop$1.apply(KafkaSystemProducer.scala:56) at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$stop$1.apply(KafkaSystemProducer.scala:56) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174) at org.apache.samza.system.kafka.KafkaSystemProducer.stop(KafkaSystemProducer.scala:56) at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$send$5.apply(KafkaSystemProducer.scala:120) at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$send$5.apply(KafkaSystemProducer.scala:116) at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:81) at org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProducer.scala:91) at org.apache.samza.system.SystemProducers.send(SystemProducers.scala:87) at org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61) at org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala:77) at org.apache.samza.storage.kv.SerializedKeyValueStore.putAll(SerializedKeyValueStore.scala:73) at org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:193) at org.apache.samza.storage.kv.CachedStore.all(CachedStore.scala:134) at org.apache.samza.storage.kv.NullSafeKeyValueStore.all(NullSafeKeyValueStore.scala:78)* at org.apache.samza.storage.kv.KeyValueStorageEngine.all(KeyValueStorageEngine.scala:79) at MyTask.window(SessionizeTask.java:192)* at org.apache.samza.container.TaskInstance$$anonfun$window$1.apply$mcV$sp(TaskInstance.scala:166) at org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54) at org.apache.samza.container.TaskInstance.window(TaskInstance.scala:165) at org.apache.samza.container.RunLoop$$anonfun$window$1$$anonfun$apply$mcVJ$sp$5.apply(RunLoop.scala:146) at org.apache.samza.container.RunLoop$$anonfun$window$1$$anonfun$apply$mcVJ$sp$5.apply(RunLoop.scala:143) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at org.apache.samza.container.RunLoop$$anonfun$window$1.apply$mcVJ$sp(RunLoop.scala:143) at org.apache.samza.util.TimerUtils$class.updateTimerAndGetDuration(TimerUtils.scala:51) at org.apache.samza.container.RunLoop.updateTimerAndGetDuration(RunLoop.scala:35) at org.apache.samza.container.RunLoop.window(RunLoop.scala:137) at org.apache.samza.container.RunLoop.run(RunLoop.scala:75) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553) at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:92) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:66) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)*Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.* Here is the config for myStore: serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory serializers.registry.serializable.class=org.apache.samza.serializers.SerializableSerdeFactory stores.myStore.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory stores.myStore.changelog=kafka.__samza_kv_MyTask_MyStore stores.myStore.key.serde=string stores.myStore.msg.serde=serializable The size of MyObject is unbounded, so I am not surprise to get RecordTooLargeException. However, I get the exception when I am attempting to create the KeyValueIterator with KeyValueStore.all(), which means I cannot even recover from this error by deleting the too-large record. Can anyone explain how I can recover from this exception? Regards, Jack