This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit c14359ccd3b55704f4168b70cc9e8df89732b751 Author: xingjian <[email protected]> AuthorDate: Thu Nov 10 10:08:27 2022 +0800 [DIRTY] fix partition snapshot build on spark serverless --- .../engine/spark/builder/SnapshotBuilder.scala | 31 ++++++++++++---------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala index ce5a080601..b077112498 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala @@ -18,29 +18,28 @@ package org.apache.kylin.engine.spark.builder -import java.io.IOException -import java.util -import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, Executors} -import java.util.{Objects, UUID} import com.google.common.collect.Maps -import org.apache.kylin.engine.spark.NSparkCubingEngine -import org.apache.kylin.engine.spark.job.{DFChooser, KylinBuildEnv} -import org.apache.kylin.engine.spark.utils.{FileNames, LogUtils} -import org.apache.kylin.metadata.model.{NDataModel, NTableMetadataManager} -import org.apache.kylin.metadata.project.NProjectManager import org.apache.commons.codec.digest.DigestUtils import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.kylin.common.KylinConfig.SetAndUnsetThreadLocalConfig import org.apache.kylin.common.persistence.transaction.UnitOfWork -import org.apache.kylin.common.{KapConfig, KylinConfig} import org.apache.kylin.common.util.HadoopUtil -import org.apache.kylin.metadata.model.{TableDesc, TableExtDesc} +import org.apache.kylin.common.{KapConfig, KylinConfig} +import org.apache.kylin.engine.spark.NSparkCubingEngine +import org.apache.kylin.engine.spark.job.{DFChooser, KylinBuildEnv} +import org.apache.kylin.engine.spark.utils.{FileNames, LogUtils} +import org.apache.kylin.metadata.model.{NDataModel, NTableMetadataManager, TableDesc, TableExtDesc} +import org.apache.kylin.metadata.project.NProjectManager import org.apache.kylin.source.SourceFactory import org.apache.spark.internal.Logging import org.apache.spark.sql.hive.utils.ResourceDetectUtils -import org.apache.spark.sql.{Dataset, Encoders, Row, SparderEnv, SparkSession} +import org.apache.spark.sql._ import org.apache.spark.utils.ProxyThreadUtils +import java.io.IOException +import java.util +import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, Executors} +import java.util.{Objects, UUID} import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} @@ -384,7 +383,7 @@ class SnapshotBuilder(var jobId: String) extends Logging with Serializable { val resourcePath = baseDir + "/" + snapshotTablePath var hadoopConf = SparderEnv.getHadoopConfiguration() if (kylinConfig.getClusterManagerClassName.contains("AWSServerless")) { - hadoopConf = ss.sparkContext.hadoopConfiguration + hadoopConf = ss.sparkContext.hadoopConfiguration } val (repartitionNum, sizeMB) = try { val sizeInMB = ResourceDetectUtils.getPaths(sourceData.queryExecution.sparkPlan) @@ -464,8 +463,12 @@ class SnapshotBuilder(var jobId: String) extends Logging with Serializable { hadoopConf = sourceData.sparkSession.sparkContext.hadoopConfiguration } try { + var hadoopConf = SparderEnv.getHadoopConfiguration() + if (kylinConfig.getClusterManagerClassName.contains("AWSServerless")) { + hadoopConf = sourceData.sparkSession.sparkContext.hadoopConfiguration + } val sizeInMB = ResourceDetectUtils.getPaths(sourceData.queryExecution.sparkPlan) - .map(path => HadoopUtil.getContentSummary(path.getFileSystem(SparderEnv.getHadoopConfiguration()), path).getLength) + .map(path => HadoopUtil.getContentSummary(path.getFileSystem(hadoopConf), path).getLength) .sum * 1.0 / MB val num = Math.ceil(sizeInMB / KylinBuildEnv.get().kylinConfig.getSnapshotShardSizeMB).intValue() (num, sizeInMB)
