Hello Community,
The Spark 3.5 application I am working on shows slowness, right at the time
of writing to a Hive table.

I'd like to ask you some hints on how to mitigate this behaviour, if
possible.

The same application using Spark 2.4 ran "fine" within reasonable times,
with minimal cluster idle cpu times, while on Spark 3, the driver is stuck
for a long time waiting for HMS.
Some more context:

   - the app runs on an Amazon EMR cluster 7.2.0 [1], that uses its own
   "distribution" of Spark 3.5.1, Hive 3.1.3, and with some optimizations I
   don't know the details of. From what I understood, this is why line numbers
   of stack traces in cluster logs don't always match with the open source
   code of Spark.
   - Hive MetaStore is using Hive 2 - not sure exactly which version
   - the app reads from a hive table and writes the output data to other 3
   hive tables. Let's focus on 1 of them, because that's where the app is stuck
   - I think we can leave the table it reads from outside of the equation,
   because the driver thread dumps show it is stuck at this point, when
   inserting in the output table:

java.base@17.0.13/sun.nio.ch.Net.poll(Native Method)
java.base@17.0.13/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:186)
java.base@17.0.13/sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:290)
java.base@17.0.13/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:314)
java.base@17.0.13/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:355)
java.base@17.0.13/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:808)
java.base@17.0.13/java.net.Socket$SocketInputStream.read(Socket.java:966)
java.base@17.0.13
/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
java.base@17.0.13
/java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
java.base@17.0.13/java.io.BufferedInputStream.read(BufferedInputStream.java:343)
=> holding Monitor(java.io.BufferedInputStream@1632069162)
app//org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)
app//org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
app//org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:425)
app//org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:321)
app//org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:225)
app//org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
app//org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partitions_ps_with_auth(ThriftHiveMetastore.java:2601)
app//org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partitions_ps_with_auth(ThriftHiveMetastore.java:2583)
app//org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionsWithAuthInfo(HiveMetaStoreClient.java:1212)
java.base@17.0.13/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
java.base@17.0.13
/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
java.base@17.0.13
/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base@17.0.13/java.lang.reflect.Method.invoke(Method.java:569)
app//org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:177)
app/jdk.proxy2/jdk.proxy2.$Proxy106.listPartitionsWithAuthInfo(Unknown
Source)
java.base@17.0.13/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
java.base@17.0.13
/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
java.base@17.0.13
/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base@17.0.13/java.lang.reflect.Method.invoke(Method.java:569)
app//org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2350)
=> holding
Monitor(org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler@244249800
)
app/jdk.proxy2/jdk.proxy2.$Proxy106.listPartitionsWithAuthInfo(Unknown
Source)
app//org.apache.hadoop.hive.ql.metadata.Hive.listPartitionLocationsWithoutExtension(Hive.java:2797)
app//org.apache.hadoop.hive.ql.metadata.Hive.doGetPartitionLocations(Hive.java:2775)
app//org.apache.hadoop.hive.ql.metadata.Hive.getPartitionLocations(Hive.java:2754)
app//org.apache.hadoop.hive.ql.metadata.Hive.getPartitionLocations(Hive.java:2727)
app//org.apache.spark.sql.hive.client.Shim_v0_12.getPartitionLocations(HiveShim.scala:673)
app//org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitionLocations$3(HiveClientImpl.scala:798)
app//org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$3952/0x00007f8c193a3068.apply(Unknown
Source)
app//org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$doGetPartitions$1(HiveClientImpl.scala:845)
app//org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$3953/0x00007f8c193a3628.apply(Unknown
Source)
app//org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:303)
app//org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$3366/0x00007f8c19188000.apply(Unknown
Source)
app//org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:234)
app//org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:233)
=> holding
Monitor(org.apache.spark.sql.hive.client.IsolatedClientLoader@1088141517)
app//org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:283)
app//org.apache.spark.sql.hive.client.HiveClientImpl.doGetPartitions(HiveClientImpl.scala:838)
app//org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionLocations(HiveClientImpl.scala:796)
app//org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionLocations(HiveClientImpl.scala:790)
app//org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitionLocations$1(HiveExternalCatalog.scala:1307)
app//org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$3941/0x00007f8c1939eb90.apply(Unknown
Source)
app//org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102)
=> holding Monitor(org.apache.spark.sql.hive.HiveExternalCatalog@845450683)
app//org.apache.spark.sql.hive.HiveExternalCatalog.listPartitionLocations(HiveExternalCatalog.scala:1303)
app//org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitionLocations(ExternalCatalogWithListener.scala:254)
app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionLocationsOptimized(SessionCatalog.scala:1360)
app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.$anonfun$listPartitionLocations$1(SessionCatalog.scala:1341)
app//org.apache.spark.sql.catalyst.catalog.SessionCatalog$$Lambda$3935/0x00007f8c193932c0.apply(Unknown
Source)
app//org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:554)
app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.withCommonListMetrics(SessionCatalog.scala:1375)
app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionLocations(SessionCatalog.scala:1340)
app//org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:107)
app//org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
=> holding
Monitor(org.apache.spark.sql.execution.command.DataWritingCommandExec@1800917126
)
app//org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
app//org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:535)
app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$5823/0x00007f8c19973f98.apply(Unknown
Source)
app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:574)
app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:535)
app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:126)
app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1$$Lambda$3552/0x00007f8c19243128.apply(Unknown
Source)
app//org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
app//org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:264)
app//org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:138)
app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:174)
app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$3562/0x00007f8c19245730.apply(Unknown
Source)
app//org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
app//org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:264)
app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:174)
app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$3561/0x00007f8c19245000.apply(Unknown
Source)
app//org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:285)
app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:173)
app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$3553/0x00007f8c192433f8.apply(Unknown
Source)
app//org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
app//org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70)
app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:123)
app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:114)
app//org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
app//org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$3197/0x00007f8c190ddbb0.apply(Unknown
Source)
app//org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:77)
app//org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
app//org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org
$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:34)
app//org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:297)
app//org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:293)
app//org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
app//org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
app//org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
app//org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:114)
app//org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:101)
=> holding Monitor(org.apache.spark.sql.execution.QueryExecution@310792592)
app//org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:99)
app//org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:164)
app//org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:874)
app//org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:495)
app//org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:456)
x.Application.run(Application.java:N) <---------------------- this is the
application code
java.base@17.0.13/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
java.base@17.0.13
/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
java.base@17.0.13
/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base@17.0.13/java.lang.reflect.Method.invoke(Method.java:569)
app//org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:741)

   - at line x.Application.main(Application.java:N) there is:

