[ https://issues.apache.org/jira/browse/HIVE-27078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17904015#comment-17904015 ]
Seonggon Namgung commented on HIVE-27078: ----------------------------------------- My investigation shows that this problem occurs due to a bug in Tez task scheduling. Specifically, it seems that Tez erroneously creates InputSpec objects before the final call to reconfigureVertex() is made (in the vertex that decreases its parallelism). So, this problem should be fixed on the Tez side. To reproduce this problem, try the attached q file. The execution gets stuck precisely for the reason described in this JIRA. {code:java} create table source_table2(date_col date, string_col string, decimal_col decimal(38,0)) clustered by (decimal_col) into 7 buckets; insert into table source_table2 values ('2022-08-30', 'pipeline', '50000000000000000005905545593'), ('2022-08-16', 'pipeline', '50000000000000000005905545593'), ('2022-09-01', 'pipeline', '50000000000000000006008686831'), ('2022-08-30', 'pipeline', '50000000000000000005992620837'), ('2022-09-01', 'pipeline', '50000000000000000005992620837'), ('2022-09-01', 'pipeline', '50000000000000000005992621067'), ('2022-08-30', 'pipeline', '50000000000000000005992621067'); create table target_table2(date_col date, string_col string, decimal_col decimal(38,0)) clustered by (decimal_col) into 7 buckets; insert into table target_table2 values ('2017-05-17', 'pipeline', '50000000000000000000441610525'), ('2018-12-20', 'pipeline', '50000000000000000001048981030'), ('2020-06-30', 'pipeline', '50000000000000000002332575516'), ('2021-08-16', 'pipeline', '50000000000000000003897973989'), ('2017-06-06', 'pipeline', '50000000000000000000449148729'), ('2017-09-08', 'pipeline', '50000000000000000000525378314'), ('2022-08-30', 'pipeline', '50000000000000000005905545593'), ('2022-08-16', 'pipeline', '50000000000000000005905545593'), ('2018-05-03', 'pipeline', '50000000000000000000750826355'), ('2020-01-10', 'pipeline', '50000000000000000001816579677'), ('2021-11-01', 'pipeline', '50000000000000000004269423714'), ('2017-11-07', 'pipeline', '50000000000000000000585901787'), ('2019-10-15', 'pipeline', '50000000000000000001598843430'), ('2020-04-01', 'pipeline', '50000000000000000002035795461'), ('2020-02-24', 'pipeline', '50000000000000000001932600185'), ('2020-04-27', 'pipeline', '50000000000000000002108160849'), ('2016-07-05', 'pipeline', '50000000000000000000054405114'), ('2020-06-02', 'pipeline', '50000000000000000002234387967'), ('2020-08-21', 'pipeline', '50000000000000000002529168758'), ('2021-02-17', 'pipeline', '50000000000000000003158511687'); set hive.auto.convert.join=true; set hive.optimize.dynamic.partition.hashjoin=false; set hive.convert.join.bucket.mapjoin.tez=true; set hive.vectorized.execution.enabled=false; set hive.tez.auto.reducer.parallelism=true; set hive.tez.min.partition.factor=12; set hive.tez.max.partition.factor=50; select * from target_table2 t inner join (select distinct date_col, 'pipeline' string_col, decimal_col from source_table2 where coalesce(decimal_col,'') = '50000000000000000005905545593') s on s.date_col = t.date_col AND s.string_col = t.string_col AND s.decimal_col = t.decimal_col; {code} > Bucket Map Join can hang if the source vertex parallelism is changed by > reducer autoparallelism > ----------------------------------------------------------------------------------------------- > > Key: HIVE-27078 > URL: https://issues.apache.org/jira/browse/HIVE-27078 > Project: Hive > Issue Type: Bug > Reporter: László Bodor > Priority: Major > > Considering this DAG: > {code} > | Map 1 <- Reducer 3 (CUSTOM_EDGE) | > | Map 2 <- Map 4 (CUSTOM_EDGE) | > | Map 5 <- Map 1 (CUSTOM_EDGE) | > | Reducer 3 <- Map 2 (SIMPLE_EDGE) > {code} > this can be simplified further, just picked from a customer query, the > problematic vertices and edge is: > {code} > | Map 1 <- Reducer 3 (CUSTOM_EDGE) | > {code} > Reducer 3 started scheduled with 20 tasks, and later it's decided by auto > reducer parallelism that only 4 tasks are needed: > {code} > 2023-02-07 13:00:36,078 [INFO] [App Shared Pool - #4] > |vertexmanager.ShuffleVertexManager|: Reducing auto parallelism for vertex: > Reducer 3 from 20 to 4 > {code} > in this case, Map 1 can hang as it still expects 20 inputs: > {code} > ---------------------------------------------------------------------------------------------- > VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING > FAILED KILLED > ---------------------------------------------------------------------------------------------- > Map 4 .......... container SUCCEEDED 16 16 0 0 > 0 0 > Map 2 .......... container SUCCEEDED 48 48 0 0 > 0 0 > Reducer 3 ...... container SUCCEEDED 4 4 0 0 > 0 0 > Map 1 container RUNNING 192 0 13 179 > 0 0 > Map 5 container INITED 241 0 0 241 > 0 0 > ---------------------------------------------------------------------------------------------- > VERTICES: 03/05 [===>>-----------------------] 13% ELAPSED TIME: 901.18 s > ---------------------------------------------------------------------------------------------- > {code} > in logs it's like: > {code} > 2022-12-08 09:42:26,845 [INFO] [I/O Setup 2 Start: {Reducer 3}] > |impl.ShuffleManager|: Reducer_3: numInputs=20, > compressionCodec=org.apache.hadoop.io.compress.SnappyCodec, numFetchers=10, > ifileBufferSize=4096, ifileReadAheadEnabled=true, > ifileReadAheadLength=4194304, localDiskFetchEnabled=true, > sharedFetchEnabled=false, keepAlive=true, keepAliveMaxConnections=20, > connectionTimeout=180000, readTimeout=180000, bufferSize=8192, > bufferSize=8192, maxTaskOutputAtOnce=20, asyncHttp=false > ...receives the input event: > 2022-12-08 09:42:27,134 [INFO] [TaskHeartbeatThread] |task.TaskReporter|: > Routing events from heartbeat response to task, > currentTaskAttemptId=attempt_1670331499491_1408_1_03_000039_0, eventCount=1 > fromEventId=0 nextFromEventId=0 > ...but then it hangs while waiting for further inputs: > "TezChild" #29 daemon prio=5 os_prio=0 tid=0x00007f3fae141000 nid=0x9581 > waiting on condition [0x00007f3f737ba000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x000000071ad90a00> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at > java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:492) > at > java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:680) > at > org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.getNextInput(ShuffleManager.java:1033) > at > org.apache.tez.runtime.library.common.readers.UnorderedKVReader.moveToNextInput(UnorderedKVReader.java:202) > at > org.apache.tez.runtime.library.common.readers.UnorderedKVReader.next(UnorderedKVReader.java:125) > at > org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableLoader.load(VectorMapJoinFastHashTableLoader.java:129) > at > org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTableInternal(MapJoinOperator.java:385) > at > org.apache.hadoop.hive.ql.exec.MapJoinOperator.loadHashTable(MapJoinOperator.java:454) > at > org.apache.hadoop.hive.ql.exec.MapJoinOperator.initializeOp(MapJoinOperator.java:241) > at > org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinCommonOperator.initializeOp(VectorMapJoinCommonOperator.java:555) > at > org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinGenerateResultOperator.initializeOp(VectorMapJoinGenerateResultOperator.java:111) > at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:374) > at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:571) > at > org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:523) > at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:384) > at > org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.init(MapRecordProcessor.java:353) > at > org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:268) > at > org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:252) > at > org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:374) > at > org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:75) > at > org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:62) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898) > at > org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:62) > at > org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:38) > at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36) > at > com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) > at > com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69) > at > com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > {code} > we can temporarily (as a quick workaround) disable auto reducer parallelism > on a vertex if it's a source of a bucket map join (here: Reducer 3) -- This message was sent by Atlassian Jira (v8.20.10#820010)