This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 6493060459a9192b05b95286d800841fcee12108 Author: Yubi Lee <[email protected]> AuthorDate: Thu Aug 22 03:40:43 2024 +0900 IMPALA-13311: Hive3 INSERT failed by ClassNotFoundException: org.apache.tez.runtime.api.Event correct TEZ_HOME when using apache tez use apache tez 0.10.2 and make hive compatible with it Change-Id: Ia278a87f92fedb96ec20608b5872facc55ae0a3c Reviewed-on: http://gerrit.cloudera.org:8080/21706 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- bin/bootstrap_toolchain.py | 9 +- bin/impala-config.sh | 5 +- testdata/bin/run-hive-server.sh | 2 +- testdata/cluster/hive/README | 5 + testdata/cluster/hive/patch4-HIVE-27336.diff | 258 +++++++++++++++++++++++++++ 5 files changed, 269 insertions(+), 10 deletions(-) diff --git a/bin/bootstrap_toolchain.py b/bin/bootstrap_toolchain.py index 42e5049b6..950156565 100755 --- a/bin/bootstrap_toolchain.py +++ b/bin/bootstrap_toolchain.py @@ -526,13 +526,6 @@ def get_hadoop_downloads(): archive_basename_tmpl="hbase-${version}-bin", unpack_directory_tmpl="hbase-${version}") - use_apache_ozone = os.environ["USE_APACHE_OZONE"] == "true" - if use_apache_ozone: - ozone = ApacheComponent("ozone", component_path_tmpl="ozone/${version}") - else: - ozone = CdpComponent("ozone") - - use_apache_hive = os.environ["USE_APACHE_HIVE"] == "true" if use_apache_hive: hive = ApacheComponent("hive", archive_basename_tmpl="apache-hive-${version}-bin") hive_src = ApacheComponent("hive", archive_basename_tmpl="apache-hive-${version}-src") @@ -564,10 +557,12 @@ def get_hadoop_downloads(): template_subs_in=template_subs) else: ranger = CdpComponent("ranger", archive_basename_tmpl="ranger-${version}-admin") + if use_apache_ozone: ozone = ApacheComponent("ozone", component_path_tmpl="ozone/${version}") else: ozone = CdpComponent("ozone") + use_override_hive = \ "HIVE_VERSION_OVERRIDE" in os.environ and os.environ["HIVE_VERSION_OVERRIDE"] != "" use_override_ranger = \ diff --git a/bin/impala-config.sh b/bin/impala-config.sh index eaaad8657..5252f8e45 100755 --- a/bin/impala-config.sh +++ b/bin/impala-config.sh @@ -252,7 +252,7 @@ export APACHE_ICEBERG_VERSION=1.4.3 export APACHE_KNOX_VERSION=1.6.1 export APACHE_PARQUET_VERSION=1.12.3 export APACHE_RANGER_VERSION=2.4.0 -export APACHE_TEZ_VERSION=0.10.3 +export APACHE_TEZ_VERSION=0.10.2 export APACHE_HIVE_VERSION=3.1.3 export APACHE_HIVE_STORAGE_API_VERSION=2.7.0 export APACHE_OZONE_VERSION=1.4.0 @@ -690,7 +690,7 @@ fi export HIVE_METASTORE_THRIFT_DIR=${HIVE_METASTORE_THRIFT_DIR_OVERRIDE:-\ "$HIVE_SRC_DIR/standalone-metastore/src/main/thrift"} if $USE_APACHE_TEZ; then - export TEZ_HOME="$APACHE_COMPONENTS_HOME/tez-${IMPALA_TEZ_VERSION}" + export TEZ_HOME="$APACHE_COMPONENTS_HOME/apache-tez-${IMPALA_TEZ_VERSION}-bin" else export TEZ_HOME="$CDP_COMPONENTS_HOME/tez-${IMPALA_TEZ_VERSION}-minimal" fi @@ -1161,6 +1161,7 @@ echo "IMPALA_HADOOP_VERSION = $IMPALA_HADOOP_VERSION" echo "IMPALA_AVRO_JAVA_VERSION= $IMPALA_AVRO_JAVA_VERSION" echo "IMPALA_PARQUET_VERSION = $IMPALA_PARQUET_VERSION" echo "IMPALA_HIVE_VERSION = $IMPALA_HIVE_VERSION" +echo "IMPALA_TEZ_VERSION = $IMPALA_TEZ_VERSION" echo "IMPALA_HBASE_VERSION = $IMPALA_HBASE_VERSION" echo "IMPALA_OZONE_VERSION = $IMPALA_OZONE_VERSION" echo "IMPALA_HUDI_VERSION = $IMPALA_HUDI_VERSION" diff --git a/testdata/bin/run-hive-server.sh b/testdata/bin/run-hive-server.sh index 9da4fd015..f64d4cdd5 100755 --- a/testdata/bin/run-hive-server.sh +++ b/testdata/bin/run-hive-server.sh @@ -140,7 +140,7 @@ export HADOOP_CLASSPATH=${HADOOP_CLASSPATH}:${TEZ_HOME}/* # So, we'll be a bit choosy about what we add to the classpath here. for jar in $TEZ_HOME/lib/* ; do case $(basename $jar) in - commons-*|RoaringBitmap*) + commons-*|RoaringBitmap*|protobuf-java*) export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$jar ;; esac diff --git a/testdata/cluster/hive/README b/testdata/cluster/hive/README index 2347bbd77..0f591b35f 100644 --- a/testdata/cluster/hive/README +++ b/testdata/cluster/hive/README @@ -13,3 +13,8 @@ Update queries on non-bucketed + partitioned tables throws NPE patch3-HIVE-20067.diff: Fix failures in firing InsertEvent for on insert-only tables + +patch4-HIVE-27336.diff: +Upgrade Tez to 0.10.2 in Hive-3.X +- https://issues.apache.org/jira/browse/BIGTOP-4022 +- https://issues.apache.org/jira/browse/HIVE-27336 diff --git a/testdata/cluster/hive/patch4-HIVE-27336.diff b/testdata/cluster/hive/patch4-HIVE-27336.diff new file mode 100644 index 000000000..380a053f4 --- /dev/null +++ b/testdata/cluster/hive/patch4-HIVE-27336.diff @@ -0,0 +1,258 @@ +From 5f79f2a059534eaa63c0479a2142a250fa78c1e3 Mon Sep 17 00:00:00 2001 +From: amaruthappan <[email protected]> +Date: Mon, 8 May 2023 17:00:46 -0700 +Subject: [PATCH 1/2] =?UTF-8?q?HIVE-23190:=20LLAP:=20modify=20IndexCache?= + =?UTF-8?q?=20to=20pass=20filesystem=20object=20to=20TezSpillRecord=20(L?= + =?UTF-8?q?=C3=A1szl=C3=B3=20Bodor=20reviewed=20by=20Rajesh=20Balamohan)?= +MIME-Version: 1.0 +Content-Type: text/plain; charset=UTF-8 +Content-Transfer-Encoding: 8bit + +--- + .../hadoop/hive/llap/shufflehandler/IndexCache.java | 11 +++++++++++ + 1 file changed, 11 insertions(+) + +diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java +index 4de03f232d70..c7b986469f4a 100644 +--- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java ++++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java +@@ -25,6 +25,7 @@ + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.fs.FileSystem; + import org.apache.hadoop.fs.Path; + import org.apache.tez.runtime.library.common.Constants; + import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; +@@ -42,11 +43,21 @@ class IndexCache { + + private final LinkedBlockingQueue<String> queue = + new LinkedBlockingQueue<String>(); ++ private FileSystem fs; + + public IndexCache(Configuration conf) { + this.conf = conf; + totalMemoryAllowed = 10 * 1024 * 1024; + LOG.info("IndexCache created with max memory = " + totalMemoryAllowed); ++ initLocalFs(); ++ } ++ ++ private void initLocalFs() { ++ try { ++ this.fs = FileSystem.getLocal(conf).getRaw(); ++ } catch (IOException e) { ++ throw new RuntimeException(e); ++ } + } + + /** + +From 1edbe403ff424f91ed0cd1ae91eb39290b5beb7f Mon Sep 17 00:00:00 2001 +From: amaruthappan <[email protected]> +Date: Mon, 8 May 2023 16:58:51 -0700 +Subject: [PATCH 2/2] HIVE-27336: Upgrade Tez to 0.10.2 in Hive-3.X + +--- + .../hive/llap/daemon/impl/ContainerRunnerImpl.java | 4 ++-- + .../hadoop/hive/llap/daemon/impl/LlapTaskReporter.java | 2 +- + .../hive/llap/daemon/impl/TaskRunnerCallable.java | 2 +- + .../hadoop/hive/llap/shufflehandler/IndexCache.java | 2 +- + .../hive/llap/tezplugins/LlapTaskCommunicator.java | 10 +++++----- + .../hive/llap/tezplugins/LlapTaskSchedulerService.java | 8 ++++---- + .../hive/llap/tezplugins/TestLlapTaskCommunicator.java | 1 + + pom.xml | 2 +- + .../org/apache/hadoop/hive/ql/exec/tez/TezTask.java | 7 +++++++ + 9 files changed, 23 insertions(+), 15 deletions(-) + +diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +index ef5922ef41b6..95d601a8e2f3 100644 +--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ++++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +@@ -215,7 +215,7 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws + vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber()); + + // This is the start of container-annotated logging. +- final String dagId = attemptId.getTaskID().getVertexID().getDAGId().toString(); ++ final String dagId = attemptId.getDAGID().toString(); + final String queryId = vertex.getHiveQueryId(); + final String fragmentId = LlapTezUtils.stripAttemptPrefix(fragmentIdString); + MDC.put("dagId", dagId); +@@ -237,7 +237,7 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws + env.put(ApplicationConstants.Environment.USER.name(), vertex.getUser()); + + TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString(fragmentIdString); +- int dagIdentifier = taskAttemptId.getTaskID().getVertexID().getDAGId().getId(); ++ int dagIdentifier = taskAttemptId.getDAGID().getId(); + + QueryIdentifier queryIdentifier = new QueryIdentifier( + qIdProto.getApplicationIdString(), dagIdentifier); +diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java +index 33ade55ee1f5..cc7879cdecea 100644 +--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java ++++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java +@@ -291,7 +291,7 @@ private synchronized ResponseWrapper heartbeat(Collection<TezEvent> eventsArg) t + int fromPreRoutedEventId = task.getNextPreRoutedEventId(); + int maxEvents = Math.min(maxEventsToGet, task.getMaxEventsToHandle()); + TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, fromPreRoutedEventId, +- containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents); ++ containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents, 0); + if (LOG.isDebugEnabled()) { + LOG.debug("Sending heartbeat to AM, request=" + request); + } +diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +index 7f436e23264b..66f7c330f786 100644 +--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ++++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +@@ -327,7 +327,7 @@ private String constructThreadNameSuffix(TezTaskAttemptID taskAttemptId) { + StringBuilder sb = new StringBuilder(); + TezTaskID taskId = taskAttemptId.getTaskID(); + TezVertexID vertexId = taskId.getVertexID(); +- TezDAGID dagId = vertexId.getDAGId(); ++ TezDAGID dagId = vertexId.getDAGID(); + ApplicationId appId = dagId.getApplicationId(); + long clusterTs = appId.getClusterTimestamp(); + long clusterTsShort = clusterTs % 1_000_000L; +diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java +index c7b986469f4a..cc5019a64d84 100644 +--- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java ++++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java +@@ -129,7 +129,7 @@ private IndexInformation readIndexFileToCache(Path indexFileName, + LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ; + TezSpillRecord tmp = null; + try { +- tmp = new TezSpillRecord(indexFileName, conf, expectedIndexOwner); ++ tmp = new TezSpillRecord(indexFileName, fs, expectedIndexOwner); + } catch (Throwable e) { + tmp = new TezSpillRecord(0); + cache.remove(mapId); +diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +index 5d4ce223d9e9..5eebe10ac9a3 100644 +--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java ++++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +@@ -322,7 +322,7 @@ public <T> void startUpdateGuaranteed(TezTaskAttemptID attemptId, NodeInfo assig + UpdateFragmentRequestProto request = UpdateFragmentRequestProto.newBuilder() + .setIsGuaranteed(newState).setFragmentIdentifierString(attemptId.toString()) + .setQueryIdentifier(constructQueryIdentifierProto( +- attemptId.getTaskID().getVertexID().getDAGId().getId())).build(); ++ attemptId.getDAGID().getId())).build(); + + communicator.sendUpdateFragment(request, nodeId.getHostname(), nodeId.getPort(), + new LlapProtocolClientProxy.ExecuteRequestCallback<UpdateFragmentResponseProto>() { +@@ -349,7 +349,7 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task + int priority) { + super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, + credentialsChanged, priority); +- int dagId = taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId(); ++ int dagId = taskSpec.getTaskAttemptID().getDAGID().getId(); + if (currentQueryIdentifierProto == null || (dagId != currentQueryIdentifierProto.getDagIndex())) { + // TODO HiveQueryId extraction by parsing the Processor payload is ugly. This can be improved + // once TEZ-2672 is fixed. +@@ -505,7 +505,7 @@ private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId, + TerminateFragmentRequestProto request = + TerminateFragmentRequestProto.newBuilder().setQueryIdentifier( + constructQueryIdentifierProto( +- taskAttemptId.getTaskID().getVertexID().getDAGId().getId())) ++ taskAttemptId.getDAGID().getId())) + .setFragmentIdentifierString(taskAttemptId.toString()).build(); + communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(), + new LlapProtocolClientProxy.ExecuteRequestCallback<TerminateFragmentResponseProto>() { +@@ -649,7 +649,7 @@ private String constructLogUrl(final TezTaskAttemptID attemptID, final NodeId co + + private String constructLlapLogUrl(final TezTaskAttemptID attemptID, final String containerIdString, + final boolean isDone, final String nmAddress) { +- String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString(); ++ String dagId = attemptID.getDAGID().toString(); + String filename = JOINER.join(currentHiveQueryId, "-", dagId, ".log", (isDone ? ".done" : ""), + "?nm.id=", nmAddress); + String url = PATH_JOINER.join(timelineServerUri, "ws", "v1", "applicationhistory", "containers", +@@ -794,7 +794,7 @@ private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerI + builder.setAmPort(getAddress().getPort()); + + Preconditions.checkState(currentQueryIdentifierProto.getDagIndex() == +- taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId()); ++ taskSpec.getTaskAttemptID().getDAGID().getId()); + ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto); + if (credentialsBinary == null) { + credentialsBinary = serializeCredentials(getContext().getCurrentDagInfo().getCredentials()); +diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +index 82179645da00..99038cd49542 100644 +--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java ++++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +@@ -1075,7 +1075,7 @@ public void allocateTask(Object task, Resource capability, String[] hosts, Strin + writeLock.lock(); + try { + if (!dagRunning && metrics != null && id != null) { +- metrics.setDagId(id.getTaskID().getVertexID().getDAGId().toString()); ++ metrics.setDagId(id.getDAGID().toString()); + } + dagRunning = true; + dagStats.registerTaskRequest(hosts, racks); +@@ -1099,7 +1099,7 @@ public void allocateTask(Object task, Resource capability, ContainerId container + writeLock.lock(); + try { + if (!dagRunning && metrics != null && id != null) { +- metrics.setDagId(id.getTaskID().getVertexID().getDAGId().toString()); ++ metrics.setDagId(id.getDAGID().toString()); + } + dagRunning = true; + dagStats.registerTaskRequest(null, null); +@@ -1114,7 +1114,7 @@ public void allocateTask(Object task, Resource capability, ContainerId container + protected TezTaskAttemptID getTaskAttemptId(Object task) { + // TODO: why does Tez API use "Object" for this? + if (task instanceof TaskAttempt) { +- return ((TaskAttempt)task).getID(); ++ return ((TaskAttempt)task).getTaskAttemptID(); + } + throw new AssertionError("LLAP plugin can only schedule task attempts"); + } +@@ -2030,7 +2030,7 @@ private List<TaskInfo> preemptTasksFromMap(TreeMap<Integer, TreeSet<TaskInfo>> r + continue; // Not the right host. + } + Map<Integer,Set<Integer>> depInfo = getDependencyInfo( +- taskInfo.attemptId.getTaskID().getVertexID().getDAGId()); ++ taskInfo.attemptId.getDAGID()); + Set<Integer> vertexDepInfo = null; + if (depInfo != null) { + vertexDepInfo = depInfo.get(forVertex); +diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java +index 5efe7c677ce6..2fa2487a74d7 100644 +--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java ++++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java +@@ -364,6 +364,7 @@ private TaskSpec createBaseTaskSpec(String vertexName, TezVertexID vertexId, int + TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance( + TezTaskID.getInstance(vertexId, taskIdx), 0); + doReturn(taskAttemptId).when(taskSpec).getTaskAttemptID(); ++ doReturn(taskAttemptId.getDAGID()).when(taskSpec).getDAGID(); + doReturn(DAG_NAME).when(taskSpec).getDAGName(); + doReturn(vertexName).when(taskSpec).getVertexName(); + ProcessorDescriptor processorDescriptor = ProcessorDescriptor.create("fakeClassName").setUserPayload(userPayload); +diff --git a/pom.xml b/pom.xml +index cb54806ef5ca..053ccc059f3f 100644 +--- a/pom.xml ++++ b/pom.xml +@@ -196,7 +196,7 @@ + <slf4j.version>1.7.10</slf4j.version> + <ST4.version>4.0.4</ST4.version> + <storage-api.version>2.7.0</storage-api.version> +- <tez.version>0.9.1</tez.version> ++ <tez.version>0.10.2</tez.version> + <super-csv.version>2.2.0</super-csv.version> + <spark.version>2.3.0</spark.version> + <scala.binary.version>2.11</scala.binary.version> +diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +index a15482f19c43..288341a2b229 100644 +--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ++++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +@@ -761,5 +761,12 @@ public DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<StatusGetOpts> + return dagClient.waitForCompletionWithStatusUpdates(statusGetOpts); + } + } ++ ++ @Override ++ public String getWebUIAddress() throws IOException, TezException { ++ synchronized (dagClient) { ++ return dagClient.getWebUIAddress(); ++ } ++ } + } + }
