Liu Zhao created KYLIN-5271: ------------------------------- Summary: Query memory leaks Key: KYLIN-5271 URL: https://issues.apache.org/jira/browse/KYLIN-5271 Project: Kylin Issue Type: Bug Components: Query Engine Affects Versions: v4.0.1 Reporter: Liu Zhao
The query thread will clone a SparkSession and put it into ThreadLocal. However, if an exception occurs in the Calcite To SparkPlan, the SparkSession in ThreadLocal will not be removed. More importantly, if the Spark restarts later, the SparkSession left in ThreadLocal will be unavailable, and the query on this thread will fail, throwing an exception: Caused by: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext This stopped SparkContext was created at: org.apache.spark.sql.SparderContext$$anon$4.run(SparderContext.scala:150) java.lang.Thread.run(Thread.java:748) // put SparkSession toThreadLocal {code:java} object SparderContextFacade extends Logging { final val CURRENT_SPARKSESSION: InternalThreadLocal[Pair[SparkSession, UdfManager]] = new InternalThreadLocal[Pair[SparkSession, UdfManager]]() def current(): Pair[SparkSession, UdfManager] = { if (CURRENT_SPARKSESSION.get() == null) { val spark = SparderContext.getOriginalSparkSession.cloneSession() CURRENT_SPARKSESSION.set(new Pair[SparkSession, UdfManager](spark, UdfManager.createWithoutBuildInFunc(spark))) } CURRENT_SPARKSESSION.get() } def remove(): Unit = { CURRENT_SPARKSESSION.remove() } } {code} // remove SparkSession from ThreadLocal // org.apache.kylin.query.runtime.plans.ResultPlan {code:java} def getResult(df: DataFrame, rowType: RelDataType, resultType: ResultType) : Either[Enumerable[Array[Any]], Enumerable[Any]] = withScope(df) { val result: Either[Enumerable[Array[Any]], Enumerable[Any]] = resultType match { case ResultType.NORMAL => if (SparderContext.needCompute()) { Left(ResultPlan.collectEnumerable(df, rowType)) } else { Left(Linq4j.asEnumerable(Array.empty[Array[Any]])) } case ResultType.SCALA => if (SparderContext.needCompute()) { Right(ResultPlan.collectScalarEnumerable(df, rowType)) } else { Right(Linq4j.asEnumerable(Lists.newArrayList[Any]())) } } SparderContext.cleanQueryInfo() SparderContext.closeThreadSparkSession() result } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)