On Thu, Dec 5, 2024 at 8:21 AM Matteo Moci <mox...@gmail.com> wrote:

> Thanks a lot Mich,
> 1) you mean coalescing partitions that are about to write? I don't think
> this will take less time, because all partitions have data. It seems the
> problem is that it asks HMS all partitions, even if it's only writing 650.
> Is it an improvement something that would benefit Spark?
>
I mean: I find it strange that Spark gets metadata of all partitions even
if it is only writing some.
But I don't know the underlying logic, so I will have to look at the code
first.


>
> 2) so you are saying that the fallback method is more expensive than the
> first one, right?
> I will try to use the old HMS passing the configuration at runtime like
> they say it's possible in this link
> https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html
>
>
> On Thu, Dec 5, 2024 at 8:17 AM Matteo Moci <mox...@gmail.com> wrote:
>
>> Thanks Mich,
>> 1)
>>
>> On Thu, Dec 5, 2024 at 1:16 AM Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>> Hi Matteo,
>>>
>>> 1) You have an incompatible Metastore: The Hive Metastore version used
>>> by the EMR cluster (2.3.9) doesn't support the get_partition_locations
>>> method directly. Spark 3.5 tries to use this method, leading to fallback
>>> and increased (Hive Metastore Service) HMS calls.
>>>
>>> 2) Large Number of Partitions: Your table has a very high number of
>>> partitions (250,000). Listing all partitions with
>>> listPartitionsWithAuthInfo can be slow, especially with potential network
>>> issues causing retries problem: Your  Spark 3.5 application writing to a
>>> Hive table with 250000 partitions experiences slowness compared to Spark
>>> 2.4. The driver gets stuck waiting for HMS (Hive Metastore Service) calls.
>>>
>>> Suggestions
>>>
>>> 1) You can optimize Partition Handling by coalescing Partitions, before
>>> writing. Consider using coalesce to reduce the number of partitions written
>>> to. This can significantly reduce HMS calls and improve performance.
>>>
>>> 2) check this link
>>> <https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html>
>>> for more info .
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>>
>>> Architect | Data Science | Financial Crime | GDPR & Compliance Specialist
>>> PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial
>>> College London <https://en.wikipedia.org/wiki/Imperial_College_London>
>>> London, United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>
>>>
>>> On Wed, 4 Dec 2024 at 21:47, Matteo Moci <mox...@gmail.com> wrote:
>>>
>>>> Hello Community,
>>>> The Spark 3.5 application I am working on shows slowness, right at the
>>>> time of writing to a Hive table.
>>>>
>>>> I'd like to ask you some hints on how to mitigate this behaviour, if
>>>> possible.
>>>>
>>>> The same application using Spark 2.4 ran "fine" within reasonable
>>>> times, with minimal cluster idle cpu times, while on Spark 3, the driver is
>>>> stuck for a long time waiting for HMS.
>>>> Some more context:
>>>>
>>>>    - the app runs on an Amazon EMR cluster 7.2.0 [1], that uses its
>>>>    own "distribution" of Spark 3.5.1, Hive 3.1.3, and with some 
>>>> optimizations
>>>>    I don't know the details of. From what I understood, this is why line
>>>>    numbers of stack traces in cluster logs don't always match with the open
>>>>    source code of Spark.
>>>>    - Hive MetaStore is using Hive 2 - not sure exactly which version
>>>>    - the app reads from a hive table and writes the output data to
>>>>    other 3 hive tables. Let's focus on 1 of them, because that's where the 
>>>> app
>>>>    is stuck
>>>>    - I think we can leave the table it reads from outside of the
>>>>    equation, because the driver thread dumps show it is stuck at this 
>>>> point,
>>>>    when inserting in the output table:
>>>>
>>>> java.base@17.0.13/sun.nio.ch.Net.poll(Native Method)
>>>> java.base@17.0.13/sun.nio.ch.NioSocketImpl.park(NioSocketImpl.java:186)
>>>> java.base@17.0.13
>>>> /sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:290)
>>>> java.base@17.0.13
>>>> /sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:314)
>>>> java.base@17.0.13/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:355)
>>>> java.base@17.0.13
>>>> /sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:808)
>>>> java.base@17.0.13
>>>> /java.net.Socket$SocketInputStream.read(Socket.java:966)
>>>> java.base@17.0.13
>>>> /java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
>>>> java.base@17.0.13
>>>> /java.io.BufferedInputStream.read1(BufferedInputStream.java:284)
>>>> java.base@17.0.13/java.io.BufferedInputStream.read(BufferedInputStream.java:343)
>>>> => holding Monitor(java.io.BufferedInputStream@1632069162)
>>>>
>>>> app//org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:127)
>>>> app//org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
>>>>
>>>> app//org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:425)
>>>>
>>>> app//org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:321)
>>>>
>>>> app//org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:225)
>>>>
>>>> app//org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
>>>>
>>>> app//org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_partitions_ps_with_auth(ThriftHiveMetastore.java:2601)
>>>>
>>>> app//org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partitions_ps_with_auth(ThriftHiveMetastore.java:2583)
>>>>
>>>> app//org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionsWithAuthInfo(HiveMetaStoreClient.java:1212)
>>>> java.base@17.0.13/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>> Method)
>>>> java.base@17.0.13
>>>> /jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>>>> java.base@17.0.13
>>>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> java.base@17.0.13/java.lang.reflect.Method.invoke(Method.java:569)
>>>>
>>>> app//org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:177)
>>>> app/jdk.proxy2/jdk.proxy2.$Proxy106.listPartitionsWithAuthInfo(Unknown
>>>> Source)
>>>> java.base@17.0.13/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>> Method)
>>>> java.base@17.0.13
>>>> /jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>>>> java.base@17.0.13
>>>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> java.base@17.0.13/java.lang.reflect.Method.invoke(Method.java:569)
>>>> app//org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2350)
>>>> => holding
>>>> Monitor(org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler@244249800
>>>> )
>>>> app/jdk.proxy2/jdk.proxy2.$Proxy106.listPartitionsWithAuthInfo(Unknown
>>>> Source)
>>>>
>>>> app//org.apache.hadoop.hive.ql.metadata.Hive.listPartitionLocationsWithoutExtension(Hive.java:2797)
>>>>
>>>> app//org.apache.hadoop.hive.ql.metadata.Hive.doGetPartitionLocations(Hive.java:2775)
>>>>
>>>> app//org.apache.hadoop.hive.ql.metadata.Hive.getPartitionLocations(Hive.java:2754)
>>>>
>>>> app//org.apache.hadoop.hive.ql.metadata.Hive.getPartitionLocations(Hive.java:2727)
>>>>
>>>> app//org.apache.spark.sql.hive.client.Shim_v0_12.getPartitionLocations(HiveShim.scala:673)
>>>>
>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitionLocations$3(HiveClientImpl.scala:798)
>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$3952/0x00007f8c193a3068.apply(Unknown
>>>> Source)
>>>>
>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$doGetPartitions$1(HiveClientImpl.scala:845)
>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$3953/0x00007f8c193a3628.apply(Unknown
>>>> Source)
>>>>
>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:303)
>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl$$Lambda$3366/0x00007f8c19188000.apply(Unknown
>>>> Source)
>>>>
>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:234)
>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:233)
>>>> => holding
>>>> Monitor(org.apache.spark.sql.hive.client.IsolatedClientLoader@1088141517
>>>> )
>>>>
>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:283)
>>>>
>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.doGetPartitions(HiveClientImpl.scala:838)
>>>>
>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionLocations(HiveClientImpl.scala:796)
>>>>
>>>> app//org.apache.spark.sql.hive.client.HiveClientImpl.getPartitionLocations(HiveClientImpl.scala:790)
>>>>
>>>> app//org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitionLocations$1(HiveExternalCatalog.scala:1307)
>>>> app//org.apache.spark.sql.hive.HiveExternalCatalog$$Lambda$3941/0x00007f8c1939eb90.apply(Unknown
>>>> Source)
>>>> app//org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102)
>>>> => holding Monitor(org.apache.spark.sql.hive.HiveExternalCatalog@845450683
>>>> )
>>>>
>>>> app//org.apache.spark.sql.hive.HiveExternalCatalog.listPartitionLocations(HiveExternalCatalog.scala:1303)
>>>>
>>>> app//org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitionLocations(ExternalCatalogWithListener.scala:254)
>>>>
>>>> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionLocationsOptimized(SessionCatalog.scala:1360)
>>>>
>>>> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.$anonfun$listPartitionLocations$1(SessionCatalog.scala:1341)
>>>> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog$$Lambda$3935/0x00007f8c193932c0.apply(Unknown
>>>> Source)
>>>> app//org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:554)
>>>>
>>>> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.withCommonListMetrics(SessionCatalog.scala:1375)
>>>>
>>>> app//org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionLocations(SessionCatalog.scala:1340)
>>>>
>>>> app//org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:107)
>>>> app//org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
>>>> => holding
>>>> Monitor(org.apache.spark.sql.execution.command.DataWritingCommandExec@1800917126
>>>> )
>>>>
>>>> app//org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
>>>>
>>>> app//org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
>>>>
>>>> app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:535)
>>>> app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$5823/0x00007f8c19973f98.apply(Unknown
>>>> Source)
>>>>
>>>> app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:574)
>>>>
>>>> app//org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:535)
>>>>
>>>> app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:126)
>>>> app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1$$Lambda$3552/0x00007f8c19243128.apply(Unknown
>>>> Source)
>>>>
>>>> app//org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
>>>>
>>>> app//org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:264)
>>>>
>>>> app//org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:138)
>>>>
>>>> app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:174)
>>>> app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$3562/0x00007f8c19245730.apply(Unknown
>>>> Source)
>>>>
>>>> app//org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
>>>>
>>>> app//org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:264)
>>>>
>>>> app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:174)
>>>> app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$3561/0x00007f8c19245000.apply(Unknown
>>>> Source)
>>>>
>>>> app//org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:285)
>>>>
>>>> app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:173)
>>>> app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$3553/0x00007f8c192433f8.apply(Unknown
>>>> Source)
>>>>
>>>> app//org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:901)
>>>>
>>>> app//org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70)
>>>>
>>>> app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:123)
>>>>
>>>> app//org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:114)
>>>>
>>>> app//org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
>>>> app//org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$3197/0x00007f8c190ddbb0.apply(Unknown
>>>> Source)
>>>>
>>>> app//org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:77)
>>>>
>>>> app//org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
>>>> app//org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org
>>>> $apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:34)
>>>>
>>>> app//org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:297)
>>>>
>>>> app//org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:293)
>>>>
>>>> app//org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
>>>>
>>>> app//org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:34)
>>>>
>>>> app//org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
>>>>
>>>> app//org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:114)
>>>> app//org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:101)
>>>> => holding Monitor(org.apache.spark.sql.execution.QueryExecution@310792592
>>>> )
>>>>
>>>> app//org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:99)
>>>>
>>>> app//org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:164)
>>>>
>>>> app//org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:874)
>>>>
>>>> app//org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:495)
>>>>
>>>> app//org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:456)
>>>> x.Application.run(Application.java:N) <---------------------- this is
>>>> the application code
>>>> java.base@17.0.13/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>> Method)
>>>> java.base@17.0.13
>>>> /jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>>>> java.base@17.0.13
>>>> /jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> java.base@17.0.13/java.lang.reflect.Method.invoke(Method.java:569)
>>>>
>>>> app//org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:741)
>>>>
>>>>    - at line x.Application.main(Application.java:N) there is:
>>>>
>>>> dataset.write().mode(SaveMode.Overwrite).insertInto("table_name"); //
>>>> dataset is a Dataset<Row>
>>>>
>>>>    - on cluster logs we see:
>>>>
>>>> 24/12/04 10:16:22 INFO Hive: Metastore does not support
>>>> listPartitionLocations operation; falling back to using
>>>> listPartitionsWithAuthInfo operation from now on
>>>> org.apache.hadoop.hive.metastore.IMetaStoreClient$IncompatibleMetastoreException:
>>>> Metastore doesn't support listPartitionLocation: Invalid method name:
>>>> 'get_partition_locations'
>>>> at
>>>> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.newIncompatibleMetastoreException(HiveMetaStoreClient.java:2579)
>>>> ~[hive-metastore-2.3.9-amzn-3.jar:2.3.9-amzn-3]
>>>>
>>>>    - on cluster logs we see around 5 times this log:
>>>>
>>>> 24/12/04 11:33:42 WARN RetryingMetaStoreClient: MetaStoreClient lost
>>>> connection. Attempting to reconnect (3 of 5) after 15s.
>>>> listPartitionsWithAuthInfo
>>>> org.apache.thrift.transport.TTransportException:
>>>> java.net.SocketTimeoutException: Read timed out
>>>>
>>>>    - and on hivemetastore server logs we see (I am told that the -1
>>>>    there means "get all the partitions"):
>>>>
>>>> 2024-12-04T10:33:11,357 [pool-4-thread-173] DEBUG
>>>> x.server.invocation-log:168 - #get_partitions_ps_with_auth('x',
>>>> 'table_name', '[, , , ]', -1, 'client-x', []): entered
>>>>
>>>>    - the table the app tries to write to has 1 year worth of data
>>>>    - Spark 2 is capable to write around 650 partitions daily, around
>>>>    1Tb worth of data, so there's a total of around 250000 partitions
>>>>    - the application uses spark.sql.sources.partitionOverwriteMode:
>>>>    dynamic so it's expected the 650 partitions are completely
>>>>    overwritten, leaving the others untouched
>>>>
>>>> I have some options but wanted to check if you have any pointers
>>>> forward, specifically how to speed this up :D
>>>>
>>>> Thanks a lot and sorry for the wall of text!
>>>>
>>>> Matteo
>>>>
>>>> [1]
>>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-release-app-versions-7.x.html
>>>>
>>>>
>>
>> --
>> Matteo Moci
>> https://twitter.com/matteomoci <http://mox.fm>
>>
>>
>
> --
> Matteo Moci
> https://twitter.com/matteomoci <http://mox.fm>
>
>

-- 
Matteo Moci
https://twitter.com/matteomoci <http://mox.fm>

Reply via email to