We have a samza job configured to run in a yarn cluster. This job consumes multiple kafka topics and routes the messages to elasticsearch for indexing. When enough batch-updates to elasticsearch fail using the ElasticsearchSystemProducer, the entire samza job dies. Due to checkpointing + yarn, the job starts backup, starts reading where it left off and dies again. Enter loop.
Updates to ES are failing due to invalid data on the part of our consumers but I can't aways control them so need to be defensive about the code. I don't see how to handle this in any of the source examples. I would like to just trap this error and if it is what I expect it to be - squash it. Can someone point me in the right direction? Below is the log where the failure occurs. 2016-02-15 18:55:26 ElasticsearchSystemProducer [ERROR] Unable to send message from TaskName-Partition 5 to system elastic. 2016-02-15 18:55:26 SamzaContainerExceptionHandler [ERROR] Uncaught exception in thread (name=main). Exiting process now. org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 5 to system elastic. at org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.flush(ElasticsearchSystemProducer.java:186) at org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.stop(ElasticsearchSystemProducer.java:92) at org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47) at org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.system.SystemProducers.stop(SystemProducers.scala:47) at org.apache.samza.container.SamzaContainer.shutdownProducers(SamzaContainer.scala:672) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:564) at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:92) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:66) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) 2016-02-15 18:55:26 RunLoop [INFO] Shutting down, will wait up to 5000 ms 2016-02-15 18:55:31 RunLoop [WARN] Did not shut down within 5000 ms, exiting - jeremiah