xiongkun created FLINK-12551: -------------------------------- Summary: elasticsearch6 connector print log error Key: FLINK-12551 URL: https://issues.apache.org/jira/browse/FLINK-12551 Project: Flink Issue Type: Bug Components: Connectors / ElasticSearch Affects Versions: 1.6.3 Reporter: xiongkun
when i use elasticsearch connector ,when my project is running,i find some data does not insert elasticsearch ,so i want to read log help me ,but the log does contain importance message,so i read source code (org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase),i find a error on write ERROR log. {code:java} @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { BulkItemResponse itemResponse; Throwable failure; RestStatus restStatus; try { for (int i = 0; i < response.getItems().length; i++) { itemResponse = response.getItems()[i]; failure = callBridge.extractFailureCauseFromBulkItemResponse(itemResponse); if (failure != null) { LOG.error("Failed Elasticsearch item request: {}", itemResponse.getFailureMessage(), failure); restStatus = itemResponse.getFailure().getStatus(); if (restStatus == null) { failureHandler.onFailure(request.requests().get(i), failure, -1, requestIndexer); } else { failureHandler.onFailure(request.requests().get(i), failure, restStatus.getStatus(), requestIndexer); } } } } catch (Throwable t) { // fail the sink and skip the rest of the items // if the failure handler decides to throw an exception failureThrowable.compareAndSet(null, t); } } if (flushOnCheckpoint) { numPendingRequests.getAndAdd(-request.numberOfActions()); } } {code} {code:java} @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { LOG.error("Failed Elasticsearch bulk request: {}", failure.getMessage(), failure.getCause()); try { for (ActionRequest action : request.requests()) { failureHandler.onFailure(action, failure, -1, requestIndexer); } } catch (Throwable t) { // fail the sink and skip the rest of the items // if the failure handler decides to throw an exception failureThrowable.compareAndSet(null, t); } if (flushOnCheckpoint) { numPendingRequests.getAndAdd(-request.numberOfActions()); } } } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)