[ 
https://issues.apache.org/jira/browse/HIVE-24626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

László Bodor updated HIVE-24626:
--------------------------------
    Description: 
The root cause is that the readers cannot queue.offer items to full queues, 
which belong to consumers that are blocked on other consumers. 
Scenario is like below:
{code}
----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  
FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 2 ......          llap       RUNNING      3          2        1        0    
   0       0
Map 1                 llap       RUNNING    676          0      119      557    
   0       0
Map 3                 llap       RUNNING    108          0       21       87    
   0      21
Reducer 4             llap        INITED      1          0        0        1    
   0       0
Map 5                 llap        INITED    108          0        0      108    
   0       0
Reducer 6             llap        INITED      4          0        0        4    
   0       0
Reducer 7             llap        INITED      1          0        0        1    
   0       0
----------------------------------------------------------------------------------------------
VERTICES: 00/07  [>>--------------------------] 0%    ELAPSED TIME: 3489.83 s
----------------------------------------------------------------------------------------------
{code}

Map2 is MAPJOINed to Map1. In an LLAP daemon, the forever running Map2 task is 
blocked on nextCvb:
{code}
"TezTR-886270_0_1_0_1_0" #154 daemon prio=5 os_prio=0 tid=0x00007f1b88348000 
nid=0x147 waiting on condition [0x00007f0ce005d000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f0de8025e00> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at 
java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.nextCvb(LlapRecordReader.java:517)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:372)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:82)
        at 
org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:362)
        at 
org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:79)
        at 
org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:33)
        at 
org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:117)
        at 
org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.next(TezGroupedSplitsInputFormat.java:151)
        at 
org.apache.tez.mapreduce.lib.MRReaderMapred.next(MRReaderMapred.java:115)
        at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68)
        at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run(MapRecordProcessor.java:437)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:267)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250)
        at 
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
        at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
        at 
org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}

while all the elevator threads are blocked here (all of them tries to read for 
Map1):
{code}
"IO-Elevator-Thread-11" #408 daemon prio=5 os_prio=0 tid=0x00007f0cddc48800 
nid=0x267 waiting on condition [0x00007f0cd7af8000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f0e3095c480> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at 
java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:379)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.enqueueInternal(LlapRecordReader.java:607)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:591)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:82)
        at 
org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:268)
        at 
org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:79)
        at 
org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:122)
        at 
org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:42)
        at 
org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl.readEncodedColumns(EncodedReaderImpl.java:535)
        at 
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.performDataRead(OrcEncodedDataReader.java:430)
        at 
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:279)
        at 
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:276)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
        at 
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:276)
        at 
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:117)
        at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
        at 
org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:88)
        at 
org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:73)
        at 
org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}

the problem here is that, as far as I can see, all the elevator threads try to 
offer to LlapRecordReaders that belong to Map 1, however, Map 1 tasks cannot 
progress beyond a certain point, because wait for Map 2's input
this is because the queues of LlapRecordReader instances of Map1 are full, 
otherwise LlapRecordReader.enqueueInternal should be able to offer the item 
from the elevator thread

In my example:
Map2's queue limit is: 50000 (Map2: mapjoin source)
Map1's queue limit is: 6931 (Map1: mapjoin target)
the limits are calculated according to data characteristics in 
LlapRecordReader.determineQueueLimit

in my case, io threads for Map1 reached their queue limit, and cannot offer new 
items, so holding IO elevator threads, which cannot be used for reading for 
more important Map2 tasks, so Map1 tasks will eventually hang here:
{code}
"TezTR-886270_0_1_1_1_0" #281 daemon prio=5 os_prio=0 tid=0x00007f1b80366800 
nid=0x1ca waiting on condition [0x00007f0cdb6d9000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f0de90049a8> (a 
java.util.concurrent.FutureTask)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:426)
        at java.util.concurrent.FutureTask.get(FutureTask.java:204)
        at 
org.apache.hadoop.hive.ql.exec.Operator.completeInitialization(Operator.java:436)
        at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:399)
        at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:572)
        at 
org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:524)
        at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
        at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:266)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250)
        at 
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
        at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
        at 
org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}
Operator.asyncInitOperations typically contains a dependency on map join input.

in my example Map2 reads small table, like microstrategy.lu_item, and Map1 
reads a larger table microstrategy.order_detail

this deadlock theoretically can be solved by:
1. IO elevator threads (reading for Map1) opt out as they cannot offer new items
2. the freed elevator threads can be used for Map2
3. Map2 can finish its work, and let Map1 proceed by providing the mapjoin data 
to it

  was:
The root cause is that the readers cannot queue.offer items to full queues, 
which belong to consumers that are blocked on other consumers. 
Scenario is like below:
{code}
----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  
FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 2 ......          llap       RUNNING      3          2        1        0    
   0       0
Map 1                 llap       RUNNING    676          0      119      557    
   0       0
Map 3                 llap       RUNNING    108          0       21       87    
   0      21
