This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push:
new d34f085 KYLIN-5057 CubeBuildJob in Kylin4.0 run failed when open
Spark3.1 AQE
d34f085 is described below
commit d34f08594db97d0f175144f0190e782e11f2b9b5
Author: Zhichao Zhang <[email protected]>
AuthorDate: Wed Aug 11 22:28:28 2021 +0800
KYLIN-5057 CubeBuildJob in Kylin4.0 run failed when open Spark3.1 AQE
Root cause:
With KylinJoinSelection strategy, if AQE is true, this strategy will be
applied before LogicalQueryStageStrategy,
and then it will be applied JoinSelection strategy again, leading to change
the 'Build Type' of BroadcastHashJoin in some cases.
---
.../spark/sql/execution/KylinJoinSelection.scala | 282 ---------------------
.../spark/sql/execution/KylinJoinSelection.scala | 249 ------------------
.../engine/spark/application/SparkApplication.java | 21 +-
3 files changed, 4 insertions(+), 548 deletions(-)
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinJoinSelection.scala
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinJoinSelection.scala
deleted file mode 100644
index 200dd24..0000000
---
a/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinJoinSelection.scala
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.spark.sql.execution
-
-import javax.annotation.concurrent.GuardedBy
-import org.apache.kylin.common.KylinConfig
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.expressions.{PredicateHelper, RowOrdering}
-import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
-import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.{SparkSession, Strategy}
-
-/**
- * Select the proper physical plan for join based on joining keys and size of
logical plan.
- *
- * At first, uses the [[ExtractEquiJoinKeys]] pattern to find joins where at
least some of the
- * predicates can be evaluated by matching join keys. If found, join
implementations are chosen
- * with the following precedence:
- *
- * - Broadcast hash join (BHJ):
- * BHJ is not supported for full outer join. For right outer join, we only can
broadcast the
- * left side. For left outer, left semi, left anti and the internal join type
ExistenceJoin,
- * we only can broadcast the right side. For inner like join, we can broadcast
both sides.
- * Normally, BHJ can perform faster than the other join algorithms when the
broadcast side is
- * small. However, broadcasting tables is a network-intensive operation.
It could cause OOM
- * or perform worse than the other join algorithms, especially when the
build/broadcast side
- * is big.
- *
- * For the supported cases, users can specify the broadcast hint (e.g. the
user applied the
- * [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame) and
session-based
- * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold to adjust whether BHJ is
used and
- * which join side is broadcast.
- *
- * 1) Broadcast the join side with the broadcast hint, even if the size is
larger than
- * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]]. If both sides have the hint (only
when the type
- * is inner like join), the side with a smaller estimated physical size will
be broadcast.
- * 2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold and
broadcast the side
- * whose estimated physical size is smaller than the threshold. If both sides
are below the
- * threshold, broadcast the smaller side. If neither is smaller, BHJ is not
used.
- *
- * - Shuffle hash join: if the average size of a single partition is small
enough to build a hash
- * table.
- *
- * - Sort merge: if the matching join keys are sortable.
- *
- * If there is no joining keys, Join implementations are chosen with the
following precedence:
- * - BroadcastNestedLoopJoin (BNLJ):
- * BNLJ supports all the join types but the impl is OPTIMIZED for the
following scenarios:
- * For right outer join, the left side is broadcast. For left outer, left
semi, left anti
- * and the internal join type ExistenceJoin, the right side is broadcast. For
inner like
- * joins, either side is broadcast.
- *
- * Like BHJ, users still can specify the broadcast hint and session-based
- * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold to impact which side is
broadcast.
- *
- * 1) Broadcast the join side with the broadcast hint, even if the size is
larger than
- * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]]. If both sides have the hint
(i.e., just for
- * inner-like join), the side with a smaller estimated physical size will be
broadcast.
- * 2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold and
broadcast the side
- * whose estimated physical size is smaller than the threshold. If both sides
are below the
- * threshold, broadcast the smaller side. If neither is smaller, BNLJ is not
used.
- *
- * - CartesianProduct: for inner like join, CartesianProduct is the fallback
option.
- *
- * - BroadcastNestedLoopJoin (BNLJ):
- * For the other join types, BNLJ is the fallback option. Here, we just pick
the broadcast
- * side with the broadcast hint. If neither side has a hint, we broadcast the
side with
- * the smaller estimated physical size.
- */
-case class KylinJoinSelection(session: SparkSession) extends Strategy with
PredicateHelper with Logging {
-
- val conf: SQLConf = session.sessionState.conf
-
- /**
- * Matches a plan whose output should be small enough to be used in
broadcast join.
- */
- private def canBroadcast(plan: LogicalPlan): Boolean = {
- val sizeInBytes = plan.stats.sizeInBytes
- sizeInBytes >= 0 && sizeInBytes <= conf.autoBroadcastJoinThreshold &&
JoinMemoryManager.acquireMemory(sizeInBytes.toLong)
- }
-
- /**
- * Matches a plan whose single partition should be small enough to build a
hash table.
- *
- * Note: this assume that the number of partition is fixed, requires
additional work if it's
- * dynamic.
- */
- private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
- plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold *
conf.numShufflePartitions
- }
-
- /**
- * Returns whether plan a is much smaller (3X) than plan b.
- *
- * The cost to build hash map is higher than sorting, we should only build
hash map on a table
- * that is much smaller than other one. Since we does not have the statistic
for number of rows,
- * use the size of bytes here as estimation.
- */
- private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
- a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
- }
-
- private def canBuildRight(joinType: JoinType): Boolean = joinType match {
- case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin =>
true
- case _ => false
- }
-
- private def canBuildLeft(joinType: JoinType): Boolean = joinType match {
- case _: InnerLike | RightOuter => true
- case _ => false
- }
-
- private def broadcastSide(
- canBuildLeft: Boolean,
- canBuildRight: Boolean,
- left: LogicalPlan,
- right: LogicalPlan): BuildSide = {
-
- def smallerSide =
- if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else
BuildLeft
-
- if (canBuildRight && canBuildLeft) {
- // Broadcast smaller side base on its estimated physical size
- // if both sides have broadcast hint
- smallerSide
- } else if (canBuildRight) {
- BuildRight
- } else if (canBuildLeft) {
- BuildLeft
- } else {
- // for the last default broadcast nested loop join
- smallerSide
- }
- }
-
- private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan,
right: LogicalPlan)
- : Boolean = {
- val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
- val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
- buildLeft || buildRight
- }
-
- private def broadcastSideByHints(joinType: JoinType, left: LogicalPlan,
right: LogicalPlan)
- : BuildSide = {
- val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
- val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
- broadcastSide(buildLeft, buildRight, left, right)
- }
-
- private def canBroadcastBySizes(joinType: JoinType, left: LogicalPlan,
right: LogicalPlan)
- : Boolean = {
- val buildLeft = canBuildLeft(joinType) && canBroadcast(left)
- val buildRight = canBuildRight(joinType) && canBroadcast(right)
- buildLeft || buildRight
- }
-
- private def broadcastSideBySizes(joinType: JoinType, left: LogicalPlan,
right: LogicalPlan)
- : BuildSide = {
- val buildLeft = canBuildLeft(joinType) && canBroadcast(left)
- val buildRight = canBuildRight(joinType) && canBroadcast(right)
- broadcastSide(buildLeft, buildRight, left, right)
- }
-
- def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-
- // --- BroadcastHashJoin
--------------------------------------------------------------------
-
- // broadcast hints were specified
- case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left,
right)
- if canBroadcastByHints(joinType, left, right) =>
- val buildSide = broadcastSideByHints(joinType, left, right)
- Seq(joins.BroadcastHashJoinExec(
- leftKeys, rightKeys, joinType, buildSide, condition, planLater(left),
planLater(right)))
-
- // broadcast hints were not specified, so need to infer it from size and
configuration.
- case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left,
right)
- if canBroadcastBySizes(joinType, left, right) =>
- val buildSide = broadcastSideBySizes(joinType, left, right)
- Seq(joins.BroadcastHashJoinExec(
- leftKeys, rightKeys, joinType, buildSide, condition, planLater(left),
planLater(right)))
-
- // --- ShuffledHashJoin
---------------------------------------------------------------------
-
- case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left,
right)
- if !conf.preferSortMergeJoin && canBuildRight(joinType) &&
canBuildLocalHashMap(right)
- && muchSmaller(right, left) ||
- !RowOrdering.isOrderable(leftKeys) =>
- Seq(joins.ShuffledHashJoinExec(
- leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left),
planLater(right)))
-
- case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left,
right)
- if !conf.preferSortMergeJoin && canBuildLeft(joinType) &&
canBuildLocalHashMap(left)
- && muchSmaller(left, right) ||
- !RowOrdering.isOrderable(leftKeys) =>
- Seq(joins.ShuffledHashJoinExec(
- leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left),
planLater(right)))
-
- // --- SortMergeJoin
------------------------------------------------------------
-
- case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left,
right)
- if RowOrdering.isOrderable(leftKeys) =>
- joins.SortMergeJoinExec(
- leftKeys, rightKeys, joinType, condition, planLater(left),
planLater(right)) :: Nil
-
- // --- Without joining keys
------------------------------------------------------------
-
- // Pick BroadcastNestedLoopJoin if one side could be broadcast
- case [email protected](left, right, joinType, condition)
- if canBroadcastByHints(joinType, left, right) =>
- val buildSide = broadcastSideByHints(joinType, left, right)
- joins.BroadcastNestedLoopJoinExec(
- planLater(left), planLater(right), buildSide, joinType, condition) ::
Nil
-
- case [email protected](left, right, joinType, condition)
- if canBroadcastBySizes(joinType, left, right) =>
- val buildSide = broadcastSideBySizes(joinType, left, right)
- joins.BroadcastNestedLoopJoinExec(
- planLater(left), planLater(right), buildSide, joinType, condition) ::
Nil
-
- // Pick CartesianProduct for InnerJoin
- case logical.Join(left, right, _: InnerLike, condition) =>
- joins.CartesianProductExec(planLater(left), planLater(right), condition)
:: Nil
-
- case logical.Join(left, right, joinType, condition) =>
- val buildSide = broadcastSide(
- left.stats.hints.broadcast, right.stats.hints.broadcast, left, right)
- // This join could be very slow or OOM
- joins.BroadcastNestedLoopJoinExec(
- planLater(left), planLater(right), buildSide, joinType, condition) ::
Nil
-
- // --- Cases where this strategy does not apply
---------------------------------------------
-
- case _ => Nil
- }
-}
-
-object JoinMemoryManager extends Logging {
-
- @GuardedBy("this")
- private[this] var memoryUsed: Long = 0
-
- def acquireMemory(numBytesToAcquire: Long): Boolean = synchronized {
- assert(numBytesToAcquire >= 0)
- val enoughMemory = numBytesToAcquire <= (maxMemoryJoinCanUse - memoryUsed)
- if (enoughMemory) {
- memoryUsed += numBytesToAcquire
- logInfo(s"Acquire $numBytesToAcquire bytes for BHJ, memory used
$memoryUsed, max memory BHJ can use $maxMemoryJoinCanUse.")
- } else {
- logInfo("Driver memory is not enough for BHJ.")
- }
- enoughMemory
- }
-
- private def maxMemoryJoinCanUse: Long = {
- val joinMemoryFraction =
KylinConfig.getInstanceFromEnv.getJoinMemoryFraction
- (Runtime.getRuntime.maxMemory() * joinMemoryFraction).toLong
- }
-
- def releaseAllMemory(): Unit = synchronized {
- memoryUsed = 0
- }
-
-}
\ No newline at end of file
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinJoinSelection.scala
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinJoinSelection.scala
deleted file mode 100644
index 243ffd6..0000000
---
a/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinJoinSelection.scala
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.spark.sql.execution
-
-import org.apache.kylin.common.KylinConfig
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.expressions.{PredicateHelper, RowOrdering}
-import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight,
JoinSelectionHelper}
-import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys,
ExtractSingleColumnNullAwareAntiJoin}
-import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.{SparkSession, Strategy}
-
-import javax.annotation.concurrent.GuardedBy
-
-/**
- * .
- */
-case class KylinJoinSelection(session: SparkSession) extends Strategy
- with JoinSelectionHelper
- with PredicateHelper
- with Logging {
-
- val conf: SQLConf = session.sessionState.conf
-
- def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-
- // If it is an equi-join, we first look at the join hints w.r.t. the
following order:
- // 1. broadcast hint: pick broadcast hash join if the join type is
supported. If both sides
- // have the broadcast hints, choose the smaller side (based on stats)
to broadcast.
- // 2. sort merge hint: pick sort merge join if join keys are sortable.
- // 3. shuffle hash hint: We pick shuffle hash join if the join type is
supported. If both
- // sides have the shuffle hash hints, choose the smaller side (based
on stats) as the
- // build side.
- // 4. shuffle replicate NL hint: pick cartesian product if join type is
inner like.
- //
- // If there is no hint or the hints are not applicable, we follow these
rules one by one:
- // 1. Pick broadcast hash join if one side is small enough to broadcast,
and the join type
- // is supported. If both sides are small, choose the smaller side
(based on stats)
- // to broadcast.
- // 2. Pick shuffle hash join if one side is small enough to build local
hash map, and is
- // much smaller than the other side, and
`spark.sql.join.preferSortMergeJoin` is false.
- // 3. Pick sort merge join if the join keys are sortable.
- // 4. Pick cartesian product if join type is inner like.
- // 5. Pick broadcast nested loop join as the final solution. It may OOM
but we don't have
- // other choice.
- case j@ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond,
left, right, hint) =>
- def createBroadcastHashJoin(onlyLookingAtHint: Boolean) = {
- getBroadcastBuildSide(left, right, joinType, hint, onlyLookingAtHint,
conf).map {
- buildSide =>
- Seq(joins.BroadcastHashJoinExec(
- leftKeys,
- rightKeys,
- joinType,
- buildSide,
- nonEquiCond,
- planLater(left),
- planLater(right)))
- }
- }
-
- def createShuffleHashJoin(onlyLookingAtHint: Boolean) = {
- getShuffleHashJoinBuildSide(left, right, joinType, hint,
onlyLookingAtHint, conf).map {
- buildSide =>
- Seq(joins.ShuffledHashJoinExec(
- leftKeys,
- rightKeys,
- joinType,
- buildSide,
- nonEquiCond,
- planLater(left),
- planLater(right)))
- }
- }
-
- def createSortMergeJoin() = {
- if (RowOrdering.isOrderable(leftKeys)) {
- Some(Seq(joins.SortMergeJoinExec(
- leftKeys, rightKeys, joinType, nonEquiCond, planLater(left),
planLater(right))))
- } else {
- None
- }
- }
-
- def createCartesianProduct() = {
- if (joinType.isInstanceOf[InnerLike]) {
- // `CartesianProductExec` can't implicitly evaluate equal join
condition, here we should
- // pass the original condition which includes both equal and
non-equal conditions.
- Some(Seq(joins.CartesianProductExec(planLater(left),
planLater(right), j.condition)))
- } else {
- None
- }
- }
-
- def createJoinWithoutHint() = {
- createBroadcastHashJoin(false)
- .orElse {
- if (!conf.preferSortMergeJoin) {
- createShuffleHashJoin(false)
- } else {
- None
- }
- }
- .orElse(createSortMergeJoin())
- .orElse(createCartesianProduct())
- .getOrElse {
- // This join could be very slow or OOM
- val buildSide = getSmallerSide(left, right)
- Seq(joins.BroadcastNestedLoopJoinExec(
- planLater(left), planLater(right), buildSide, joinType,
nonEquiCond))
- }
- }
-
- createBroadcastHashJoin(true)
- .orElse {
- if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None
- }
- .orElse(createShuffleHashJoin(true))
- .orElse {
- if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else
None
- }
- .getOrElse(createJoinWithoutHint())
-
- case j@ExtractSingleColumnNullAwareAntiJoin(leftKeys, rightKeys) =>
- Seq(joins.BroadcastHashJoinExec(leftKeys, rightKeys, LeftAnti,
BuildRight,
- None, planLater(j.left), planLater(j.right), isNullAwareAntiJoin =
true))
-
- // If it is not an equi-join, we first look at the join hints w.r.t. the
following order:
- // 1. broadcast hint: pick broadcast nested loop join. If both sides
have the broadcast
- // hints, choose the smaller side (based on stats) to broadcast for
inner and full joins,
- // choose the left side for right join, and choose right side for
left join.
- // 2. shuffle replicate NL hint: pick cartesian product if join type is
inner like.
- //
- // If there is no hint or the hints are not applicable, we follow these
rules one by one:
- // 1. Pick broadcast nested loop join if one side is small enough to
broadcast. If only left
- // side is broadcast-able and it's left join, or only right side is
broadcast-able and
- // it's right join, we skip this rule. If both sides are small,
broadcasts the smaller
- // side for inner and full joins, broadcasts the left side for right
join, and broadcasts
- // right side for left join.
- // 2. Pick cartesian product if join type is inner like.
- // 3. Pick broadcast nested loop join as the final solution. It may OOM
but we don't have
- // other choice. It broadcasts the smaller side for inner and full
joins, broadcasts the
- // left side for right join, and broadcasts right side for left join.
- case logical.Join(left, right, joinType, condition, hint) =>
- val desiredBuildSide = if (joinType.isInstanceOf[InnerLike] || joinType
== FullOuter) {
- getSmallerSide(left, right)
- } else {
- // For perf reasons, `BroadcastNestedLoopJoinExec` prefers to
broadcast left side if
- // it's a right join, and broadcast right side if it's a left join.
- // TODO: revisit it. If left side is much smaller than the right side,
it may be better
- // to broadcast the left side even if it's a left join.
- if (canBuildBroadcastLeft(joinType)) BuildLeft else BuildRight
- }
-
- def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = {
- val maybeBuildSide = if (buildLeft && buildRight) {
- Some(desiredBuildSide)
- } else if (buildLeft) {
- Some(BuildLeft)
- } else if (buildRight) {
- Some(BuildRight)
- } else {
- None
- }
-
- maybeBuildSide.map { buildSide =>
- Seq(joins.BroadcastNestedLoopJoinExec(
- planLater(left), planLater(right), buildSide, joinType, condition))
- }
- }
-
- def createCartesianProduct() = {
- if (joinType.isInstanceOf[InnerLike]) {
- Some(Seq(joins.CartesianProductExec(planLater(left),
planLater(right), condition)))
- } else {
- None
- }
- }
-
- def createJoinWithoutHint() = {
- createBroadcastNLJoin(canBroadcastBySize(left, conf),
canBroadcastBySize(right, conf))
- .orElse(createCartesianProduct())
- .getOrElse {
- // This join could be very slow or OOM
- Seq(joins.BroadcastNestedLoopJoinExec(
- planLater(left), planLater(right), desiredBuildSide, joinType,
condition))
- }
- }
-
- createBroadcastNLJoin(hintToBroadcastLeft(hint),
hintToBroadcastRight(hint))
- .orElse {
- if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else
None
- }
- .getOrElse(createJoinWithoutHint())
-
- // --- Cases where this strategy does not apply
---------------------------------------------
- case _ => Nil
- }
-
- override def canBroadcastBySize(plan: LogicalPlan, conf: SQLConf): Boolean =
{
- val size = plan.stats.sizeInBytes
- size >= 0 && size <= conf.autoBroadcastJoinThreshold &&
JoinMemoryManager.acquireMemory(size.toLong)
- }
-}
-
-object JoinMemoryManager extends Logging {
-
- @GuardedBy("this")
- private[this] var memoryUsed: Long = 0
-
- def acquireMemory(numBytesToAcquire: Long): Boolean = synchronized {
- assert(numBytesToAcquire >= 0)
- val enoughMemory = numBytesToAcquire <= (maxMemoryJoinCanUse - memoryUsed)
- if (enoughMemory) {
- memoryUsed += numBytesToAcquire
- logInfo(s"Acquire $numBytesToAcquire bytes for BHJ, memory used
$memoryUsed, max memory BHJ can use $maxMemoryJoinCanUse.")
- } else {
- logInfo("Driver memory is not enough for BHJ.")
- }
- enoughMemory
- }
-
- private def maxMemoryJoinCanUse: Long = {
- val joinMemoryFraction =
KylinConfig.getInstanceFromEnv.getJoinMemoryFraction
- (Runtime.getRuntime.maxMemory() * joinMemoryFraction).toLong
- }
-
- def releaseAllMemory(): Unit = synchronized {
- memoryUsed = 0
- }
-
-}
\ No newline at end of file
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 32f6219..0d4352b 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -55,10 +55,7 @@ import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.spark.SparkConf;
-import org.apache.spark.sql.execution.KylinJoinSelection;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.SparkSessionExtensions;
-import org.apache.spark.sql.execution.SparkStrategy;
import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
import org.apache.spark.util.Utils;
import org.apache.spark.utils.ResourceUtils;
@@ -67,8 +64,6 @@ import org.apache.spark.utils.YarnInfoFetcherUtils;
import org.apache.kylin.engine.spark.common.util.TimeZoneUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
public abstract class SparkApplication {
private static final Logger logger =
LoggerFactory.getLogger(SparkApplication.class);
@@ -287,18 +282,10 @@ public abstract class SparkApplication {
}
}
- ss = SparkSession.builder().withExtensions(new
AbstractFunction1<SparkSessionExtensions, BoxedUnit>() {
- @Override
- public BoxedUnit apply(SparkSessionExtensions v1) {
- v1.injectPlannerStrategy(new
AbstractFunction1<SparkSession, SparkStrategy>() {
- @Override
- public SparkStrategy apply(SparkSession session) {
- return new KylinJoinSelection(session);
- }
- });
- return BoxedUnit.UNIT;
- }
-
}).enableHiveSupport().config(sparkConf).config("mapreduce.fileoutputcommitter.marksuccessfuljobs",
"false")
+ ss = SparkSession.builder()
+ .enableHiveSupport()
+ .config(sparkConf)
+
.config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
.getOrCreate();
if (isJobOnCluster(sparkConf)) {