This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch branch-4.4.0
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5d32919f46117213249c60574f77e3f9bb66ed90
Author: stiga-huang <[email protected]>
AuthorDate: Thu Apr 18 10:32:58 2024 +0800

    IMPALA-13009: Fix catalogd not sending deletion updates for some dropped 
partitions
    
    *Background*
    
    Since IMPALA-3127, catalogd sends incremental partition updates based on
    the last sent table snapshot ('maxSentPartitionId_' to be specific).
    Dropped partitions since the last catalog update are tracked in
    'droppedPartitions_' of HdfsTable. When catalogd collects the next
    catalog update, they will be collected. HdfsTable then clears the set.
    See details in CatalogServiceCatalog#addHdfsPartitionsToCatalogDelta().
    
    If an HdfsTable is invalidated, it's replaced with an IncompleteTable
    which doesn't track any partitions. The HdfsTable object is then added
    to the deleteLog so catalogd can send deletion updates for all its
    partitions. The same if the HdfsTable is dropped. However, the
    previously dropped partitions are not collected in this case, which
    results in a leak in the catalog topic if the partition name is not
    reused anymore. Note that in the catalog topic, the key of a partition
    update consists of the table name and the partition name. So if the
    partition is added back to the table, the topic key will be reused then
    resolves the leak.
    
    The leak will be observed when a coordinator restarts. In the initial
    catalog update sent from statestore, coordinator will find some
    partition updates that are not referenced by the HdfsTable (assuming the
    table is used again after the INVALIDATE). Then a Precondition check
    fails and the table is not added to the coordinator.
    
    *Overview of the patch*
    
    This patch fixes the leak by also collecting the dropped partitions when
    adding the HdfsTable to the deleteLog. A new field, dropped_partitions,
    is added in THdfsTable to collect them. It's only used when catalogd
    collects catalog updates.
    
    Removes the Precondition check in coordinator and just reports the stale
    partitions since IMPALA-12831 could also introduce them.
    
    Also adds a log line in CatalogOpExecutor.alterTableDropPartition() to
    show the dropped partition names for better diagnostics.
    
    Tests
     - Added e2e tests
    
    Change-Id: I12a68158dca18ee48c9564ea16b7484c9f5b5d21
    Reviewed-on: http://gerrit.cloudera.org:8080/21326
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
    (cherry picked from commit ee21427d26620b40d38c706b4944d2831f84f6f5)
---
 common/thrift/CatalogObjects.thrift                |   4 +
 .../impala/catalog/CatalogServiceCatalog.java      | 107 +++++++++++-----
 .../java/org/apache/impala/catalog/HdfsTable.java  |  15 ++-
 .../org/apache/impala/catalog/ImpaladCatalog.java  |  16 ++-
 .../apache/impala/service/CatalogOpExecutor.java   |   2 +
 tests/common/impala_test_suite.py                  |  34 ++++--
 tests/custom_cluster/test_partition.py             | 134 +++++++++++++++++++++
 tests/metadata/test_recover_partitions.py          |   4 -
 8 files changed, 262 insertions(+), 54 deletions(-)

diff --git a/common/thrift/CatalogObjects.thrift 
b/common/thrift/CatalogObjects.thrift
index a0e28a92a..30b856e7e 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -518,6 +518,10 @@ struct THdfsTable {
 
   // Bucket information for HDFS tables
   16: optional TBucketInfo bucket_info
+
+  // Recently dropped partitions that are not yet synced to the catalog topic.
+  // Only used in catalogd.
+  17: optional list<THdfsPartition> dropped_partitions
 }
 
 struct THBaseTable {
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index fc8fa029e..b58970ea7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -1063,38 +1063,7 @@ public class CatalogServiceCatalog extends Catalog {
           Catalog.toCatalogObjectKey(removedObject))) {
         ctx.addCatalogObject(removedObject, true);
       }
