[ https://issues.apache.org/jira/browse/HIVE-25582?focusedWorklogId=683234&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-683234 ]
ASF GitHub Bot logged work on HIVE-25582: ----------------------------------------- Author: ASF GitHub Bot Created on: 18/Nov/21 11:15 Start Date: 18/Nov/21 11:15 Worklog Time Spent: 10m Work Description: dengzhhu653 commented on a change in pull request #2693: URL: https://github.com/apache/hive/pull/2693#discussion_r752141014 ########## File path: ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java ########## @@ -18,82 +18,66 @@ package org.apache.hadoop.hive.ql.exec.mr; +import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hadoop.hive.ql.metadata.HiveException; /** - * ObjectCache. No-op implementation on MR we don't have a means to reuse - * Objects between runs of the same task. + * ObjectCache. Simple implementation on MR we don't have a means to reuse + * Objects between runs of the same task, this acts as a local cache. * */ public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache { private static final Logger LOG = LoggerFactory.getLogger(ObjectCache.class.getName()); + private final Map<String, Object> cache = new ConcurrentHashMap<>(); + + private static ExecutorService staticPool = Executors.newCachedThreadPool(); + @Override public void release(String key) { // nothing to do LOG.debug("{} no longer needed", key); + cache.remove(key); } @Override public <T> T retrieve(String key) throws HiveException { - return retrieve(key, null); + return (T) cache.get(key); } @Override public <T> T retrieve(String key, Callable<T> fn) throws HiveException { + T value = (T) cache.get(key); + if (value != null || fn == null) { + return value; + } try { LOG.debug("Creating {}", key); - return fn.call(); + value = fn.call(); } catch (Exception e) { throw new HiveException(e); } + T previous = (T) cache.putIfAbsent(key, value); + return previous != null ? previous : value; } @Override public <T> Future<T> retrieveAsync(String key, Callable<T> fn) throws HiveException { - final T value = retrieve(key, fn); - - return new Future<T>() { - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - return false; - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return true; - } - - @Override - public T get() throws InterruptedException, ExecutionException { - return value; - } - - @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, - TimeoutException { - return value; - } - }; + return staticPool.submit((Callable)() -> retrieve(key, fn)); Review comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 683234) Time Spent: 2h 20m (was: 2h 10m) > Empty result when using offset limit with MR > -------------------------------------------- > > Key: HIVE-25582 > URL: https://issues.apache.org/jira/browse/HIVE-25582 > Project: Hive > Issue Type: Bug > Components: Operators > Affects Versions: 4.0.0 > Reporter: Zhihua Deng > Assignee: Zhihua Deng > Priority: Major > Labels: pull-request-available > Time Spent: 2h 20m > Remaining Estimate: 0h > > The _mr.ObjectCache_ caches nothing, every time when the limit [retrieving > global counter from the > cache|https://github.com/apache/hive/blob/7b3ecf617a6d46f48a3b6f77e0339fd4ad95a420/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java#L150-L161], > a new AtomicInteger will be returned. This make offset _<= > currentCountForAllTasksInt_ always be evaluated to false, as _offset > 0_, > the operator will skip all rows. -- This message was sent by Atlassian Jira (v8.20.1#820001)