The root of the issue may be in the HTTP status code handling. This code
seems to imply that the only valid error case from Elasticsearch is
conflict. This is too narrow of a constraint. In one of my use cases, a
mapping/message conflict occurs resulting in an HTTP 400. In my case, it is
perfectly reasonable to log the error, not raise hasFatalError = true and
continue processing.  A way to control this via properties or some other
mechanism would probably solve the issue. Setting the hasFatalError flag
looks to be the source of the unhandled exception that ultimately fails the
job.

I don't think the ListenerCallback will solve the problem. I don't
understand how that might stop the unhandled exception being raised by
flush().

  if (itemResp.getFailure().getStatus().equals(RestStatus.CONFLICT)) {
     LOGGER.info("Failed to index document in Elasticsearch: " +
itemResp.getFailureMessage());
   } else {
     hasFatalError = true;
     LOGGER.error("Failed to index document in Elasticsearch: " +
itemResp.getFailureMessage());
   }

- jeremiah

On Tue, Feb 16, 2016 at 12:18 PM, Roger Hoover <roger.hoo...@gmail.com>
wrote:

> Hi Jeremiah,
>
> There's currently no way to do that.  I think the best way to modify the
> existing ElasticsearchSystemProducer would be to add a config option for a
> callback to let you customize this behavior.  Basically, a pluggable
> listener (
>
> https://github.com/apache/samza/blob/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java#L101
> ).
>
>
>
> On Mon, Feb 15, 2016 at 2:30 PM, jeremiah adams <jadams...@gmail.com>
> wrote:
>
> > We have a samza job configured to run in a yarn cluster. This job
> consumes
> > multiple kafka topics and routes the messages to elasticsearch for
> > indexing. When enough batch-updates to elasticsearch fail using the
> > ElasticsearchSystemProducer, the entire samza job dies. Due to
> > checkpointing + yarn, the job starts backup, starts reading where it left
> > off and dies again. Enter loop.
> >
> > Updates to ES are failing due to invalid data on the part of our
> consumers
> > but I can't aways control them so need to be defensive about the code. I
> > don't see how to handle this in any of the source examples. I would like
> to
> > just trap this error and if it is what I expect it to be - squash it. Can
> > someone point me in the right direction?
> >
> > Below is the log where the failure occurs.
> >
> > 2016-02-15 18:55:26 ElasticsearchSystemProducer [ERROR] Unable to send
> > message from TaskName-Partition 5 to system elastic.
> > 2016-02-15 18:55:26 SamzaContainerExceptionHandler [ERROR] Uncaught
> > exception in thread (name=main). Exiting process now.
> > org.apache.samza.SamzaException: Unable to send message from
> > TaskName-Partition 5 to system elastic.
> > at
> >
> >
> org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.flush(ElasticsearchSystemProducer.java:186)
> > at
> >
> >
> org.apache.samza.system.elasticsearch.ElasticsearchSystemProducer.stop(ElasticsearchSystemProducer.java:92)
> > at
> >
> >
> org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47)
> > at
> >
> >
> org.apache.samza.system.SystemProducers$$anonfun$stop$2.apply(SystemProducers.scala:47)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > at
> > scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
> > at org.apache.samza.system.SystemProducers.stop(SystemProducers.scala:47)
> > at
> >
> >
> org.apache.samza.container.SamzaContainer.shutdownProducers(SamzaContainer.scala:672)
> > at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:564)
> > at
> >
> >
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:92)
> > at
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:66)
> > at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
> > 2016-02-15 18:55:26 RunLoop [INFO] Shutting down, will wait up to 5000 ms
> > 2016-02-15 18:55:31 RunLoop [WARN] Did not shut down within 5000 ms,
> > exiting
> >
> > - jeremiah
> >
>

Reply via email to