-      // If this is a HdfsTable and incremental metadata updates are enabled, 
make sure we
-      // send deletes for removed partitions. So we won't leak partition topic 
entries in
-      // the statestored catalog topic. Partitions are only included as 
objects in topic
-      // updates if incremental metadata updates are enabled. Don't need this 
if
-      // incremental metadata updates are disabled, because in this case the 
table
-      // snapshot will be sent as a complete object. See more details in
-      // addTableToCatalogDeltaHelper().
-      if (BackendConfig.INSTANCE.isIncrementalMetadataUpdatesEnabled()
-          && removedObject.type == TCatalogObjectType.TABLE
-          && removedObject.getTable().getTable_type() == 
TTableType.HDFS_TABLE) {
-        THdfsTable hdfsTable = removedObject.getTable().getHdfs_table();
-        Preconditions.checkState(
-            !hdfsTable.has_full_partitions && hdfsTable.has_partition_names,
-            /*errorMessage*/hdfsTable);
-        String tblName = removedObject.getTable().db_name + "."
-            + removedObject.getTable().tbl_name;
-        PartitionMetaSummary deleteSummary = 
createPartitionMetaSummary(tblName);
-        for (THdfsPartition part : hdfsTable.partitions.values()) {
-          Preconditions.checkState(part.id >= 
HdfsPartition.INITIAL_PARTITION_ID
-              && part.db_name != null
-              && part.tbl_name != null
-              && part.partition_name != null, /*errorMessage*/part);
-          TCatalogObject removedPart = new TCatalogObject(HDFS_PARTITION,
-              removedObject.getCatalog_version());
-          removedPart.setHdfs_partition(part);
-          if (!ctx.updatedCatalogObjects.contains(
-              Catalog.toCatalogObjectKey(removedPart))) {
-            ctx.addCatalogObject(removedPart, true, deleteSummary);
-          }
-        }
-        if (deleteSummary.hasUpdates()) LOG.info(deleteSummary.toString());
-      }
+      collectPartitionDeletion(ctx, removedObject);
     }
     // Each topic update should contain a single "TCatalog" object which is 
used to
     // pass overall state on the catalog, such as the current version and the
@@ -1122,6 +1091,80 @@ public class CatalogServiceCatalog extends Catalog {
     return ctx.toVersion;
   }
 
