This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push:
new 5187aff KYLIN-5069 Refactor hive and hadoop dependency of kylin4
5187aff is described below
commit 5187affe180fc88a8422cc193097d1ef9614ce89
Author: yaqian.zhang <[email protected]>
AuthorDate: Thu Oct 14 11:37:41 2021 +0800
KYLIN-5069 Refactor hive and hadoop dependency of kylin4
---
build/bin/check-hadoop-env.sh | 88 ++++++
build/bin/find-hive-dependency.sh | 214 ---------------
build/bin/kylin.sh | 80 +-----
build/bin/prepare_hadoop_dependency.sh | 192 +++++++++++++
build/bin/replace-jars-under-spark.sh | 149 ----------
.../org/apache/kylin/common/KylinConfigBase.java | 2 +-
.../kylin/common/util/HiveCmdBuilderTest.java | 118 --------
.../kylin/spark/classloader/ClassLoaderUtils.java | 20 --
.../kylin/spark/classloader/SparkClassLoader.java | 212 ---------------
.../kylin/spark/classloader/TomcatClassLoader.java | 9 -
.../org/apache/spark/sql/SparderContext.scala | 26 +-
source-hive/pom.xml | 11 +-
.../kylin/source/hive/BeelineHiveClient.java | 300 ---------------------
.../apache/kylin/source/hive/CLIHiveClient.java | 191 -------------
.../kylin/source/hive/HiveClientFactory.java | 6 +-
.../apache/kylin/source/hive/HiveTableMeta.java | 4 +-
.../kylin/source/hive/HiveTableMetaBuilder.java | 4 +-
.../apache/kylin/source/hive/SparkHiveClient.java | 131 +++++++++
.../kylin/source/hive/BeelineHIveClientTest.java | 51 ----
19 files changed, 437 insertions(+), 1371 deletions(-)
diff --git a/build/bin/check-hadoop-env.sh b/build/bin/check-hadoop-env.sh
new file mode 100644
index 0000000..5c88501
--- /dev/null
+++ b/build/bin/check-hadoop-env.sh
@@ -0,0 +1,88 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cdh_mapreduce_path="/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce"
+hadoop_lib_path="/usr/lib/hadoop"
+emr_spark_lib_path="/usr/lib/spark"
+hdi3_flag_path="/usr/hdp/current/hdinsight-zookeeper"
+
+cdh_version=`hadoop version | head -1 | awk -F '-' '{print $2}'`
+
+function is_cdh_6_x() {
+ if [ -d ${cdh_mapreduce_path}/../hadoop/ ]; then
+ hadoop_common_file=`find ${cdh_mapreduce_path}/../hadoop/ -maxdepth 1
-name "hadoop-common-*.jar" -not -name "*test*" | tail -1`
+ cdh_version=${hadoop_common_file##*-}
+
+ if [[ "${cdh_version}" == cdh6.* ]]; then
+ echo 1
+ return 1
+ fi
+ fi
+ echo 0
+ return 0
+}
+
+hdp_hadoop_path=$HDP_HADOOP_HOME
+if [[ -z ${hdp_hadoop_path} ]]
+then
+ if [[ -d "/usr/hdp/current/hadoop-client" ]]; then
+ hdp_hadoop_path="/usr/hdp/current/hadoop-client"
+ fi
+fi
+
+if [ -d $hadoop_lib_path ]; then
+ # hadoop-common-3.2.1-amzn-0.jar
+ hadoop_common_file=$(find $hadoop_lib_path -maxdepth 1 -name
"hadoop-common-*.jar" -not -name "*test*" | tail -1)
+ emr_version_1=${hadoop_common_file##*common-}
+ arrVersion=(${emr_version_1//-/ })
+fi
+
+function is_aws_emr() {
+ if [[ "${arrVersion[1]}" == *amzn* ]]; then
+ echo 1
+ return 1
+ fi
+ echo 0
+ return 0
+}
+
+function is_aws_emr_6() {
+ if [[ "${arrVersion[0]}" == 3.* && "${arrVersion[1]}" == *amzn* ]]; then
+ echo 1
+ return 1
+ fi
+ echo 0
+ return 0
+}
+
+function is_hdi_3_x() {
+ # get hdp_version
+ if [ -z "${hdp_version}" ]; then
+ hdp_version=`/bin/bash -x hadoop 2>&1 | sed -n "s/\(.*\)export
HDP_VERSION=\(.*\)/\2/"p`
+ verbose "hdp_version is ${hdp_version}"
+ fi
+
+ if [[ -d "/usr/hdp/current/hdinsight-zookeeper" && $hdp_version == "2"*
]];then
+ echo 1
+ return 1
+ fi
+
+ echo 0
+ return 0
+}
\ No newline at end of file
diff --git a/build/bin/find-hive-dependency.sh
b/build/bin/find-hive-dependency.sh
deleted file mode 100755
index 31530e5..0000000
--- a/build/bin/find-hive-dependency.sh
+++ /dev/null
@@ -1,214 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-source ${KYLIN_HOME:-"$(cd -P -- "$(dirname -- "$0")" && pwd
-P)/../"}/bin/header.sh
-
-## ${dir} assigned to $KYLIN_HOME/bin in header.sh
-source ${dir}/load-hive-conf.sh
-
-echo Retrieving hive dependency...
-
-client_mode=`bash ${KYLIN_HOME}/bin/get-properties.sh kylin.source.hive.client`
-hive_env=
-
-if [ "${client_mode}" == "beeline" ]
-then
- beeline_shell=`$KYLIN_HOME/bin/get-properties.sh
kylin.source.hive.beeline-shell`
- beeline_params=`bash ${KYLIN_HOME}/bin/get-properties.sh
kylin.source.hive.beeline-params`
- hive_env=`${beeline_shell} ${hive_conf_properties} ${beeline_params}
--outputformat=dsv -e "set;" 2>&1 | grep --text 'env:CLASSPATH' `
-else
- source ${dir}/check-hive-usability.sh
- hive_env=`hive ${hive_conf_properties} -e set 2>&1 | grep 'env:CLASSPATH'`
-fi
-
-if [ -z $hive_env ]
-then
- hive_permission=`hive ${hive_conf_properties} -e set 2>&1 | grep 'No valid
credentials provided'`
- if [ -n "$hive_permission" ]
- then
- quit "No valid credentials provided for Hive CLI, please check
permission of hive. (e.g. check if Kerberos is expired or not)"
- else
- quit "Something wrong with Hive CLI or Beeline, please execute Hive
CLI or Beeline CLI in terminal to find the root cause."
- fi
-fi
-
-hive_classpath=`echo $hive_env | grep 'env:CLASSPATH' | awk -F '=' '{print
$2}'`
-arr=(`echo $hive_classpath | cut -d ":" -f 1- | sed 's/:/ /g'`)
-hive_conf_path=
-hive_exec_path=
-
-if [ -n "$HIVE_CONF" ]
-then
- verbose "HIVE_CONF is set to: $HIVE_CONF, use it to locate hive
configurations."
- hive_conf_path=$HIVE_CONF
-fi
-
-for data in ${arr[@]}
-do
- result=`echo $data | grep -e 'hive-exec[a-z0-9A-Z\.-]*.jar' | grep -v
'auxlib'`
- # In some cases there are more than one lib dirs, only the first one will
be applied.
- if [ $result ] && [ -z "$hive_exec_path" ]
- then
- hive_exec_path=$data
- fi
-
- # in some versions of hive config is not in hive's classpath, find it
separately
- if [ -z "$hive_conf_path" ]
- then
- result=`echo $data | grep -e 'hive[^/]*/conf'`
- if [ $result ]
- then
- hive_conf_path=$data
- fi
- fi
-done
-
-if [ -z "$hive_conf_path" ]
-then
- quit "Couldn't find hive configuration directory. Please set HIVE_CONF to
the path which contains hive-site.xml."
-fi
-
-if [ -z "$hive_exec_path" ]
-then
- quit "Couldn't find hive executable jar. Please check if hive executable
jar exists in HIVE_LIB folder."
-fi
-
-# in some versions of hive hcatalog is not in hive's classpath, find it
separately
-if [ -z "$HCAT_HOME" ]
-then
- verbose "HCAT_HOME not found, try to find hcatalog path from hadoop home"
- hadoop_home=`echo $hive_exec_path | awk -F
'/hive.*/lib/hive-exec[a-z0-9A-Z.-]*.jar' '{print $1}'`
- hive_home=`echo $hive_exec_path | awk -F
'/lib/hive-exec[a-z0-9A-Z.-]*.jar' '{print $1}'`
- is_aws=`uname -r | grep amzn`
- if [ -d "${hadoop_home}/hive-hcatalog" ]; then
- hcatalog_home=${hadoop_home}/hive-hcatalog
- elif [ -d "${hadoop_home}/hive/hcatalog" ]; then
- hcatalog_home=${hadoop_home}/hive/hcatalog
- elif [ -d "${hive_home}/hcatalog" ]; then
- hcatalog_home=${hive_home}/hcatalog
- elif [ -n is_aws ] && [ -d "/usr/lib/hive-hcatalog" ]; then
- # special handling for Amazon EMR
- hcatalog_home=/usr/lib/hive-hcatalog
- else
- quit "Couldn't locate hcatalog installation, please make sure it is
installed and set HCAT_HOME to the path."
- fi
-else
- verbose "HCAT_HOME is set to: $HCAT_HOME, use it to find hcatalog path:"
- hcatalog_home=${HCAT_HOME}
-fi
-
-hcatalog=`find -L ${hcatalog_home} -name "hive-hcatalog-core[0-9\.-]*.jar"
2>&1 | grep -m 1 -v 'Permission denied'`
-
-if [ -z "$hcatalog" ]
-then
- quit "hcatalog lib not found"
-fi
-
-function checkFileExist()
-{
- msg_hint=""
- if [ "$1" == "hive_lib" ]
- then
- msg_hint=", please check jar files in current HIVE_LIB or export
HIVE_LIB='YOUR_LOCAL_HIVE_LIB'"
- elif [ "$1" == "hcatalog" ]
- then
- msg_hint=", please check jar files in current HCAT_HOME or export
HCAT_HOME='YOUR_LOCAL_HCAT_HOME'"
- fi
-
- if [ -z "$2" ]
- then
- if [ "$1" == "hive_lib" ]
- then
- quit "Current HIVE_LIB is not valid, please export
HIVE_LIB='YOUR_LOCAL_HIVE_LIB'"
- elif [ "$1" == "hcatalog" ]
- then
- quit "Current HCAT_HOME is not valid, please export
HCAT_HOME='YOUR_LOCAL_HCAT_HOME'"
- fi
- fi
-
- files=(`echo $2 | cut -d ":" -f 1- | sed 's/:/ /g'`)
- misFiles=0
- outputMissFiles=
- for file in ${files}
- do
- let allFiles++
- if [ ! -f "${file}" ]; then
- outputMissFiles=${outputMissFiles}${file}", "
- let misFiles++
- fi
- done
- if [ 0 != ${misFiles} ]; then
- times=`expr ${allFiles} / ${misFiles}`
- [[ ${times} -gt 10 ]] || quit "A couple of hive jars can't be found:
${outputMisFiles}${msg_hint}"
- fi
-}
-
-function validateDirectory()
-{
- conf_path=$1
- [[ -d "${conf_path}" ]] || quit "${conf_path} doesn't exist!"
- unit=${conf_path: -1}
- [[ "${unit}" == "/" ]] || conf_path=${conf_path}"/"
-
- find="false"
- filelist=`ls ${conf_path}`
- for file in $filelist
- do
- if [ "${file}" == "hive-site.xml" ]
- then
- find="true"
- break
- fi
- done
- [[ "${find}" == "true" ]] || quit "ERROR, no hive-site.xml found under
dir: ${conf_path}!"
-}
-
-if [ -z "$HIVE_LIB" ]
-then
- verbose "HIVE_LIB is not set, try to retrieve hive lib from hive_exec_path"
- if [[ $hive_exec_path =~ ^\/.*hive.*\/lib\/hive-exec[a-z0-9A-Z\.-]*.jar ]]
- then
- hive_lib_dir="$(dirname $hive_exec_path)"
- else
- quit "HIVE_LIB not found, please check hive installation or export
HIVE_LIB='YOUR_LOCAL_HIVE_LIB'."
- fi
-else
- if [[ $HIVE_LIB =~ ^\/.*hive.*\/lib[\/]* ]]
- then
- verbose "HIVE_LIB is set to ${HIVE_LIB}"
- else
- echo "WARNING: HIVE_LIB is set to ${HIVE_LIB}, it's advised to set it
to the lib dir under hive's installation directory"
- fi
- hive_lib_dir="$HIVE_LIB"
-fi
-
-hive_lib=`find -L ${hive_lib_dir} -name '*.jar' ! -name '*druid*' ! -name
'*slf4j*' ! -name '*avatica*' ! -name '*calcite*' \
- ! -name '*jackson-datatype-joda*' ! -name '*derby*' ! -name "*jetty*" !
-name "*jsp*" ! -name "*servlet*" ! -name "*hbase*" ! -name "*websocket*" \
- -printf '%p:' | sed 's/:$//'`
-
-validateDirectory ${hive_conf_path}
-checkFileExist hive_lib ${hive_lib}
-checkFileExist hcatalog ${hcatalog}
-
-hive_dependency=${hive_conf_path}:${hive_lib}:${hcatalog}
-verbose "hive dependency is $hive_dependency"
-export hive_dependency
-export hive_conf_path
-echo "export hive_dependency=$hive_dependency
-export hive_conf_path=$hive_conf_path" > ${dir}/cached-hive-dependency.sh
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index a110c8c..a20df10 100755
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -33,11 +33,9 @@ mkdir -p ${KYLIN_HOME}/ext
source ${dir}/set-java-home.sh
function retrieveDependency() {
- #retrive $hive_dependency and $hbase_dependency
if [[ -z $reload_dependency && `ls -1 ${dir}/cached-* 2>/dev/null | wc -l`
-eq 6 ]]
then
echo "Using cached dependency..."
- source ${dir}/cached-hive-dependency.sh
#retrive $hbase_dependency
metadataUrl=`${dir}/get-properties.sh kylin.metadata.url`
if [[ "${metadataUrl##*@}" == "hbase" ]]
@@ -45,11 +43,8 @@ function retrieveDependency() {
source ${dir}/cached-hbase-dependency.sh
fi
source ${dir}/cached-hadoop-conf-dir.sh
- # source ${dir}/cached-kafka-dependency.sh
source ${dir}/cached-spark-dependency.sh
- # source ${dir}/cached-flink-dependency.sh
else
- source ${dir}/find-hive-dependency.sh
#retrive $hbase_dependency
metadataUrl=`${dir}/get-properties.sh kylin.metadata.url`
if [[ "${metadataUrl##*@}" == "hbase" ]]
@@ -57,72 +52,16 @@ function retrieveDependency() {
source ${dir}/find-hbase-dependency.sh
fi
source ${dir}/find-hadoop-conf-dir.sh
- # source ${dir}/find-kafka-dependency.sh
source ${dir}/find-spark-dependency.sh
- # source ${dir}/find-flink-dependency.sh
fi
- # Replace jars for different hadoop dist
- bash ${dir}/replace-jars-under-spark.sh
-
# get hdp_version
if [ -z "${hdp_version}" ]; then
hdp_version=`/bin/bash -x hadoop 2>&1 | sed -n "s/\(.*\)export
HDP_VERSION=\(.*\)/\2/"p`
verbose "hdp_version is ${hdp_version}"
fi
- # Replace jars for HDI
- KYLIN_SPARK_JARS_HOME="${KYLIN_HOME}/spark/jars"
- if [[ -d "/usr/hdp/current/hdinsight-zookeeper" && $hdp_version == "2"* ]]
- then
- echo "The current Hadoop environment is HDI3, will replace some jars
package for ${KYLIN_HOME}/spark/jars"
- if [[ -f ${KYLIN_HOME}/tomcat/webapps/kylin.war ]]
- then
- if [[ ! -d ${KYLIN_HOME}/tomcat/webapps/kylin ]]
- then
- mkdir ${KYLIN_HOME}/tomcat/webapps/kylin
- fi
- mv ${KYLIN_HOME}/tomcat/webapps/kylin.war
${KYLIN_HOME}/tomcat/webapps/kylin
- cd ${KYLIN_HOME}/tomcat/webapps/kylin
- jar -xf ${KYLIN_HOME}/tomcat/webapps/kylin/kylin.war
- if [[ -f
${KYLIN_HOME}/tomcat/webapps/kylin/WEB-INF/lib/guava-14.0.jar ]]
- then
- echo "Remove
${KYLIN_HOME}/tomcat/webapps/kylin/WEB-INF/lib/guava-14.0.jar to avoid version
conflicts"
- rm -rf
${KYLIN_HOME}/tomcat/webapps/kylin/WEB-INF/lib/guava-14.0.jar
- rm -rf ${KYLIN_HOME}/tomcat/webapps/kylin/kylin.war
- cd ${KYLIN_HOME}/
- fi
- fi
-
- if [[ -d "${KYLIN_SPARK_JARS_HOME}" ]]
- then
- if [[ -f ${KYLIN_HOME}/hdi3_spark_jars_flag ]]
- then
- echo "Required jars have been added to ${KYLIN_HOME}/spark/jars,
skip this step."
- else
- rm -rf ${KYLIN_HOME}/spark/jars/hadoop-*
- cp /usr/hdp/current/spark2-client/jars/hadoop-*
$KYLIN_SPARK_JARS_HOME
- cp /usr/hdp/current/spark2-client/jars/azure-*
$KYLIN_SPARK_JARS_HOME
- cp
/usr/hdp/current/hadoop-client/lib/microsoft-log4j-etwappender-1.0.jar
$KYLIN_SPARK_JARS_HOME
- cp
/usr/hdp/current/hadoop-client/lib/hadoop-lzo-0.6.0.${hdp_version}.jar
$KYLIN_SPARK_JARS_HOME
-
- rm -rf $KYLIN_HOME/spark/jars/guava-14.0.1.jar
- cp /usr/hdp/current/spark2-client/jars/guava-24.1.1-jre.jar
$KYLIN_SPARK_JARS_HOME
-
- echo "Upload spark jars to HDFS"
- hdfs dfs -test -d /spark2_jars
- if [ $? -eq 1 ]
- then
- hdfs dfs -mkdir /spark2_jars
- fi
- hdfs dfs -put $KYLIN_SPARK_JARS_HOME/* /spark2_jars
-
- touch ${KYLIN_HOME}/hdi3_spark_jars_flag
- fi
- else
- echo "${KYLIN_HOME}/spark/jars dose not exist. You can run
${KYLIN_HOME}/download-spark.sh to download spark."
- fi
- fi
+ source ${KYLIN_HOME}/bin/prepare_hadoop_dependency.sh
tomcat_root=${dir}/../tomcat
export tomcat_root
@@ -141,26 +80,15 @@ function retrieveDependency() {
spring_profile="${spring_profile},${additional_security_profiles}"
fi
- # compose hadoop_dependencies
- hadoop_dependencies=${hadoop_dependencies}:`hadoop classpath`
- if [ -n "${hive_dependency}" ]; then
- hadoop_dependencies=${hive_dependency}:${hadoop_dependencies}
- fi
- if [ -n "${kafka_dependency}" ]; then
- hadoop_dependencies=${hadoop_dependencies}:${kafka_dependency}
- fi
-
# compose KYLIN_TOMCAT_CLASSPATH
tomcat_classpath=${tomcat_root}/bin/bootstrap.jar:${tomcat_root}/bin/tomcat-juli.jar:${tomcat_root}/lib/*
- export
KYLIN_TOMCAT_CLASSPATH=${tomcat_classpath}:${KYLIN_HOME}/conf:${KYLIN_HOME}/lib/*:${KYLIN_HOME}/ext/*:${hadoop_dependencies}:${flink_dependency}
+ export
KYLIN_TOMCAT_CLASSPATH=${tomcat_classpath}:${KYLIN_HOME}/conf:${KYLIN_HOME}/hadoop_conf:${KYLIN_HOME}/lib/*:${KYLIN_HOME}/ext/*:${SPARK_HOME}/jars/*
# compose KYLIN_TOOL_CLASSPATH
- export
KYLIN_TOOL_CLASSPATH=${KYLIN_HOME}/conf:${KYLIN_HOME}/tool/*:${KYLIN_HOME}/ext/*:${hadoop_dependencies}
+ export
KYLIN_TOOL_CLASSPATH=${KYLIN_HOME}/conf:${KYLIN_HOME}/tool/*:${KYLIN_HOME}/ext/*:${SPARK_HOME}/jars/*
# compose kylin_common_opts
- kylin_common_opts="-Dkylin.hive.dependency=${hive_dependency} \
- -Dkylin.kafka.dependency=${kafka_dependency} \
- -Dkylin.hadoop.conf.dir=${kylin_hadoop_conf_dir} \
+ kylin_common_opts="-Dkylin.hadoop.conf.dir=${kylin_hadoop_conf_dir} \
-Dkylin.server.host-address=${KYLIN_REST_ADDRESS} \
-Dspring.profiles.active=${spring_profile} \
-Dhdp.version=${hdp_version}"
diff --git a/build/bin/prepare_hadoop_dependency.sh
b/build/bin/prepare_hadoop_dependency.sh
new file mode 100644
index 0000000..4ef1f30
--- /dev/null
+++ b/build/bin/prepare_hadoop_dependency.sh
@@ -0,0 +1,192 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+source ${KYLIN_HOME}/bin/check-hadoop-env.sh
+
+
+BYPASS=${SPARK_HOME}/jars/replace-jars-bypass
+
+if [[ -f ${BYPASS} ]]
+then
+ return
+fi
+
+if [ ! -d "$KYLIN_HOME/spark" ]; then
+ echo "Skip spark which not owned by kylin. SPARK_HOME is $SPARK_HOME and
KYLIN_HOME is $KYLIN_HOME ."
+ exit 0
+fi
+
+echo "Start replace hadoop jars under ${KYLIN_HOME}/spark/jars."
+
+hadoop_lib=${KYLIN_HOME}/spark/jars
+
+common_jars=
+hdfs_jars=
+mr_jars=
+yarn_jars=
+other_jars=
+
+function cdh_replace_jars() {
+ common_jars=$(find $cdh_mapreduce_path/../hadoop -maxdepth 2 \
+ -name "hadoop-annotations-*.jar" -not -name "*test*" \
+ -o -name "hadoop-auth-*.jar" -not -name "*test*" \
+ -o -name "hadoop-common-*.jar" -not -name "*test*")
+
+ hdfs_jars=$(find $cdh_mapreduce_path/../hadoop-hdfs -maxdepth 1 -name
"hadoop-hdfs-*" -not -name "*test*" -not -name "*nfs*")
+
+ mr_jars=$(find $cdh_mapreduce_path -maxdepth 1 \
+ -name "hadoop-mapreduce-client-app-*.jar" -not -name "*test*" \
+ -o -name "hadoop-mapreduce-client-common-*.jar" -not -name "*test*" \
+ -o -name "hadoop-mapreduce-client-jobclient-*.jar" -not -name "*test*" \
+ -o -name "hadoop-mapreduce-client-shuffle-*.jar" -not -name "*test*" \
+ -o -name "hadoop-mapreduce-client-core-*.jar" -not -name "*test*")
+
+ yarn_jars=$(find $cdh_mapreduce_path/../hadoop-yarn -maxdepth 1 \
+ -name "hadoop-yarn-api-*.jar" -not -name "*test*" \
+ -o -name "hadoop-yarn-client-*.jar" -not -name "*test*" \
+ -o -name "hadoop-yarn-common-*.jar" -not -name "*test*" \
+ -o -name "hadoop-yarn-server-common-*.jar" -not -name "*test*" \
+ -o -name "hadoop-yarn-server-web-proxy-*.jar" -not -name "*test*")
+
+ other_jars=$(find $cdh_mapreduce_path/../../jars -maxdepth 1 -name
"htrace-core4*" || find $cdh_mapreduce_path/../hadoop -maxdepth 2 -name
"htrace-core4*")
+
+ if [[ $(is_cdh_6_x) == 1 ]]; then
+ cdh6_jars=$(find ${cdh_mapreduce_path}/../../jars -maxdepth 1 \
+ -name "woodstox-core-*.jar" -o -name "stax2-*.jar" -o -name
"commons-configuration2-*.jar" -o -name "re2j-*.jar" )
+ fi
+}
+
+function emr_replace_jars() {
+ common_jars=$(find ${emr_spark_lib_path}/jars/ -maxdepth 1 \
+ -name "hadoop-*.jar" -not -name "*test*" \
+ -o -name "htrace-core4*" \
+ -o -name "emr-spark-goodies*")
+
+ other_jars=$(find ${hadoop_lib_path}/lib/ -maxdepth 1 \
+ -name "woodstox-core-*.jar" \
+ -o -name "stax2-api-3*.jar")
+
+ lzo_jars=$(find ${hadoop_lib_path}/../hadoop-lzo/lib/ -maxdepth 1 \
+ -name "hadoop-lzo-*.jar" )
+
+ if [[ $(is_aws_emr_6) == 1 ]]; then
+ emr6_jars=$(find ${emr_spark_lib_path}/jars/ -maxdepth 1 \
+ -name "re2j-*.jar" -not -name "*test*" \
+ -o -name "commons-configuration2-*" )
+ fi
+}
+
+function hdi_replace_jars() {
+ common_jars=$(find ${hdi3_flag_path}/../spark2-client/ -maxdepth 2 \
+ -name "hadoop-*.jar" -not -name "*test*" \
+ -o -name "azure-*.jar" -not -name "*test*" \
+ -o -name "guava-*.jar")
+
+ other_jars=$(find ${hdi3_flag_path}/../hadoop-client/ -maxdepth 2 \
+ -name "microsoft-log4j-etwappender-*.jar")
+
+ lzo_jars=$(find ${hdi3_flag_path}/../hadoop-client/ -maxdepth 2 \
+ -name "hadoop-lzo-*.jar" )
+}
+
+if [ -d "$cdh_mapreduce_path" ]
+then
+ cdh_replace_jars
+elif [[ $(is_aws_emr) == 1 ]]
+then
+ emr_replace_jars
+elif [[ $(is_hdi_3_x) == 1 ]]
+then
+ hdi_replace_jars
+else
+ touch "${BYPASS}"
+fi
+
+jar_list="${common_jars} ${hdfs_jars} ${mr_jars} ${yarn_jars} ${other_jars}
${cdh6_jars} ${emr6_jars} ${lzo_jars}"
+
+echo "Find platform specific jars:${jar_list}, will replace with these jars
under ${SPARK_HOME}/jars."
+
+if [[ $(is_aws_emr_6) == 1 ]]; then
+ find ${SPARK_HOME}/jars -name "hive-exec-*.jar" -exec rm -f {} \;
+ hive_jars=$(find ${emr_spark_lib_path}/jars/ -maxdepth 1 -name
"hive-exec-*.jar")
+ cp ${hive_jars} ${SPARK_HOME}/jars
+ configuration_jars=$(find ${emr_spark_lib_path}/../ -name
"commons-configuration-1.10*.jar")
+ cp ${configuration_jars} ${KYLIN_HOME}/lib
+fi
+
+if [ $(is_cdh_6_x) == 1 ]; then
+ if [ -d "${KYLIN_HOME}/bin/hadoop3_jars/cdh6" ]; then
+ find ${SPARK_HOME}/jars -name "hive-exec-*.jar" -exec rm -f {} \;
+ echo "Copy jars from ${KYLIN_HOME}/bin/hadoop3_jars/cdh6"
+ cp ${KYLIN_HOME}/hadoop3_jars/cdh6/*.jar ${SPARK_HOME}/jars
+ fi
+fi
+
+if [ ! -f ${BYPASS} ]; then
+ find ${SPARK_HOME}/jars -name "htrace-core-*" -exec rm -rf {} \;
+ find ${SPARK_HOME}/jars -name "hadoop-*.jar" -exec rm -f {} \;
+fi
+
+for jar_file in ${jar_list}
+do
+ `cp ${jar_file} ${SPARK_HOME}/jars`
+done
+
+if [[ (${is_emr} == 1) || ($(is_cdh_6_x) == 1)]]; then
+ log4j_jars=$(find ${SPARK_HOME}/jars/ -maxdepth 2 -name "slf4j-*.jar")
+ cp ${log4j_jars} ${KYLIN_HOME}/ext
+fi
+
+if [ $(is_hdi_3_x) == 1 ]; then
+ if [[ -f ${KYLIN_HOME}/tomcat/webapps/kylin.war ]]; then
+ if [[ ! -d ${KYLIN_HOME}/tomcat/webapps/kylin ]]
+ then
+ mkdir ${KYLIN_HOME}/tomcat/webapps/kylin
+ fi
+ mv ${KYLIN_HOME}/tomcat/webapps/kylin.war
${KYLIN_HOME}/tomcat/webapps/kylin
+ cd ${KYLIN_HOME}/tomcat/webapps/kylin
+ jar -xf ${KYLIN_HOME}/tomcat/webapps/kylin/kylin.war
+ if [[ -f
${KYLIN_HOME}/tomcat/webapps/kylin/WEB-INF/lib/guava-14.0.jar ]]
+ then
+ echo "Remove
${KYLIN_HOME}/tomcat/webapps/kylin/WEB-INF/lib/guava-14.0.jar to avoid version
conflicts"
+ rm -rf
${KYLIN_HOME}/tomcat/webapps/kylin/WEB-INF/lib/guava-14.0.jar
+ rm -rf ${KYLIN_HOME}/tomcat/webapps/kylin/kylin.war
+ cd ${KYLIN_HOME}/
+ fi
+ fi
+ find ${SPARK_HOME}/jars -name "guava-14*.jar" -exec rm -f {} \;
+ echo "Upload spark jars to HDFS"
+ hdfs dfs -test -d /spark2_jars
+ if [ $? -eq 1 ]; then
+ hdfs dfs -mkdir /spark2_jars
+ fi
+ hdfs dfs -put ${SPARK_HOME}/jars/* /spark2_jars
+fi
+
+# Remove all spaces
+jar_list=${jar_list// /}
+
+if [[ (-z "${jar_list}") && (! -f ${BYPASS}) ]]
+then
+ echo "Please confirm that the corresponding hadoop jars have been
replaced. The automatic replacement program cannot be executed correctly."
+else
+ touch "${BYPASS}"
+fi
+
+echo "Done hadoop jars replacement under ${SPARK_HOME}/jars."
diff --git a/build/bin/replace-jars-under-spark.sh
b/build/bin/replace-jars-under-spark.sh
deleted file mode 100644
index 01f9854..0000000
--- a/build/bin/replace-jars-under-spark.sh
+++ /dev/null
@@ -1,149 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# check
https://cwiki.apache.org/confluence/display/KYLIN/Deploy+Kylin+4+on+CDH+6
-
-BYPASS=${KYLIN_HOME}/spark/jars/replace-jars-bypass
-cdh_mapreduce_path="/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce"
-hadoop_lib_path="/usr/lib/hadoop"
-
-if [ -f ${BYPASS} ]; then
- exit 0
-fi
-
-if [ ! -d "$KYLIN_HOME/spark" ]; then
- echo "Skip spark which not owned by kylin. SPARK_HOME is $SPARK_HOME and
KYLIN_HOME is $KYLIN_HOME ."
- exit 0
-fi
-
-echo "Start replacing hadoop jars under ${SPARK_HOME}/jars."
-
-function check_cdh_hadoop() {
- # hadoop-common-3.0.0-cdh6.2.0.jar
- hadoop_common_file=$(find ${cdh_mapreduce_path}/../hadoop/ -maxdepth 1 -name
"hadoop-common-*.jar" -not -name "*test*" | tail -1)
- cdh_version=${hadoop_common_file##*-}
- if [[ "${cdh_version}" == cdh6.* ]]; then
- export is_cdh6=1
- else
- export is_cdh6=0
- fi
- if [[ "${cdh_version}" == cdh5.* ]]; then
- export is_cdh5=1
- else
- export is_cdh5=0
- fi
-}
-
-function check_aws_emr() {
- if [ ! -d $hadoop_lib_path ]; then
- return 0
- fi
-
- # hadoop-common-3.2.1-amzn-0.jar
- hadoop_common_file=$(find $hadoop_lib_path -maxdepth 1 -name
"hadoop-common-*.jar" -not -name "*test*" | tail -1)
- emr_version_1=${hadoop_common_file##*common-}
- echo $emr_version_1
- arrVersion=(${emr_version_1//-/ })
-
- if [[ "${arrVersion[0]}" == 3.* && "${arrVersion[1]}" == *amzn* ]]; then
- export is_emr6=1
- else
- export is_emr6=0
- fi
-
- if [[ "${arrVersion[0]}" == 2.* && "${arrVersion[1]}" == *amzn* ]]; then
- export is_emr5=1
- else
- export is_emr5=0
- fi
-}
-
-check_cdh_hadoop
-check_aws_emr
-
-common_jars=
-hdfs_jars=
-mr_jars=
-yarn_jars=
-other_jars=
-
-if [ $is_cdh6 == 1 ]; then
- common_jars=$(find $cdh_mapreduce_path/../hadoop -maxdepth 2 \
- -name "hadoop-annotations-*.jar" -not -name "*test*" \
- -o -name "hadoop-auth-*.jar" -not -name "*test*" \
- -o -name "hadoop-common-*.jar" -not -name "*test*")
-
- hdfs_jars=$(find $cdh_mapreduce_path/../hadoop-hdfs -maxdepth 1 -name
"hadoop-hdfs-*" -not -name "*test*" -not -name "*nfs*")
-
- mr_jars=$(find $cdh_mapreduce_path -maxdepth 1 \
- -name "hadoop-mapreduce-client-app-*.jar" -not -name "*test*" \
- -o -name "hadoop-mapreduce-client-common-*.jar" -not -name "*test*" \
- -o -name "hadoop-mapreduce-client-jobclient-*.jar" -not -name "*test*" \
- -o -name "hadoop-mapreduce-client-shuffle-*.jar" -not -name "*test*" \
- -o -name "hadoop-mapreduce-client-core-*.jar" -not -name "*test*")
-
- yarn_jars=$(find $cdh_mapreduce_path/../hadoop-yarn -maxdepth 1 \
- -name "hadoop-yarn-api-*.jar" -not -name "*test*" \
- -o -name "hadoop-yarn-client-*.jar" -not -name "*test*" \
- -o -name "hadoop-yarn-common-*.jar" -not -name "*test*" \
- -o -name "hadoop-yarn-server-common-*.jar" -not -name "*test*" \
- -o -name "hadoop-yarn-server-web-proxy-*.jar" -not -name "*test*")
-
- other_jars=$(find $cdh_mapreduce_path/../../jars -maxdepth 1 -name
"htrace-core4*" || find $cdh_mapreduce_path/../hadoop -maxdepth 2 -name
"htrace-core4*")
-
- if [[ $is_cdh6 == 1 ]]; then
- cdh6_jars=$(find ${cdh_mapreduce_path}/../../jars -maxdepth 1 \
- -name "woodstox-core-*.jar" -o -name "commons-configuration2-*.jar" -o
-name "re2j-*.jar")
- fi
-fi
-
-jar_list="${common_jars} ${hdfs_jars} ${mr_jars} ${yarn_jars} ${other_jars}
${cdh6_jars}"
-
-echo "Find platform specific jars:${jar_list}, will replace with these jars
under ${SPARK_HOME}/jars."
-
-if [ $is_cdh6 == 1 ]; then
- find ${KYLIN_HOME}/spark/jars -name "hadoop-hdfs-*.jar" -exec rm -f {} \;
- find ${KYLIN_HOME}/spark/jars -name "hadoop-yarn-*.jar" -exec rm -f {} \;
- find ${KYLIN_HOME}/spark/jars -name "hadoop-mapreduce-*.jar" -exec rm -f {}
\;
- find ${KYLIN_HOME}/spark/jars -name "hadoop-annotations-*.jar" -exec rm -f
{} \;
- find ${KYLIN_HOME}/spark/jars -name "hadoop-auth-*.jar" -exec rm -f {} \;
- find ${KYLIN_HOME}/spark/jars -name "hadoop-client-*.jar" -exec rm -f {} \;
- find ${KYLIN_HOME}/spark/jars -name "hadoop-common-*.jar" -exec rm -f {} \;
- find ${KYLIN_HOME}/spark/jars -name "hive-exec-*.jar" -exec rm -f {} \;
- if [ -d "${KYLIN_HOME}/bin/hadoop3_jars/cdh6" ]; then
- echo "Copy jars from ${KYLIN_HOME}/bin/hadoop3_jars/cdh6"
- cp ${KYLIN_HOME}/bin/hadoop3_jars/cdh6/*.jar ${SPARK_HOME}/jars
- fi
-fi
-
-for jar_file in ${jar_list}; do
- $(cp ${jar_file} ${KYLIN_HOME}/spark/jars)
-done
-
-# Remove all spaces
-jar_list=${jar_list// /}
-
-if [ -z "${jar_list}" ]; then
- echo "Please confirm that the corresponding hadoop jars have been replaced.
The automatic replacement program cannot be executed correctly."
-else
- echo "Replace jars under SPARK_HOME/jars finished."
- touch ${BYPASS}
-fi
-
-echo "Done hadoop jars replacement under ${SPARK_HOME}/jars."
\ No newline at end of file
diff --git
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 6246b94..19f84a2 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1129,7 +1129,7 @@ public abstract class KylinConfigBase implements
Serializable {
}
public String getHiveClientMode() {
- return getOptional("kylin.source.hive.client", "cli");
+ return getOptional("kylin.source.hive.client", "spark_catalog");
}
public String getHiveBeelineShell() {
diff --git
a/core-common/src/test/java/org/apache/kylin/common/util/HiveCmdBuilderTest.java
b/core-common/src/test/java/org/apache/kylin/common/util/HiveCmdBuilderTest.java
deleted file mode 100644
index a1b78db..0000000
---
a/core-common/src/test/java/org/apache/kylin/common/util/HiveCmdBuilderTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.common.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class HiveCmdBuilderTest {
-
- @Before
- public void setup() {
- System.setProperty("log4j.configuration",
"file:../build/conf/kylin-tools-log4j.properties");
- System.setProperty("KYLIN_CONF",
LocalFileMetadataTestCase.LOCALMETA_TEST_DATA);
- }
-
- @After
- public void after() throws Exception {
- System.clearProperty("kylin.source.hive.client");
- System.clearProperty("kylin.source.hive.beeline-shell");
- System.clearProperty("kylin.source.hive.beeline-params");
-
-
System.clearProperty("kylin.source.hive.enable-sparksql-for-table-ops");
- System.clearProperty("kylin.source.hive.sparksql-beeline-shell");
- System.clearProperty("kylin.source.hive.sparksql-beeline-params");
- }
-
- @Test
- public void testHiveCLI() {
- System.setProperty("kylin.source.hive.client", "cli");
-
- Map<String, String> hiveProps = new HashMap<>();
- hiveProps.put("hive.execution.engine", "mr");
- Map<String, String> hivePropsOverwrite = new HashMap<>();
- hivePropsOverwrite.put("hive.execution.engine", "tez");
- HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder("test HiveCLI");
- hiveCmdBuilder.addStatement("USE default;");
- hiveCmdBuilder.addStatement("DROP TABLE `test`;");
- hiveCmdBuilder.addStatement("SHOW\n TABLES;");
- hiveCmdBuilder.setHiveConfProps(hiveProps);
- hiveCmdBuilder.overwriteHiveProps(hivePropsOverwrite);
- assertEquals(
- "hive -e \"set mapred.job.name='test HiveCLI';\nUSE
default;\nDROP TABLE \\`test\\`;\nSHOW\n TABLES;\n\" --hiveconf
hive.execution.engine=tez",
- hiveCmdBuilder.build());
- }
-
- @Test
- public void testBeeline() throws IOException {
- String lineSeparator = java.security.AccessController
- .doPrivileged(new
sun.security.action.GetPropertyAction("line.separator"));
- System.setProperty("kylin.source.hive.client", "beeline");
- System.setProperty("kylin.source.hive.beeline-shell",
"/spark-client/bin/beeline");
- System.setProperty("kylin.source.hive.beeline-params", "-u jdbc_url");
-
- HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
- hiveCmdBuilder.addStatement("USE default;");
- hiveCmdBuilder.addStatement("DROP TABLE `test`;");
- hiveCmdBuilder.addStatement("SHOW TABLES;");
-
- String cmd = hiveCmdBuilder.build();
- String hqlFile = cmd.substring(cmd.lastIndexOf("-f ") + 3).trim();
- hqlFile = hqlFile.substring(0, hqlFile.length() - ";exit
$ret_code".length());
- String createFileCmd = cmd.substring(0, cmd.indexOf("EOL\n",
cmd.indexOf("EOL\n") + 1) + 3);
- CliCommandExecutor cliCommandExecutor = new CliCommandExecutor();
- Pair<Integer, String> execute =
cliCommandExecutor.execute(createFileCmd);
- String hqlStatement = FileUtils.readFileToString(new File(hqlFile),
Charset.defaultCharset());
- assertEquals(
- "USE default;" + lineSeparator + "DROP TABLE `test`;" +
lineSeparator + "SHOW TABLES;" + lineSeparator,
- hqlStatement);
- assertBeelineCmd(cmd);
- FileUtils.forceDelete(new File(hqlFile));
- }
-
- @Test
- public void testSparkSqlForTableOps() throws IOException {
- System.setProperty("kylin.source.hive.enable-sparksql-for-table-ops",
"true");
- System.setProperty("kylin.source.hive.sparksql-beeline-shell",
"/spark-client/bin/beeline");
- System.setProperty("kylin.source.hive.sparksql-beeline-params", "-u
jdbc_url");
-
- HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
- hiveCmdBuilder.addStatement("USE default;");
- hiveCmdBuilder.addStatement("DROP TABLE `test`;");
- hiveCmdBuilder.addStatement("SHOW TABLES;");
- String cmd = hiveCmdBuilder.build();
- assertBeelineCmd(cmd);
- }
-
- private void assertBeelineCmd(String cmd) {
- String beelineCmd = cmd.substring(cmd.indexOf("EOL\n",
cmd.indexOf("EOL\n") + 1) + 4);
- assertTrue(beelineCmd.startsWith("/spark-client/bin/beeline -u
jdbc_url"));
- }
-}
diff --git
a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/ClassLoaderUtils.java
b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/ClassLoaderUtils.java
index d544059..8f5bb9b 100644
---
a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/ClassLoaderUtils.java
+++
b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/ClassLoaderUtils.java
@@ -24,7 +24,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class ClassLoaderUtils {
- static URLClassLoader sparkClassLoader = null;
static URLClassLoader originClassLoader = null;
private static Logger logger =
LoggerFactory.getLogger(ClassLoaderUtils.class);
@@ -40,25 +39,6 @@ public final class ClassLoaderUtils {
return null;
}
- public static ClassLoader getSparkClassLoader() {
- if (sparkClassLoader == null) {
- return Thread.currentThread().getContextClassLoader();
- } else {
- return sparkClassLoader;
- }
- }
-
- public static void setSparkClassLoader(URLClassLoader classLoader) {
- if (sparkClassLoader != null) {
- logger.error("sparkClassLoader already initialized");
- }
- logger.info("set sparkClassLoader :" + classLoader);
- if (System.getenv("DEBUG_SPARK_CLASSLOADER") != null) {
- return;
- }
- sparkClassLoader = classLoader;
- }
-
public static ClassLoader getOriginClassLoader() {
if (originClassLoader == null) {
logger.error("originClassLoader not init");
diff --git
a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/SparkClassLoader.java
b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/SparkClassLoader.java
deleted file mode 100644
index 2dc0292..0000000
---
a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/SparkClassLoader.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.spark.classloader;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.HashSet;
-import java.util.Set;
-
-public class SparkClassLoader extends URLClassLoader {
- //preempt these classes from parent
- private static String[] SPARK_CL_PREEMPT_CLASSES = new String[]
{"org.apache.spark", "scala.",
- "org.spark_project", "com.esotericsoftware.kryo"};
-
- //preempt these files from parent
- private static String[] SPARK_CL_PREEMPT_FILES = new String[]
{"spark-version-info.properties", "HiveClientImpl",
- "org/apache/spark"};
-
- //when loading class (indirectly used by SPARK_CL_PREEMPT_CLASSES), some
of them should NOT use parent's first
- private static String[] THIS_CL_PRECEDENT_CLASSES = new String[]
{"javax.ws.rs", "org.apache.hadoop.hive"};
-
- //when loading class (indirectly used by SPARK_CL_PREEMPT_CLASSES), some
of them should use parent's first
- private static String[] PARENT_CL_PRECEDENT_CLASSES = new String[] {
- // Java standard library:
- "com.sun.", "launcher.", "java.", "javax.", "org.ietf", "org.omg",
"org.w3c", "org.xml", "sunw.", "sun.",
- // logging
- "org.apache.commons.logging", "org.apache.log4j", "org.slf4j",
"org.apache.hadoop",
- // Hadoop/ZK:
- "org.apache.kylin", "com.intellij", "org.apache.calcite"};
-
- private static final Set<String> classNotFoundCache = new HashSet<>();
- private static Logger logger =
LoggerFactory.getLogger(SparkClassLoader.class);
-
- static {
- String sparkClassLoaderSparkClPreemptClasses =
System.getenv("SPARKCLASSLOADER_SPARK_CL_PREEMPT_CLASSES");
- if (!StringUtils.isEmpty(sparkClassLoaderSparkClPreemptClasses)) {
- SPARK_CL_PREEMPT_CLASSES =
StringUtils.split(sparkClassLoaderSparkClPreemptClasses, ",");
- }
-
- String sparkClassLoaderSparkClPreemptFiles =
System.getenv("SPARKCLASSLOADER_SPARK_CL_PREEMPT_FILES");
- if (!StringUtils.isEmpty(sparkClassLoaderSparkClPreemptFiles)) {
- SPARK_CL_PREEMPT_FILES =
StringUtils.split(sparkClassLoaderSparkClPreemptFiles, ",");
- }
-
- String sparkClassLoaderThisClPrecedentClasses =
System.getenv("SPARKCLASSLOADER_THIS_CL_PRECEDENT_CLASSES");
- if (!StringUtils.isEmpty(sparkClassLoaderThisClPrecedentClasses)) {
- THIS_CL_PRECEDENT_CLASSES =
StringUtils.split(sparkClassLoaderThisClPrecedentClasses, ",");
- }
-
- String sparkClassLoaderParentClPrecedentClasses = System
- .getenv("SPARKCLASSLOADER_PARENT_CL_PRECEDENT_CLASSES");
- if (!StringUtils.isEmpty(sparkClassLoaderParentClPrecedentClasses)) {
- PARENT_CL_PRECEDENT_CLASSES =
StringUtils.split(sparkClassLoaderParentClPrecedentClasses, ",");
- }
-
- try {
- final Method registerParallel =
ClassLoader.class.getDeclaredMethod("registerAsParallelCapable");
- AccessController.doPrivileged(new PrivilegedAction<Object>() {
- public Object run() {
- registerParallel.setAccessible(true);
- return null;
- }
- });
- Boolean result = (Boolean) registerParallel.invoke(null);
- if (!result) {
- logger.warn("registrationFailed");
- }
- } catch (Exception ignore) {
-
- }
- }
-
- /**
- * Creates a DynamicClassLoader that can load classes dynamically
- * from jar files under a specific folder.
- *
- * @param parent the parent ClassLoader to set.
- */
- protected SparkClassLoader(ClassLoader parent) throws IOException {
- super(new URL[] {}, parent);
- init();
- }
-
- public void init() throws MalformedURLException {
- String sparkHome = System.getenv("SPARK_HOME");
- if (sparkHome == null) {
- sparkHome = System.getProperty("SPARK_HOME");
- if (sparkHome == null) {
- throw new RuntimeException(
- "Spark home not found; set it explicitly or use the
SPARK_HOME environment variable.");
- }
- }
- File file = new File(sparkHome + "/jars");
- File[] jars = file.listFiles();
- for (File jar : jars) {
- addURL(jar.toURI().toURL());
- }
- }
-
- @Override
- public Class<?> loadClass(String name, boolean resolve) throws
ClassNotFoundException {
-
- if (needToUseGlobal(name)) {
- logger.debug("delegate " + name + " directly to parent");
- return super.loadClass(name, resolve);
- }
- return doLoadclass(name);
- }
-
- private Class<?> doLoadclass(String name) throws ClassNotFoundException {
- synchronized (getClassLoadingLock(name)) {
- // Check whether the class has already been loaded:
- Class<?> clasz = findLoadedClass(name);
- if (clasz != null) {
- logger.debug("Class " + name + " already loaded");
- } else {
- try {
- // Try to find this class using the URLs passed to this
ClassLoader
- logger.debug("Finding class: " + name);
- clasz = super.findClass(name);
- if (clasz == null) {
- logger.debug("cannot find class" + name);
- }
- } catch (ClassNotFoundException e) {
- classNotFoundCache.add(name);
- // Class not found using this ClassLoader, so delegate to
parent
- logger.debug("Class " + name + " not found - delegating to
parent");
- try {
- // sparder and query module has some class start with
org.apache.spark,
- // We need to use some lib that does not exist in
spark/jars
- clasz = getParent().loadClass(name);
- } catch (ClassNotFoundException e2) {
- // Class not found in this ClassLoader or in the
parent ClassLoader
- // Log some debug output before re-throwing
ClassNotFoundException
- logger.debug("Class " + name + " not found in parent
loader");
- throw e2;
- }
- }
- }
- return clasz;
- }
- }
-
- private boolean isThisCLPrecedent(String name) {
- for (String exemptPrefix : THIS_CL_PRECEDENT_CLASSES) {
- if (name.startsWith(exemptPrefix)) {
- return true;
- }
- }
- return false;
- }
-
- private boolean isParentCLPrecedent(String name) {
- for (String exemptPrefix : PARENT_CL_PRECEDENT_CLASSES) {
- if (name.startsWith(exemptPrefix)) {
- return true;
- }
- }
- return false;
- }
-
- private boolean needToUseGlobal(String name) {
- return !isThisCLPrecedent(name) && !classNeedPreempt(name) &&
isParentCLPrecedent(name);
- }
-
- boolean classNeedPreempt(String name) {
- if (classNotFoundCache.contains(name)) {
- return false;
- }
- for (String exemptPrefix : SPARK_CL_PREEMPT_CLASSES) {
- if (name.startsWith(exemptPrefix)) {
- return true;
- }
- }
- return false;
- }
-
- boolean fileNeedPreempt(String name) {
- for (String exemptPrefix : SPARK_CL_PREEMPT_FILES) {
- if (name.contains(exemptPrefix)) {
- return true;
- }
- }
- return false;
- }
-}
diff --git
a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/TomcatClassLoader.java
b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/TomcatClassLoader.java
index f403dbb..024d628 100644
---
a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/TomcatClassLoader.java
+++
b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/TomcatClassLoader.java
@@ -58,7 +58,6 @@ public class TomcatClassLoader extends
ParallelWebappClassLoader {
}
private static Logger logger =
LoggerFactory.getLogger(TomcatClassLoader.class);
- private SparkClassLoader sparkClassLoader;
/**
* Creates a DynamicClassLoader that can load classes dynamically
@@ -68,8 +67,6 @@ public class TomcatClassLoader extends
ParallelWebappClassLoader {
*/
public TomcatClassLoader(ClassLoader parent) throws IOException {
super(parent);
- sparkClassLoader = new SparkClassLoader(this);
- ClassLoaderUtils.setSparkClassLoader(sparkClassLoader);
ClassLoaderUtils.setOriginClassLoader(this);
init();
}
@@ -104,9 +101,6 @@ public class TomcatClassLoader extends
ParallelWebappClassLoader {
if (name.startsWith("org.apache.kylin.spark.classloader")) {
return parent.loadClass(name);
}
- if (sparkClassLoader.classNeedPreempt(name)) {
- return sparkClassLoader.loadClass(name);
- }
if (isParentCLPrecedent(name)) {
logger.debug("Skipping exempt class " + name + " - delegating
directly to parent");
return parent.loadClass(name);
@@ -116,9 +110,6 @@ public class TomcatClassLoader extends
ParallelWebappClassLoader {
@Override
public InputStream getResourceAsStream(String name) {
- if (sparkClassLoader.fileNeedPreempt(name)) {
- return sparkClassLoader.getResourceAsStream(name);
- }
return super.getResourceAsStream(name);
}
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
index 47d0c9b..2061257 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
@@ -54,7 +54,7 @@ object SparderContext extends Logging {
@volatile
var master_app_url: String = _
- def getOriginalSparkSession: SparkSession = withClassLoad {
+ def getOriginalSparkSession: SparkSession = {
if (spark == null || spark.sparkContext.isStopped) {
logInfo("Init spark.")
initSpark()
@@ -92,7 +92,7 @@ object SparderContext extends Logging {
spark != null && !spark.sparkContext.isStopped
}
- def restartSpark(): Unit = withClassLoad {
+ def restartSpark(): Unit = {
this.synchronized {
if (spark != null && !spark.sparkContext.isStopped) {
Utils.tryWithSafeFinally {
@@ -107,7 +107,7 @@ object SparderContext extends Logging {
}
}
- def stopSpark(): Unit = withClassLoad {
+ def stopSpark(): Unit = {
this.synchronized {
if (spark != null && !spark.sparkContext.isStopped) {
Utils.tryWithSafeFinally {
@@ -119,7 +119,7 @@ object SparderContext extends Logging {
}
}
- def init(): Unit = withClassLoad {
+ def init(): Unit = {
getOriginalSparkSession
}
@@ -127,7 +127,7 @@ object SparderContext extends Logging {
getSparkSession.sparkContext.conf.get(key)
}
- def initSpark(): Unit = withClassLoad {
+ def initSpark(): Unit =
this.synchronized {
if (initializingThread == null && (spark == null ||
spark.sparkContext.isStopped)) {
initializingThread = new Thread(new Runnable {
@@ -219,7 +219,6 @@ object SparderContext extends Logging {
// init FileStatusCache
ShardFileStatusCache.getFileStatusCache(getOriginalSparkSession)
}
- }
def registerListener(sc: SparkContext): Unit = {
val sparkListener = new SparkListener {
@@ -250,21 +249,6 @@ object SparderContext extends Logging {
logicalPlan
}
- /**
- * To avoid spark being affected by the environment, we use spark
classloader load spark.
- *
- * @param body Somewhere if you use spark
- * @tparam T Action function
- * @return The body return
- */
- def withClassLoad[T](body: => T): T = {
- // val originClassLoad = Thread.currentThread().getContextClassLoader
-
Thread.currentThread().setContextClassLoader(ClassLoaderUtils.getSparkClassLoader)
- val t = body
- // Thread.currentThread().setContextClassLoader(originClassLoad)
- t
- }
-
val _isAsyncQuery = new ThreadLocal[JBoolean]
val _separator = new ThreadLocal[JString]
val _df = new ThreadLocal[Dataset[Row]]
diff --git a/source-hive/pom.xml b/source-hive/pom.xml
index 8430796..895db29 100644
--- a/source-hive/pom.xml
+++ b/source-hive/pom.xml
@@ -37,7 +37,11 @@
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-build-engine</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-spark-query</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- Env & Test -->
<dependency>
<groupId>com.h2database</groupId>
@@ -66,6 +70,11 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<classifier>hadoop2</classifier>
diff --git
a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
deleted file mode 100644
index 669d66e..0000000
---
a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.hive;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.util.DBUtils;
-import org.apache.kylin.common.util.SourceConfigurationUtil;
-
-import org.apache.kylin.shaded.com.google.common.base.Preconditions;
-import org.apache.kylin.shaded.com.google.common.collect.Lists;
-
-public class BeelineHiveClient implements IHiveClient {
-
- private static final String HIVE_AUTH_USER = "user";
- private static final String HIVE_AUTH_PASSWD = "password";
- private Connection cnct;
- private Statement stmt;
- private DatabaseMetaData metaData;
-
- public BeelineHiveClient(String beelineParams) {
- if (StringUtils.isEmpty(beelineParams)) {
- throw new IllegalArgumentException("BeelineParames cannot be
empty");
- }
- String[] splits = StringUtils.split(beelineParams);
- String url = "", username = "", password = "";
- for (int i = 0; i < splits.length - 1; i++) {
- if ("-u".equals(splits[i])) {
- url = stripQuotes(splits[i + 1]);
- }
- if ("-n".equals(splits[i])) {
- username = stripQuotes(splits[i + 1]);
- }
- if ("-p".equals(splits[i])) {
- password = stripQuotes(splits[i + 1]);
- }
- if ("-w".equals(splits[i])) {
- File file = new File(splits[i + 1]);
- BufferedReader br = null;
- try {
- br = new BufferedReader(new InputStreamReader(new
FileInputStream(file), StandardCharsets.UTF_8));
- try {
- password = br.readLine();
- } finally {
- if (null != br) {
- br.close();
- }
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- Properties jdbcProperties =
SourceConfigurationUtil.loadHiveJDBCProperties();
- jdbcProperties.put(HIVE_AUTH_PASSWD, password);
- jdbcProperties.put(HIVE_AUTH_USER, username);
- this.init(url, jdbcProperties);
- }
-
- private void init(String url, Properties hiveProperties) {
- try {
- Class.forName("org.apache.hive.jdbc.HiveDriver");
- cnct = DriverManager.getConnection(url, hiveProperties);
- stmt = cnct.createStatement();
- metaData = cnct.getMetaData();
- } catch (SQLException | ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
- }
-
- private String stripQuotes(String input) {
- if (input.startsWith("'") && input.endsWith("'")) {
- return StringUtils.strip(input, "'");
- } else if (input.startsWith("\"") && input.endsWith("\"")) {
- return StringUtils.strip(input, "\"");
- } else {
- return input;
- }
- }
-
- public List<String> getHiveDbNames() throws Exception {
- List<String> ret = Lists.newArrayList();
- ResultSet schemas = metaData.getSchemas();
- while (schemas.next()) {
- ret.add(String.valueOf(schemas.getObject(1)));
- }
- DBUtils.closeQuietly(schemas);
- return ret;
- }
-
- public List<String> getHiveTableNames(String database) throws Exception {
- List<String> ret = Lists.newArrayList();
- ResultSet tables = metaData.getTables(null, database, null, null);
- while (tables.next()) {
- ret.add(String.valueOf(tables.getObject(3)));
- }
- DBUtils.closeQuietly(tables);
- return ret;
- }
-
- @Override
- public long getHiveTableRows(String database, String tableName) throws
Exception {
- ResultSet resultSet = null;
- long count = 0;
- try {
- String query = "select count(*) from ";
- resultSet = stmt.executeQuery(query.concat(database + "." +
tableName));
- if (resultSet.next()) {
- count = resultSet.getLong(1);
- }
- } finally {
- DBUtils.closeQuietly(resultSet);
- }
- return count;
- }
-
- @Override
- public List<Object[]> getHiveResult(String hql) throws Exception {
- ResultSet resultSet = null;
- List<Object[]> datas = new ArrayList<>();
- try {
- resultSet = stmt.executeQuery(hql);
- int columnCtn = resultSet.getMetaData().getColumnCount();
- while (resultSet.next()) {
- Object[] data = new Object[columnCtn];
- for (int i = 0; i < columnCtn; i++) {
- data[i] = resultSet.getObject(i + 1);
- }
- datas.add(data);
- }
- } finally {
- DBUtils.closeQuietly(resultSet);
- }
- return datas;
- }
-
- @Override
- public void executeHQL(String hql) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void executeHQL(String[] hqls) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- public HiveTableMeta getHiveTableMeta(String database, String tableName)
throws SQLException {
- ResultSet columns = metaData.getColumns(null, database, tableName,
null);
- HiveTableMetaBuilder builder = new HiveTableMetaBuilder();
- builder.setTableName(tableName);
-
- List<HiveTableMeta.HiveTableColumnMeta> allColumns =
Lists.newArrayList();
- while (columns.next()) {
- String columnName = columns.getString(4);
- String dataType = columns.getString(6);
- String comment = columns.getString(12);
- dataType = considerDataTypePrecision(dataType,
columns.getString(7), columns.getString(9));
- allColumns.add(new HiveTableMeta.HiveTableColumnMeta(columnName,
dataType, comment));
- }
- builder.setAllColumns(allColumns);
- DBUtils.closeQuietly(columns);
- String exe = "use ";
- stmt.execute(exe.concat(database));
- String des = "describe formatted ";
- ResultSet resultSet = stmt.executeQuery(des.concat(tableName));
- extractHiveTableMeta(resultSet, builder);
- DBUtils.closeQuietly(resultSet);
- return builder.createHiveTableMeta();
- }
-
- public static String considerDataTypePrecision(String dataType, String
precision, String scale) {
- if ("VARCHAR".equalsIgnoreCase(dataType) ||
"CHAR".equalsIgnoreCase(dataType)) {
- if (null != precision)
- dataType = new
StringBuilder(dataType).append("(").append(precision).append(")").toString();
- }
- if ("DECIMAL".equalsIgnoreCase(dataType) ||
"NUMERIC".equalsIgnoreCase(dataType)) {
- if (precision != null && scale != null)
- dataType = new
StringBuilder(dataType).append("(").append(precision).append(",").append(scale)
- .append(")").toString();
- }
- return dataType;
- }
-
- private void extractHiveTableMeta(ResultSet resultSet,
HiveTableMetaBuilder builder) throws SQLException {
- while (resultSet.next()) {
- parseResultEntry(resultSet, builder);
- }
- }
-
- private void parseResultEntry(ResultSet resultSet, HiveTableMetaBuilder
builder) throws SQLException {
- List<HiveTableMeta.HiveTableColumnMeta> partitionColumns =
Lists.newArrayList();
- if ("# Partition Information".equals(resultSet.getString(1).trim())) {
- resultSet.next();
- Preconditions.checkArgument("#
col_name".equals(resultSet.getString(1).trim()));
- resultSet.next();
-
Preconditions.checkArgument("".equals(resultSet.getString(1).trim()));
- while (resultSet.next()) {
- if ("".equals(resultSet.getString(1).trim())) {
- break;
- }
- partitionColumns.add(new
HiveTableMeta.HiveTableColumnMeta(resultSet.getString(1).trim(),
- resultSet.getString(2).trim(),
resultSet.getString(3).trim()));
- }
- builder.setPartitionColumns(partitionColumns);
- }
-
- if ("Owner:".equals(resultSet.getString(1).trim())) {
- builder.setOwner(resultSet.getString(2).trim());
- }
- if ("LastAccessTime:".equals(resultSet.getString(1).trim())) {
- try {
- int i = Integer.parseInt(resultSet.getString(2).trim());
- builder.setLastAccessTime(i);
- } catch (NumberFormatException e) {
- builder.setLastAccessTime(0);
- }
- }
- if ("Location:".equals(resultSet.getString(1).trim())) {
- builder.setSdLocation(resultSet.getString(2).trim());
- }
- if ("Table Type:".equals(resultSet.getString(1).trim())) {
- builder.setTableType(resultSet.getString(2).trim());
- }
- if ("Table Parameters:".equals(resultSet.getString(1).trim())) {
- extractTableParam(resultSet, builder);
- }
- if ("InputFormat:".equals(resultSet.getString(1).trim())) {
- builder.setSdInputFormat(resultSet.getString(2).trim());
- }
- if ("OutputFormat:".equals(resultSet.getString(1).trim())) {
- builder.setSdOutputFormat(resultSet.getString(2).trim());
- }
- }
-
- private void extractTableParam(ResultSet resultSet, HiveTableMetaBuilder
builder) throws SQLException {
- while (resultSet.next()) {
- if (resultSet.getString(2) == null) {
- break;
- }
- if ("storage_handler".equals(resultSet.getString(2).trim())) {
- builder.setIsNative(false);//default is true
- }
- if ("totalSize".equals(resultSet.getString(2).trim())) {
-
builder.setFileSize(Long.parseLong(resultSet.getString(3).trim()));//default is
false
- }
- if ("numFiles".equals(resultSet.getString(2).trim())) {
-
builder.setFileNum(Long.parseLong(resultSet.getString(3).trim()));
- }
- if
("skip.header.line.count".equals(resultSet.getString(2).trim())) {
- builder.setSkipHeaderLineCount(resultSet.getString(3).trim());
- }
- }
- }
-
- public void close() {
- DBUtils.closeQuietly(stmt);
- DBUtils.closeQuietly(cnct);
- }
-
- public static void main(String[] args) throws SQLException {
-
- BeelineHiveClient loader = new BeelineHiveClient(
- "-n root --hiveconf
hive.security.authorization.sqlstd.confwhitelist.append='mapreduce.job.*|dfs.*'
-u 'jdbc:hive2://sandbox:10000'");
- //BeelineHiveClient loader = new
BeelineHiveClient(StringUtils.join(args, " "));
- HiveTableMeta hiveTableMeta = loader.getHiveTableMeta("default",
"test_kylin_fact_part");
- System.out.println(hiveTableMeta);
- loader.close();
- }
-}
diff --git
a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java
b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java
deleted file mode 100644
index 361a3c6..0000000
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.hive;
-
-import org.apache.kylin.shaded.com.google.common.collect.Lists;
-import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HiveCmdBuilder;
-import org.apache.kylin.common.util.Pair;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Hive meta API client for Kylin
- * @author shaoshi
- *
- */
-public class CLIHiveClient implements IHiveClient {
- protected HiveConf hiveConf = null;
- protected IMetaStoreClient metaStoreClient = null;
-
- public CLIHiveClient() {
- hiveConf = new HiveConf(CLIHiveClient.class);
- }
-
- /**
- * only used by Deploy Util
- * @throws IOException
- */
- @Override
- public void executeHQL(String hql) throws IOException {
- final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
- hiveCmdBuilder.addStatement(hql);
- Pair<Integer, String> response =
KylinConfig.getInstanceFromEnv().getCliCommandExecutor()
- .execute(hiveCmdBuilder.toString());
- if (response.getFirst() != 0) {
- throw new IllegalArgumentException("Failed to execute hql [" + hql
+ "], error message is: " + response.getSecond());
- }
-
- }
-
- /**
- * only used by Deploy Util
- */
- @Override
- public void executeHQL(String[] hqls) throws IOException {
- for (String sql : hqls)
- executeHQL(sql);
- }
-
- @Override
- public HiveTableMeta getHiveTableMeta(String database, String tableName)
throws Exception {
- HiveTableMetaBuilder builder = new HiveTableMetaBuilder();
- Table table = getMetaStoreClient().getTable(database, tableName);
-
- List<FieldSchema> allFields = getMetaStoreClient().getFields(database,
tableName);
- List<FieldSchema> partitionFields = table.getPartitionKeys();
- if (allFields == null) {
- allFields = Lists.newArrayList();
- }
- if (partitionFields != null && partitionFields.size() > 0) {
- allFields.addAll(partitionFields);
- }
- List<HiveTableMeta.HiveTableColumnMeta> allColumns =
Lists.newArrayList();
- List<HiveTableMeta.HiveTableColumnMeta> partitionColumns =
Lists.newArrayList();
- for (FieldSchema fieldSchema : allFields) {
- allColumns.add(new
HiveTableMeta.HiveTableColumnMeta(fieldSchema.getName(), fieldSchema.getType(),
fieldSchema.getComment()));
- }
- if (partitionFields != null && partitionFields.size() > 0) {
- for (FieldSchema fieldSchema : partitionFields) {
- partitionColumns.add(new
HiveTableMeta.HiveTableColumnMeta(fieldSchema.getName(), fieldSchema.getType(),
fieldSchema.getComment()));
- }
- }
- builder.setAllColumns(allColumns);
- builder.setPartitionColumns(partitionColumns);
-
- builder.setSdLocation(table.getSd().getLocation());
- builder.setFileSize(getBasicStatForTable(new
org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.TOTAL_SIZE));
- builder.setFileNum(getBasicStatForTable(new
org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.NUM_FILES));
- builder.setIsNative(!MetaStoreUtils.isNonNativeTable(table));
- builder.setTableName(tableName);
- builder.setSdInputFormat(table.getSd().getInputFormat());
- builder.setSdOutputFormat(table.getSd().getOutputFormat());
- builder.setOwner(table.getOwner());
- builder.setLastAccessTime(table.getLastAccessTime());
- builder.setTableType(table.getTableType());
-
builder.setSkipHeaderLineCount(table.getParameters().get("skip.header.line.count"));
-
- return builder.createHiveTableMeta();
- }
-
- @Override
- public List<String> getHiveDbNames() throws Exception {
- return getMetaStoreClient().getAllDatabases();
- }
-
- @Override
- public List<String> getHiveTableNames(String database) throws Exception {
- return getMetaStoreClient().getAllTables(database);
- }
-
- @Override
- public long getHiveTableRows(String database, String tableName) throws
Exception {
- Table table = getMetaStoreClient().getTable(database, tableName);
- return getBasicStatForTable(new
org.apache.hadoop.hive.ql.metadata.Table(table), StatsSetupConst.ROW_COUNT);
- }
-
- @Override
- public List<Object[]> getHiveResult(String hql) throws Exception {
- List<Object[]> data = new ArrayList<>();
-
- final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
- hiveCmdBuilder.addStatement(hql);
- Pair<Integer, String> response =
KylinConfig.getInstanceFromEnv().getCliCommandExecutor().execute(hiveCmdBuilder.toString());
-
- String[] respData = response.getSecond().split("\n");
-
- boolean isData = false;
-
- for (String item : respData) {
- if (item.trim().equalsIgnoreCase("OK")) {
- isData = true;
- continue;
- }
- if (item.trim().startsWith("Time taken")) {
- isData = false;
- }
- if (isData) {
- Object[] arr = item.split("\t");
- data.add(arr);
- }
-
- }
-
- return data;
- }
-
- private IMetaStoreClient getMetaStoreClient() throws Exception {
- if (metaStoreClient == null) {
- metaStoreClient =
HiveMetaStoreClientFactory.getHiveMetaStoreClient(hiveConf);
- }
- return metaStoreClient;
- }
-
- /**
- * COPIED FROM org.apache.hadoop.hive.ql.stats.StatsUtil for backward
compatibility
- * <p>
- * Get basic stats of table
- *
- * @param table - table
- * @param statType - type of stats
- * @return value of stats
- */
- private long getBasicStatForTable(org.apache.hadoop.hive.ql.metadata.Table
table, String statType) {
- Map<String, String> params = table.getParameters();
- long result = 0;
-
- if (params != null) {
- try {
- result = Long.parseLong(params.get(statType));
- } catch (NumberFormatException e) {
- result = 0;
- }
- }
- return result;
- }
-}
diff --git
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java
index 4687973..74e1e8a 100644
---
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java
+++
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java
@@ -23,10 +23,8 @@ import org.apache.kylin.common.KylinConfig;
public class HiveClientFactory {
public static IHiveClient getHiveClient() {
- if
("cli".equals(KylinConfig.getInstanceFromEnv().getHiveClientMode())) {
- return new CLIHiveClient();
- } else if
("beeline".equals(KylinConfig.getInstanceFromEnv().getHiveClientMode())) {
- return new
BeelineHiveClient(KylinConfig.getInstanceFromEnv().getHiveBeelineParams());
+ if
("spark_catalog".equals(KylinConfig.getInstanceFromEnv().getHiveClientMode())) {
+ return new SparkHiveClient();
} else {
throw new RuntimeException("cannot recognize hive client mode");
}
diff --git
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java
index 9a26c14..4327b90 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java
@@ -45,14 +45,14 @@ public class HiveTableMeta {
String owner;
String tableType;
int skipHeaderLineCount;
- int lastAccessTime;
+ long lastAccessTime;
long fileSize;
long fileNum;
boolean isNative;
List<HiveTableColumnMeta> allColumns;
List<HiveTableColumnMeta> partitionColumns;
- public HiveTableMeta(String tableName, String sdLocation, String
sdInputFormat, String sdOutputFormat, String owner, String tableType, int
lastAccessTime, long fileSize, long fileNum, int skipHeaderLineCount, boolean
isNative, List<HiveTableColumnMeta> allColumns, List<HiveTableColumnMeta>
partitionColumns) {
+ public HiveTableMeta(String tableName, String sdLocation, String
sdInputFormat, String sdOutputFormat, String owner, String tableType, long
lastAccessTime, long fileSize, long fileNum, int skipHeaderLineCount, boolean
isNative, List<HiveTableColumnMeta> allColumns, List<HiveTableColumnMeta>
partitionColumns) {
this.tableName = tableName;
this.sdLocation = sdLocation;
this.sdInputFormat = sdInputFormat;
diff --git
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java
index 0f34224..5238fd3 100644
---
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java
+++
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMetaBuilder.java
@@ -29,7 +29,7 @@ public class HiveTableMetaBuilder {
private String sdOutputFormat;
private String owner;
private String tableType;
- private int lastAccessTime;
+ private long lastAccessTime;
private long fileSize;
private long fileNum;
private int skipHeaderLineCount;
@@ -67,7 +67,7 @@ public class HiveTableMetaBuilder {
return this;
}
- public HiveTableMetaBuilder setLastAccessTime(int lastAccessTime) {
+ public HiveTableMetaBuilder setLastAccessTime(long lastAccessTime) {
this.lastAccessTime = lastAccessTime;
return this;
}
diff --git
a/source-hive/src/main/java/org/apache/kylin/source/hive/SparkHiveClient.java
b/source-hive/src/main/java/org/apache/kylin/source/hive/SparkHiveClient.java
new file mode 100644
index 0000000..26b97d7
--- /dev/null
+++
b/source-hive/src/main/java/org/apache/kylin/source/hive/SparkHiveClient.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.kylin.source.hive;
+
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.apache.spark.sql.SparderContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.apache.spark.sql.catalyst.catalog.CatalogTable;
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import scala.Option;
+import scala.collection.Iterator;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SparkHiveClient implements IHiveClient {
+ //key in hive metadata map
+ private static final String CHAR_VARCHAR_TYPE_STRING =
"__CHAR_VARCHAR_TYPE_STRING";
+ private static final String HIVE_COMMENT = "comment";
+ private static final String HIVE_TABLE_ROWS = "numRows";
+ private static final String TABLE_TOTAL_SIZE = "totalSize";
+ private static final String TABLE_FILE_NUM = "numFiles";
+
+ protected SparkSession ss;
+ protected SessionCatalog catalog;
+
+ public SparkHiveClient() {
+ ss = SparderContext.getOriginalSparkSession();
+ catalog = ss.sessionState().catalog();
+ }
+
+
+ @Override
+ public void executeHQL(String hql) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void executeHQL(String[] hqls) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public HiveTableMeta getHiveTableMeta(String database, String tableName)
throws Exception {
+ HiveTableMetaBuilder builder = new HiveTableMetaBuilder();
+ CatalogTable catalogTable = catalog
+ .getTempViewOrPermanentTableMetadata(new
TableIdentifier(tableName, Option.apply(database)));
+ scala.collection.immutable.List<StructField> structFieldList =
catalogTable.schema().toList();
+ Iterator<StructField> structFieldIterator = structFieldList.iterator();
+
+ List<HiveTableMeta.HiveTableColumnMeta> allColumns =
Lists.newArrayList();
+ List<HiveTableMeta.HiveTableColumnMeta> partitionColumns =
Lists.newArrayList();
+ while (structFieldIterator.hasNext()) {
+ StructField structField = structFieldIterator.next();
+ String name = structField.name();
+ String hiveDataType = structField.dataType().simpleString();
+ Metadata metadata = structField.metadata();
+ String description = metadata.contains(HIVE_COMMENT) ?
metadata.getString(HIVE_COMMENT) : "";
+ String datatype = metadata.contains(CHAR_VARCHAR_TYPE_STRING) ?
metadata.getString(CHAR_VARCHAR_TYPE_STRING) : hiveDataType;
+
+ allColumns.add(new HiveTableMeta.HiveTableColumnMeta(name,
datatype, description));
+ if (catalogTable.partitionColumnNames().contains(name)) {
+ partitionColumns.add(new
HiveTableMeta.HiveTableColumnMeta(name, datatype, description));
+ }
+ }
+
+ builder.setAllColumns(allColumns);
+ builder.setPartitionColumns(partitionColumns);
+ builder.setSdLocation(catalogTable.location().getPath());
+
builder.setFileSize(Long.parseLong(catalogTable.ignoredProperties().apply(TABLE_TOTAL_SIZE)));
+
builder.setFileNum(Long.parseLong(catalogTable.ignoredProperties().apply(TABLE_FILE_NUM)));
+
builder.setIsNative(catalogTable.tableType().equals(CatalogTableType.MANAGED()));
+ builder.setTableName(tableName);
+
builder.setSdInputFormat(catalogTable.storage().inputFormat().toString());
+
builder.setSdOutputFormat(catalogTable.storage().outputFormat().toString());
+ builder.setOwner(catalogTable.owner());
+ builder.setLastAccessTime(catalogTable.lastAccessTime());
+ builder.setTableType(catalogTable.tableType().name());
+
+ return builder.createHiveTableMeta();
+ }
+
+ @Override
+ public List<String> getHiveDbNames() throws Exception {
+ return
scala.collection.JavaConversions.seqAsJavaList(catalog.listDatabases());
+ }
+
+ @Override
+ public List<String> getHiveTableNames(String database) throws Exception {
+ List<TableIdentifier> tableIdentifiers =
scala.collection.JavaConversions.seqAsJavaList(catalog.listTables(database));
+ List<String> tableNames = tableIdentifiers.stream().map(table ->
table.table()).collect(Collectors.toList());
+ return tableNames;
+ }
+
+ @Override
+ public long getHiveTableRows(String database, String tableName) throws
Exception {
+ return Long.parseLong(catalog.getTempViewOrPermanentTableMetadata(new
TableIdentifier(tableName, Option.apply(database)))
+ .ignoredProperties().apply(HIVE_TABLE_ROWS));
+ }
+
+ /*
+ This method was originally used for pushdown query.
+ The method of pushdown query in kylin4 is
PushDownRunnerSparkImpl.executeQuery, so getHiveResult is not implemented here.
+ */
+ @Override
+ public List<Object[]> getHiveResult(String sql) throws Exception {
+ return null;
+ }
+}
diff --git
a/source-hive/src/test/java/org/apache/kylin/source/hive/BeelineHIveClientTest.java
b/source-hive/src/test/java/org/apache/kylin/source/hive/BeelineHIveClientTest.java
deleted file mode 100644
index 00a25e5..0000000
---
a/source-hive/src/test/java/org/apache/kylin/source/hive/BeelineHIveClientTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.hive;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class BeelineHIveClientTest {
- @Test
- public void testBasics() {
- String dataType = "varchar";
- String precision = "60";
- String scale = null;
- dataType = BeelineHiveClient.considerDataTypePrecision(dataType,
precision, scale);
- Assert.assertEquals("varchar(60)", dataType);
-
- dataType = "char";
- precision = "50";
- scale = null;
- dataType = BeelineHiveClient.considerDataTypePrecision(dataType,
precision, scale);
- Assert.assertEquals("char(50)", dataType);
-
- dataType = "decimal";
- precision = "8";
- scale = "4";
- dataType = BeelineHiveClient.considerDataTypePrecision(dataType,
precision, scale);
- Assert.assertEquals("decimal(8,4)", dataType);
-
- dataType = "numeric";
- precision = "7";
- scale = "3";
- dataType = BeelineHiveClient.considerDataTypePrecision(dataType,
precision, scale);
- Assert.assertEquals("numeric(7,3)", dataType);
- }
-}