[ 
https://issues.apache.org/jira/browse/FLINK-7386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565299#comment-16565299
 ] 

ASF GitHub Bot commented on FLINK-7386:
---------------------------------------

asfgit closed pull request #6043: [FLINK-7386] evolve RequestIndexer API to 
make it working with Elastic 5.3+, evolve ElasticsearchApiCallBridge API to 
make it compatible with a possible RestHighLevelClient implementation 
URL: https://github.com/apache/flink/pull/6043
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
index 2ebb97c82e2..33b42cb47f1 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/BulkProcessorIndexer.java
@@ -22,6 +22,9 @@
 
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -45,12 +48,32 @@
        }
 
        @Override
-       public void add(ActionRequest... actionRequests) {
-               for (ActionRequest actionRequest : actionRequests) {
+       public void add(DeleteRequest... deleteRequests) {
+               for (DeleteRequest deleteRequest : deleteRequests) {
                        if (flushOnCheckpoint) {
                                numPendingRequestsRef.getAndIncrement();
                        }
-                       this.bulkProcessor.add(actionRequest);
+                       this.bulkProcessor.add(deleteRequest);
+               }
+       }
+
+       @Override
+       public void add(IndexRequest... indexRequests) {
+               for (IndexRequest indexRequest : indexRequests) {
+                       if (flushOnCheckpoint) {
+                               numPendingRequestsRef.getAndIncrement();
+                       }
+                       this.bulkProcessor.add(indexRequest);
+               }
+       }
+
+       @Override
+       public void add(UpdateRequest... updateRequests) {
+               for (UpdateRequest updateRequest : updateRequests) {
+                       if (flushOnCheckpoint) {
+                               numPendingRequestsRef.getAndIncrement();
+                       }
+                       this.bulkProcessor.add(updateRequest);
                }
        }
 }
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
index 2a7a21659e4..1c501bf4a20 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
@@ -22,7 +22,6 @@
 
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
-import org.elasticsearch.client.Client;
 
 import javax.annotation.Nullable;
 
@@ -39,15 +38,18 @@
  * exactly one instance of the call bridge, and state cleanup is performed 
when the sink is closed.
  */
 @Internal
-public interface ElasticsearchApiCallBridge extends Serializable {
+public abstract class ElasticsearchApiCallBridge implements Serializable {
 
        /**
-        * Creates an Elasticsearch {@link Client}.
+        * Creates an Elasticsearch client implementing {@link AutoCloseable}. 
This can
+        * be a {@link org.elasticsearch.client.Client} or {@link 
org.elasticsearch.client.RestHighLevelClient}
         *
         * @param clientConfig The configuration to use when constructing the 
client.
         * @return The created client.
         */
-       Client createClient(Map<String, String> clientConfig);
+       public abstract AutoCloseable createClient(Map<String, String> 
clientConfig);
+
+       public abstract BulkProcessor.Builder 
createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener 
listener);
 
        /**
         * Extracts the cause of failure of a bulk item action.
@@ -55,7 +57,7 @@
         * @param bulkItemResponse the bulk item response to extract cause of 
failure
         * @return the extracted {@link Throwable} from the response ({@code 
null} is the response is successful).
         */
-       @Nullable Throwable 
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
+       public abstract @Nullable Throwable 
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse);
 
        /**
         * Set backoff-related configurations on the provided {@link 
BulkProcessor.Builder}.
@@ -64,13 +66,15 @@
         * @param builder the {@link BulkProcessor.Builder} to configure.
         * @param flushBackoffPolicy user-provided backoff retry settings 
({@code null} if the user disabled backoff retries).
         */
-       void configureBulkProcessorBackoff(
+       public abstract void configureBulkProcessorBackoff(
                BulkProcessor.Builder builder,
                @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy 
flushBackoffPolicy);
 
        /**
         * Perform any necessary state cleanup.
         */
-       void cleanup();
+       public void cleanup() {
+               // nothing to cleanup by default
+       }
 
 }
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
index 9105d9947f2..0305ee3d867 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
@@ -176,7 +176,7 @@ public void setDelayMillis(long delayMillis) {
        private AtomicLong numPendingRequests = new AtomicLong(0);
 
        /** Elasticsearch client created using the call bridge. */
-       private transient Client client;
+       private transient AutoCloseable client;
 
        /** Bulk processor to buffer and send requests to Elasticsearch, 
created using the client. */
        private transient BulkProcessor bulkProcessor;
@@ -341,7 +341,7 @@ public void close() throws Exception {
        protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener 
listener) {
                checkNotNull(listener);
 
-               BulkProcessor.Builder bulkProcessorBuilder = 
BulkProcessor.builder(client, listener);
+               BulkProcessor.Builder bulkProcessorBuilder = 
callBridge.createBulkProcessorBuilder(client, listener);
 
                // This makes flush() blocking
                bulkProcessorBuilder.setConcurrentRequests(0);
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
index 2a1b29736b6..3dc8f879641 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/RequestIndexer.java
@@ -21,9 +21,12 @@
 import org.apache.flink.annotation.PublicEvolving;
 
 import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 
 /**
- * Users add multiple {@link ActionRequest ActionRequests} to a {@link 
RequestIndexer} to prepare
+ * Users add multiple delete, index or update requests to a {@link 
RequestIndexer} to prepare
  * them for sending to an Elasticsearch cluster.
  */
 @PublicEvolving
@@ -33,6 +36,41 @@
         * Add multiple {@link ActionRequest} to the indexer to prepare for 
sending requests to Elasticsearch.
         *
         * @param actionRequests The multiple {@link ActionRequest} to add.
+        * @deprecated use the {@link DeleteRequest}, {@link IndexRequest} or 
{@link UpdateRequest}
         */
-       void add(ActionRequest... actionRequests);
+       @Deprecated
+       default void add(ActionRequest... actionRequests) {
+               for (ActionRequest actionRequest : actionRequests) {
+                       if (actionRequest instanceof IndexRequest) {
+                               add((IndexRequest) actionRequest);
+                       } else if (actionRequest instanceof DeleteRequest) {
+                               add((DeleteRequest) actionRequest);
+                       } else if (actionRequest instanceof UpdateRequest) {
+                               add((UpdateRequest) actionRequest);
+                       } else {
+                               throw new 
IllegalArgumentException("RequestIndexer only supports Index, Delete and Update 
requests");
+                       }
+               }
+       }
+
+       /**
+        * Add multiple {@link DeleteRequest} to the indexer to prepare for 
sending requests to Elasticsearch.
+        *
+        * @param deleteRequests The multiple {@link DeleteRequest} to add.
+        */
+       void add(DeleteRequest... deleteRequests);
+
+       /**
+        * Add multiple {@link IndexRequest} to the indexer to prepare for 
sending requests to Elasticsearch.
+        *
+        * @param indexRequests The multiple {@link IndexRequest} to add.
+        */
+       void add(IndexRequest... indexRequests);
+
+       /**
+        * Add multiple {@link UpdateRequest} to the indexer to prepare for 
sending requests to Elasticsearch.
+        *
+        * @param updateRequests The multiple {@link UpdateRequest} to add.
+        */
+       void add(UpdateRequest... updateRequests);
 }
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
index 09d8806b963..5a161a747c1 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBaseTest.java
@@ -31,6 +31,7 @@
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Requests;
 import org.junit.Assert;
@@ -92,7 +93,7 @@ public void testItemFailureRethrownOnInvoke() throws 
Throwable {
                // setup the next bulk request, and its mock item failures
                
sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new
 Exception("artificial failure for record")));
                testHarness.processElement(new StreamRecord<>("msg"));
-               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(IndexRequest.class));
 
                // manually execute the next bulk request
                sink.manualBulkRequestWithAllPendingRequests();
@@ -124,7 +125,7 @@ public void testItemFailureRethrownOnCheckpoint() throws 
Throwable {
                // setup the next bulk request, and its mock item failures
                
sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new
 Exception("artificial failure for record")));
                testHarness.processElement(new StreamRecord<>("msg"));