+  /**
+   * Collects partition deletion from removed HdfsTable objects.
+   */
+  private void collectPartitionDeletion(GetCatalogDeltaContext ctx,
+      TCatalogObject removedObject) throws TException {
+    // If this is a HdfsTable and incremental metadata updates are enabled, 
make sure we
+    // send deletes for removed partitions. So we won't leak partition topic 
entries in
+    // the statestored catalog topic. Partitions are only included as objects 
in topic
+    // updates if incremental metadata updates are enabled. Don't need this if
+    // incremental metadata updates are disabled, because in this case the 
table
+    // snapshot will be sent as a complete object. See more details in
+    // addTableToCatalogDeltaHelper().
+    if (!BackendConfig.INSTANCE.isIncrementalMetadataUpdatesEnabled()
+        || removedObject.type != TCatalogObjectType.TABLE
+        || removedObject.getTable().getTable_type() != TTableType.HDFS_TABLE) {
+      return;
+    }
+    THdfsTable hdfsTable = removedObject.getTable().getHdfs_table();
+    Preconditions.checkState(
+        !hdfsTable.has_full_partitions && hdfsTable.has_partition_names,
+        /*errorMessage*/hdfsTable);
+    String tblName = removedObject.getTable().db_name + "."
+        + removedObject.getTable().tbl_name;
+    PartitionMetaSummary deleteSummary = createPartitionMetaSummary(tblName);
+    Set<String> collectedPartNames = new HashSet<>();
+    for (THdfsPartition part : hdfsTable.partitions.values()) {
+      Preconditions.checkState(part.id >= HdfsPartition.INITIAL_PARTITION_ID
+          && part.db_name != null
+          && part.tbl_name != null
+          && part.partition_name != null, /*errorMessage*/part);
+      TCatalogObject removedPart = new TCatalogObject(HDFS_PARTITION,
+          removedObject.getCatalog_version());
+      removedPart.setHdfs_partition(part);
+      String partObjKey = Catalog.toCatalogObjectKey(removedPart);
+      boolean collected = false;
+      if (!ctx.updatedCatalogObjects.contains(partObjKey)) {
+        ctx.addCatalogObject(removedPart, true, deleteSummary);
+        collectedPartNames.add(part.partition_name);
+        collected = true;
+      }
+      LOG.trace("{} deletion of {} id={} from the active partition set " +
+              "of a removed/invalidated table (version={})",
+          collected ? "Collected" : "Skipped", partObjKey, part.id,
+          removedObject.getCatalog_version());
+    }
+    // Adds the recently dropped partitions that are not yet synced to the 
catalog
+    // topic.
+    if (hdfsTable.isSetDropped_partitions()) {
+      for (THdfsPartition part : hdfsTable.dropped_partitions) {
+        // If a partition is dropped and then re-added, the old instance is 
added to
+        // droppedPartitions and the new instance is in partitionMap of 
HdfsTable.
+        // So partitions collected in the above loop could have the same name 
here.
+        if (collectedPartNames.contains(part.partition_name)) continue;
+        TCatalogObject removedPart = new TCatalogObject(HDFS_PARTITION,
+            removedObject.getCatalog_version());
+        removedPart.setHdfs_partition(part);
+        String partObjKey = Catalog.toCatalogObjectKey(removedPart);
+        // Skip if there is an update of the partition collected. It could be
+        // collected from a new version of the HdfsTable and this comes from an
+        // invalidated HdfsTable.
+        boolean collected = false;
+        if (!ctx.updatedCatalogObjects.contains(partObjKey)) {
+          ctx.addCatalogObject(removedPart, true, deleteSummary);
+          collected = true;
+        }
+        LOG.trace("{} deletion of {} id={} from the dropped partition set " +
+                "of a removed/invalidated table (version={})",
+            collected ? "Collected" : "Skipped", partObjKey, part.id,
+            removedObject.getCatalog_version());
+      }
+    }
+    if (deleteSummary.hasUpdates()) LOG.info(deleteSummary.toString());
+  }
+
   /**
    * Evaluates if the information from an event (serviceId and versionNumber) 
matches to
    * the catalog object. If there is match, the in-flight version for that 
object is
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 45fb94e39..8954eab0b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -2079,10 +2079,14 @@ public class HdfsTable extends Table implements 
FeFsTable {
   public void validatePartitions(Set<Long> expectedPartitionIds)
       throws TableLoadingException {
     if (!partitionMap_.keySet().equals(expectedPartitionIds)) {
+      Set<Long> missingIds = new HashSet<>(expectedPartitionIds);
+      missingIds.removeAll(partitionMap_.keySet());
+      Set<Long> staleIds = new HashSet<>(partitionMap_.keySet());
+      staleIds.removeAll(expectedPartitionIds);
       throw new TableLoadingException(String.format("Error applying 
incremental updates" +
-              " on table %s: missing partition ids %s, stale partition ids %s",
-          getFullName(), 
expectedPartitionIds.removeAll(partitionMap_.keySet()),
-          partitionMap_.keySet().removeAll(expectedPartitionIds)));
+              " on table %s. Missing partition ids: %s. Stale partition ids: 
%s. Total " +
+              "partitions: %d.",
+          getFullName(), missingIds, staleIds, partitionMap_.size()));
     }
   }
 
@@ -2188,6 +2192,11 @@ public class HdfsTable extends Table implements 
FeFsTable {
       for (HdfsPartition part : partitionMap_.values()) {
         hdfsTable.partitions.put(part.getId(), part.toMinimalTHdfsPartition());
       }
+      // Adds the recently dropped partitions that are not yet synced to the 
catalog
+      // topic.
+      for (HdfsPartition part : droppedPartitions_) {
+        hdfsTable.addToDropped_partitions(part.toMinimalTHdfsPartition());
+      }
       hdfsTable.setHas_full_partitions(false);
       // The minimal catalog object of partitions contain the partition names.
       hdfsTable.setHas_partition_names(true);
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index 430736a07..2962325be 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -516,12 +516,17 @@ public class ImpaladCatalog extends Catalog implements 
FeCatalog {
         }
       }
       // Apply incremental updates.
+      List<String> stalePartitionNames = new ArrayList<>();
       for (THdfsPartition tPart : newPartitions) {
         // TODO: remove this after IMPALA-9937. It's only illegal in 
statestore updates,
         //  which indicates a leak of partitions in the catalog topic - a 
stale partition
         //  should already have a corresponding deletion so won't get here.
-        Preconditions.checkState(tHdfsTable.partitions.containsKey(tPart.id),
-            "Received stale partition in a statestore update: " + tPart);
+        if (!tHdfsTable.partitions.containsKey(tPart.id)) {
+          stalePartitionNames.add(tPart.partition_name);
+          LOG.warn("Stale partition: {}.{}:{} id={}", tPart.db_name, 
tPart.tbl_name,
+              tPart.partition_name, tPart.id);
+          continue;
+        }
         // The existing table could have a newer version than the last sent 
table version
         // in catalogd, so some partition instances may already exist here. 
This happens
         // when we have executed DDL/DMLs on this table on this coordinator 
since the last
@@ -541,6 +546,13 @@ public class ImpaladCatalog extends Catalog implements 
FeCatalog {
             tPart.id, tPart.partition_name, newHdfsTable.getFullName());
         numNewParts++;
       }
+      if (!stalePartitionNames.isEmpty()) {
+        LOG.warn("Received {} stale partitions of table {}.{} in the 
statestore update:" +
+                " {}. There are possible leaks in the catalog topic values. To 
resolve " +
+                "the leak, add them back and then drop them again.",
+            stalePartitionNames.size(), thriftTable.db_name, 
thriftTable.tbl_name,
+            String.join(",", stalePartitionNames));
+      }
       // Validate that all partitions are set.
       ((HdfsTable) 
newTable).validatePartitions(tHdfsTable.partitions.keySet());
       LOG.info("Applied incremental table updates on {} existing partitions of 
" +
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index f8475e5f2..6b5ffa01d 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -5526,6 +5526,8 @@ public class CatalogOpExecutor {
         try {
           msClient.getHiveClient().dropPartition(tableName.getDb(), 
tableName.getTbl(),
               part.getPartitionValuesAsStrings(true), dropOptions);
+          LOG.info("Dropped partition {}.{}:{} in Metastore",
+              tableName.getDb(), tableName.getTbl(), part.getPartitionName());
           ++numTargetedPartitions;
         } catch (NoSuchObjectException e) {
           if (!ifExists) {
diff --git a/tests/common/impala_test_suite.py 
b/tests/common/impala_test_suite.py
index 40020e3d0..e5122676a 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -1201,6 +1201,11 @@ class ImpalaTestSuite(BaseTestSuite):
     else:
       return "UNIDENTIFIED"
 
+  @classmethod
+  def has_value(cls, value, lines):
+    """Check if lines contain value."""
+    return any([line.find(value) != -1 for line in lines])
+
   def wait_for_db_to_appear(self, db_name, timeout_s):
     """Wait until the database with 'db_name' is present in the impalad's 
local catalog.
     Fail after timeout_s if the doesn't appear."""
@@ -1263,22 +1268,24 @@ class ImpalaTestSuite(BaseTestSuite):
         "Check failed to return True after {0} tries and {1} 
seconds{2}".format(
           count, timeout_s, error_msg_str))
 
-  def assert_impalad_log_contains(self, level, line_regex, expected_count=1, 
timeout_s=6):
+  def assert_impalad_log_contains(self, level, line_regex, expected_count=1, 
timeout_s=6,
+      dry_run=False):
     """
     Convenience wrapper around assert_log_contains for impalad logs.
     """
     return self.assert_log_contains(
-        "impalad", level, line_regex, expected_count, timeout_s)
+        "impalad", level, line_regex, expected_count, timeout_s, dry_run)
 
   def assert_catalogd_log_contains(self, level, line_regex, expected_count=1,
-      timeout_s=6):
+      timeout_s=6, dry_run=False):
     """
     Convenience wrapper around assert_log_contains for catalogd logs.
     """
     return self.assert_log_contains(
-        "catalogd", level, line_regex, expected_count, timeout_s)
+        "catalogd", level, line_regex, expected_count, timeout_s, dry_run)
 
-  def assert_log_contains(self, daemon, level, line_regex, expected_count=1, 
timeout_s=6):
+  def assert_log_contains(self, daemon, level, line_regex, expected_count=1, 
timeout_s=6,
+      dry_run=False):
     """
     Assert that the daemon log with specified level (e.g. ERROR, WARNING, 
INFO) contains
     expected_count lines with a substring matching the regex. When 
expected_count is -1,
@@ -1312,14 +1319,15 @@ class ImpalaTestSuite(BaseTestSuite):
             if re_result:
               found += 1
               last_re_result = re_result
-        if expected_count == -1:
-          assert found > 0, "Expected at least one line in file %s matching 
regex '%s'"\
-            ", but found none." % (log_file_path, line_regex)
-        else:
-          assert found == expected_count, \
-            "Expected %d lines in file %s matching regex '%s', but found %d 
lines. "\
-            "Last line was: \n%s" %\
-            (expected_count, log_file_path, line_regex, found, line)
+        if not dry_run:
+          if expected_count == -1:
+            assert found > 0, "Expected at least one line in file %s matching 
regex '%s'"\
+              ", but found none." % (log_file_path, line_regex)
+          else:
+            assert found == expected_count, \
+              "Expected %d lines in file %s matching regex '%s', but found %d 
lines. "\
+              "Last line was: \n%s" %\
+              (expected_count, log_file_path, line_regex, found, line)
         return last_re_result
       except AssertionError as e:
         # Re-throw the exception to the caller only when the timeout is 
expired. Otherwise
diff --git a/tests/custom_cluster/test_partition.py 
b/tests/custom_cluster/test_partition.py
index 89d80f2c1..a65296c34 100644
--- a/tests/custom_cluster/test_partition.py
+++ b/tests/custom_cluster/test_partition.py
@@ -19,6 +19,7 @@ from __future__ import absolute_import, division, 
print_function
 import logging
 import pytest
 import shutil
+import time
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.skip import SkipIfFS
@@ -77,3 +78,136 @@ class TestPartition(CustomClusterTestSuite):
         shutil.rmtree(local_file_dir)
       except OSError as e:
         LOG.info("Cannot remove directory %s, %s " % (local_file_dir, 
e.strerror))
+
+
+class TestPartitionDeletion(CustomClusterTestSuite):
+  """Tests catalogd sends deletion updates (i.e. isDeleted=true) for dropped 
partitions.
+     Use a normal catalog update frequency (2s) instead of the default one in 
custom
+     cluster tests (50ms) so the race conditions of IMPALA-13009 could 
happen."""
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestPartitionDeletion, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
+
+    # It doesn't matter what the file format is. So just test on text/none.
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        (v.get_value('table_format').file_format == 'text'
+         and v.get_value('table_format').compression_codec == 'none'))
+
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--statestore_update_frequency_ms=2000",
+    impalad_args="--use_local_catalog=false",
+    catalogd_args="--catalog_topic_mode=full --hms_event_polling_interval_s=0")
+  def test_legacy_catalog_no_event_processing(self, unique_database):
+    self._test_partition_deletion(unique_database)
+
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--statestore_update_frequency_ms=2000",
+    impalad_args="--use_local_catalog=false",
+    catalogd_args="--catalog_topic_mode=full --hms_event_polling_interval_s=1")
+  def test_legacy_catalog_with_event_processing(self, unique_database):
+    self._test_partition_deletion(unique_database)
+
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--statestore_update_frequency_ms=2000",
+    impalad_args="--use_local_catalog=true",
+    catalogd_args="--catalog_topic_mode=minimal 
--hms_event_polling_interval_s=0")
+  def test_local_catalog_no_event_processing(self, unique_database):
+    self._test_partition_deletion(unique_database)
+
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--statestore_update_frequency_ms=2000",
+    impalad_args="--use_local_catalog=true",
+    catalogd_args="--catalog_topic_mode=minimal 
--hms_event_polling_interval_s=1")
+  def test_local_catalog_with_event_processing(self, unique_database):
+    self._test_partition_deletion(unique_database)
+
+  def _test_partition_deletion(self, unique_database):
+    tbl = unique_database + ".part_tbl"
+    self.client.execute("create table {}(i int) partitioned by (p 
int)".format(tbl))
+    self.client.execute("alter table {} add partition(p=0)".format(tbl))
+
+    #############################################################
+    # Test 1: DropPartition + Invalidate + Load
+    #
+    # Add and drop different partitions
+    for i in range(1, 4):
+      self.client.execute("alter table {} add partition(p={})".format(tbl, i))
+      # Wait 1s so catalogd has chance to propagate new partitions before we 
drop them.
+      time.sleep(1)
+      # If the following 3 statements are executed in a catalog topic update 
cycle, it
+      # covers the bug of IMPALA-13009.
+      self.client.execute("alter table {} drop partition(p>0)".format(tbl))
+      self.client.execute("invalidate metadata {}".format(tbl))
+      # Trigger metadata loading on this table
+      self.client.execute("describe {}".format(tbl))
+
+    res = self.client.execute("show partitions {}".format(tbl))
+    assert self.has_value("p=0", res.data)
+    # The last line is the total summary
+    assert len(res.data) == 2
+
+    # Check catalogd has sent deletions for dropped partitions if their 
updates have been
+    # sent before.
+    update_log_regex = "Collected . partition update.*HDFS_PARTITION:{}:.*p={}"
+    deletion_log_regex = "Collected . partition 
deletion.*HDFS_PARTITION:{}:.*p={}"
+    for i in range(1, 4):
+      update_found = self.assert_catalogd_log_contains("INFO",
+          update_log_regex.format(tbl, i), dry_run=True)
+      if update_found:
+        self.assert_catalogd_log_contains("INFO", 
deletion_log_regex.format(tbl, i))
+
+    # Restart impalad and check the partitions on it
+    self.cluster.impalads[0].restart()
+    self.client = self.create_impala_client()
+    new_res = self.client.execute("show partitions {}".format(tbl))
+    assert new_res.data == res.data
+    self.assert_impalad_log_contains("WARNING", "stale partition", 
expected_count=0)
+    self.assert_impalad_log_contains("ERROR", "stale partition", 
expected_count=0)
+
+    #############################################################
+    # Test 2: UpdatePartition + Invalidate + Load
+    #
+    # Updates the partition, invalidates the table and then reloads it. Checks 
the dropped
+    # version of the partition in the removed table version won't interfere 
with the
+    # update. Run this 5 times so they could happen inside a catalog update 
cycle.
+    self.client.execute("alter table {} add partition(p=5)".format(tbl))
+    for num in range(5):
+      self.client.execute("refresh {} partition(p=5)".format(tbl))
+      self.client.execute("invalidate metadata " + tbl)
+      res = self.client.execute("show partitions " + tbl)
+      assert self.has_value("p=0", res.data)
+      assert self.has_value("p=5", res.data)
+      # The last line is the total summary
+      assert len(res.data) == 3
+
+    # Restart impalad and check the partitions on it
+    self.cluster.impalads[0].restart()
+    self.client = self.create_impala_client()
+    new_res = self.client.execute("show partitions {}".format(tbl))
+    assert new_res.data == res.data
+    self.assert_impalad_log_contains("WARNING", "stale partition", 
expected_count=0)
+    self.assert_impalad_log_contains("ERROR", "stale partition", 
expected_count=0)
+
+    #############################################################
+    # Test 3: DropPartition + DropTable + CreateTable + Load
+    #
+    # Check no leaks if the partition and table are dropped sequentially.
+    self.client.execute("alter table {} drop partition(p=0)".format(tbl))
+    self.client.execute("drop table " + tbl)
+    time.sleep(2)
+    # Re-create and reload the HdfsTable so Impalad will see a HdfsTable with 
an empty
+    # partition map. Any leaked stale partitions in the catalog topic will be 
reported.
+    self.client.execute("create table {}(i int) partitioned by (p 
int)".format(tbl))
+    self.client.execute("describe " + tbl)
+    # Restart impalad and check the partitions on it
+    self.cluster.impalads[0].restart()
+    self.client = self.create_impala_client()
+    res = self.client.execute("show partitions {}".format(tbl))
+    assert len(res.data) == 1
+    self.assert_impalad_log_contains("WARNING", "stale partition", 
expected_count=0)
+    self.assert_impalad_log_contains("ERROR", "stale partition", 
expected_count=0)
diff --git a/tests/metadata/test_recover_partitions.py 
b/tests/metadata/test_recover_partitions.py
index 362c0fa83..4f0f4b081 100644
--- a/tests/metadata/test_recover_partitions.py
+++ b/tests/metadata/test_recover_partitions.py
@@ -457,10 +457,6 @@ class TestRecoverPartitions(ImpalaTestSuite):
         "ALTER TABLE %s RECOVER PARTITIONS failed to handle "\
         "invalid partition key values." % fq_tbl_name
 
-  def has_value(self, value, lines):
-    """Check if lines contain value."""
-    return any([line.find(value) != -1 for line in lines])
-
   def count_partition(self, lines):
     """Count the number of partitions in the lines."""
     return self.count_value(WAREHOUSE, lines)

Reply via email to