Hi group We run into RecordTooLargeException while sending message to outgoing kafka system. We solved this at samza 0.10.0. Now it's back in 0.10.1... Here's the detail.
We are analyzing crawled web pages, some message will grow beyond the max.request.size after adding new features to them. Our current strategy is simply throw them away We have several configurable serialize strategies, which make it not easy to count exact size of the message before finally deserialize them to byte array. What we do at 0.10.0 is catch the processing exception, and then check if it's caused by RecordTooLargeException. If so our task will hide the exception and continue processing. This way seems not work at 0.10.1 version. Our task could detect catch the exception and ignore it at task process. But it seems that task commit will re-flush the large message again and cause container failure. It seems that TaskIntance.commit is beyond the control of our task, as nothing from our CppTask is listed at the stacktrace. Is there any better way to handle RecordTooLargeException? Your help is much of my appreciation. The stack trace of first catching the exception in our CppTask: 2016-09-07 23:03:50.283 [main] CppTask [ERROR] error processing message at partition : SystemStreamPartition [message, documents_nlp, 3], offset: 3166714 2016-09-07 23:03:50.283 [main] CppTask [ERROR] CAUSE: Unable to send message from TaskName-Partition 3 to system message. org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 3 to system message. at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply$mcV$sp(KafkaSystemProducer.scala:165) at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:149) at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:149) at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) at org.apache.samza.system.kafka.KafkaSystemProducer.updateTimer(KafkaSystemProducer.scala:39) at org.apache.samza.system.kafka.KafkaSystemProducer.flush(KafkaSystemProducer.scala:149) at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$stop$1.apply(KafkaSystemProducer.scala:64) at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$stop$1.apply(KafkaSystemProducer.scala:64) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174) at org.apache.samza.system.kafka.KafkaSystemProducer.stop(KafkaSystemProducer.scala:64) at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$send$5.apply(KafkaSystemProducer.scala:130) at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$send$5.apply(KafkaSystemProducer.scala:125) at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:81) at org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProducer.scala:100) at org.apache.samza.system.SystemProducers.send(SystemProducers.scala:87) at org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61) *at com.xxx.cpp.samza.task.CppTask.process(CppTask.java:155)* at org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:150) at org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54) * at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:149)* at org.apache.samza.container.RunLoop$$anonfun$org$apache$samza$container$RunLoop$$process$1$$anonfun$apply$mcVJ$sp$2.apply(RunLoop.scala:133) at org.apache.samza.container.RunLoop$$anonfun$org$apache$samza$container$RunLoop$$process$1$$anonfun$apply$mcVJ$sp$2.apply(RunLoop.scala:130) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.samza.container.RunLoop$$anonfun$org$apache$samza$container$RunLoop$$process$1.apply$mcVJ$sp(RunLoop.scala:129) at org.apache.samza.util.TimerUtils$class.updateTimerAndGetDuration(TimerUtils.scala:51) at org.apache.samza.container.RunLoop.updateTimerAndGetDuration(RunLoop.scala:37) at org.apache.samza.container.RunLoop.org $apache$samza$container$RunLoop$$process(RunLoop.scala:121) at org.apache.samza.container.RunLoop$$anon$2.run(RunLoop.scala:78) at org.apache.samza.util.ThrottlingExecutor.execute(ThrottlingExecutor.java:64) at org.apache.samza.container.RunLoop.run(RunLoop.scala:88) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:594) at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:82) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:56) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1214147 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. The stack trace of actually cause the container fail: 2016-09-07 23:03:50.287 [main] SamzaContainer [ERROR] Caught exception in process loop. org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 3 to system message. at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply$mcV$sp(KafkaSystemProducer.scala:165) at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:149) at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:149) at org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) at org.apache.samza.system.kafka.KafkaSystemProducer.updateTimer(KafkaSystemProducer.scala:39) at org.apache.samza.system.kafka.KafkaSystemProducer.flush(KafkaSystemProducer.scala:149) at org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64) at org.apache.samza.system.SystemProducers$$anonfun$flush$2.apply(SystemProducers.scala:64) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.system.SystemProducers.flush(SystemProducers.scala:64) at org.apache.samza.task.TaskInstanceCollector.flush(TaskInstanceCollector.scala:70) * at org.apache.samza.container.TaskInstance.commit(TaskInstance.scala:182)* at org.apache.samza.container.RunLoop$$anonfun$org$apache$samza$container$RunLoop$$commit$1$$anonfun$apply$mcVJ$sp$7.apply(RunLoop.scala:173) at org.apache.samza.container.RunLoop$$anonfun$org$apache$samza$container$RunLoop$$commit$1$$anonfun$apply$mcVJ$sp$7.apply(RunLoop.scala:173) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.container.RunLoop$$anonfun$org$apache$samza$container$RunLoop$$commit$1.apply$mcVJ$sp(RunLoop.scala:173) at org.apache.samza.util.TimerUtils$class.updateTimerAndGetDuration(TimerUtils.scala:51) at org.apache.samza.container.RunLoop.updateTimerAndGetDuration(RunLoop.scala:37) at org.apache.samza.container.RunLoop.org $apache$samza$container$RunLoop$$commit(RunLoop.scala:168) at org.apache.samza.container.RunLoop$$anon$2.run(RunLoop.scala:80) at org.apache.samza.util.ThrottlingExecutor.execute(ThrottlingExecutor.java:64) at org.apache.samza.container.RunLoop.run(RunLoop.scala:88) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:594) at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:82) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:56) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1214147 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. 2016-09-07 23:03:50.288 [main] SamzaContainer [INFO] Shutting down. 2016-09-07 23:03:50.289 [main] SamzaContainer [INFO] Shutting down consumer multiplexer. -- All the best Liu Bo