This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push:
new 35e1487e57 KYLIN-5285 the number of filePartitions of FileScanRDD when
used shardby
35e1487e57 is described below
commit 35e1487e57cc43bba5e592dfcb4c41a2fb1e8a8b
Author: zhaoliu4 <[email protected]>
AuthorDate: Mon Oct 31 11:04:08 2022 +0800
KYLIN-5285 the number of filePartitions of FileScanRDD when used shardby
KYLIN-5285 the number of filePartitions of FileScanRDD when used shardby
---
.../apache/spark/sql/execution/KylinFileSourceScanExec.scala | 9 ++++++---
.../main/scala/org/apache/spark/application/JobWorkSpace.scala | 10 +---------
2 files changed, 7 insertions(+), 12 deletions(-)
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
index 61fef995fe..534bb5663b 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
@@ -162,9 +162,12 @@ class KylinFileSourceScanExec(
f => FilePruner.getPartitionId(new Path(f.filePath))
}
- val filePartitions = Seq.tabulate(shardSpec.numShards) { shardId =>
- FilePartition(shardId, filesToPartitionId.getOrElse(shardId,
Nil).toArray)
- }
+ var shardId = 0
+ val filePartitions = new ArrayBuffer[FilePartition]()
+ filesToPartitionId.foreach(t => {
+ filePartitions += FilePartition(shardId, t._2.toArray)
+ shardId += 1
+ })
if (SoftAffinityManager.usingSoftAffinity) {
val start = System.currentTimeMillis()
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
index 8c2c30a53e..03029efef4 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
@@ -35,15 +35,7 @@ object JobWorkSpace extends Logging {
val worker = new JobWorker(application, appArgs, eventLoop)
val monitor = new JobMonitor(eventLoop)
val workspace = new JobWorkSpace(eventLoop, monitor, worker)
-
- if (System.getProperty("spark.master").equals("yarn") &&
System.getProperty("spark.submit.deployMode").equals("cluster")) {
- val res = workspace.run()
- if (res != 0) {
- System.exit(res)
- }
- } else {
- System.exit(workspace.run())
- }
+ workspace.run()
} catch {
case throwable: Throwable =>
logError("Error occurred when init job workspace.", throwable)