dataset.write().mode(SaveMode.Overwrite).insertInto("table_name"); //
dataset is a Dataset<Row>

   - on cluster logs we see:

24/12/04 10:16:22 INFO Hive: Metastore does not support
listPartitionLocations operation; falling back to using
listPartitionsWithAuthInfo operation from now on
org.apache.hadoop.hive.metastore.IMetaStoreClient$IncompatibleMetastoreException:
Metastore doesn't support listPartitionLocation: Invalid method name:
'get_partition_locations'
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.newIncompatibleMetastoreException(HiveMetaStoreClient.java:2579)
~[hive-metastore-2.3.9-amzn-3.jar:2.3.9-amzn-3]

   - on cluster logs we see around 5 times this log:

24/12/04 11:33:42 WARN RetryingMetaStoreClient: MetaStoreClient lost
connection. Attempting to reconnect (3 of 5) after 15s.
listPartitionsWithAuthInfo
org.apache.thrift.transport.TTransportException:
java.net.SocketTimeoutException: Read timed out

   - and on hivemetastore server logs we see (I am told that the -1 there
   means "get all the partitions"):

2024-12-04T10:33:11,357 [pool-4-thread-173] DEBUG
x.server.invocation-log:168 - #get_partitions_ps_with_auth('x',
'table_name', '[, , , ]', -1, 'client-x', []): entered

   - the table the app tries to write to has 1 year worth of data
   - Spark 2 is capable to write around 650 partitions daily, around 1Tb
   worth of data, so there's a total of around 250000 partitions
   - the application uses spark.sql.sources.partitionOverwriteMode: dynamic
   so it's expected the 650 partitions are completely overwritten, leaving the
   others untouched

I have some options but wanted to check if you have any pointers forward,
specifically how to speed this up :D

Thanks a lot and sorry for the wall of text!

Matteo

[1]
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-app-versions-7.x.html

Reply via email to