The code that you showed below is part of the BulkProcessor.Listener interface so if that listener were pluggable, you could override the default behavior (which is to only ignore version conflicts).
On Tue, Feb 16, 2016 at 12:27 PM, jeremiah adams <jadams...@gmail.com> wrote: > The root of the issue may be in the HTTP status code handling. This code > seems to imply that the only valid error case from Elasticsearch is > conflict. This is too narrow of a constraint. In one of my use cases, a > mapping/message conflict occurs resulting in an HTTP 400. In my case, it is > perfectly reasonable to log the error, not raise hasFatalError = true and > continue processing. A way to control this via properties or some other > mechanism would probably solve the issue. Setting the hasFatalError flag > looks to be the source of the unhandled exception that ultimately fails the > job. > > I don't think the ListenerCallback will solve the problem. I don't > understand how that might stop the unhandled exception being raised by > flush(). > > if (itemResp.getFailure().getStatus().equals(RestStatus.CONFLICT)) { > LOGGER.info("Failed to index document in Elasticsearch: " + > itemResp.getFailureMessage()); > } else { > hasFatalError = true; > LOGGER.error("Failed to index document in Elasticsearch: " + > itemResp.getFailureMessage()); > } > > - jeremiah > > On Tue, Feb 16, 2016 at 12:18 PM, Roger Hoover <roger.hoo...@gmail.com> > wrote: > > > Hi Jeremiah, > > > > There's currently no way to do that. I think the best way to modify the > > existing ElasticsearchSystemProducer would be to add a config option for > a > > callback to let you customize this behavior. Basically, a pluggable > > listener ( > > > > > https://github.com/apache/samza/blob/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java#L101 > > ). > > > > > > > > On Mon, Feb 15, 2016 at 2:30 PM, jeremiah adams <jadams...@gmail.com> > > wrote: > > > > > We have a samza job configured to run in a yarn cluster. This job > > consumes > > > multiple kafka topics and routes the messages to elasticsearch for > > > indexing. When enough batch-updates to elasticsearch fail using the > > > ElasticsearchSystemProducer, the entire samza job dies. Due to > > > checkpointing + yarn, the job starts backup, starts reading where it > left > > > off and dies again. Enter loop. > > > > > > Updates to ES are failing due to invalid data on the part of our > > consumers > > > but I can't aways control them so need to be defensive about the code. > I > > > don't see how to handle this in any of the source examples. I would > like > > to > > > just trap this error and if it is what I expect it to be - squash it. > Can > > > someone point me in the right direction? > > > > > > Below is the log where the failure occurs. > > > > > > 2016-02-15 18:55:26 ElasticsearchSystemProducer [ERROR] Unable to send > > > message from TaskName-Partition 5 to system elastic. > > > 2016-02-15 18:55:26 SamzaContainerExceptionHandler [ERROR] Uncaught > > > exception in thread (name=main). Exiting process now. > > > org.apache.samza.SamzaException: Unable to send message from > > > TaskName-Partition 5 to system elastic. > > > at > > > > > > > > > org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.flush(ElasticsearchSystemProducer.java:186) > > > at > > > > > > > > > org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.stop(ElasticsearchSystemProducer.java:92) > > > at > > > > > > > > > org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47) > > > at > > > > > > > > > org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47) > > > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > > > at > > > > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > > > at > org.apache.samza.system.SystemProducers.stop(SystemProducers.scala:47) > > > at > > > > > > > > > org.apache.samza.container.SamzaContainer.shutdownProducers(SamzaContainer.scala:672) > > > at > > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:564) > > > at > > > > > > > > > org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:92) > > > at > > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:66) > > > at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > > > 2016-02-15 18:55:26 RunLoop [INFO] Shutting down, will wait up to 5000 > ms > > > 2016-02-15 18:55:31 RunLoop [WARN] Did not shut down within 5000 ms, > > > exiting > > > > > > - jeremiah > > > > > >