This is an automated email from the ASF dual-hosted git repository.
dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new ce9b927d5 IMPALA-13599: Reduce the number of interactions with
alter_partition() HMS API
ce9b927d5 is described below
commit ce9b927d547ac4290275fede4843288bbf97a429
Author: Sai Hemanth Gantasala <[email protected]>
AuthorDate: Tue Dec 10 22:04:20 2024 -0800
IMPALA-13599: Reduce the number of interactions with alter_partition()
HMS API
Drop incremental stats and set/unset cached operations in Impala are
calling alter_partition() HMS API as many times as the number of
partitions in the table. This patch reduces these unnecessary
interactions by calling alter_partitions() HMS API to #partitions/500
times regardless of number of partitions, since we have a limit of 500
partitions per HMS RPC in bulkAlterPartitions().
Note: This doesn't change the number of ALTER_PARTITION events
generated by HMS. Once HIVE-27746 is included in the impala build, this
patch further benefits by generating #partitions/500 ALTER_PARTITIONS
events.
Testing:
- Added an end-to-end test to verify that HMS API is called only once.
Change-Id: I2f2f1d9637e8be9c931da0415a17dd0839637e4c
Reviewed-on: http://gerrit.cloudera.org:8080/22197
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../apache/impala/service/CatalogOpExecutor.java | 28 ++++++++++++----------
tests/custom_cluster/test_events_custom_configs.py | 23 ++++++++++++++++++
2 files changed, 38 insertions(+), 13 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 33eb0d8f3..8c7eb73d3 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -2741,14 +2741,17 @@ public class CatalogOpExecutor {
return;
}
+ // List of partitions that were modified as part of this operation.
+ List<HdfsPartition.Builder> modifiedParts = Lists.newArrayList();
for (HdfsPartition partition : partitions) {
if (partition.getPartitionStatsCompressed() != null) {
HdfsPartition.Builder partBuilder = new
HdfsPartition.Builder(partition);
partBuilder.dropPartitionStats();
- applyAlterPartition(table, partBuilder, catalogTimeline);
- hdfsTbl.updatePartition(partBuilder);
+ modifiedParts.add(partBuilder);
}
}
+ bulkAlterPartitions(table, modifiedParts, null,
UpdatePartitionMethod.IN_PLACE,
+ catalogTimeline);
}
loadTableMetadata(table, modification.newVersionNumber(),
/*reloadFileMetadata=*/false,
@@ -6155,6 +6158,8 @@ public class CatalogOpExecutor {
// partitions.
Collection<? extends FeFsPartition> parts =
FeCatalogUtils.loadAllPartitions(hdfsTable);
+ // List of partitions that were modified as part of this operation.
+ List<HdfsPartition.Builder> modifiedParts = Lists.newArrayList();
for (FeFsPartition fePartition: parts) {
// TODO(todd): avoid downcast
HdfsPartition partition = (HdfsPartition) fePartition;
@@ -6185,14 +6190,11 @@ public class CatalogOpExecutor {
}
}
- // Update the partition metadata.
- try {
- applyAlterPartition(tbl, partBuilder, catalogTimeline);
- } finally {
- ((HdfsTable) tbl).markDirtyPartition(partBuilder);
- }
+ modifiedParts.add(partBuilder);
}
}
+ bulkAlterPartitions(tbl, modifiedParts, null,
UpdatePartitionMethod.MARK_DIRTY,
+ catalogTimeline);
} else {
loadFileMetadata = true;
}
@@ -6211,19 +6213,19 @@ public class CatalogOpExecutor {
if (tbl.getNumClusteringCols() > 0) {
Collection<? extends FeFsPartition> parts =
FeCatalogUtils.loadAllPartitions(hdfsTable);
+ // List of partitions that were modified as part of this operation.
+ List<HdfsPartition.Builder> modifiedParts = Lists.newArrayList();
for (FeFsPartition fePartition: parts) {
// TODO(todd): avoid downcast
HdfsPartition partition = (HdfsPartition) fePartition;
if (partition.isMarkedCached()) {
HdfsPartition.Builder partBuilder = new
HdfsPartition.Builder(partition);
HdfsCachingUtil.removePartitionCacheDirective(partBuilder);
- try {
- applyAlterPartition(tbl, partBuilder, catalogTimeline);
- } finally {
- ((HdfsTable) tbl).markDirtyPartition(partBuilder);
- }
+ modifiedParts.add(partBuilder);
}
}
+ bulkAlterPartitions(tbl, modifiedParts, null,
UpdatePartitionMethod.MARK_DIRTY,
+ catalogTimeline);
} else {
loadFileMetadata = true;
}
diff --git a/tests/custom_cluster/test_events_custom_configs.py
b/tests/custom_cluster/test_events_custom_configs.py
index c3f9bd648..16c52def8 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -1469,6 +1469,29 @@ class
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
assert '1\t00%3A00%3A00' in res.data
assert '2\t00%253A00%253A00' in res.data
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--use_local_catalog=true",
+ catalogd_args="--catalog_topic_mode=minimal",
+ disable_log_buffering=True, cluster_size=1)
+ def test_bulk_alter_partitions(self, unique_database):
+ test_tbl = unique_database + ".bulk_alter_partitions"
+ self.client.execute("create table {} (id int) partitioned by (p int)"
+ .format(test_tbl))
+ self.run_stmt_in_hive("SET hive.exec.dynamic.partition.mode=nonstrict; "
+ "insert into {} partition(p) values
(0,0),(1,1),(2,2)".format(test_tbl))
+ self.client.execute("select * from {}".format(test_tbl))
+ self.client.execute("compute incremental stats {}".format(test_tbl))
+ self.client.execute("drop incremental stats {}
partition(p>=0)".format(test_tbl))
+ EventProcessorUtils.wait_for_event_processing(self)
+ log_regex = r"HMS alterPartitions done on {}/{} partitions of table {}" \
+ .format(3, 3, test_tbl)
+ # we see the above twice, once for compute stats and second for drop stats
+ self.assert_catalogd_log_contains('INFO', log_regex, expected_count=2,
timeout_s=20)
+ self.client.execute("alter table {} partition(p>=0) set cached in
'testPool'"
+ .format(test_tbl))
+ self.assert_catalogd_log_contains('INFO', log_regex, expected_count=3,
timeout_s=20)
+
@SkipIfFS.hive
class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase):