[ https://issues.apache.org/jira/browse/BEAM-10990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Evan Galpin reassigned BEAM-10990: ---------------------------------- Assignee: Evan Galpin > Elasticsearch IO Infinite loop with write Error when the pipeline job > streaming mode > ------------------------------------------------------------------------------------- > > Key: BEAM-10990 > URL: https://issues.apache.org/jira/browse/BEAM-10990 > Project: Beam > Issue Type: Improvement > Components: io-java-elasticsearch > Affects Versions: 2.24.0 > Reporter: Steven Gaunt > Assignee: Evan Galpin > Priority: P3 > > When streaming messages from PubsubIO , the pipeline is in Streaming mode. > If for some reason the ElasticSearchIO.Write() has an response from the > ElasticSearch index api, the writefn will throw a IOException. Since this > excetpion is part of the Write transform, it becomes an unhandled error. > This will then inheritly cause behaviour from the job pipeline to infinitely > retry that error.. > _*snippet form beam website*_ > The Dataflow service retries failed tasks up to 4 times in batch mode, and an > unlimited number of times in streaming mode. In batch mode, your job will > fail; in streaming, it may stall indefinitely. > > This is the ElasticSearchIO.write transform . > {code:java} > public PDone expand(PCollection<String> input) { > ElasticsearchIO.ConnectionConfiguration connectionConfiguration = > this.getConnectionConfiguration(); > Preconditions.checkState(connectionConfiguration != null, > "withConnectionConfiguration() is required"); > * input.apply(ParDo.of(new ElasticsearchIO.Write.WriteFn(this)));* > return PDone.in(input.getPipeline()); > } > {code} > The pardo function (WriteFn) finishBundle step will call > ElasticsearchIO.checkForErrors helper method which will throw exception if > the http response from elasticsearch has error in the json reponse. > {code:java} > // Some comments here > public void finishBundle(DoFn<String, Void>.FinishBundleContext context) > throws IOException, InterruptedException { > this.flushBatch(); > } > private void flushBatch() throws IOException, > InterruptedException { > if (!this.batch.isEmpty()) { > StringBuilder bulkRequest = new StringBuilder(); > Iterator var2 = this.batch.iterator(); > while(var2.hasNext()) { > String json = (String)var2.next(); > bulkRequest.append(json); > } > this.batch.clear(); > this.currentBatchSizeBytes = 0L; > String endPoint = String.format("/%s/%s/_bulk", > this.spec.getConnectionConfiguration().getIndex(), > this.spec.getConnectionConfiguration().getType()); > HttpEntity requestBody = new > NStringEntity(bulkRequest.toString(), ContentType.APPLICATION_JSON); > Request request = new Request("POST", endPoint); > request.addParameters(Collections.emptyMap()); > request.setEntity(requestBody); > Response response = > this.restClient.performRequest(request); > HttpEntity responseEntity = new > BufferedHttpEntity(response.getEntity()); > if (this.spec.getRetryConfiguration() != null && > this.spec.getRetryConfiguration().getRetryPredicate().test(responseEntity)) { > responseEntity = this.handleRetry("POST", endPoint, > Collections.emptyMap(), requestBody); > } > > ElasticsearchIO.checkForErrors((HttpEntity)responseEntity, > this.backendVersion, this.spec.getUsePartialUpdate()); > } > } > static void checkForErrors(HttpEntity responseEntity, int backendVersion, > boolean partialUpdate) throws IOException { > JsonNode searchResult = parseResponse(responseEntity); > boolean errors = searchResult.path("errors").asBoolean(); > if (errors) { > StringBuilder errorMessages = new StringBuilder("Error writing to > Elasticsearch, some elements could not be inserted:"); > JsonNode items = searchResult.path("items"); > Iterator var7 = items.iterator(); > while(var7.hasNext()) { > JsonNode item = (JsonNode)var7.next(); > String errorRootName = ""; > if (partialUpdate) { > errorRootName = "update"; > } else if (backendVersion == 2) { > errorRootName = "create"; > } else if (backendVersion >= 5) { > errorRootName = "index"; > } > JsonNode errorRoot = item.path(errorRootName); > JsonNode error = errorRoot.get("error"); > if (error != null) { > String type = error.path("type").asText(); > String reason = error.path("reason").asText(); > String docId = errorRoot.path("_id").asText(); > errorMessages.append(String.format("%nDocument id %s: %s > (%s)", docId, reason, type)); > JsonNode causedBy = error.get("caused_by"); > if (causedBy != null) { > String cbReason = causedBy.path("reason").asText(); > String cbType = causedBy.path("type").asText(); > errorMessages.append(String.format("%nCaused by: %s > (%s)", cbReason, cbType)); > } > } > } > throw new IOException(errorMessages.toString()); > } > } > {code} > > As a possible suggestion, rather than throw the exception, could it be > possible to write the exception to an errorhandling tupeltag which then can > handled to a deadletter queue ? > -- This message was sent by Atlassian Jira (v8.3.4#803005)