Hi QiShu, 1. I see the exception occurring in your *process* method. It seems that the size of the message you are trying to send is larger than 1M (the maximum kafka message size). You can choose to catch the exception in your process method and move on. Would n't that work for you?
{code} public DocumentTagTask implements StreamTask { public void process(..,..,..) { //custom logic // try { //logic that could potentially throw an exception //collector.send(msg) } catch(Exception e) { //handle exception and move on. } } } {code} 2. Alternately, you may want to look at the following properties in the Samza config table <https://samza.apache.org/learn/documentation/0.12/jobs/configuration-table.html>. (if you want a config driven approach) task.ignored.exceptions This property specifies which exceptions should be ignored if thrown in a task's process or window methods. The exceptions to be ignored should be a comma-separated list of fully-qualified class names of the exceptions or * to ignore all exceptions. task.drop.deserialization.errors This property is to define how the system deals with deserialization failure situation. If set to true, the system will skip the error messages and keep running. If set to false, the system with throw exceptions and fail the container. Default is false. task.drop.serialization.errors This property is to define how the system deals with serialization failure situation. If set to true, the system will drop the error messages and keep running. If set to false, the system with throw exceptions and fail the container. Default is false. Thanks, Jagadish On Thu, Feb 23, 2017 at 12:27 AM, 舒琦 <sh...@eefung.com> wrote: > Hi, > > Sometimes there are huge size of data will occur in our flow, like > 2MB, now samza will catch exception and shutdown like belowing.But what I > want is I can handle such specific exception and just discard such data and > the flow continues. > > 2017-02-23 16:17:01.949 [main] SamzaContainerExceptionHandler [ERROR] > Uncaught exception in thread (name=main). Exiting process now. > org.apache.samza.SamzaException: org.apache.samza.SamzaException: Unable > to send message from TaskName-Partition 0 to system kafka. > at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:133) > at org.apache.samza.container.SamzaContainer.run( > SamzaContainer.scala:661) > at org.apache.samza.container.SamzaContainer$.safeMain( > SamzaContainer.scala:115) > at org.apache.samza.container.SamzaContainer$.main( > SamzaContainer.scala:89) > at org.apache.samza.container.SamzaContainer.main( > SamzaContainer.scala) > Caused by: org.apache.samza.SamzaException: Unable to send message from > TaskName-Partition 0 to system kafka. > at org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1. > onCompletion(KafkaSystemProducer.scala:177) > at org.apache.kafka.clients.producer.KafkaProducer.send( > KafkaProducer.java:350) > at org.apache.samza.system.kafka.KafkaSystemProducer.send( > KafkaSystemProducer.scala:162) > at org.apache.samza.system.SystemProducers.send( > SystemProducers.scala:87) > at org.apache.samza.task.TaskInstanceCollector.send( > TaskInstanceCollector.scala:60) > at com.antfact.datacenter.canal.task.tags.DocumentTagTask. > process(DocumentTagTask.java:127) > at org.apache.samza.task.AsyncStreamTaskAdapter.process( > AsyncStreamTaskAdapter.java:72) > at org.apache.samza.task.AsyncStreamTaskAdapter.processAsync( > AsyncStreamTaskAdapter.java:63) > at org.apache.samza.container.TaskInstance$$anonfun$process$ > 1.apply$mcV$sp(TaskInstance.scala:157) > at org.apache.samza.container.TaskInstanceExceptionHandler. > maybeHandle(TaskInstanceExceptionHandler.scala:54) > at org.apache.samza.container.TaskInstance.process( > TaskInstance.scala:155) > at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker. > process(AsyncRunLoop.java:356) > at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker. > run(AsyncRunLoop.java:325) > at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker. > access$300(AsyncRunLoop.java:283) > at org.apache.samza.task.AsyncRunLoop.runTasks( > AsyncRunLoop.java:199) > at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:144) > ... 4 more > Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The > message is 881729 bytes when serialized which is larger than the maximum > request size you have configured with the max.request.size configuration. > 2017-02-23 16:17:01.937 [main] SamzaContainer [INFO] Shutting down. > 2017-02-23 16:17:01.937 [main] SamzaContainer [INFO] Shutting down > consumer multiplexer. > 2017-02-23 16:17:01.940 [main] BrokerProxy [INFO] Shutting down > BrokerProxy for 172.19.105.20:9096 > 2017-02-23 16:17:01.940 [main] BrokerProxy [INFO] closing simple > consumer... > 2017-02-23 16:17:01.941 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at > 172.19.105.20:9096 for client samza_consumer-canal_doc_tag-1] BrokerProxy > [INFO] Got interrupt exception in broker proxy thread. > 2017-02-23 16:17:01.941 [main] BrokerProxy [INFO] Shutting down > BrokerProxy for 172.19.105.22:9096 > 2017-02-23 16:17:01.941 [main] BrokerProxy [INFO] closing simple > consumer... > 2017-02-23 16:17:01.941 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at > 172.19.105.22:9096 for client samza_consumer-canal_doc_tag-1] BrokerProxy > [INFO] Got interrupt exception in broker proxy thread. > 2017-02-23 16:17:01.941 [main] SamzaContainer [INFO] Shutting down task > instance stream tasks. > 2017-02-23 16:17:01.942 [main] SamzaContainer [INFO] Shutting down task > instance stores. > 2017-02-23 16:17:01.943 [main] SamzaContainer [INFO] Shutting down host > statistics monitor. > 2017-02-23 16:17:01.944 [main] SamzaContainer [INFO] Shutting down > producer multiplexer. > 2017-02-23 16:17:01.944 [main] SamzaContainer [INFO] Shutting down > locality manager. > 2017-02-23 16:17:01.944 [main] CoordinatorStreamSystemProducer [INFO] > Stopping coordinator stream producer. > 2017-02-23 16:17:01.945 [main] SamzaContainer [INFO] Shutting down offset > manager. > 2017-02-23 16:17:01.946 [main] SamzaContainer [INFO] Shutting down metrics > reporters. > 2017-02-23 16:17:01.946 [main] MetricsSnapshotReporter [INFO] Stopping > producer. > 2017-02-23 16:17:01.947 [main] MetricsSnapshotReporter [INFO] Stopping > reporter timer. > 2017-02-23 16:17:01.947 [main] SamzaContainer [INFO] Shutting down JVM > metrics. > 2017-02-23 16:17:01.947 [main] SamzaContainer [INFO] Shutdown complete. > > Thanks! > > ———————— > QiShu > > -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University