sririshindra commented on code in PR #49814:
URL: https://github.com/apache/spark/pull/49814#discussion_r1946734535


##########
core/src/main/scala/org/apache/spark/SparkContext.scala:
##########
@@ -722,6 +722,9 @@ class SparkContext(config: SparkConf) extends Logging {
     }
     appStatusSource.foreach(_env.metricsSystem.registerSource(_))
     _plugins.foreach(_.registerMetrics(applicationId))
+
+    new CallerContext("DRIVER", config.get(APP_CALLER_CONTEXT),
+      Option(applicationId), applicationAttemptId).setCurrentContext()

Review Comment:
   Done



##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala:
##########
@@ -171,6 +172,11 @@ private[hive] class HiveClientImpl(
   private def newState(): SessionState = {
     val hiveConf = newHiveConf(sparkConf, hadoopConf, extraConfig, 
Some(initClassLoader))
     val state = new SessionState(hiveConf)
+    // When SessionState is initialized, the caller context is overridden by 
hive
+    // so we need to reset it back to the DRIVER
+    new CallerContext("DRIVER",
+      sparkConf.get(APP_CALLER_CONTEXT),
+      
Option(sparkConf.getOption("spark.app.id").getOrElse(""))).setCurrentContext()

Review Comment:
   Done.



##########
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala:
##########
@@ -171,6 +172,11 @@ private[hive] class HiveClientImpl(
   private def newState(): SessionState = {
     val hiveConf = newHiveConf(sparkConf, hadoopConf, extraConfig, 
Some(initClassLoader))
     val state = new SessionState(hiveConf)
+    // When SessionState is initialized, the caller context is overridden by 
hive
+    // so we need to reset it back to the DRIVER

Review Comment:
   I see, I didn't know that iceberg constructs its own hive client instance. I 
am not quite sure how it is being propagated. But I know it is being propagated 
properly based on testing. I tested this by running both batch and streaming 
jobs that do writes and reads from an Iceberg table.  
   
   The way I identified the best location to set the caller context is by first 
identifying all the places it is being set elsewhere by setting a breakpoint on 
the setCurrent method in the CallerContext.java file in hadoop library. I then 
let a spark streaming (and also a batch) job on a cluster write/read from an 
iceberg table and I recorded the stack traces where setCurrent method is being 
called by attaching a remote debugger. I then identified the shared execution 
paths in all these stacktraces and I decided that the the best place to reset 
the caller context in spark is here in the HiveClientImpl file after the 
SessionState is initialized.
   
   Like you said may be iceberg constructs its own hive instance but maybe that 
codepath doesn't set the caller context? I have checked the Iceberg codebase 
and I have found the location where a hive client is being created, but I am 
not sure if that in turn calls 
[newClinet](https://github.com/apache/iceberg/blob/afda8be25652d44d9339a79c6797b6bf20c55bd6/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveClientPool.java#L60)
 setCurrent method. 
   
   Here are the stacktraces where the method setCurrent is being set in my 
testing.
   
   ```
   Breakpoint reached
        at 
org.apache.hadoop.ipc.CallerContext.setCurrent(CallerContext.java:148)
        at 
org.apache.hadoop.hive.shims.Hadoop23Shims.setHadoopCallerContext(Hadoop23Shims.java:546)
        at 
org.apache.hadoop.hive.shims.Hadoop23Shims.setHadoopSessionContext(Hadoop23Shims.java:556)
        at 
org.apache.hadoop.hive.ql.session.SessionState.<init>(SessionState.java:508)
        at 
org.apache.hadoop.hive.ql.session.SessionState.<init>(SessionState.java:475)
        at 
org.apache.hadoop.hive.ql.session.SessionState.<init>(SessionState.java:471)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:195)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:143)
        at 
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(NativeConstructorAccessorImpl.java:-1)
        at 
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at 
jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:490)
        at 
org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:316)
        at 
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:539)
        at 
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:399)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:70)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:69)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:223)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$2058.2002899646.apply$mcZ$sp(Unknown
 Source:-1)
        at 
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:223)
        at 
org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:150)
        at 
org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:144)
        at 
org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:180)
        at 
org.apache.spark.sql.internal.SharedState.globalTempViewManager(SharedState.scala:178)
        at 
org.apache.spark.sql.hive.HiveSessionStateBuilder.$anonfun$catalog$2(HiveSessionStateBuilder.scala:70)
        at 
org.apache.spark.sql.hive.HiveSessionStateBuilder$$Lambda$1492.867369720.apply(Unknown
 Source:-1)
        at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager$lzycompute(SessionCatalog.scala:123)
        at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager(SessionCatalog.scala:123)
        at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.isGlobalTempViewDB(SessionCatalog.scala:1001)
        at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.getRawLocalOrGlobalTempView(SessionCatalog.scala:716)
        at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.isTempView(SessionCatalog.scala:1008)
        at 
org.apache.spark.sql.streaming.DataStreamWriter.toTable(DataStreamWriter.scala:282)
        at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
        at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:834)
   Breakpoint reached at 
org.apache.hadoop.ipc.CallerContext.setCurrent(CallerContext.java:148)
   Breakpoint reached
        at 
org.apache.hadoop.ipc.CallerContext.setCurrent(CallerContext.java:148)
        at 
org.apache.hadoop.hive.shims.Hadoop23Shims.setHadoopCallerContext(Hadoop23Shims.java:546)
        at 
org.apache.hadoop.hive.shims.Hadoop23Shims.setHadoopSessionContext(Hadoop23Shims.java:556)
        at 
org.apache.hadoop.hive.ql.session.SessionState.<init>(SessionState.java:510)
        at 
org.apache.hadoop.hive.ql.session.SessionState.<init>(SessionState.java:475)
        at 
org.apache.hadoop.hive.ql.session.SessionState.<init>(SessionState.java:471)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:195)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:143)
        at 
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(NativeConstructorAccessorImpl.java:-1)
        at 
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at 
jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:490)
        at 
org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:316)
        at 
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:539)
        at 
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:399)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:70)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:69)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:223)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$2058.2002899646.apply$mcZ$sp(Unknown
 Source:-1)
        at 
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:223)
        at 
org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:150)
        at 
org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:144)
        at 
org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:180)
        at 
org.apache.spark.sql.internal.SharedState.globalTempViewManager(SharedState.scala:178)
        at 
org.apache.spark.sql.hive.HiveSessionStateBuilder.$anonfun$catalog$2(HiveSessionStateBuilder.scala:70)
        at 
org.apache.spark.sql.hive.HiveSessionStateBuilder$$Lambda$1492.867369720.apply(Unknown
 Source:-1)
        at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager$lzycompute(SessionCatalog.scala:123)
        at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager(SessionCatalog.scala:123)
        at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.isGlobalTempViewDB(SessionCatalog.scala:1001)
        at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.getRawLocalOrGlobalTempView(SessionCatalog.scala:716)
        at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.isTempView(SessionCatalog.scala:1008)
        at 
org.apache.spark.sql.streaming.DataStreamWriter.toTable(DataStreamWriter.scala:282)
        at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
        at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:834)
   Breakpoint reached at 
org.apache.hadoop.ipc.CallerContext.setCurrent(CallerContext.java:148)
   Breakpoint reached
        at 
org.apache.hadoop.ipc.CallerContext.setCurrent(CallerContext.java:148)
        at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
        at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.apache.spark.util.CallerContext.setCurrentContext(Utils.scala:3414)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:202)
        at 
org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:143)
        at 
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(NativeConstructorAccessorImpl.java:-1)
        at 
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at 
jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:490)
        at 
org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:316)
        at 
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:539)
        at 
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:399)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:70)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:69)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:223)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$2058.2002899646.apply$mcZ$sp(Unknown
 Source:-1)
        at 
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:101)
        at 
org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:223)
        at 
org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:150)
        at 
org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:144)
        at 
org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:180)
        at 
org.apache.spark.sql.internal.SharedState.globalTempViewManager(SharedState.scala:178)
        at 
org.apache.spark.sql.hive.HiveSessionStateBuilder.$anonfun$catalog$2(HiveSessionStateBuilder.scala:70)
        at 
org.apache.spark.sql.hive.HiveSessionStateBuilder$$Lambda$1492.867369720.apply(Unknown
 Source:-1)
        at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager$lzycompute(SessionCatalog.scala:123)
        at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager(SessionCatalog.scala:123)
        at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.isGlobalTempViewDB(SessionCatalog.scala:1001)
        at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.getRawLocalOrGlobalTempView(SessionCatalog.scala:716)
        at 
org.apache.spark.sql.catalyst.catalog.SessionCatalog.isTempView(SessionCatalog.scala:1008)
        at 
org.apache.spark.sql.streaming.DataStreamWriter.toTable(DataStreamWriter.scala:282)
        at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
        at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:834)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to