Reducer 4             llap        INITED      1          0        0        1    
   0       0
Map 5                 llap        INITED    108          0        0      108    
   0       0
Reducer 6             llap        INITED      4          0        0        4    
   0       0
Reducer 7             llap        INITED      1          0        0        1    
   0       0
----------------------------------------------------------------------------------------------
VERTICES: 00/07  [>>--------------------------] 0%    ELAPSED TIME: 3489.83 s
----------------------------------------------------------------------------------------------
{code}

Map2 is MAPJOINed to Map1. In an LLAP daemon, the forever running Map2 task is 
blocked on nextCvb:
{code}
"TezTR-886270_0_1_0_1_0" #154 daemon prio=5 os_prio=0 tid=0x00007f1b88348000 
nid=0x147 waiting on condition [0x00007f0ce005d000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f0de8025e00> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at 
java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.nextCvb(LlapRecordReader.java:517)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:372)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:82)
        at 
org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:362)
        at 
org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:79)
        at 
org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:33)
        at 
org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:117)
        at 
org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.next(TezGroupedSplitsInputFormat.java:151)
        at 
org.apache.tez.mapreduce.lib.MRReaderMapred.next(MRReaderMapred.java:115)
        at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68)
        at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run(MapRecordProcessor.java:437)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:267)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250)
        at 
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
        at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
        at 
org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}

while all the elevator threads are blocked here (all of them tries to read for 
Map1):
{code}
"IO-Elevator-Thread-11" #408 daemon prio=5 os_prio=0 tid=0x00007f0cddc48800 
nid=0x267 waiting on condition [0x00007f0cd7af8000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f0e3095c480> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at 
java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:379)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.enqueueInternal(LlapRecordReader.java:607)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:591)
        at 
org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:82)
        at 
org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:268)
        at 
org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:79)
        at 
org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:122)
        at 
org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:42)
        at 
org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl.readEncodedColumns(EncodedReaderImpl.java:535)
        at 
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.performDataRead(OrcEncodedDataReader.java:430)
        at 
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:279)
        at 
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:276)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
        at 
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:276)
        at 
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:117)
        at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
        at 
org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:88)
        at 
org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:73)
        at 
org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}

the problem here is that, as far as I can see, all the elevator threads try to 
offer to LlapRecordReaders that belong to Map 1, however, Map 1 tasks cannot 
progress beyond a certain point, because wait for Map 2's input
this is because the queues of LlapRecordReader instances of Map1 are full, 
otherwise LlapRecordReader.enqueueInternal should be able to offer the item 
from the elevator thread

In my example:
Map2's queue limit is: 50000 (Map2: mapjoin source)
Map1's queue limit is: 6931 (Map1: mapjoin target)
the limits are calculated according to data characteristics in 
LlapRecordReader.determineQueueLimit

in my case, io threads for Map1 reached their queue limit, and cannot offer new 
items, so holding IO elevator threads, which cannot be used for reading for 
more important Map2 tasks, so Map1 tasks will eventually hang here:
{code}
"TezTR-886270_0_1_1_1_0" #281 daemon prio=5 os_prio=0 tid=0x00007f1b80366800 
nid=0x1ca waiting on condition [0x00007f0cdb6d9000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f0de90049a8> (a 
java.util.concurrent.FutureTask)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:426)
        at java.util.concurrent.FutureTask.get(FutureTask.java:204)
        at 
org.apache.hadoop.hive.ql.exec.Operator.completeInitialization(Operator.java:436)
        at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:399)
        at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:572)
        at 
org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:524)
        at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
        at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:266)
        at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250)
        at 
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
        at 
org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
        at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
        at 
org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{code}
Operator.asyncInitOperations typically contains a dependency on map join input.

in my example Map2 reads small table, like microstrategy.lu_item, and Map1 
reads a larger table microstrategy.order_detail


