Liu Zhao created KYLIN-5271:
-------------------------------

             Summary: Query memory leaks
                 Key: KYLIN-5271
                 URL: https://issues.apache.org/jira/browse/KYLIN-5271
             Project: Kylin
          Issue Type: Bug
          Components: Query Engine
    Affects Versions: v4.0.1
            Reporter: Liu Zhao


The query thread will clone a SparkSession and put it into ThreadLocal. 
However, if an exception occurs in the Calcite To SparkPlan, the SparkSession 
in ThreadLocal will not be removed. More importantly, if the Spark restarts 
later, the SparkSession left in ThreadLocal will be unavailable, and the query 
on this thread will fail, throwing an exception: Caused by: 
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext

This stopped SparkContext was created at:



org.apache.spark.sql.SparderContext$$anon$4.run(SparderContext.scala:150)

java.lang.Thread.run(Thread.java:748)


// put SparkSession toThreadLocal
{code:java}
object SparderContextFacade extends Logging {

  final val CURRENT_SPARKSESSION: InternalThreadLocal[Pair[SparkSession, 
UdfManager]] =
    new InternalThreadLocal[Pair[SparkSession, UdfManager]]()

  def current(): Pair[SparkSession, UdfManager] = {
    if (CURRENT_SPARKSESSION.get() == null) {
      val spark = SparderContext.getOriginalSparkSession.cloneSession()
      CURRENT_SPARKSESSION.set(new Pair[SparkSession, UdfManager](spark,
        UdfManager.createWithoutBuildInFunc(spark)))
    }
    CURRENT_SPARKSESSION.get()
  }

  def remove(): Unit = {
    CURRENT_SPARKSESSION.remove()
  }
}
{code}

// remove SparkSession from ThreadLocal
// org.apache.kylin.query.runtime.plans.ResultPlan
{code:java}
    def getResult(df: DataFrame, rowType: RelDataType, resultType: ResultType)
  : Either[Enumerable[Array[Any]], Enumerable[Any]] = withScope(df) {
    val result: Either[Enumerable[Array[Any]], Enumerable[Any]] =
      resultType match {
        case ResultType.NORMAL =>
          if (SparderContext.needCompute()) {
            Left(ResultPlan.collectEnumerable(df, rowType))
          } else {
            Left(Linq4j.asEnumerable(Array.empty[Array[Any]]))
          }
        case ResultType.SCALA =>
          if (SparderContext.needCompute()) {
            Right(ResultPlan.collectScalarEnumerable(df, rowType))
          } else {
            Right(Linq4j.asEnumerable(Lists.newArrayList[Any]()))
          }
      }
    SparderContext.cleanQueryInfo()
    SparderContext.closeThreadSparkSession()
    result
  }
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to