This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new ce9b927d5 IMPALA-13599: Reduce the number of interactions with 
alter_partition() HMS API
ce9b927d5 is described below

commit ce9b927d547ac4290275fede4843288bbf97a429
Author: Sai Hemanth Gantasala <[email protected]>
AuthorDate: Tue Dec 10 22:04:20 2024 -0800

    IMPALA-13599: Reduce the number of interactions with alter_partition()
    HMS API
    
    Drop incremental stats and set/unset cached operations in Impala are
    calling alter_partition() HMS API as many times as the number of
    partitions in the table. This patch reduces these unnecessary
    interactions by calling alter_partitions() HMS API to #partitions/500
    times regardless of number of partitions, since we have a limit of 500
    partitions per HMS RPC in bulkAlterPartitions().
    
    Note: This doesn't change the number of ALTER_PARTITION events
    generated by HMS. Once HIVE-27746 is included in the impala build, this
    patch further benefits by generating #partitions/500 ALTER_PARTITIONS
    events.
    
    Testing:
    - Added an end-to-end test to verify that HMS API is called only once.
    
    Change-Id: I2f2f1d9637e8be9c931da0415a17dd0839637e4c
    Reviewed-on: http://gerrit.cloudera.org:8080/22197
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../apache/impala/service/CatalogOpExecutor.java   | 28 ++++++++++++----------
 tests/custom_cluster/test_events_custom_configs.py | 23 ++++++++++++++++++
 2 files changed, 38 insertions(+), 13 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 33eb0d8f3..8c7eb73d3 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -2741,14 +2741,17 @@ public class CatalogOpExecutor {
           return;
         }
 
+        // List of partitions that were modified as part of this operation.
+        List<HdfsPartition.Builder> modifiedParts = Lists.newArrayList();
         for (HdfsPartition partition : partitions) {
           if (partition.getPartitionStatsCompressed() != null) {
             HdfsPartition.Builder partBuilder = new 
HdfsPartition.Builder(partition);
             partBuilder.dropPartitionStats();
-            applyAlterPartition(table, partBuilder, catalogTimeline);
-            hdfsTbl.updatePartition(partBuilder);
+            modifiedParts.add(partBuilder);
           }
         }
+        bulkAlterPartitions(table, modifiedParts, null, 
UpdatePartitionMethod.IN_PLACE,
+            catalogTimeline);
       }
       loadTableMetadata(table, modification.newVersionNumber(),
           /*reloadFileMetadata=*/false,
@@ -6155,6 +6158,8 @@ public class CatalogOpExecutor {
         // partitions.
         Collection<? extends FeFsPartition> parts =
             FeCatalogUtils.loadAllPartitions(hdfsTable);
+        // List of partitions that were modified as part of this operation.
+        List<HdfsPartition.Builder> modifiedParts = Lists.newArrayList();
         for (FeFsPartition fePartition: parts) {
           // TODO(todd): avoid downcast
           HdfsPartition partition = (HdfsPartition) fePartition;
@@ -6185,14 +6190,11 @@ public class CatalogOpExecutor {
               }
             }
 
-            // Update the partition metadata.
-            try {
-              applyAlterPartition(tbl, partBuilder, catalogTimeline);
-            } finally {
-              ((HdfsTable) tbl).markDirtyPartition(partBuilder);
-            }
+            modifiedParts.add(partBuilder);
           }
         }
+        bulkAlterPartitions(tbl, modifiedParts, null, 
UpdatePartitionMethod.MARK_DIRTY,
+            catalogTimeline);
       } else {
         loadFileMetadata = true;
       }
@@ -6211,19 +6213,19 @@ public class CatalogOpExecutor {
       if (tbl.getNumClusteringCols() > 0) {
         Collection<? extends FeFsPartition> parts =
             FeCatalogUtils.loadAllPartitions(hdfsTable);
+        // List of partitions that were modified as part of this operation.
+        List<HdfsPartition.Builder> modifiedParts = Lists.newArrayList();
         for (FeFsPartition fePartition: parts) {
           // TODO(todd): avoid downcast
           HdfsPartition partition = (HdfsPartition) fePartition;
           if (partition.isMarkedCached()) {
             HdfsPartition.Builder partBuilder = new 
HdfsPartition.Builder(partition);
             HdfsCachingUtil.removePartitionCacheDirective(partBuilder);
-            try {
-              applyAlterPartition(tbl, partBuilder, catalogTimeline);
-            } finally {
-              ((HdfsTable) tbl).markDirtyPartition(partBuilder);
-            }
+            modifiedParts.add(partBuilder);
           }
         }
+        bulkAlterPartitions(tbl, modifiedParts, null, 
UpdatePartitionMethod.MARK_DIRTY,
+            catalogTimeline);
       } else {
         loadFileMetadata = true;
       }
diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index c3f9bd648..16c52def8 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -1469,6 +1469,29 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
     assert '1\t00%3A00%3A00' in res.data
     assert '2\t00%253A00%253A00' in res.data
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--use_local_catalog=true",
+    catalogd_args="--catalog_topic_mode=minimal",
+    disable_log_buffering=True, cluster_size=1)
+  def test_bulk_alter_partitions(self, unique_database):
+    test_tbl = unique_database + ".bulk_alter_partitions"
+    self.client.execute("create table {} (id int) partitioned by (p int)"
+        .format(test_tbl))
+    self.run_stmt_in_hive("SET hive.exec.dynamic.partition.mode=nonstrict; "
+        "insert into {} partition(p) values 
(0,0),(1,1),(2,2)".format(test_tbl))
+    self.client.execute("select * from {}".format(test_tbl))
+    self.client.execute("compute incremental stats {}".format(test_tbl))
+    self.client.execute("drop incremental stats {} 
partition(p>=0)".format(test_tbl))
+    EventProcessorUtils.wait_for_event_processing(self)
+    log_regex = r"HMS alterPartitions done on {}/{} partitions of table {}" \
+        .format(3, 3, test_tbl)
+    # we see the above twice, once for compute stats and second for drop stats
+    self.assert_catalogd_log_contains('INFO', log_regex, expected_count=2, 
timeout_s=20)
+    self.client.execute("alter table {} partition(p>=0) set cached in 
'testPool'"
+        .format(test_tbl))
+    self.assert_catalogd_log_contains('INFO', log_regex, expected_count=3, 
timeout_s=20)
+
 
 @SkipIfFS.hive
 class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase):

Reply via email to