Hi Matteo, 1) You have an incompatible Metastore: The Hive Metastore version used by the EMR cluster (2.3.9) doesn't support the get_partition_locations method directly. Spark 3.5 tries to use this method, leading to fallback and increased (Hive Metastore Service) HMS calls.
2) Large Number of Partitions: Your table has a very high number of partitions (250,000). Listing all partitions with listPartitionsWithAuthInfo can be slow, especially with potential network issues causing retries problem: Your Spark 3.5 application writing to a Hive table with 250000 partitions experiences slowness compared to Spark 2.4. The driver gets stuck waiting for HMS (Hive Metastore Service) calls. Suggestions 1) You can optimize Partition Handling by coalescing Partitions, before writing. Consider using coalesce to reduce the number of partitions written to. This can significantly reduce HMS calls and improve performance. 2) check this link <https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html> for more info . HTH Mich Talebzadeh, Architect | Data Science | Financial Crime | GDPR & Compliance Specialist PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College London <https://en.wikipedia.org/wiki/Imperial_College_London> London, United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Wed, 4 Dec 2024 at 21:47, Matteo Moci <mox...@gmail.com> wrote: > Hello Community, > The Spark 3.5 application I am working on shows slowness, right at the > time of writing to a Hive table. > > I'd like to ask you some hints on how to mitigate this behaviour, if > possible. > > The same application using Spark 2.4 ran "fine" within reasonable times, > with minimal cluster idle cpu times, while on Spark 3, the driver is stuck > for a long time waiting for HMS. > Some more context: > > - the app runs on an Amazon EMR cluster 7.2.0 [1], that uses its own > "distribution" of Spark 3.5.1, Hive 3.1.3, and with some optimizations I > don't know the details of. From what I understood, this is why line numbers > of stack traces in cluster logs don't always match with the open source > code of Spark. > - Hive MetaStore is using Hive 2 - not sure exactly which version > - the app reads from a hive table and writes the output data to other > 3 hive tables. Let's focus on 1 of them, because that's where the app is > stuck > - I think we can leave the table it reads from outside of the > equation, because the driver thread dumps show it is stuck at this point, > when inserting in the output table: > > java.base@17.0.13/sun.nio.ch.Net.poll(Native Method) > java.base@17.0.13/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:186) > java.base@17.0.13 > /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:290) > java.base@17.0.13 > /sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:314) > java.base@17.0.13/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:355) > java.base@17.0.13/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:808) > java.base@17.0.13/java.net.Socket$SocketInputStream.read(Socket.java:966) > java.base@17.0.13 > /java.io.BufferedInputStream.fill(BufferedInputStream.java:244) > java.base@17.0.13 > /java.io.BufferedInputStream.read1(BufferedInputStream.java:284) > java.base@17.0.13/java.io.BufferedInputStream.read(BufferedInputStream.java:343) > => holding Monitor(java.io.BufferedInputStream@1632069162) > > app//org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127) > app//org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) > > app//org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:425) > > app//org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:321) > > app//org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:225) > app//org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77) > > app//org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partitions_ps_with_auth(ThriftHiveMetastore.java:2601) > > app//org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partitions_ps_with_auth(ThriftHiveMetastore.java:2583) > > app//org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionsWithAuthInfo(HiveMetaStoreClient.java:1212) > java.base@17.0.13/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > java.base@17.0.13 > /jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > java.base@17.0.13 > /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > java.base@17.0.13/java.lang.reflect.Method.invoke(Method.java:569) > > app//org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:177) > app/jdk.proxy2/jdk.proxy2.$Proxy106.listPartitionsWithAuthInfo(Unknown > Source) > java.base@17.0.13/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > java.base@17.0.13 > /jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > java.base@17.0.13 > /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > java.base@17.0.13/java.lang.reflect.Method.invoke(Method.java:569) > app//org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2350) > => holding > Monitor(org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler@244249800 > ) > app/jdk.proxy2/jdk.proxy2.$Proxy106.listPartitionsWithAuthInfo(Unknown > Source) > > app//org.apache.hadoop.hive.ql.metadata.Hive.listPartitionLocationsWithoutExtension(Hive.java:2797) > > app//org.apache.hadoop.hive.ql.metadata.Hive.doGetPartitionLocations(Hive.java:2775) > > app//org.apache.hadoop.hive.ql.metadata.Hive.getPartitionLocations(Hive.java:2754) > > app//org.apache.hadoop.hive.ql.metadata.Hive.getPartitionLocations(Hive.java:2727) > > app//org.apache.spark.sql.hive.client.Shim_v0_12.getPartitionLocations(HiveShim.scala:673) > > app//org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitionLocations$3(HiveClientImpl.scala:798) > app//org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$3952/0x00007f8c193a3068.apply(Unknown > Source) > > app//org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$doGetPartitions$1(HiveClientImpl.scala:845) > app//org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$3953/0x00007f8c193a3628.apply(Unknown > Source) > > app//org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:303) > app//org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$3366/0x00007f8c19188000.apply(Unknown > Source) > > app//org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:234) > app//org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:233) > => holding > Monitor(org.apache.spark.sql.hive.client.IsolatedClientLoader@1088141517) > > app//org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:283) > > app//org.apache.spark.sql.hive.client.HiveClientImpl.doGetPartitions(HiveClientImpl.scala:838) > > app//org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionLocations(HiveClientImpl.scala:796) > > app//org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionLocations(HiveClientImpl.scala:790) > > app//org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitionLocations$1(HiveExternalCatalog.scala:1307) > app//org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$3941/0x00007f8c1939eb90.apply(Unknown > Source) > app//org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102) > => holding Monitor(org.apache.spark.sql.hive.HiveExternalCatalog@845450683 > ) > > app//org.apache.spark.sql.hive.HiveExternalCatalog.listPartitionLocations(HiveExternalCatalog.scala:1303) > > app//org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitionLocations(ExternalCatalogWithListener.scala:254) > > app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionLocationsOptimized(SessionCatalog.scala:1360) > > app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.$anonfun$listPartitionLocations$1(SessionCatalog.scala:1341) > app//org.apache.spark.sql.catalyst.catalog.SessionCatalog$$Lambda$3935/0x00007f8c193932c0.apply(Unknown > Source) > app//org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:554) > > app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.withCommonListMetrics(SessionCatalog.scala:1375) > > app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionLocations(SessionCatalog.scala:1340) > > app//org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:107) > app//org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113) > => holding > Monitor(org.apache.spark.sql.execution.command.DataWritingCommandExec@1800917126 > ) > > app//org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111) > > app//org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125) > > app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:535) > app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$5823/0x00007f8c19973f98.apply(Unknown > Source) > > app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:574) > > app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:535) > > app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:126) > app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1$$Lambda$3552/0x00007f8c19243128.apply(Unknown > Source) > > app//org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108) > > app//org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:264) > > app//org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:138) > > app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:174) > app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$3562/0x00007f8c19245730.apply(Unknown > Source) > > app//org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108) > > app//org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:264) > > app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:174) > app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$3561/0x00007f8c19245000.apply(Unknown > Source) > > app//org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:285) > > app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:173) > app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$3553/0x00007f8c192433f8.apply(Unknown > Source) > app//org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901) > > app//org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70) > > app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:123) > > app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:114) > > app//org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519) > app//org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$3197/0x00007f8c190ddbb0.apply(Unknown > Source) > > app//org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:77) > > app//org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519) > app//org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org > $apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:34) > > app//org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:297) > > app//org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:293) > > app//org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34) > > app//org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34) > > app//org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495) > > app//org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:114) > app//org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:101) > => holding Monitor(org.apache.spark.sql.execution.QueryExecution@310792592 > ) > > app//org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:99) > > app//org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:164) > > app//org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:874) > > app//org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:495) > > app//org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:456) > x.Application.run(Application.java:N) <---------------------- this is the > application code > java.base@17.0.13/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > java.base@17.0.13 > /jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > java.base@17.0.13 > /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > java.base@17.0.13/java.lang.reflect.Method.invoke(Method.java:569) > > app//org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:741) > > - at line x.Application.main(Application.java:N) there is: > > dataset.write().mode(SaveMode.Overwrite).insertInto("table_name"); // > dataset is a Dataset<Row> > > - on cluster logs we see: > > 24/12/04 10:16:22 INFO Hive: Metastore does not support > listPartitionLocations operation; falling back to using > listPartitionsWithAuthInfo operation from now on > org.apache.hadoop.hive.metastore.IMetaStoreClient$IncompatibleMetastoreException: > Metastore doesn't support listPartitionLocation: Invalid method name: > 'get_partition_locations' > at > org.apache.hadoop.hive.metastore.HiveMetaStoreClient.newIncompatibleMetastoreException(HiveMetaStoreClient.java:2579) > ~[hive-metastore-2.3.9-amzn-3.jar:2.3.9-amzn-3] > > - on cluster logs we see around 5 times this log: > > 24/12/04 11:33:42 WARN RetryingMetaStoreClient: MetaStoreClient lost > connection. Attempting to reconnect (3 of 5) after 15s. > listPartitionsWithAuthInfo > org.apache.thrift.transport.TTransportException: > java.net.SocketTimeoutException: Read timed out > > - and on hivemetastore server logs we see (I am told that the -1 there > means "get all the partitions"): > > 2024-12-04T10:33:11,357 [pool-4-thread-173] DEBUG > x.server.invocation-log:168 - #get_partitions_ps_with_auth('x', > 'table_name', '[, , , ]', -1, 'client-x', []): entered > > - the table the app tries to write to has 1 year worth of data > - Spark 2 is capable to write around 650 partitions daily, around 1Tb > worth of data, so there's a total of around 250000 partitions > - the application uses spark.sql.sources.partitionOverwriteMode: > dynamic so it's expected the 650 partitions are completely > overwritten, leaving the others untouched > > I have some options but wanted to check if you have any pointers forward, > specifically how to speed this up :D > > Thanks a lot and sorry for the wall of text! > > Matteo > > [1] > https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-app-versions-7.x.html > >