The following code

for(KeyValueIterator<String, MyObject> itor = myStore.all();
itor.hasNext(); ) {     ...
}

​

Throws the exception

*org.apache.samza.SamzaException: Unable to send message from
TaskName-Partition 8 to system kafka.*
        at 
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply$mcV$sp(KafkaSystemProducer.scala:152)
        at 
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:136)
        at 
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:136)
        at 
org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
        at 
org.apache.samza.system.kafka.KafkaSystemProducer.updateTimer(KafkaSystemProducer.scala:39)
        at 
org.apache.samza.system.kafka.KafkaSystemProducer.flush(KafkaSystemProducer.scala:136)
        at 
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$stop$1.apply(KafkaSystemProducer.scala:56)
        at 
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$stop$1.apply(KafkaSystemProducer.scala:56)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
        at 
org.apache.samza.system.kafka.KafkaSystemProducer.stop(KafkaSystemProducer.scala:56)
        at 
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$send$5.apply(KafkaSystemProducer.scala:120)
        at 
org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$send$5.apply(KafkaSystemProducer.scala:116)
        at 
org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:81)
        at 
org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProducer.scala:91)
        at 
org.apache.samza.system.SystemProducers.send(SystemProducers.scala:87)
        at 
org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61)
        at org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala:77)
        at 
org.apache.samza.storage.kv.SerializedKeyValueStore.putAll(SerializedKeyValueStore.scala:73)
        at org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:193)
        at org.apache.samza.storage.kv.CachedStore.all(CachedStore.scala:134)
        at 
org.apache.samza.storage.kv.NullSafeKeyValueStore.all(NullSafeKeyValueStore.scala:78)*
       at
org.apache.samza.storage.kv.KeyValueStorageEngine.all(KeyValueStorageEngine.scala:79)
        at MyTask.window(SessionizeTask.java:192)*
        at 
org.apache.samza.container.TaskInstance$$anonfun$window$1.apply$mcV$sp(TaskInstance.scala:166)
        at 
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
        at 
org.apache.samza.container.TaskInstance.window(TaskInstance.scala:165)
        at 
org.apache.samza.container.RunLoop$$anonfun$window$1$$anonfun$apply$mcVJ$sp$5.apply(RunLoop.scala:146)
        at 
org.apache.samza.container.RunLoop$$anonfun$window$1$$anonfun$apply$mcVJ$sp$5.apply(RunLoop.scala:143)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
        at 
org.apache.samza.container.RunLoop$$anonfun$window$1.apply$mcVJ$sp(RunLoop.scala:143)
        at 
org.apache.samza.util.TimerUtils$class.updateTimerAndGetDuration(TimerUtils.scala:51)
        at 
org.apache.samza.container.RunLoop.updateTimerAndGetDuration(RunLoop.scala:35)
        at org.apache.samza.container.RunLoop.window(RunLoop.scala:137)
        at org.apache.samza.container.RunLoop.run(RunLoop.scala:75)
        at 
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553)
        at 
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:92)
        at 
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:66)
        at 
org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)*Caused
by: org.apache.kafka.common.errors.RecordTooLargeException: The
request included a message larger than the max message size the server
will accept.*

Here is the config for myStore:

serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.serializable.class=org.apache.samza.serializers.SerializableSerdeFactory
stores.myStore.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
stores.myStore.changelog=kafka.__samza_kv_MyTask_MyStore
stores.myStore.key.serde=string
stores.myStore.msg.serde=serializable

​

The size of MyObject is unbounded, so I am not surprise to get
RecordTooLargeException. However, I get the exception when I am attempting
to create the KeyValueIterator with KeyValueStore.all(), which means I
cannot even recover from this error by deleting the too-large record. Can
anyone explain how I can recover from this exception?


Regards,


Jack

Reply via email to