[ https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
luoguohao updated FLINK-11046: ------------------------------ Description: When i'm using es6 sink to index into es, bulk process with some exception catched, and i trying to reindex the document with the call `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, but things goes incorrect. The call thread stuck there, and with the thread dump, i saw the `bulkprocessor` object was locked by other thread. {code:java} public interface ActionRequestFailureHandler extends Serializable { void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable; } {code} After i read the code implemented in the `indexer.add(action)`, i find that `synchronized` is needed on each add operation. {code:java} private synchronized void internalAdd(DocWriteRequest request, @Nullable Object payload) { ensureOpen(); bulkRequest.add(request, payload); executeIfNeeded(); } {code} And, at i also noticed that `bulkprocessor` object would also locked in the bulk process thread. the bulk process operation is in the following code: {code:java} public void execute(BulkRequest bulkRequest, long executionId) { Runnable toRelease = () -> {}; boolean bulkRequestSetupSuccessful = false; try { listener.beforeBulk(executionId, bulkRequest); semaphore.acquire(); toRelease = semaphore::release; CountDownLatch latch = new CountDownLatch(1); retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse response) { try { listener.afterBulk(executionId, bulkRequest, response); } finally { semaphore.release(); latch.countDown(); } } @Override public void onFailure(Exception e) { try { listener.afterBulk(executionId, bulkRequest, e); } finally { semaphore.release(); latch.countDown(); } } }, Settings.EMPTY); bulkRequestSetupSuccessful = true; if (concurrentRequests == 0) { latch.await(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e); listener.afterBulk(executionId, bulkRequest, e); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e); listener.afterBulk(executionId, bulkRequest, e); } finally { if (bulkRequestSetupSuccessful == false) { // if we fail on client.bulk() release the semaphore toRelease.run(); } } } {code} As the read line i marked above, i think, that's the reason why the retry operation thread was block, because the the bulk process thread never release the lock on `bulkprocessor`. and, i also trying to figure out why the field `concurrentRequests` was set to zero. And i saw the the initialize for bulkprocessor in class `ElasticsearchSinkBase`: {code:java} protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) { ... BulkProcessor.Builder bulkProcessorBuilder = callBridge.createBulkProcessorBuilder(client, listener); // This makes flush() blocking bulkProcessorBuilder.setConcurrentRequests(0); ... return bulkProcessorBuilder.build(); } {code} this field value was set to zero explicitly. So, all things seems to make sense, but i still wonder why the retry operation is not in the same thread as the bulk process execution, after i read the code, `bulkAsync` method might be the last puzzle. {code:java} @Override public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient client, BulkProcessor.Listener listener) { return BulkProcessor.builder(client::bulkAsync, listener); } {code} So, I hope someone can help to fix this problem, or given some suggestions, and also i can make a try to take it. Thanks a lot ! was: When i'm using es6 sink to index into es, bulk process with some exception catched, and i trying to reindex the document with the call `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` method, but things goes incorrect. The call thread stuck there, and with the thread dump, i saw the `bulkprocessor` object was locked by other thread. {code:java} public interface ActionRequestFailureHandler extends Serializable { void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable; } {code} After i read the code implemented in the `indexer.add(action)`, i find that `synchronized` is needed on each add operation. {code:java} private synchronized void internalAdd(DocWriteRequest request, @Nullable Object payload) { ensureOpen(); bulkRequest.add(request, payload); executeIfNeeded(); } {code} And, at i also noticed that `bulkprocessor` object would also locked in the bulk process thread. the bulk process operation is in the following code: {code:java} public void execute(BulkRequest bulkRequest, long executionId) { Runnable toRelease = () -> {}; boolean bulkRequestSetupSuccessful = false; try { listener.beforeBulk(executionId, bulkRequest); semaphore.acquire(); toRelease = semaphore::release; CountDownLatch latch = new CountDownLatch(1); retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse response) { try { listener.afterBulk(executionId, bulkRequest, response); } finally { semaphore.release(); latch.countDown(); } } @Override public void onFailure(Exception e) { try { listener.afterBulk(executionId, bulkRequest, e); } finally { semaphore.release(); latch.countDown(); } } }, Settings.EMPTY); bulkRequestSetupSuccessful = true; {color:red} if (concurrentRequests == 0) { latch.await(); }{color} } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e); listener.afterBulk(executionId, bulkRequest, e); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e); listener.afterBulk(executionId, bulkRequest, e); } finally { if (bulkRequestSetupSuccessful == false) { // if we fail on client.bulk() release the semaphore toRelease.run(); } } } {code} As the read line i marked above, i think, that's the reason why the retry operation thread was block, because the the bulk process thread never release the lock on `bulkprocessor`. and, i also trying to figure out why the field `concurrentRequests` was set to zero. And i saw the the initialize for bulkprocessor in class `ElasticsearchSinkBase`: {code:java} protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) { ... BulkProcessor.Builder bulkProcessorBuilder = callBridge.createBulkProcessorBuilder(client, listener); // This makes flush() blocking bulkProcessorBuilder.setConcurrentRequests(0); ... return bulkProcessorBuilder.build(); } {code} this field value was set to zero explicitly. So, all things seems to make sense, but i still wonder why the retry operation is not in the same thread as the bulk process execution, after i read the code, `bulkAsync` method might be the last puzzle. {code:java} @Override public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient client, BulkProcessor.Listener listener) { return BulkProcessor.builder(client::bulkAsync, listener); } {code} So, I hope someone can help to fix this problem, or given some suggestions, and also i can make a try to take it. Thanks a lot ! > ElasticSearch6Connector cause thread blocked when index failed with retry > ------------------------------------------------------------------------- > > Key: FLINK-11046 > URL: https://issues.apache.org/jira/browse/FLINK-11046 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector > Affects Versions: 1.6.2 > Reporter: luoguohao > Priority: Major > > When i'm using es6 sink to index into es, bulk process with some exception > catched, and i trying to reindex the document with the call > `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` > method, but things goes incorrect. The call thread stuck there, and with the > thread dump, i saw the `bulkprocessor` object was locked by other thread. > {code:java} > public interface ActionRequestFailureHandler extends Serializable { > void onFailure(ActionRequest action, Throwable failure, int restStatusCode, > RequestIndexer indexer) throws Throwable; > } > {code} > After i read the code implemented in the `indexer.add(action)`, i find that > `synchronized` is needed on each add operation. > {code:java} > private synchronized void internalAdd(DocWriteRequest request, @Nullable > Object payload) { > ensureOpen(); > bulkRequest.add(request, payload); > executeIfNeeded(); > } > {code} > And, at i also noticed that `bulkprocessor` object would also locked in the > bulk process thread. > the bulk process operation is in the following code: > {code:java} > public void execute(BulkRequest bulkRequest, long executionId) { > Runnable toRelease = () -> {}; > boolean bulkRequestSetupSuccessful = false; > try { > listener.beforeBulk(executionId, bulkRequest); > semaphore.acquire(); > toRelease = semaphore::release; > CountDownLatch latch = new CountDownLatch(1); > retry.withBackoff(consumer, bulkRequest, new > ActionListener<BulkResponse>() { > @Override > public void onResponse(BulkResponse response) { > try { > listener.afterBulk(executionId, bulkRequest, response); > } finally { > semaphore.release(); > latch.countDown(); > } > } > @Override > public void onFailure(Exception e) { > try { > listener.afterBulk(executionId, bulkRequest, e); > } finally { > semaphore.release(); > latch.countDown(); > } > } > }, Settings.EMPTY); > bulkRequestSetupSuccessful = true; > if (concurrentRequests == 0) { > latch.await(); > } > } catch (InterruptedException e) { > Thread.currentThread().interrupt(); > logger.info(() -> new ParameterizedMessage("Bulk request {} has been > cancelled.", executionId), e); > listener.afterBulk(executionId, bulkRequest, e); > } catch (Exception e) { > logger.warn(() -> new ParameterizedMessage("Failed to execute bulk > request {}.", executionId), e); > listener.afterBulk(executionId, bulkRequest, e); > } finally { > if (bulkRequestSetupSuccessful == false) { // if we fail on > client.bulk() release the semaphore > toRelease.run(); > } > } > } > {code} > As the read line i marked above, i think, that's the reason why the retry > operation thread was block, because the the bulk process thread never release > the lock on `bulkprocessor`. and, i also trying to figure out why the field > `concurrentRequests` was set to zero. And i saw the the initialize for > bulkprocessor in class `ElasticsearchSinkBase`: > {code:java} > protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) { > ... > BulkProcessor.Builder bulkProcessorBuilder = > callBridge.createBulkProcessorBuilder(client, listener); > // This makes flush() blocking > bulkProcessorBuilder.setConcurrentRequests(0); > > ... > return bulkProcessorBuilder.build(); > } > {code} > this field value was set to zero explicitly. So, all things seems to make > sense, but i still wonder why the retry operation is not in the same thread > as the bulk process execution, after i read the code, `bulkAsync` method > might be the last puzzle. > {code:java} > @Override > public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient > client, BulkProcessor.Listener listener) { > return BulkProcessor.builder(client::bulkAsync, listener); > } > {code} > So, I hope someone can help to fix this problem, or given some suggestions, > and also i can make a try to take it. > Thanks a lot ! -- This message was sent by Atlassian JIRA (v7.6.3#76005)