Hi QiShu,

1.  I see the exception occurring in your *process* method. It seems that
the size of the message you are trying to send is larger than 1M (the
maximum kafka message size). You can choose to catch the exception in your
process method and move on. Would n't that work for you?

{code}

public DocumentTagTask implements StreamTask {
  public void process(..,..,..) {
     //custom logic
     // try {
             //logic that could potentially throw an exception
             //collector.send(msg)
        } catch(Exception e) {
             //handle exception and move on.
       }
  }
}

{code}

2. Alternately, you may want to look at the following properties in the Samza
config table
<https://samza.apache.org/learn/documentation/0.12/jobs/configuration-table.html>.
(if you want a config driven approach)

task.ignored.exceptions This property specifies which exceptions should be
ignored if thrown in a task's process or window methods. The exceptions to
be ignored should be a comma-separated list of fully-qualified class names
of the exceptions or * to ignore all exceptions.
task.drop.deserialization.errors This property is to define how the system
deals with deserialization failure situation. If set to true, the system
will skip the error messages and keep running. If set to false, the system
with throw exceptions and fail the container. Default is false.
task.drop.serialization.errors This property is to define how the system
deals with serialization failure situation. If set to true, the system will
drop the error messages and keep running. If set to false, the system with
throw exceptions and fail the container. Default is false.

Thanks,
Jagadish









On Thu, Feb 23, 2017 at 12:27 AM, 舒琦 <sh...@eefung.com> wrote:

> Hi,
>
>         Sometimes there are huge size of data will occur in our flow, like
> 2MB, now samza will catch exception and shutdown like belowing.But what I
> want is I can handle such specific exception and just discard such data and
> the flow continues.
>
> 2017-02-23 16:17:01.949 [main] SamzaContainerExceptionHandler [ERROR]
> Uncaught exception in thread (name=main). Exiting process now.
> org.apache.samza.SamzaException: org.apache.samza.SamzaException: Unable
> to send message from TaskName-Partition 0 to system kafka.
>         at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:133)
>         at org.apache.samza.container.SamzaContainer.run(
> SamzaContainer.scala:661)
>         at org.apache.samza.container.SamzaContainer$.safeMain(
> SamzaContainer.scala:115)
>         at org.apache.samza.container.SamzaContainer$.main(
> SamzaContainer.scala:89)
>         at org.apache.samza.container.SamzaContainer.main(
> SamzaContainer.scala)
> Caused by: org.apache.samza.SamzaException: Unable to send message from
> TaskName-Partition 0 to system kafka.
>         at org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.
> onCompletion(KafkaSystemProducer.scala:177)
>         at org.apache.kafka.clients.producer.KafkaProducer.send(
> KafkaProducer.java:350)
>         at org.apache.samza.system.kafka.KafkaSystemProducer.send(
> KafkaSystemProducer.scala:162)
>         at org.apache.samza.system.SystemProducers.send(
> SystemProducers.scala:87)
>         at org.apache.samza.task.TaskInstanceCollector.send(
> TaskInstanceCollector.scala:60)
>         at com.antfact.datacenter.canal.task.tags.DocumentTagTask.
> process(DocumentTagTask.java:127)
>         at org.apache.samza.task.AsyncStreamTaskAdapter.process(
> AsyncStreamTaskAdapter.java:72)
>         at org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(
> AsyncStreamTaskAdapter.java:63)
>         at org.apache.samza.container.TaskInstance$$anonfun$process$
> 1.apply$mcV$sp(TaskInstance.scala:157)
>         at org.apache.samza.container.TaskInstanceExceptionHandler.
> maybeHandle(TaskInstanceExceptionHandler.scala:54)
>         at org.apache.samza.container.TaskInstance.process(
> TaskInstance.scala:155)
>         at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.
> process(AsyncRunLoop.java:356)
>         at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.
> run(AsyncRunLoop.java:325)
>         at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.
> access$300(AsyncRunLoop.java:283)
>         at org.apache.samza.task.AsyncRunLoop.runTasks(
> AsyncRunLoop.java:199)
>         at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:144)
>         ... 4 more
> Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The
> message is 881729 bytes when serialized which is larger than the maximum
> request size you have configured with the max.request.size configuration.
> 2017-02-23 16:17:01.937 [main] SamzaContainer [INFO] Shutting down.
> 2017-02-23 16:17:01.937 [main] SamzaContainer [INFO] Shutting down
> consumer multiplexer.
> 2017-02-23 16:17:01.940 [main] BrokerProxy [INFO] Shutting down
> BrokerProxy for 172.19.105.20:9096
> 2017-02-23 16:17:01.940 [main] BrokerProxy [INFO] closing simple
> consumer...
> 2017-02-23 16:17:01.941 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at
> 172.19.105.20:9096 for client samza_consumer-canal_doc_tag-1] BrokerProxy
> [INFO] Got interrupt exception in broker proxy thread.
> 2017-02-23 16:17:01.941 [main] BrokerProxy [INFO] Shutting down
> BrokerProxy for 172.19.105.22:9096
> 2017-02-23 16:17:01.941 [main] BrokerProxy [INFO] closing simple
> consumer...
> 2017-02-23 16:17:01.941 [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at
> 172.19.105.22:9096 for client samza_consumer-canal_doc_tag-1] BrokerProxy
> [INFO] Got interrupt exception in broker proxy thread.
> 2017-02-23 16:17:01.941 [main] SamzaContainer [INFO] Shutting down task
> instance stream tasks.
> 2017-02-23 16:17:01.942 [main] SamzaContainer [INFO] Shutting down task
> instance stores.
> 2017-02-23 16:17:01.943 [main] SamzaContainer [INFO] Shutting down host
> statistics monitor.
> 2017-02-23 16:17:01.944 [main] SamzaContainer [INFO] Shutting down
> producer multiplexer.
> 2017-02-23 16:17:01.944 [main] SamzaContainer [INFO] Shutting down
> locality manager.
> 2017-02-23 16:17:01.944 [main] CoordinatorStreamSystemProducer [INFO]
> Stopping coordinator stream producer.
> 2017-02-23 16:17:01.945 [main] SamzaContainer [INFO] Shutting down offset
> manager.
> 2017-02-23 16:17:01.946 [main] SamzaContainer [INFO] Shutting down metrics
> reporters.
> 2017-02-23 16:17:01.946 [main] MetricsSnapshotReporter [INFO] Stopping
> producer.
> 2017-02-23 16:17:01.947 [main] MetricsSnapshotReporter [INFO] Stopping
> reporter timer.
> 2017-02-23 16:17:01.947 [main] SamzaContainer [INFO] Shutting down JVM
> metrics.
> 2017-02-23 16:17:01.947 [main] SamzaContainer [INFO] Shutdown complete.
>
> Thanks!
>
> ————————
> QiShu
>
>


-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Reply via email to