On Thu, Dec 5, 2024 at 8:21 AM Matteo Moci <mox...@gmail.com> wrote:
> Thanks a lot Mich, > 1) you mean coalescing partitions that are about to write? I don't think > this will take less time, because all partitions have data. It seems the > problem is that it asks HMS all partitions, even if it's only writing 650. > Is it an improvement something that would benefit Spark? > I mean: I find it strange that Spark gets metadata of all partitions even if it is only writing some. But I don't know the underlying logic, so I will have to look at the code first. > > 2) so you are saying that the fallback method is more expensive than the > first one, right? > I will try to use the old HMS passing the configuration at runtime like > they say it's possible in this link > https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html > > > On Thu, Dec 5, 2024 at 8:17 AM Matteo Moci <mox...@gmail.com> wrote: > >> Thanks Mich, >> 1) >> >> On Thu, Dec 5, 2024 at 1:16 AM Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >>> Hi Matteo, >>> >>> 1) You have an incompatible Metastore: The Hive Metastore version used >>> by the EMR cluster (2.3.9) doesn't support the get_partition_locations >>> method directly. Spark 3.5 tries to use this method, leading to fallback >>> and increased (Hive Metastore Service) HMS calls. >>> >>> 2) Large Number of Partitions: Your table has a very high number of >>> partitions (250,000). Listing all partitions with >>> listPartitionsWithAuthInfo can be slow, especially with potential network >>> issues causing retries problem: Your Spark 3.5 application writing to a >>> Hive table with 250000 partitions experiences slowness compared to Spark >>> 2.4. The driver gets stuck waiting for HMS (Hive Metastore Service) calls. >>> >>> Suggestions >>> >>> 1) You can optimize Partition Handling by coalescing Partitions, before >>> writing. Consider using coalesce to reduce the number of partitions written >>> to. This can significantly reduce HMS calls and improve performance. >>> >>> 2) check this link >>> <https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html> >>> for more info . >>> >>> HTH >>> >>> Mich Talebzadeh, >>> >>> Architect | Data Science | Financial Crime | GDPR & Compliance Specialist >>> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial >>> College London <https://en.wikipedia.org/wiki/Imperial_College_London> >>> London, United Kingdom >>> >>> >>> view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* The information provided is correct to the best of my >>> knowledge but of course cannot be guaranteed . It is essential to note >>> that, as with any advice, quote "one test result is worth one-thousand >>> expert opinions (Werner >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>> >>> >>> On Wed, 4 Dec 2024 at 21:47, Matteo Moci <mox...@gmail.com> wrote: >>> >>>> Hello Community, >>>> The Spark 3.5 application I am working on shows slowness, right at the >>>> time of writing to a Hive table. >>>> >>>> I'd like to ask you some hints on how to mitigate this behaviour, if >>>> possible. >>>> >>>> The same application using Spark 2.4 ran "fine" within reasonable >>>> times, with minimal cluster idle cpu times, while on Spark 3, the driver is >>>> stuck for a long time waiting for HMS. >>>> Some more context: >>>> >>>> - the app runs on an Amazon EMR cluster 7.2.0 [1], that uses its >>>> own "distribution" of Spark 3.5.1, Hive 3.1.3, and with some >>>> optimizations >>>> I don't know the details of. From what I understood, this is why line >>>> numbers of stack traces in cluster logs don't always match with the open >>>> source code of Spark. >>>> - Hive MetaStore is using Hive 2 - not sure exactly which version >>>> - the app reads from a hive table and writes the output data to >>>> other 3 hive tables. Let's focus on 1 of them, because that's where the >>>> app >>>> is stuck >>>> - I think we can leave the table it reads from outside of the >>>> equation, because the driver thread dumps show it is stuck at this >>>> point, >>>> when inserting in the output table: >>>> >>>> java.base@17.0.13/sun.nio.ch.Net.poll(Native Method) >>>> java.base@17.0.13/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:186) >>>> java.base@17.0.13 >>>> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:290) >>>> java.base@17.0.13 >>>> /sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:314) >>>> java.base@17.0.13/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:355) >>>> java.base@17.0.13 >>>> /sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:808) >>>> java.base@17.0.13 >>>> /java.net.Socket$SocketInputStream.read(Socket.java:966) >>>> java.base@17.0.13 >>>> /java.io.BufferedInputStream.fill(BufferedInputStream.java:244) >>>> java.base@17.0.13 >>>> /java.io.BufferedInputStream.read1(BufferedInputStream.java:284) >>>> java.base@17.0.13/java.io.BufferedInputStream.read(BufferedInputStream.java:343) >>>> => holding Monitor(java.io.BufferedInputStream@1632069162) >>>> >>>> app//org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127) >>>> app//org.apache.thrift.transport.TTransport.readAll(TTransport.java:86) >>>> >>>> app//org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:425) >>>> >>>> app//org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:321) >>>> >>>> app//org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:225) >>>> >>>> app//org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77) >>>> >>>> app//org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partitions_ps_with_auth(ThriftHiveMetastore.java:2601) >>>> >>>> app//org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partitions_ps_with_auth(ThriftHiveMetastore.java:2583) >>>> >>>> app//org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionsWithAuthInfo(HiveMetaStoreClient.java:1212) >>>> java.base@17.0.13/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>>> Method) >>>> java.base@17.0.13 >>>> /jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) >>>> java.base@17.0.13 >>>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> java.base@17.0.13/java.lang.reflect.Method.invoke(Method.java:569) >>>> >>>> app//org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:177) >>>> app/jdk.proxy2/jdk.proxy2.$Proxy106.listPartitionsWithAuthInfo(Unknown >>>> Source) >>>> java.base@17.0.13/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>>> Method) >>>> java.base@17.0.13 >>>> /jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) >>>> java.base@17.0.13 >>>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> java.base@17.0.13/java.lang.reflect.Method.invoke(Method.java:569) >>>> app//org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2350) >>>> => holding >>>> Monitor(org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler@244249800 >>>> ) >>>> app/jdk.proxy2/jdk.proxy2.$Proxy106.listPartitionsWithAuthInfo(Unknown >>>> Source) >>>> >>>> app//org.apache.hadoop.hive.ql.metadata.Hive.listPartitionLocationsWithoutExtension(Hive.java:2797) >>>> >>>> app//org.apache.hadoop.hive.ql.metadata.Hive.doGetPartitionLocations(Hive.java:2775) >>>> >>>> app//org.apache.hadoop.hive.ql.metadata.Hive.getPartitionLocations(Hive.java:2754) >>>> >>>> app//org.apache.hadoop.hive.ql.metadata.Hive.getPartitionLocations(Hive.java:2727) >>>> >>>> app//org.apache.spark.sql.hive.client.Shim_v0_12.getPartitionLocations(HiveShim.scala:673) >>>> >>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitionLocations$3(HiveClientImpl.scala:798) >>>> app//org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$3952/0x00007f8c193a3068.apply(Unknown >>>> Source) >>>> >>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$doGetPartitions$1(HiveClientImpl.scala:845) >>>> app//org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$3953/0x00007f8c193a3628.apply(Unknown >>>> Source) >>>> >>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:303) >>>> app//org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$3366/0x00007f8c19188000.apply(Unknown >>>> Source) >>>> >>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:234) >>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:233) >>>> => holding >>>> Monitor(org.apache.spark.sql.hive.client.IsolatedClientLoader@1088141517 >>>> ) >>>> >>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:283) >>>> >>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.doGetPartitions(HiveClientImpl.scala:838) >>>> >>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionLocations(HiveClientImpl.scala:796) >>>> >>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionLocations(HiveClientImpl.scala:790) >>>> >>>> app//org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitionLocations$1(HiveExternalCatalog.scala:1307) >>>> app//org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$3941/0x00007f8c1939eb90.apply(Unknown >>>> Source) >>>> app//org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102) >>>> => holding Monitor(org.apache.spark.sql.hive.HiveExternalCatalog@845450683 >>>> ) >>>> >>>> app//org.apache.spark.sql.hive.HiveExternalCatalog.listPartitionLocations(HiveExternalCatalog.scala:1303) >>>> >>>> app//org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitionLocations(ExternalCatalogWithListener.scala:254) >>>> >>>> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionLocationsOptimized(SessionCatalog.scala:1360) >>>> >>>> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.$anonfun$listPartitionLocations$1(SessionCatalog.scala:1341) >>>> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog$$Lambda$3935/0x00007f8c193932c0.apply(Unknown >>>> Source) >>>> app//org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:554) >>>> >>>> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.withCommonListMetrics(SessionCatalog.scala:1375) >>>> >>>> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionLocations(SessionCatalog.scala:1340) >>>> >>>> app//org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:107) >>>> app//org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113) >>>> => holding >>>> Monitor(org.apache.spark.sql.execution.command.DataWritingCommandExec@1800917126 >>>> ) >>>> >>>> app//org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111) >>>> >>>> app//org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125) >>>> >>>> app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:535) >>>> app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$5823/0x00007f8c19973f98.apply(Unknown >>>> Source) >>>> >>>> app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:574) >>>> >>>> app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:535) >>>> >>>> app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:126) >>>> app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1$$Lambda$3552/0x00007f8c19243128.apply(Unknown >>>> Source) >>>> >>>> app//org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108) >>>> >>>> app//org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:264) >>>> >>>> app//org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:138) >>>> >>>> app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:174) >>>> app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$3562/0x00007f8c19245730.apply(Unknown >>>> Source) >>>> >>>> app//org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108) >>>> >>>> app//org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:264) >>>> >>>> app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:174) >>>> app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$3561/0x00007f8c19245000.apply(Unknown >>>> Source) >>>> >>>> app//org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:285) >>>> >>>> app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:173) >>>> app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$3553/0x00007f8c192433f8.apply(Unknown >>>> Source) >>>> >>>> app//org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901) >>>> >>>> app//org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70) >>>> >>>> app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:123) >>>> >>>> app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:114) >>>> >>>> app//org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519) >>>> app//org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$3197/0x00007f8c190ddbb0.apply(Unknown >>>> Source) >>>> >>>> app//org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:77) >>>> >>>> app//org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519) >>>> app//org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org >>>> $apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:34) >>>> >>>> app//org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:297) >>>> >>>> app//org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:293) >>>> >>>> app//org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34) >>>> >>>> app//org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34) >>>> >>>> app//org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495) >>>> >>>> app//org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:114) >>>> app//org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:101) >>>> => holding Monitor(org.apache.spark.sql.execution.QueryExecution@310792592 >>>> ) >>>> >>>> app//org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:99) >>>> >>>> app//org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:164) >>>> >>>> app//org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:874) >>>> >>>> app//org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:495) >>>> >>>> app//org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:456) >>>> x.Application.run(Application.java:N) <---------------------- this is >>>> the application code >>>> java.base@17.0.13/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>>> Method) >>>> java.base@17.0.13 >>>> /jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) >>>> java.base@17.0.13 >>>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> java.base@17.0.13/java.lang.reflect.Method.invoke(Method.java:569) >>>> >>>> app//org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:741) >>>> >>>> - at line x.Application.main(Application.java:N) there is: >>>> >>>> dataset.write().mode(SaveMode.Overwrite).insertInto("table_name"); // >>>> dataset is a Dataset<Row> >>>> >>>> - on cluster logs we see: >>>> >>>> 24/12/04 10:16:22 INFO Hive: Metastore does not support >>>> listPartitionLocations operation; falling back to using >>>> listPartitionsWithAuthInfo operation from now on >>>> org.apache.hadoop.hive.metastore.IMetaStoreClient$IncompatibleMetastoreException: >>>> Metastore doesn't support listPartitionLocation: Invalid method name: >>>> 'get_partition_locations' >>>> at >>>> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.newIncompatibleMetastoreException(HiveMetaStoreClient.java:2579) >>>> ~[hive-metastore-2.3.9-amzn-3.jar:2.3.9-amzn-3] >>>> >>>> - on cluster logs we see around 5 times this log: >>>> >>>> 24/12/04 11:33:42 WARN RetryingMetaStoreClient: MetaStoreClient lost >>>> connection. Attempting to reconnect (3 of 5) after 15s. >>>> listPartitionsWithAuthInfo >>>> org.apache.thrift.transport.TTransportException: >>>> java.net.SocketTimeoutException: Read timed out >>>> >>>> - and on hivemetastore server logs we see (I am told that the -1 >>>> there means "get all the partitions"): >>>> >>>> 2024-12-04T10:33:11,357 [pool-4-thread-173] DEBUG >>>> x.server.invocation-log:168 - #get_partitions_ps_with_auth('x', >>>> 'table_name', '[, , , ]', -1, 'client-x', []): entered >>>> >>>> - the table the app tries to write to has 1 year worth of data >>>> - Spark 2 is capable to write around 650 partitions daily, around >>>> 1Tb worth of data, so there's a total of around 250000 partitions >>>> - the application uses spark.sql.sources.partitionOverwriteMode: >>>> dynamic so it's expected the 650 partitions are completely >>>> overwritten, leaving the others untouched >>>> >>>> I have some options but wanted to check if you have any pointers >>>> forward, specifically how to speed this up :D >>>> >>>> Thanks a lot and sorry for the wall of text! >>>> >>>> Matteo >>>> >>>> [1] >>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-app-versions-7.x.html >>>> >>>> >> >> -- >> Matteo Moci >> https://twitter.com/matteomoci <http://mox.fm> >> >> > > -- > Matteo Moci > https://twitter.com/matteomoci <http://mox.fm> > > -- Matteo Moci https://twitter.com/matteomoci <http://mox.fm>