-               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(IndexRequest.class));
 
                // manually execute the next bulk request
                sink.manualBulkRequestWithAllPendingRequests();
@@ -164,7 +165,7 @@ public void testItemFailureRethrownOnCheckpointAfterFlush() 
throws Throwable {
                
sink.setMockItemFailuresListForNextBulkItemResponses(mockResponsesList);
 
                testHarness.processElement(new StreamRecord<>("msg-1"));
-               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(IndexRequest.class));
 
                // manually execute the next bulk request (1 request only, thus 
should succeed)
                sink.manualBulkRequestWithAllPendingRequests();
@@ -172,7 +173,7 @@ public void testItemFailureRethrownOnCheckpointAfterFlush() 
throws Throwable {
                // setup the requests to be flushed in the snapshot
                testHarness.processElement(new StreamRecord<>("msg-2"));
                testHarness.processElement(new StreamRecord<>("msg-3"));
-               verify(sink.getMockBulkProcessor(), 
times(3)).add(any(ActionRequest.class));
+               verify(sink.getMockBulkProcessor(), 
times(3)).add(any(IndexRequest.class));
 
                CheckedThread snapshotThread = new CheckedThread() {
                        @Override
@@ -217,7 +218,7 @@ public void testBulkFailureRethrownOnInvoke() throws 
Throwable {
                // setup the next bulk request, and let the whole bulk request 
fail
                sink.setFailNextBulkRequestCompletely(new Exception("artificial 
failure for bulk request"));
                testHarness.processElement(new StreamRecord<>("msg"));
-               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(IndexRequest.class));
 
                // manually execute the next bulk request
                sink.manualBulkRequestWithAllPendingRequests();
@@ -249,7 +250,7 @@ public void testBulkFailureRethrownOnCheckpoint() throws 
Throwable {
                // setup the next bulk request, and let the whole bulk request 
fail
                sink.setFailNextBulkRequestCompletely(new Exception("artificial 
failure for bulk request"));
                testHarness.processElement(new StreamRecord<>("msg"));
-               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(IndexRequest.class));
 
                // manually execute the next bulk request
                sink.manualBulkRequestWithAllPendingRequests();
@@ -284,7 +285,7 @@ public void 
testBulkFailureRethrownOnOnCheckpointAfterFlush() throws Throwable {
                // setup the next bulk request, and let bulk request succeed
                
sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList((Exception)
 null));
                testHarness.processElement(new StreamRecord<>("msg-1"));
-               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(IndexRequest.class));
 
                // manually execute the next bulk request
                sink.manualBulkRequestWithAllPendingRequests();
@@ -292,7 +293,7 @@ public void 
testBulkFailureRethrownOnOnCheckpointAfterFlush() throws Throwable {
                // setup the requests to be flushed in the snapshot
                testHarness.processElement(new StreamRecord<>("msg-2"));
                testHarness.processElement(new StreamRecord<>("msg-3"));
-               verify(sink.getMockBulkProcessor(), 
times(3)).add(any(ActionRequest.class));
+               verify(sink.getMockBulkProcessor(), 
times(3)).add(any(IndexRequest.class));
 
                CheckedThread snapshotThread = new CheckedThread() {
                        @Override
@@ -346,7 +347,7 @@ public void testAtLeastOnceSink() throws Throwable {
                // it contains 1 request, which will fail and re-added to the 
next bulk request
                
sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new
 Exception("artificial failure for record")));
                testHarness.processElement(new StreamRecord<>("msg"));
-               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(IndexRequest.class));
 
                CheckedThread snapshotThread = new CheckedThread() {
                        @Override
@@ -402,7 +403,7 @@ public void 
testDoesNotWaitForPendingRequestsIfFlushingDisabled() throws Excepti
                // setup the next bulk request, and let bulk request succeed
                
sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new
 Exception("artificial failure for record")));
                testHarness.processElement(new StreamRecord<>("msg-1"));
-               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(ActionRequest.class));
+               verify(sink.getMockBulkProcessor(), 
times(1)).add(any(IndexRequest.class));
 
                // the snapshot should not block even though we haven't flushed 
the bulk request
                testHarness.snapshot(1L, 1000L);
@@ -478,11 +479,11 @@ public BulkProcessor getMockBulkProcessor() {
                protected BulkProcessor buildBulkProcessor(final 
BulkProcessor.Listener listener) {
                        this.mockBulkProcessor = mock(BulkProcessor.class);
 
-                       
when(mockBulkProcessor.add(any(ActionRequest.class))).thenAnswer(new 
Answer<Object>() {
+                       
when(mockBulkProcessor.add(any(IndexRequest.class))).thenAnswer(new 
Answer<Object>() {
                                @Override
                                public Object answer(InvocationOnMock 
invocationOnMock) throws Throwable {
                                        // intercept the request and add it to 
our mock bulk request
-                                       
nextBulkRequest.add(invocationOnMock.getArgumentAt(0, ActionRequest.class));
+                                       
nextBulkRequest.add(invocationOnMock.getArgumentAt(0, IndexRequest.class));
 
                                        return null;
                                }
@@ -530,12 +531,12 @@ public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
                }
        }
 
-       private static class DummyElasticsearchApiCallBridge implements 
ElasticsearchApiCallBridge {
+       private static class DummyElasticsearchApiCallBridge extends 
ElasticsearchApiCallBridge {
 
                private static final long serialVersionUID = 
-4272760730959041699L;
 
                @Override
-               public Client createClient(Map<String, String> clientConfig) {
+               public AutoCloseable createClient(Map<String, String> 
clientConfig) {
                        return mock(Client.class);
                }
 
@@ -550,13 +551,13 @@ public Throwable 
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkIt
                }
 
                @Override
-               public void configureBulkProcessorBackoff(BulkProcessor.Builder 
builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy 
flushBackoffPolicy) {
-                       // no need for this in the test cases here
+               public BulkProcessor.Builder 
createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener 
listener) {
+                       return null;
                }
 
                @Override
-               public void cleanup() {
-                       // nothing to cleanup
+               public void configureBulkProcessorBackoff(BulkProcessor.Builder 
builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy 
flushBackoffPolicy) {
+                       // no need for this in the test cases here
                }
        }
 
diff --git 
a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
 
b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
index 2a3c2a06460..6f492064a02 100644
--- 
a/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
+++ 
b/flink-connectors/flink-connector-elasticsearch/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/Elasticsearch1ApiCallBridge.java
@@ -42,7 +42,7 @@
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 1.x.
  */
 @Internal
-public class Elasticsearch1ApiCallBridge implements ElasticsearchApiCallBridge 
{
+public class Elasticsearch1ApiCallBridge extends ElasticsearchApiCallBridge {
 
        private static final long serialVersionUID = -2632363720584123682L;
 
@@ -70,7 +70,7 @@
        }
 
        @Override
-       public Client createClient(Map<String, String> clientConfig) {
+       public AutoCloseable createClient(Map<String, String> clientConfig) {
                if (transportAddresses == null) {
 
                        // Make sure that we disable http access to our 
embedded node
@@ -115,6 +115,11 @@ public Client createClient(Map<String, String> 
clientConfig) {
                }
        }
 
+       @Override
+       public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable 
client, BulkProcessor.Listener listener) {
+               return BulkProcessor.builder((Client) client, listener);
+       }
+
        @Override
        public Throwable 
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
                if (!bulkItemResponse.isFailed()) {
diff --git 
a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
 
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
index 390a4078e2b..80c1b3acf22 100644
--- 
a/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
+++ 
b/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/Elasticsearch2ApiCallBridge.java
@@ -44,7 +44,7 @@
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 2.x.
  */
 @Internal
-public class Elasticsearch2ApiCallBridge implements ElasticsearchApiCallBridge 
{
+public class Elasticsearch2ApiCallBridge extends ElasticsearchApiCallBridge {
 
        private static final long serialVersionUID = 2638252694744361079L;
 
@@ -63,7 +63,7 @@
        }
 
        @Override
-       public Client createClient(Map<String, String> clientConfig) {
+       public AutoCloseable createClient(Map<String, String> clientConfig) {
                Settings settings = 
Settings.settingsBuilder().put(clientConfig).build();
 
                TransportClient transportClient = 
TransportClient.builder().settings(settings).build();
@@ -83,6 +83,11 @@ public Client createClient(Map<String, String> clientConfig) 
{
                return transportClient;
        }
 
+       @Override
+       public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable 
client, BulkProcessor.Listener listener) {
+               return BulkProcessor.builder((Client) client, listener);
+       }
+
        @Override
        public Throwable 
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
                if (!bulkItemResponse.isFailed()) {
@@ -117,10 +122,4 @@ public void configureBulkProcessorBackoff(
 
                builder.setBackoffPolicy(backoffPolicy);
        }
-
-       @Override
-       public void cleanup() {
-               // nothing to cleanup
-       }
-
 }
diff --git 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
index 7c4ba7a97f1..1e73feb9e43 100644
--- 
a/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
+++ 
b/flink-connectors/flink-connector-elasticsearch5/src/main/java/org/apache/flink/streaming/connectors/elasticsearch5/Elasticsearch5ApiCallBridge.java
@@ -47,7 +47,7 @@
  * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 5.x.
  */
 @Internal
-public class Elasticsearch5ApiCallBridge implements ElasticsearchApiCallBridge 
{
+public class Elasticsearch5ApiCallBridge extends ElasticsearchApiCallBridge {
 
        private static final long serialVersionUID = -5222683870097809633L;
 
@@ -66,7 +66,7 @@
        }
 
        @Override
-       public Client createClient(Map<String, String> clientConfig) {
+       public AutoCloseable createClient(Map<String, String> clientConfig) {
                Settings settings = Settings.builder().put(clientConfig)
                        .put(NetworkModule.HTTP_TYPE_KEY, 
Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
                        .put(NetworkModule.TRANSPORT_TYPE_KEY, 
Netty3Plugin.NETTY_TRANSPORT_NAME)
@@ -89,6 +89,11 @@ public Client createClient(Map<String, String> clientConfig) 
{
                return transportClient;
        }
 
+       @Override
+       public BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable 
client, BulkProcessor.Listener listener) {
+               return BulkProcessor.builder((Client) client, listener);
+       }
+
        @Override
        public Throwable 
extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
                if (!bulkItemResponse.isFailed()) {
@@ -123,10 +128,4 @@ public void configureBulkProcessorBackoff(
 
                builder.setBackoffPolicy(backoffPolicy);
        }
-
-       @Override
-       public void cleanup() {
-               // nothing to cleanup
-       }
-
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ 
> client
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-7386
>                 URL: https://issues.apache.org/jira/browse/FLINK-7386
>             Project: Flink
>          Issue Type: Improvement
>          Components: ElasticSearch Connector
>            Reporter: Dawid Wysakowicz
>            Assignee: Fang Yong
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> In Elasticsearch 5.2.0 client the class {{BulkProcessor}} was refactored and 
> has no longer the method {{add(ActionRequest)}}.
> For more info see: https://github.com/elastic/elasticsearch/pull/20109



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to