Thanks Mich,
1)

On Thu, Dec 5, 2024 at 1:16 AM Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi Matteo,
>
> 1) You have an incompatible Metastore: The Hive Metastore version used by
> the EMR cluster (2.3.9) doesn't support the get_partition_locations method
> directly. Spark 3.5 tries to use this method, leading to fallback and
> increased (Hive Metastore Service) HMS calls.
>
> 2) Large Number of Partitions: Your table has a very high number of
> partitions (250,000). Listing all partitions with
> listPartitionsWithAuthInfo can be slow, especially with potential network
> issues causing retries problem: Your  Spark 3.5 application writing to a
> Hive table with 250000 partitions experiences slowness compared to Spark
> 2.4. The driver gets stuck waiting for HMS (Hive Metastore Service) calls.
>
> Suggestions
>
> 1) You can optimize Partition Handling by coalescing Partitions, before
> writing. Consider using coalesce to reduce the number of partitions written
> to. This can significantly reduce HMS calls and improve performance.
>
> 2) check this link
> <https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html>
> for more info .
>
> HTH
>
> Mich Talebzadeh,
>
> Architect | Data Science | Financial Crime | GDPR & Compliance Specialist
> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
> London <https://en.wikipedia.org/wiki/Imperial_College_London>
> London, United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* The information provided is correct to the best of my
> knowledge but of course cannot be guaranteed . It is essential to note
> that, as with any advice, quote "one test result is worth one-thousand
> expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
> Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>
>
> On Wed, 4 Dec 2024 at 21:47, Matteo Moci <mox...@gmail.com> wrote:
>
>> Hello Community,
>> The Spark 3.5 application I am working on shows slowness, right at the
>> time of writing to a Hive table.
>>
>> I'd like to ask you some hints on how to mitigate this behaviour, if
>> possible.
>>
>> The same application using Spark 2.4 ran "fine" within reasonable times,
>> with minimal cluster idle cpu times, while on Spark 3, the driver is stuck
>> for a long time waiting for HMS.
>> Some more context:
>>
>>    - the app runs on an Amazon EMR cluster 7.2.0 [1], that uses its own
>>    "distribution" of Spark 3.5.1, Hive 3.1.3, and with some optimizations I
>>    don't know the details of. From what I understood, this is why line 
>> numbers
>>    of stack traces in cluster logs don't always match with the open source
>>    code of Spark.
>>    - Hive MetaStore is using Hive 2 - not sure exactly which version
>>    - the app reads from a hive table and writes the output data to other
>>    3 hive tables. Let's focus on 1 of them, because that's where the app is
>>    stuck
>>    - I think we can leave the table it reads from outside of the
>>    equation, because the driver thread dumps show it is stuck at this point,
>>    when inserting in the output table:
>>
>> java.base@17.0.13/sun.nio.ch.Net.poll(Native Method)
>> java.base@17.0.13/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:186)
>> java.base@17.0.13
>> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:290)
>> java.base@17.0.13
>> /sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:314)
>> java.base@17.0.13/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:355)
>> java.base@17.0.13/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:808)
>> java.base@17.0.13/java.net.Socket$SocketInputStream.read(Socket.java:966)
>> java.base@17.0.13
>> /java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
>> java.base@17.0.13
>> /java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
>> java.base@17.0.13/java.io.BufferedInputStream.read(BufferedInputStream.java:343)
>> => holding Monitor(java.io.BufferedInputStream@1632069162)
>>
>> app//org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)
>> app//org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
>>
>> app//org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:425)
>>
>> app//org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:321)
>>
>> app//org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:225)
>> app//org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
>>
>> app//org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partitions_ps_with_auth(ThriftHiveMetastore.java:2601)
>>
>> app//org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partitions_ps_with_auth(ThriftHiveMetastore.java:2583)
>>
>> app//org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionsWithAuthInfo(HiveMetaStoreClient.java:1212)
>> java.base@17.0.13/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> java.base@17.0.13
>> /jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>> java.base@17.0.13
>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> java.base@17.0.13/java.lang.reflect.Method.invoke(Method.java:569)
>>
>> app//org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:177)
>> app/jdk.proxy2/jdk.proxy2.$Proxy106.listPartitionsWithAuthInfo(Unknown
>> Source)
>> java.base@17.0.13/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> java.base@17.0.13
>> /jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>> java.base@17.0.13
>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> java.base@17.0.13/java.lang.reflect.Method.invoke(Method.java:569)
>> app//org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2350)
>> => holding
>> Monitor(org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler@244249800
>> )
>> app/jdk.proxy2/jdk.proxy2.$Proxy106.listPartitionsWithAuthInfo(Unknown
>> Source)
>>
>> app//org.apache.hadoop.hive.ql.metadata.Hive.listPartitionLocationsWithoutExtension(Hive.java:2797)
>>
>> app//org.apache.hadoop.hive.ql.metadata.Hive.doGetPartitionLocations(Hive.java:2775)
>>
>> app//org.apache.hadoop.hive.ql.metadata.Hive.getPartitionLocations(Hive.java:2754)
>>
>> app//org.apache.hadoop.hive.ql.metadata.Hive.getPartitionLocations(Hive.java:2727)
>>
>> app//org.apache.spark.sql.hive.client.Shim_v0_12.getPartitionLocations(HiveShim.scala:673)
>>
>> app//org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitionLocations$3(HiveClientImpl.scala:798)
>> app//org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$3952/0x00007f8c193a3068.apply(Unknown
>> Source)
>>
>> app//org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$doGetPartitions$1(HiveClientImpl.scala:845)
>> app//org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$3953/0x00007f8c193a3628.apply(Unknown
>> Source)
>>
>> app//org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:303)
>> app//org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$3366/0x00007f8c19188000.apply(Unknown
>> Source)
>>
>> app//org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:234)
>> app//org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:233)
>> => holding
>> Monitor(org.apache.spark.sql.hive.client.IsolatedClientLoader@1088141517)
>>
>> app//org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:283)
>>
>> app//org.apache.spark.sql.hive.client.HiveClientImpl.doGetPartitions(HiveClientImpl.scala:838)
>>
>> app//org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionLocations(HiveClientImpl.scala:796)
>>
>> app//org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionLocations(HiveClientImpl.scala:790)
>>
>> app//org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitionLocations$1(HiveExternalCatalog.scala:1307)
>> app//org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$3941/0x00007f8c1939eb90.apply(Unknown
>> Source)
>> app//org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102)
>> => holding Monitor(org.apache.spark.sql.hive.HiveExternalCatalog@845450683
>> )
>>
>> app//org.apache.spark.sql.hive.HiveExternalCatalog.listPartitionLocations(HiveExternalCatalog.scala:1303)
>>
>> app//org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitionLocations(ExternalCatalogWithListener.scala:254)
>>
>> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionLocationsOptimized(SessionCatalog.scala:1360)
>>
>> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.$anonfun$listPartitionLocations$1(SessionCatalog.scala:1341)
>> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog$$Lambda$3935/0x00007f8c193932c0.apply(Unknown
>> Source)
>> app//org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:554)
>>
>> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.withCommonListMetrics(SessionCatalog.scala:1375)
>>
>> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionLocations(SessionCatalog.scala:1340)
>>
>> app//org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:107)
>> app//org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
>> => holding
>> Monitor(org.apache.spark.sql.execution.command.DataWritingCommandExec@1800917126
>> )
>>
>> app//org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
>>
>> app//org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
>>
>> app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:535)
>> app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$5823/0x00007f8c19973f98.apply(Unknown
>> Source)
>>
>> app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:574)
>>
>> app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:535)
>>
>> app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:126)
>> app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1$$Lambda$3552/0x00007f8c19243128.apply(Unknown
>> Source)
>>
>> app//org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
>>
>> app//org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:264)
>>
>> app//org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:138)
>>
>> app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:174)
>> app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$3562/0x00007f8c19245730.apply(Unknown
>> Source)
>>
>> app//org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
>>
>> app//org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:264)
>>
>> app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:174)
>> app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$3561/0x00007f8c19245000.apply(Unknown
>> Source)
>>
>> app//org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:285)
>>
>> app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:173)
>> app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$3553/0x00007f8c192433f8.apply(Unknown
>> Source)
>> app//org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
>>
>> app//org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70)
>>
>> app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:123)
>>
>> app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:114)
>>
>> app//org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
>> app//org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$3197/0x00007f8c190ddbb0.apply(Unknown
>> Source)
>>
>> app//org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:77)
>>
>> app//org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
>> app//org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org
>> $apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:34)
>>
>> app//org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:297)
>>
>> app//org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:293)
>>
>> app//org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
>>
>> app//org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
>>
>> app//org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
>>
>> app//org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:114)
>> app//org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:101)
>> => holding Monitor(org.apache.spark.sql.execution.QueryExecution@310792592
>> )
>>
>> app//org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:99)
>>
>> app//org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:164)
>>
>> app//org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:874)
>>
>> app//org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:495)
>>
>> app//org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:456)
>> x.Application.run(Application.java:N) <---------------------- this is the
>> application code
>> java.base@17.0.13/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> java.base@17.0.13
>> /jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>> java.base@17.0.13
>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> java.base@17.0.13/java.lang.reflect.Method.invoke(Method.java:569)
>>
>> app//org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:741)
>>
>>    - at line x.Application.main(Application.java:N) there is:
>>
>> dataset.write().mode(SaveMode.Overwrite).insertInto("table_name"); //
>> dataset is a Dataset<Row>
>>
>>    - on cluster logs we see:
>>
>> 24/12/04 10:16:22 INFO Hive: Metastore does not support
>> listPartitionLocations operation; falling back to using
>> listPartitionsWithAuthInfo operation from now on
>> org.apache.hadoop.hive.metastore.IMetaStoreClient$IncompatibleMetastoreException:
>> Metastore doesn't support listPartitionLocation: Invalid method name:
>> 'get_partition_locations'
>> at
>> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.newIncompatibleMetastoreException(HiveMetaStoreClient.java:2579)
>> ~[hive-metastore-2.3.9-amzn-3.jar:2.3.9-amzn-3]
>>
>>    - on cluster logs we see around 5 times this log:
>>
>> 24/12/04 11:33:42 WARN RetryingMetaStoreClient: MetaStoreClient lost
>> connection. Attempting to reconnect (3 of 5) after 15s.
>> listPartitionsWithAuthInfo
>> org.apache.thrift.transport.TTransportException:
>> java.net.SocketTimeoutException: Read timed out
>>
>>    - and on hivemetastore server logs we see (I am told that the -1
>>    there means "get all the partitions"):
>>
>> 2024-12-04T10:33:11,357 [pool-4-thread-173] DEBUG
>> x.server.invocation-log:168 - #get_partitions_ps_with_auth('x',
>> 'table_name', '[, , , ]', -1, 'client-x', []): entered
>>
>>    - the table the app tries to write to has 1 year worth of data
>>    - Spark 2 is capable to write around 650 partitions daily, around 1Tb
>>    worth of data, so there's a total of around 250000 partitions
>>    - the application uses spark.sql.sources.partitionOverwriteMode:
>>    dynamic so it's expected the 650 partitions are completely
>>    overwritten, leaving the others untouched
>>
>> I have some options but wanted to check if you have any pointers forward,
>> specifically how to speed this up :D
>>
>> Thanks a lot and sorry for the wall of text!
>>
>> Matteo
>>
>> [1]
>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-app-versions-7.x.html
>>
>>

-- 
Matteo Moci
https://twitter.com/matteomoci <http://mox.fm>

Reply via email to