Thanks Mich, 1) On Thu, Dec 5, 2024 at 1:16 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote:
> Hi Matteo, > > 1) You have an incompatible Metastore: The Hive Metastore version used by > the EMR cluster (2.3.9) doesn't support the get_partition_locations method > directly. Spark 3.5 tries to use this method, leading to fallback and > increased (Hive Metastore Service) HMS calls. > > 2) Large Number of Partitions: Your table has a very high number of > partitions (250,000). Listing all partitions with > listPartitionsWithAuthInfo can be slow, especially with potential network > issues causing retries problem: Your Spark 3.5 application writing to a > Hive table with 250000 partitions experiences slowness compared to Spark > 2.4. The driver gets stuck waiting for HMS (Hive Metastore Service) calls. > > Suggestions > > 1) You can optimize Partition Handling by coalescing Partitions, before > writing. Consider using coalesce to reduce the number of partitions written > to. This can significantly reduce HMS calls and improve performance. > > 2) check this link > <https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html> > for more info . > > HTH > > Mich Talebzadeh, > > Architect | Data Science | Financial Crime | GDPR & Compliance Specialist > PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College > London <https://en.wikipedia.org/wiki/Imperial_College_London> > London, United Kingdom > > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* The information provided is correct to the best of my > knowledge but of course cannot be guaranteed . It is essential to note > that, as with any advice, quote "one test result is worth one-thousand > expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von > Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". > > > On Wed, 4 Dec 2024 at 21:47, Matteo Moci <mox...@gmail.com> wrote: > >> Hello Community, >> The Spark 3.5 application I am working on shows slowness, right at the >> time of writing to a Hive table. >> >> I'd like to ask you some hints on how to mitigate this behaviour, if >> possible. >> >> The same application using Spark 2.4 ran "fine" within reasonable times, >> with minimal cluster idle cpu times, while on Spark 3, the driver is stuck >> for a long time waiting for HMS. >> Some more context: >> >> - the app runs on an Amazon EMR cluster 7.2.0 [1], that uses its own >> "distribution" of Spark 3.5.1, Hive 3.1.3, and with some optimizations I >> don't know the details of. From what I understood, this is why line >> numbers >> of stack traces in cluster logs don't always match with the open source >> code of Spark. >> - Hive MetaStore is using Hive 2 - not sure exactly which version >> - the app reads from a hive table and writes the output data to other >> 3 hive tables. Let's focus on 1 of them, because that's where the app is >> stuck >> - I think we can leave the table it reads from outside of the >> equation, because the driver thread dumps show it is stuck at this point, >> when inserting in the output table: >> >> java.base@17.0.13/sun.nio.ch.Net.poll(Native Method) >> java.base@17.0.13/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:186) >> java.base@17.0.13 >> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:290) >> java.base@17.0.13 >> /sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:314) >> java.base@17.0.13/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:355) >> java.base@17.0.13/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:808) >> java.base@17.0.13/java.net.Socket$SocketInputStream.read(Socket.java:966) >> java.base@17.0.13 >> /java.io.BufferedInputStream.fill(BufferedInputStream.java:244) >> java.base@17.0.13 >> /java.io.BufferedInputStream.read1(BufferedInputStream.java:284) >> java.base@17.0.13/java.io.BufferedInputStream.read(BufferedInputStream.java:343) >> => holding Monitor(java.io.BufferedInputStream@1632069162) >> >> app//org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127) >> app//org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) >> >> app//org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:425) >> >> app//org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:321) >> >> app//org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:225) >> app//org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77) >> >> app//org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partitions_ps_with_auth(ThriftHiveMetastore.java:2601) >> >> app//org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partitions_ps_with_auth(ThriftHiveMetastore.java:2583) >> >> app//org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionsWithAuthInfo(HiveMetaStoreClient.java:1212) >> java.base@17.0.13/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >> Method) >> java.base@17.0.13 >> /jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) >> java.base@17.0.13 >> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> java.base@17.0.13/java.lang.reflect.Method.invoke(Method.java:569) >> >> app//org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:177) >> app/jdk.proxy2/jdk.proxy2.$Proxy106.listPartitionsWithAuthInfo(Unknown >> Source) >> java.base@17.0.13/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >> Method) >> java.base@17.0.13 >> /jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) >> java.base@17.0.13 >> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> java.base@17.0.13/java.lang.reflect.Method.invoke(Method.java:569) >> app//org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2350) >> => holding >> Monitor(org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler@244249800 >> ) >> app/jdk.proxy2/jdk.proxy2.$Proxy106.listPartitionsWithAuthInfo(Unknown >> Source) >> >> app//org.apache.hadoop.hive.ql.metadata.Hive.listPartitionLocationsWithoutExtension(Hive.java:2797) >> >> app//org.apache.hadoop.hive.ql.metadata.Hive.doGetPartitionLocations(Hive.java:2775) >> >> app//org.apache.hadoop.hive.ql.metadata.Hive.getPartitionLocations(Hive.java:2754) >> >> app//org.apache.hadoop.hive.ql.metadata.Hive.getPartitionLocations(Hive.java:2727) >> >> app//org.apache.spark.sql.hive.client.Shim_v0_12.getPartitionLocations(HiveShim.scala:673) >> >> app//org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitionLocations$3(HiveClientImpl.scala:798) >> app//org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$3952/0x00007f8c193a3068.apply(Unknown >> Source) >> >> app//org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$doGetPartitions$1(HiveClientImpl.scala:845) >> app//org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$3953/0x00007f8c193a3628.apply(Unknown >> Source) >> >> app//org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:303) >> app//org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$3366/0x00007f8c19188000.apply(Unknown >> Source) >> >> app//org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:234) >> app//org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:233) >> => holding >> Monitor(org.apache.spark.sql.hive.client.IsolatedClientLoader@1088141517) >> >> app//org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:283) >> >> app//org.apache.spark.sql.hive.client.HiveClientImpl.doGetPartitions(HiveClientImpl.scala:838) >> >> app//org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionLocations(HiveClientImpl.scala:796) >> >> app//org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionLocations(HiveClientImpl.scala:790) >> >> app//org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitionLocations$1(HiveExternalCatalog.scala:1307) >> app//org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$3941/0x00007f8c1939eb90.apply(Unknown >> Source) >> app//org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102) >> => holding Monitor(org.apache.spark.sql.hive.HiveExternalCatalog@845450683 >> ) >> >> app//org.apache.spark.sql.hive.HiveExternalCatalog.listPartitionLocations(HiveExternalCatalog.scala:1303) >> >> app//org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitionLocations(ExternalCatalogWithListener.scala:254) >> >> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionLocationsOptimized(SessionCatalog.scala:1360) >> >> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.$anonfun$listPartitionLocations$1(SessionCatalog.scala:1341) >> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog$$Lambda$3935/0x00007f8c193932c0.apply(Unknown >> Source) >> app//org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:554) >> >> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.withCommonListMetrics(SessionCatalog.scala:1375) >> >> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionLocations(SessionCatalog.scala:1340) >> >> app//org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:107) >> app//org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113) >> => holding >> Monitor(org.apache.spark.sql.execution.command.DataWritingCommandExec@1800917126 >> ) >> >> app//org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111) >> >> app//org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125) >> >> app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:535) >> app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$5823/0x00007f8c19973f98.apply(Unknown >> Source) >> >> app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:574) >> >> app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:535) >> >> app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:126) >> app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1$$Lambda$3552/0x00007f8c19243128.apply(Unknown >> Source) >> >> app//org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108) >> >> app//org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:264) >> >> app//org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:138) >> >> app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:174) >> app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$3562/0x00007f8c19245730.apply(Unknown >> Source) >> >> app//org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108) >> >> app//org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:264) >> >> app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:174) >> app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$3561/0x00007f8c19245000.apply(Unknown >> Source) >> >> app//org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:285) >> >> app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:173) >> app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$3553/0x00007f8c192433f8.apply(Unknown >> Source) >> app//org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901) >> >> app//org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70) >> >> app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:123) >> >> app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:114) >> >> app//org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519) >> app//org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$3197/0x00007f8c190ddbb0.apply(Unknown >> Source) >> >> app//org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:77) >> >> app//org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519) >> app//org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org >> $apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:34) >> >> app//org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:297) >> >> app//org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:293) >> >> app//org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34) >> >> app//org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34) >> >> app//org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495) >> >> app//org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:114) >> app//org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:101) >> => holding Monitor(org.apache.spark.sql.execution.QueryExecution@310792592 >> ) >> >> app//org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:99) >> >> app//org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:164) >> >> app//org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:874) >> >> app//org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:495) >> >> app//org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:456) >> x.Application.run(Application.java:N) <---------------------- this is the >> application code >> java.base@17.0.13/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >> Method) >> java.base@17.0.13 >> /jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) >> java.base@17.0.13 >> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> java.base@17.0.13/java.lang.reflect.Method.invoke(Method.java:569) >> >> app//org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:741) >> >> - at line x.Application.main(Application.java:N) there is: >> >> dataset.write().mode(SaveMode.Overwrite).insertInto("table_name"); // >> dataset is a Dataset<Row> >> >> - on cluster logs we see: >> >> 24/12/04 10:16:22 INFO Hive: Metastore does not support >> listPartitionLocations operation; falling back to using >> listPartitionsWithAuthInfo operation from now on >> org.apache.hadoop.hive.metastore.IMetaStoreClient$IncompatibleMetastoreException: >> Metastore doesn't support listPartitionLocation: Invalid method name: >> 'get_partition_locations' >> at >> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.newIncompatibleMetastoreException(HiveMetaStoreClient.java:2579) >> ~[hive-metastore-2.3.9-amzn-3.jar:2.3.9-amzn-3] >> >> - on cluster logs we see around 5 times this log: >> >> 24/12/04 11:33:42 WARN RetryingMetaStoreClient: MetaStoreClient lost >> connection. Attempting to reconnect (3 of 5) after 15s. >> listPartitionsWithAuthInfo >> org.apache.thrift.transport.TTransportException: >> java.net.SocketTimeoutException: Read timed out >> >> - and on hivemetastore server logs we see (I am told that the -1 >> there means "get all the partitions"): >> >> 2024-12-04T10:33:11,357 [pool-4-thread-173] DEBUG >> x.server.invocation-log:168 - #get_partitions_ps_with_auth('x', >> 'table_name', '[, , , ]', -1, 'client-x', []): entered >> >> - the table the app tries to write to has 1 year worth of data >> - Spark 2 is capable to write around 650 partitions daily, around 1Tb >> worth of data, so there's a total of around 250000 partitions >> - the application uses spark.sql.sources.partitionOverwriteMode: >> dynamic so it's expected the 650 partitions are completely >> overwritten, leaving the others untouched >> >> I have some options but wanted to check if you have any pointers forward, >> specifically how to speed this up :D >> >> Thanks a lot and sorry for the wall of text! >> >> Matteo >> >> [1] >> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-app-versions-7.x.html >> >> -- Matteo Moci https://twitter.com/matteomoci <http://mox.fm>