sririshindra commented on code in PR #49814: URL: https://github.com/apache/spark/pull/49814#discussion_r1946734535
########## core/src/main/scala/org/apache/spark/SparkContext.scala: ########## @@ -722,6 +722,9 @@ class SparkContext(config: SparkConf) extends Logging { } appStatusSource.foreach(_env.metricsSystem.registerSource(_)) _plugins.foreach(_.registerMetrics(applicationId)) + + new CallerContext("DRIVER", config.get(APP_CALLER_CONTEXT), + Option(applicationId), applicationAttemptId).setCurrentContext() Review Comment: Done ########## sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala: ########## @@ -171,6 +172,11 @@ private[hive] class HiveClientImpl( private def newState(): SessionState = { val hiveConf = newHiveConf(sparkConf, hadoopConf, extraConfig, Some(initClassLoader)) val state = new SessionState(hiveConf) + // When SessionState is initialized, the caller context is overridden by hive + // so we need to reset it back to the DRIVER + new CallerContext("DRIVER", + sparkConf.get(APP_CALLER_CONTEXT), + Option(sparkConf.getOption("spark.app.id").getOrElse(""))).setCurrentContext() Review Comment: Done. ########## sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala: ########## @@ -171,6 +172,11 @@ private[hive] class HiveClientImpl( private def newState(): SessionState = { val hiveConf = newHiveConf(sparkConf, hadoopConf, extraConfig, Some(initClassLoader)) val state = new SessionState(hiveConf) + // When SessionState is initialized, the caller context is overridden by hive + // so we need to reset it back to the DRIVER Review Comment: I see, I didn't know that iceberg constructs its own hive client instance. I am not quite sure how it is being propagated. But I know it is being propagated properly based on testing. I tested this by running both batch and streaming jobs that do writes and reads from an Iceberg table. The way I identified the best location to set the caller context is by first identifying all the places it is being set elsewhere by setting a breakpoint on the setCurrent method in the CallerContext.java file in hadoop library. I then let a spark streaming (and also a batch) job on a cluster write/read from an iceberg table and I recorded the stack traces where setCurrent method is being called by attaching a remote debugger. I then identified the shared execution paths in all these stacktraces and I decided that the the best place to reset the caller context in spark is here in the HiveClientImpl file after the SessionState is initialized. Like you said may be iceberg constructs its own hive instance but maybe that codepath doesn't set the caller context? I have checked the Iceberg codebase and I have found the location where a hive client is being created, but I am not sure if that in turn calls [newClinet](https://github.com/apache/iceberg/blob/afda8be25652d44d9339a79c6797b6bf20c55bd6/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java#L60) setCurrent method. Here are the stacktraces where the method setCurrent is being set in my testing. ``` Breakpoint reached at org.apache.hadoop.ipc.CallerContext.setCurrent(CallerContext.java:148) at org.apache.hadoop.hive.shims.Hadoop23Shims.setHadoopCallerContext(Hadoop23Shims.java:546) at org.apache.hadoop.hive.shims.Hadoop23Shims.setHadoopSessionContext(Hadoop23Shims.java:556) at org.apache.hadoop.hive.ql.session.SessionState.<init>(SessionState.java:508) at org.apache.hadoop.hive.ql.session.SessionState.<init>(SessionState.java:475) at org.apache.hadoop.hive.ql.session.SessionState.<init>(SessionState.java:471) at org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:195) at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:143) at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(NativeConstructorAccessorImpl.java:-1) at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:490) at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:316) at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:539) at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:399) at org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:70) at org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:69) at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:223) at org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$2058.2002899646.apply$mcZ$sp(Unknown Source:-1) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101) at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:223) at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:150) at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:144) at org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:180) at org.apache.spark.sql.internal.SharedState.globalTempViewManager(SharedState.scala:178) at org.apache.spark.sql.hive.HiveSessionStateBuilder.$anonfun$catalog$2(HiveSessionStateBuilder.scala:70) at org.apache.spark.sql.hive.HiveSessionStateBuilder$$Lambda$1492.867369720.apply(Unknown Source:-1) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager$lzycompute(SessionCatalog.scala:123) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager(SessionCatalog.scala:123) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.isGlobalTempViewDB(SessionCatalog.scala:1001) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getRawLocalOrGlobalTempView(SessionCatalog.scala:716) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.isTempView(SessionCatalog.scala:1008) at org.apache.spark.sql.streaming.DataStreamWriter.toTable(DataStreamWriter.scala:282) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:834) Breakpoint reached at org.apache.hadoop.ipc.CallerContext.setCurrent(CallerContext.java:148) Breakpoint reached at org.apache.hadoop.ipc.CallerContext.setCurrent(CallerContext.java:148) at org.apache.hadoop.hive.shims.Hadoop23Shims.setHadoopCallerContext(Hadoop23Shims.java:546) at org.apache.hadoop.hive.shims.Hadoop23Shims.setHadoopSessionContext(Hadoop23Shims.java:556) at org.apache.hadoop.hive.ql.session.SessionState.<init>(SessionState.java:510) at org.apache.hadoop.hive.ql.session.SessionState.<init>(SessionState.java:475) at org.apache.hadoop.hive.ql.session.SessionState.<init>(SessionState.java:471) at org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:195) at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:143) at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(NativeConstructorAccessorImpl.java:-1) at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:490) at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:316) at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:539) at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:399) at org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:70) at org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:69) at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:223) at org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$2058.2002899646.apply$mcZ$sp(Unknown Source:-1) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101) at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:223) at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:150) at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:144) at org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:180) at org.apache.spark.sql.internal.SharedState.globalTempViewManager(SharedState.scala:178) at org.apache.spark.sql.hive.HiveSessionStateBuilder.$anonfun$catalog$2(HiveSessionStateBuilder.scala:70) at org.apache.spark.sql.hive.HiveSessionStateBuilder$$Lambda$1492.867369720.apply(Unknown Source:-1) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager$lzycompute(SessionCatalog.scala:123) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager(SessionCatalog.scala:123) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.isGlobalTempViewDB(SessionCatalog.scala:1001) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getRawLocalOrGlobalTempView(SessionCatalog.scala:716) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.isTempView(SessionCatalog.scala:1008) at org.apache.spark.sql.streaming.DataStreamWriter.toTable(DataStreamWriter.scala:282) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:834) Breakpoint reached at org.apache.hadoop.ipc.CallerContext.setCurrent(CallerContext.java:148) Breakpoint reached at org.apache.hadoop.ipc.CallerContext.setCurrent(CallerContext.java:148) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:566) at org.apache.spark.util.CallerContext.setCurrentContext(Utils.scala:3414) at org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:202) at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:143) at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(NativeConstructorAccessorImpl.java:-1) at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:490) at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:316) at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:539) at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:399) at org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:70) at org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:69) at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:223) at org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$2058.2002899646.apply$mcZ$sp(Unknown Source:-1) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101) at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:223) at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:150) at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:144) at org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:180) at org.apache.spark.sql.internal.SharedState.globalTempViewManager(SharedState.scala:178) at org.apache.spark.sql.hive.HiveSessionStateBuilder.$anonfun$catalog$2(HiveSessionStateBuilder.scala:70) at org.apache.spark.sql.hive.HiveSessionStateBuilder$$Lambda$1492.867369720.apply(Unknown Source:-1) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager$lzycompute(SessionCatalog.scala:123) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager(SessionCatalog.scala:123) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.isGlobalTempViewDB(SessionCatalog.scala:1001) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getRawLocalOrGlobalTempView(SessionCatalog.scala:716) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.isTempView(SessionCatalog.scala:1008) at org.apache.spark.sql.streaming.DataStreamWriter.toTable(DataStreamWriter.scala:282) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:834) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org