> LLAP: reader threads could be starvated if all IO elevator threads are busy 
> to enqueue to another readers with full queue
> -------------------------------------------------------------------------------------------------------------------------
>
>                 Key: HIVE-24626
>                 URL: https://issues.apache.org/jira/browse/HIVE-24626
>             Project: Hive
>          Issue Type: Bug
>            Reporter: László Bodor
>            Assignee: László Bodor
>            Priority: Major
>         Attachments: executor_stack_cache_none_12_io_threads.log
>
>
> The root cause is that the readers cannot queue.offer items to full queues, 
> which belong to consumers that are blocked on other consumers. 
> Scenario is like below:
> {code}
> ----------------------------------------------------------------------------------------------
>         VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  
> FAILED  KILLED
> ----------------------------------------------------------------------------------------------
> Map 2 ......          llap       RUNNING      3          2        1        0  
>      0       0
> Map 1                 llap       RUNNING    676          0      119      557  
>      0       0
> Map 3                 llap       RUNNING    108          0       21       87  
>      0      21
> Reducer 4             llap        INITED      1          0        0        1  
>      0       0
> Map 5                 llap        INITED    108          0        0      108  
>      0       0
> Reducer 6             llap        INITED      4          0        0        4  
>      0       0
> Reducer 7             llap        INITED      1          0        0        1  
>      0       0
> ----------------------------------------------------------------------------------------------
> VERTICES: 00/07  [>>--------------------------] 0%    ELAPSED TIME: 3489.83 s
> ----------------------------------------------------------------------------------------------
> {code}
> Map2 is MAPJOINed to Map1. In an LLAP daemon, the forever running Map2 task 
> is blocked on nextCvb:
> {code}
> "TezTR-886270_0_1_0_1_0" #154 daemon prio=5 os_prio=0 tid=0x00007f1b88348000 
> nid=0x147 waiting on condition [0x00007f0ce005d000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00007f0de8025e00> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>       at 
> java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
>       at 
> org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.nextCvb(LlapRecordReader.java:517)
>       at 
> org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:372)
>       at 
> org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.next(LlapRecordReader.java:82)
>       at 
> org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:362)
>       at 
> org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:79)
>       at 
> org.apache.hadoop.hive.ql.io.HiveRecordReader.doNext(HiveRecordReader.java:33)
>       at 
> org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:117)
>       at 
> org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.next(TezGroupedSplitsInputFormat.java:151)
>       at 
> org.apache.tez.mapreduce.lib.MRReaderMapred.next(MRReaderMapred.java:115)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run(MapRecordProcessor.java:437)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:267)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250)
>       at 
> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
>       at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
>       at 
> org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> while all the elevator threads are blocked here (all of them tries to read 
> for Map1):
> {code}
> "IO-Elevator-Thread-11" #408 daemon prio=5 os_prio=0 tid=0x00007f0cddc48800 
> nid=0x267 waiting on condition [0x00007f0cd7af8000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00007f0e3095c480> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>       at 
> java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:379)
>       at 
> org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.enqueueInternal(LlapRecordReader.java:607)
>       at 
> org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:591)
>       at 
> org.apache.hadoop.hive.llap.io.api.impl.LlapRecordReader.consumeData(LlapRecordReader.java:82)
>       at 
> org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:268)
>       at 
> org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer.decodeBatch(OrcEncodedDataConsumer.java:79)
>       at 
> org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:122)
>       at 
> org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.consumeData(EncodedDataConsumer.java:42)
>       at 
> org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReaderImpl.readEncodedColumns(EncodedReaderImpl.java:535)
>       at 
> org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.performDataRead(OrcEncodedDataReader.java:430)
>       at 
> org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:279)
>       at 
> org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader$4.run(OrcEncodedDataReader.java:276)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
>       at 
> org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:276)
>       at 
> org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.callInternal(OrcEncodedDataReader.java:117)
>       at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
>       at 
> org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:88)
>       at 
> org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer$CpuRecordingCallable.call(EncodedDataConsumer.java:73)
>       at 
> org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> the problem here is that, as far as I can see, all the elevator threads try 
> to offer to LlapRecordReaders that belong to Map 1, however, Map 1 tasks 
> cannot progress beyond a certain point, because wait for Map 2's input
> this is because the queues of LlapRecordReader instances of Map1 are full, 
> otherwise LlapRecordReader.enqueueInternal should be able to offer the item 
> from the elevator thread
> In my example:
> Map2's queue limit is: 50000 (Map2: mapjoin source)
> Map1's queue limit is: 6931 (Map1: mapjoin target)
> the limits are calculated according to data characteristics in 
> LlapRecordReader.determineQueueLimit
> in my case, io threads for Map1 reached their queue limit, and cannot offer 
> new items, so holding IO elevator threads, which cannot be used for reading 
> for more important Map2 tasks, so Map1 tasks will eventually hang here:
> {code}
> "TezTR-886270_0_1_1_1_0" #281 daemon prio=5 os_prio=0 tid=0x00007f1b80366800 
> nid=0x1ca waiting on condition [0x00007f0cdb6d9000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00007f0de90049a8> (a 
> java.util.concurrent.FutureTask)
>       at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:426)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:204)
>       at 
> org.apache.hadoop.hive.ql.exec.Operator.completeInitialization(Operator.java:436)
>       at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:399)
>       at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:572)
>       at 
> org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:524)
>       at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:385)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:266)
>       at 
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:250)
>       at 
> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38)
>       at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
>       at 
> org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool$WrappedCallable.call(StatsRecordingThreadPool.java:118)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> Operator.asyncInitOperations typically contains a dependency on map join 
> input.
> in my example Map2 reads small table, like microstrategy.lu_item, and Map1 
> reads a larger table microstrategy.order_detail
> this deadlock theoretically can be solved by:
> 1. IO elevator threads (reading for Map1) opt out as they cannot offer new 
> items
> 2. the freed elevator threads can be used for Map2
> 3. Map2 can finish its work, and let Map1 proceed by providing the mapjoin 
> data to it



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to