[ https://issues.apache.org/jira/browse/HIVE-23746?focusedWorklogId=463757&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-463757 ]
ASF GitHub Bot logged work on HIVE-23746: ----------------------------------------- Author: ASF GitHub Bot Created on: 29/Jul/20 07:29 Start Date: 29/Jul/20 07:29 Worklog Time Spent: 10m Work Description: rbalamohan commented on a change in pull request #1291: URL: https://github.com/apache/hive/pull/1291#discussion_r461277637 ########## File path: llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java ########## @@ -351,9 +403,81 @@ protected CallableRequest(REQUEST request, ExecuteRequestCallback<RESPONSE> call return callback; } + /** + * Override this method to make a synchronous request and wait for response. + * @return + * @throws Exception + */ public abstract RESPONSE call() throws Exception; } + /** + * Asynchronous request to a node. The request must override {@link #callInternal()} + * @param <REQUEST> + * @param <RESPONSE> + */ + protected static abstract class AsyncCallableRequest<REQUEST extends Message, RESPONSE extends Message> + extends NodeCallableRequest<REQUEST, RESPONSE> { + + private final long TIMEOUT = 60000; + private final long EXPONENTIAL_BACKOFF_START = 10; + private final int FAST_RETRIES = 5; + private AsyncGet<Message, Exception> getFuture; + + protected AsyncCallableRequest(LlapNodeId nodeId, REQUEST request, + ExecuteRequestCallback<RESPONSE> callback) { + super(nodeId, request, callback); + } + + @Override + public RESPONSE call() throws Exception { + boolean asyncMode = Client.isAsynchronousMode(); + long deadline = System.currentTimeMillis() + TIMEOUT; + int numRetries = 0; + long nextBackoffMs = EXPONENTIAL_BACKOFF_START; + try { + Client.setAsynchronousMode(true); + boolean sent = false; + while (!sent) { + try { + callInternal(); + sent = true; + } catch (Exception ex) { + if (ex instanceof ServiceException && ex.getCause() != null + && ex.getCause() instanceof AsyncCallLimitExceededException) { + numRetries++; + if (numRetries >= FAST_RETRIES) { + Thread.sleep(nextBackoffMs); + if (System.currentTimeMillis() > deadline) { + throw new HiveException("Async request timed out in " + TIMEOUT + " ms.", ex.getCause()); + } + numRetries = 0; + nextBackoffMs = nextBackoffMs * 2; + } + if (LOG.isTraceEnabled()) { + LOG.trace("Async call limit exceeded.", ex.getCause()); + } + } else { + throw ex; + } + } + } + getFuture = ProtobufRpcEngine.getAsyncReturnMessage(); Review comment: Rename getFuture? ########## File path: llap-common/src/java/org/apache/hadoop/hive/llap/AsyncResponseHandler.java ########## @@ -0,0 +1,126 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import com.google.protobuf.Message; +import org.apache.hadoop.util.concurrent.AsyncGet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class AsyncResponseHandler { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncResponseHandler.class); + + private final AtomicBoolean isShutdown = new AtomicBoolean(false); + + private final AsyncPbRpcProxy.RequestManager requestManager; + private final ExecutorService responseWaitingService = Executors.newSingleThreadExecutor(); + private final ExecutorService responseExecutorService = Executors.newSingleThreadExecutor(); + private final Queue<AsyncPbRpcProxy.AsyncCallableRequest<Message, Message>> asyncResponseFutureQueue = new ConcurrentLinkedDeque<>(); + + public AsyncResponseHandler(AsyncPbRpcProxy.RequestManager requestManager) { + this.requestManager = requestManager; + } + + public void start() { + responseWaitingService.submit(new AsyncResponseHandlerRunnable()); + } + + public void addToAsyncResponseFutureQueue(AsyncPbRpcProxy.AsyncCallableRequest<Message, Message> request) { + asyncResponseFutureQueue.add(request); + } + + public void shutdown() { + isShutdown.set(true); + responseExecutorService.shutdown(); + responseWaitingService.shutdown(); + } + + private final class AsyncResponseHandlerRunnable implements Runnable { + + private final List<AsyncPbRpcProxy.AsyncCallableRequest<Message, Message>> asyncResponseFutures = new ArrayList<>(); + + @Override + public void run() { + while (!isShutdown.get()) { Review comment: Isn't it a tight while loop? As in, it will consume 1 CPU @ 100% all the time if empty. Can you plz fix that? ########## File path: llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java ########## @@ -163,6 +173,7 @@ public void requestFinished(LlapNodeId nodeId) { public void shutdown() { if (!isShutdown.getAndSet(true)) { + asyncResponseHandler.shutdown(); Review comment: shutdownNow to ensure immediate shutdown? ########## File path: llap-common/src/java/org/apache/hadoop/hive/llap/AsyncResponseHandler.java ########## @@ -0,0 +1,126 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import com.google.protobuf.Message; +import org.apache.hadoop.util.concurrent.AsyncGet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class AsyncResponseHandler { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncResponseHandler.class); + + private final AtomicBoolean isShutdown = new AtomicBoolean(false); + + private final AsyncPbRpcProxy.RequestManager requestManager; + private final ExecutorService responseWaitingService = Executors.newSingleThreadExecutor(); + private final ExecutorService responseExecutorService = Executors.newSingleThreadExecutor(); Review comment: Is responseExecutorService needed, since the callback is lightweight? ########## File path: llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java ########## @@ -351,9 +403,81 @@ protected CallableRequest(REQUEST request, ExecuteRequestCallback<RESPONSE> call return callback; } + /** + * Override this method to make a synchronous request and wait for response. + * @return + * @throws Exception + */ public abstract RESPONSE call() throws Exception; } + /** + * Asynchronous request to a node. The request must override {@link #callInternal()} + * @param <REQUEST> + * @param <RESPONSE> + */ + protected static abstract class AsyncCallableRequest<REQUEST extends Message, RESPONSE extends Message> + extends NodeCallableRequest<REQUEST, RESPONSE> { + + private final long TIMEOUT = 60000; + private final long EXPONENTIAL_BACKOFF_START = 10; + private final int FAST_RETRIES = 5; + private AsyncGet<Message, Exception> getFuture; + + protected AsyncCallableRequest(LlapNodeId nodeId, REQUEST request, + ExecuteRequestCallback<RESPONSE> callback) { + super(nodeId, request, callback); + } + + @Override + public RESPONSE call() throws Exception { + boolean asyncMode = Client.isAsynchronousMode(); + long deadline = System.currentTimeMillis() + TIMEOUT; + int numRetries = 0; + long nextBackoffMs = EXPONENTIAL_BACKOFF_START; Review comment: Would be good to tweak AsyncCallableRequest backoff strategy? May be start with 1 or 2? Also, rename to BACKOFF_START if it is not exponential. If the async call backs have reached 100 requests, server would start backing off. As per current strategy, it would have fast retries for 5 times and start with 10,20,40,80,120 seconds. This can have significant delays for later requests. ########## File path: llap-common/src/java/org/apache/hadoop/hive/llap/AsyncResponseHandler.java ########## @@ -0,0 +1,114 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import com.google.protobuf.Message; +import org.apache.hadoop.hive.llap.AsyncPbRpcProxy.AsyncCallableRequest; +import org.apache.hadoop.util.concurrent.AsyncGet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class AsyncResponseHandler { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncResponseHandler.class); + + private final AtomicBoolean isShutdown = new AtomicBoolean(false); + + private final AsyncPbRpcProxy.RequestManager requestManager; + private final ExecutorService responseWaitingService = Executors.newSingleThreadExecutor(); + private final LinkedBlockingDeque<AsyncCallableRequest<Message, Message>> + incomingResponseFutures = new LinkedBlockingDeque<>(); + + public AsyncResponseHandler(AsyncPbRpcProxy.RequestManager requestManager) { + this.requestManager = requestManager; + } + + public void start() { + responseWaitingService.submit(new AsyncResponseHandlerRunnable()); + } + + public void addToAsyncResponseFutureQueue(AsyncCallableRequest<Message, Message> request) { + incomingResponseFutures.add(request); + } + + public void shutdownNow() { + isShutdown.set(true); + responseWaitingService.shutdownNow(); + } + + private final class AsyncResponseHandlerRunnable implements Runnable { + + private final List<AsyncCallableRequest<Message, Message>> responseFuturesQueue = new ArrayList<>(); + + @Override + public void run() { + while (!isShutdown.get()) { + Iterator<AsyncCallableRequest<Message, Message>> iterator = responseFuturesQueue.iterator(); + while (iterator.hasNext()) { + AsyncCallableRequest<Message, Message> request = iterator.next(); + AsyncGet<Message, Exception> responseFuture = request.getResponseFuture(); + if (responseFuture != null && responseFuture.isDone()) { + try { + iterator.remove(); + LlapNodeId nodeId = request.getNodeId(); + // since isDone is true, getFuture.get should return immediately + try { + Message remoteValue = responseFuture.get(-1, TimeUnit.MILLISECONDS); + if (remoteValue instanceof Throwable) { + request.getCallback().indicateError((Throwable) remoteValue); + } else { + request.getCallback().setResponse(remoteValue); + } + } catch (Exception e) { + request.getCallback().indicateError(e); + } finally { + requestManager.requestFinished(nodeId); + } + } catch (Throwable e) { + LOG.warn("ResponseDispatcher caught", e); + } + } + } + try { + if (responseFuturesQueue.isEmpty()) { + // there are no more futures to hear from, just block on incoming futures + AsyncCallableRequest<Message, Message> request = incomingResponseFutures.poll(200, TimeUnit.MILLISECONDS); Review comment: Instead of 200 ms polling, can you check with wait/notify or blocking DS? (LinkedBlockingDeque can be tried out with sync). ########## File path: llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java ########## @@ -351,9 +403,81 @@ protected CallableRequest(REQUEST request, ExecuteRequestCallback<RESPONSE> call return callback; } + /** + * Override this method to make a synchronous request and wait for response. + * @return + * @throws Exception + */ public abstract RESPONSE call() throws Exception; } + /** + * Asynchronous request to a node. The request must override {@link #callInternal()} + * @param <REQUEST> + * @param <RESPONSE> + */ + protected static abstract class AsyncCallableRequest<REQUEST extends Message, RESPONSE extends Message> + extends NodeCallableRequest<REQUEST, RESPONSE> { + + private final long TIMEOUT = 60000; + private final long EXPONENTIAL_BACKOFF_START = 10; + private final int FAST_RETRIES = 5; + private AsyncGet<Message, Exception> getFuture; + + protected AsyncCallableRequest(LlapNodeId nodeId, REQUEST request, + ExecuteRequestCallback<RESPONSE> callback) { + super(nodeId, request, callback); + } + + @Override + public RESPONSE call() throws Exception { + boolean asyncMode = Client.isAsynchronousMode(); + long deadline = System.currentTimeMillis() + TIMEOUT; + int numRetries = 0; + long nextBackoffMs = EXPONENTIAL_BACKOFF_START; Review comment: Sure. milliseconds is fine. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 463757) Time Spent: 0.5h (was: 20m) > Send task attempts async from AM to daemons > ------------------------------------------- > > Key: HIVE-23746 > URL: https://issues.apache.org/jira/browse/HIVE-23746 > Project: Hive > Issue Type: Sub-task > Components: llap > Reporter: Mustafa Iman > Assignee: Mustafa Iman > Priority: Major > Labels: pull-request-available > Time Spent: 0.5h > Remaining Estimate: 0h > > LlapTaskCommunicator uses sync client to send task attempts. There are fixed > number of communication threads (10 by default). This causes unneccessary > delays when there are enough free execution slots in daemons but they do not > receive all the tasks because of this bottleneck. LlapTaskCommunicator can > use an async client to pass these tasks to daemons. -- This message was sent by Atlassian Jira (v8.3.4#803005)