HI,

I’m running ElasticSearch as a sink for a batch file processing a CSV file of 
6.2 million rows, with each row being 181 numeric values. It quite happily 
processes a small example of around 2,000 rows, running each column through a 
single parallel pipeline, keyed by column number.

However, once I scale up to the full data size, with parallelism set higher 
than one typically eight, after a while ElasticSearch fails as below

2019-10-17 09:36:09,550 ERROR 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase  - 
Failed Elasticsearch bulk request: request retries exceeded max retry timeout 
[30000]
java.io.IOException: request retries exceeded max retry timeout [30000]
        at 
org.elasticsearch.client.RestClient$1.retryIfPossible(RestClient.java:411)
        at org.elasticsearch.client.RestClient$1.failed(RestClient.java:398)
        at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137)
        at 
org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:419)
        at 
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:375)
        at 
org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
        at 
org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
        at 
org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
        at 
org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263)
        at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492)
        at 
org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213)
        at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
        at 
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
        at 
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
        at java.lang.Thread.run(Thread.java:745)
2019-10-17 09:36:09,576 INFO  org.example.Job$                                  
            - Failed ElasticSearch document. Exception rethrown
2019-10-17 09:36:09,624 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during 
disposal of stream operator.
java.lang.RuntimeException: An error occurred in ElasticsearchSink.
        at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:381)
        at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:343)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:479)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:380)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: request retries exceeded max retry timeout 
[30000]
        at 
org.elasticsearch.client.RestClient$1.retryIfPossible(RestClient.java:411)
        at org.elasticsearch.client.RestClient$1.failed(RestClient.java:398)
        at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137)
        at 
org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:419)
        at 
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:375)
        at 
org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
        at 
org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
        at 
org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
        at 
org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263)
        at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492)
        at 
org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213)
        at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
        at 
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
        at 
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
        ... 1 more
2019-10-17 09:36:09,629 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Window(GlobalWindows(), CountTrigger, CountEvictor, 
ScalaProcessWindowFunctionWrapper) -> Map -> Map -> Map -> Sink: Unnamed (2/8) 
(cb74c5d1c9323000fae9488d7d256468) switched from RUNNING to FAILED.
java.lang.RuntimeException: An error occurred in ElasticsearchSink.
        at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:381)
        at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:386)
        at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307)
        at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at 
org.example.signalRecognitionWindowFunction.process(signalRecognitionWindowFunction.scala:42)
        at 
org.example.signalRecognitionWindowFunction.process(signalRecognitionWindowFunction.scala:8)
        at 
org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
        at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
        at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
        at 
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.emitWindowContents(EvictingWindowOperator.java:359)
        at 
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.processElement(EvictingWindowOperator.java:218)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: request retries exceeded max retry timeout 
[30000]
        at 
org.elasticsearch.client.RestClient$1.retryIfPossible(RestClient.java:411)
        at org.elasticsearch.client.RestClient$1.failed(RestClient.java:398)
        at org.apache.http.concurrent.BasicFuture.failed(BasicFuture.java:137)
        at 
org.apache.http.impl.nio.client.AbstractClientExchangeHandler.failed(AbstractClientExchangeHandler.java:419)
        at 
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:375)
        at 
org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92)
        at 
org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39)
        at 
org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175)
        at 
org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263)
        at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492)
        at 
org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213)
        at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
        at 
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
        at 
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
        ... 1 more


I’m running Flink 1.8.1 and version 1.9.0 of the ES connector. I’m running as a 
standalone cluster on a 16Gb MacBook Pro. The ActivityMonitor is showing no 
indication of running out of resource CPU or memory, and data is being written 
to disk at a fairly low rate of around 10-20Mb per second well under the max 
write rate of 100Mb+ per second. Nothing is showing on the ElasticSearch log 
save INFO messages about GC occasionally.

The code relevant to the sink is

val httpHosts = new ArrayList[HttpHost]
httpHosts.add(new HttpHost("localhost", 9200, "http"))
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"))

val esSinkBuilder = new ElasticsearchSink.Builder[(Int, Long, Double, String)](
  httpHosts,
  new ElasticsearchSinkFunction[(Int, Long, Double, String)] {

    def createIndexRequest(element: (Int, Long, Double, String)): IndexRequest 
= {
      val json = new util.HashMap[String, Any]

      json.put("arrayinstance", "beamarraytest")
      json.put("bearing", element._1)
      json.put("sampleindex", element._2)
      json.put("sample", element._3)
      json.put("hashstring", element._4)
      return Requests.indexRequest()
        .index("flink-index")
        .`type`("flink-type")
        .source(json)
    }

    def process(element: (Int, Long, Double, String), ctx: RuntimeContext, 
indexer: RequestIndexer): Unit = {
      indexer.add(createIndexRequest(element))
    }
  }
)

esSinkBuilder.setFailureHandler(
  new ActionRequestFailureHandler() {
    @throws(classOf[Throwable])
    @Override
    override def onFailure(action: ActionRequest, failure: Throwable, 
restStatusCode: Int, indexer: RequestIndexer) {

      if (ExceptionUtils.findThrowable(failure, 
classOf[EsRejectedExecutionException]).isPresent) {
        LOG.info("ElasticSearch full queue; re-added document for indexing")
        indexer.add(action)
      } else if (ExceptionUtils.findThrowable(failure, 
classOf[ElasticsearchParseException]).isPresent) {
        LOG.info("Malformed ElasticSearch document. Document dropped")
      } else {
        LOG.info("Failed ElasticSearch document. Exception rethrown")
        throw failure
      }
    }
  }
)

esSinkBuilder.setBulkFlushMaxActions(10000)

signalFourBuckets.addSink(esSinkBuilder.build)

Is my code at fault, or is the sink just not capable of handling the flow rate? 
Advice of any form would be very gratefully received.

TIA

Nick Walton

Reply via email to