We have a samza job configured to run in a yarn cluster. This job consumes
multiple kafka topics and routes the messages to elasticsearch for
indexing. When enough batch-updates to elasticsearch fail using the
ElasticsearchSystemProducer, the entire samza job dies. Due to
checkpointing + yarn, the job starts backup, starts reading where it left
off and dies again. Enter loop.

Updates to ES are failing due to invalid data on the part of our consumers
but I can't aways control them so need to be defensive about the code. I
don't see how to handle this in any of the source examples. I would like to
just trap this error and if it is what I expect it to be - squash it. Can
someone point me in the right direction?

Below is the log where the failure occurs.

2016-02-15 18:55:26 ElasticsearchSystemProducer [ERROR] Unable to send
message from TaskName-Partition 5 to system elastic.
2016-02-15 18:55:26 SamzaContainerExceptionHandler [ERROR] Uncaught
exception in thread (name=main). Exiting process now.
org.apache.samza.SamzaException: Unable to send message from
TaskName-Partition 5 to system elastic.
at
org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.flush(ElasticsearchSystemProducer.java:186)
at
org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.stop(ElasticsearchSystemProducer.java:92)
at
org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47)
at
org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at org.apache.samza.system.SystemProducers.stop(SystemProducers.scala:47)
at
org.apache.samza.container.SamzaContainer.shutdownProducers(SamzaContainer.scala:672)
at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:564)
at
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:92)
at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:66)
at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
2016-02-15 18:55:26 RunLoop [INFO] Shutting down, will wait up to 5000 ms
2016-02-15 18:55:31 RunLoop [WARN] Did not shut down within 5000 ms, exiting

- jeremiah

Reply via email to