This is an automated email from the ASF dual-hosted git repository. boroknagyz pushed a commit to branch branch-4.4.0 in repository https://gitbox.apache.org/repos/asf/impala.git
commit 5d32919f46117213249c60574f77e3f9bb66ed90 Author: stiga-huang <[email protected]> AuthorDate: Thu Apr 18 10:32:58 2024 +0800 IMPALA-13009: Fix catalogd not sending deletion updates for some dropped partitions *Background* Since IMPALA-3127, catalogd sends incremental partition updates based on the last sent table snapshot ('maxSentPartitionId_' to be specific). Dropped partitions since the last catalog update are tracked in 'droppedPartitions_' of HdfsTable. When catalogd collects the next catalog update, they will be collected. HdfsTable then clears the set. See details in CatalogServiceCatalog#addHdfsPartitionsToCatalogDelta(). If an HdfsTable is invalidated, it's replaced with an IncompleteTable which doesn't track any partitions. The HdfsTable object is then added to the deleteLog so catalogd can send deletion updates for all its partitions. The same if the HdfsTable is dropped. However, the previously dropped partitions are not collected in this case, which results in a leak in the catalog topic if the partition name is not reused anymore. Note that in the catalog topic, the key of a partition update consists of the table name and the partition name. So if the partition is added back to the table, the topic key will be reused then resolves the leak. The leak will be observed when a coordinator restarts. In the initial catalog update sent from statestore, coordinator will find some partition updates that are not referenced by the HdfsTable (assuming the table is used again after the INVALIDATE). Then a Precondition check fails and the table is not added to the coordinator. *Overview of the patch* This patch fixes the leak by also collecting the dropped partitions when adding the HdfsTable to the deleteLog. A new field, dropped_partitions, is added in THdfsTable to collect them. It's only used when catalogd collects catalog updates. Removes the Precondition check in coordinator and just reports the stale partitions since IMPALA-12831 could also introduce them. Also adds a log line in CatalogOpExecutor.alterTableDropPartition() to show the dropped partition names for better diagnostics. Tests - Added e2e tests Change-Id: I12a68158dca18ee48c9564ea16b7484c9f5b5d21 Reviewed-on: http://gerrit.cloudera.org:8080/21326 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> (cherry picked from commit ee21427d26620b40d38c706b4944d2831f84f6f5) --- common/thrift/CatalogObjects.thrift | 4 + .../impala/catalog/CatalogServiceCatalog.java | 107 +++++++++++----- .../java/org/apache/impala/catalog/HdfsTable.java | 15 ++- .../org/apache/impala/catalog/ImpaladCatalog.java | 16 ++- .../apache/impala/service/CatalogOpExecutor.java | 2 + tests/common/impala_test_suite.py | 34 ++++-- tests/custom_cluster/test_partition.py | 134 +++++++++++++++++++++ tests/metadata/test_recover_partitions.py | 4 - 8 files changed, 262 insertions(+), 54 deletions(-) diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift index a0e28a92a..30b856e7e 100644 --- a/common/thrift/CatalogObjects.thrift +++ b/common/thrift/CatalogObjects.thrift @@ -518,6 +518,10 @@ struct THdfsTable { // Bucket information for HDFS tables 16: optional TBucketInfo bucket_info + + // Recently dropped partitions that are not yet synced to the catalog topic. + // Only used in catalogd. + 17: optional list<THdfsPartition> dropped_partitions } struct THBaseTable { diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index fc8fa029e..b58970ea7 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -1063,38 +1063,7 @@ public class CatalogServiceCatalog extends Catalog { Catalog.toCatalogObjectKey(removedObject))) { ctx.addCatalogObject(removedObject, true); } - // If this is a HdfsTable and incremental metadata updates are enabled, make sure we - // send deletes for removed partitions. So we won't leak partition topic entries in - // the statestored catalog topic. Partitions are only included as objects in topic - // updates if incremental metadata updates are enabled. Don't need this if - // incremental metadata updates are disabled, because in this case the table - // snapshot will be sent as a complete object. See more details in - // addTableToCatalogDeltaHelper(). - if (BackendConfig.INSTANCE.isIncrementalMetadataUpdatesEnabled() - && removedObject.type == TCatalogObjectType.TABLE - && removedObject.getTable().getTable_type() == TTableType.HDFS_TABLE) { - THdfsTable hdfsTable = removedObject.getTable().getHdfs_table(); - Preconditions.checkState( - !hdfsTable.has_full_partitions && hdfsTable.has_partition_names, - /*errorMessage*/hdfsTable); - String tblName = removedObject.getTable().db_name + "." - + removedObject.getTable().tbl_name; - PartitionMetaSummary deleteSummary = createPartitionMetaSummary(tblName); - for (THdfsPartition part : hdfsTable.partitions.values()) { - Preconditions.checkState(part.id >= HdfsPartition.INITIAL_PARTITION_ID - && part.db_name != null - && part.tbl_name != null - && part.partition_name != null, /*errorMessage*/part); - TCatalogObject removedPart = new TCatalogObject(HDFS_PARTITION, - removedObject.getCatalog_version()); - removedPart.setHdfs_partition(part); - if (!ctx.updatedCatalogObjects.contains( - Catalog.toCatalogObjectKey(removedPart))) { - ctx.addCatalogObject(removedPart, true, deleteSummary); - } - } - if (deleteSummary.hasUpdates()) LOG.info(deleteSummary.toString()); - } + collectPartitionDeletion(ctx, removedObject); } // Each topic update should contain a single "TCatalog" object which is used to // pass overall state on the catalog, such as the current version and the @@ -1122,6 +1091,80 @@ public class CatalogServiceCatalog extends Catalog { return ctx.toVersion; } + /** + * Collects partition deletion from removed HdfsTable objects. + */ + private void collectPartitionDeletion(GetCatalogDeltaContext ctx, + TCatalogObject removedObject) throws TException { + // If this is a HdfsTable and incremental metadata updates are enabled, make sure we + // send deletes for removed partitions. So we won't leak partition topic entries in + // the statestored catalog topic. Partitions are only included as objects in topic + // updates if incremental metadata updates are enabled. Don't need this if + // incremental metadata updates are disabled, because in this case the table + // snapshot will be sent as a complete object. See more details in + // addTableToCatalogDeltaHelper(). + if (!BackendConfig.INSTANCE.isIncrementalMetadataUpdatesEnabled() + || removedObject.type != TCatalogObjectType.TABLE + || removedObject.getTable().getTable_type() != TTableType.HDFS_TABLE) { + return; + } + THdfsTable hdfsTable = removedObject.getTable().getHdfs_table(); + Preconditions.checkState( + !hdfsTable.has_full_partitions && hdfsTable.has_partition_names, + /*errorMessage*/hdfsTable); + String tblName = removedObject.getTable().db_name + "." + + removedObject.getTable().tbl_name; + PartitionMetaSummary deleteSummary = createPartitionMetaSummary(tblName); + Set<String> collectedPartNames = new HashSet<>(); + for (THdfsPartition part : hdfsTable.partitions.values()) { + Preconditions.checkState(part.id >= HdfsPartition.INITIAL_PARTITION_ID + && part.db_name != null + && part.tbl_name != null + && part.partition_name != null, /*errorMessage*/part); + TCatalogObject removedPart = new TCatalogObject(HDFS_PARTITION, + removedObject.getCatalog_version()); + removedPart.setHdfs_partition(part); + String partObjKey = Catalog.toCatalogObjectKey(removedPart); + boolean collected = false; + if (!ctx.updatedCatalogObjects.contains(partObjKey)) { + ctx.addCatalogObject(removedPart, true, deleteSummary); + collectedPartNames.add(part.partition_name); + collected = true; + } + LOG.trace("{} deletion of {} id={} from the active partition set " + + "of a removed/invalidated table (version={})", + collected ? "Collected" : "Skipped", partObjKey, part.id, + removedObject.getCatalog_version()); + } + // Adds the recently dropped partitions that are not yet synced to the catalog + // topic. + if (hdfsTable.isSetDropped_partitions()) { + for (THdfsPartition part : hdfsTable.dropped_partitions) { + // If a partition is dropped and then re-added, the old instance is added to + // droppedPartitions and the new instance is in partitionMap of HdfsTable. + // So partitions collected in the above loop could have the same name here. + if (collectedPartNames.contains(part.partition_name)) continue; + TCatalogObject removedPart = new TCatalogObject(HDFS_PARTITION, + removedObject.getCatalog_version()); + removedPart.setHdfs_partition(part); + String partObjKey = Catalog.toCatalogObjectKey(removedPart); + // Skip if there is an update of the partition collected. It could be + // collected from a new version of the HdfsTable and this comes from an + // invalidated HdfsTable. + boolean collected = false; + if (!ctx.updatedCatalogObjects.contains(partObjKey)) { + ctx.addCatalogObject(removedPart, true, deleteSummary); + collected = true; + } + LOG.trace("{} deletion of {} id={} from the dropped partition set " + + "of a removed/invalidated table (version={})", + collected ? "Collected" : "Skipped", partObjKey, part.id, + removedObject.getCatalog_version()); + } + } + if (deleteSummary.hasUpdates()) LOG.info(deleteSummary.toString()); + } + /** * Evaluates if the information from an event (serviceId and versionNumber) matches to * the catalog object. If there is match, the in-flight version for that object is diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 45fb94e39..8954eab0b 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -2079,10 +2079,14 @@ public class HdfsTable extends Table implements FeFsTable { public void validatePartitions(Set<Long> expectedPartitionIds) throws TableLoadingException { if (!partitionMap_.keySet().equals(expectedPartitionIds)) { + Set<Long> missingIds = new HashSet<>(expectedPartitionIds); + missingIds.removeAll(partitionMap_.keySet()); + Set<Long> staleIds = new HashSet<>(partitionMap_.keySet()); + staleIds.removeAll(expectedPartitionIds); throw new TableLoadingException(String.format("Error applying incremental updates" + - " on table %s: missing partition ids %s, stale partition ids %s", - getFullName(), expectedPartitionIds.removeAll(partitionMap_.keySet()), - partitionMap_.keySet().removeAll(expectedPartitionIds))); + " on table %s. Missing partition ids: %s. Stale partition ids: %s. Total " + + "partitions: %d.", + getFullName(), missingIds, staleIds, partitionMap_.size())); } } @@ -2188,6 +2192,11 @@ public class HdfsTable extends Table implements FeFsTable { for (HdfsPartition part : partitionMap_.values()) { hdfsTable.partitions.put(part.getId(), part.toMinimalTHdfsPartition()); } + // Adds the recently dropped partitions that are not yet synced to the catalog + // topic. + for (HdfsPartition part : droppedPartitions_) { + hdfsTable.addToDropped_partitions(part.toMinimalTHdfsPartition()); + } hdfsTable.setHas_full_partitions(false); // The minimal catalog object of partitions contain the partition names. hdfsTable.setHas_partition_names(true); diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java index 430736a07..2962325be 100644 --- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java @@ -516,12 +516,17 @@ public class ImpaladCatalog extends Catalog implements FeCatalog { } } // Apply incremental updates. + List<String> stalePartitionNames = new ArrayList<>(); for (THdfsPartition tPart : newPartitions) { // TODO: remove this after IMPALA-9937. It's only illegal in statestore updates, // which indicates a leak of partitions in the catalog topic - a stale partition // should already have a corresponding deletion so won't get here. - Preconditions.checkState(tHdfsTable.partitions.containsKey(tPart.id), - "Received stale partition in a statestore update: " + tPart); + if (!tHdfsTable.partitions.containsKey(tPart.id)) { + stalePartitionNames.add(tPart.partition_name); + LOG.warn("Stale partition: {}.{}:{} id={}", tPart.db_name, tPart.tbl_name, + tPart.partition_name, tPart.id); + continue; + } // The existing table could have a newer version than the last sent table version // in catalogd, so some partition instances may already exist here. This happens // when we have executed DDL/DMLs on this table on this coordinator since the last @@ -541,6 +546,13 @@ public class ImpaladCatalog extends Catalog implements FeCatalog { tPart.id, tPart.partition_name, newHdfsTable.getFullName()); numNewParts++; } + if (!stalePartitionNames.isEmpty()) { + LOG.warn("Received {} stale partitions of table {}.{} in the statestore update:" + + " {}. There are possible leaks in the catalog topic values. To resolve " + + "the leak, add them back and then drop them again.", + stalePartitionNames.size(), thriftTable.db_name, thriftTable.tbl_name, + String.join(",", stalePartitionNames)); + } // Validate that all partitions are set. ((HdfsTable) newTable).validatePartitions(tHdfsTable.partitions.keySet()); LOG.info("Applied incremental table updates on {} existing partitions of " + diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index f8475e5f2..6b5ffa01d 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -5526,6 +5526,8 @@ public class CatalogOpExecutor { try { msClient.getHiveClient().dropPartition(tableName.getDb(), tableName.getTbl(), part.getPartitionValuesAsStrings(true), dropOptions); + LOG.info("Dropped partition {}.{}:{} in Metastore", + tableName.getDb(), tableName.getTbl(), part.getPartitionName()); ++numTargetedPartitions; } catch (NoSuchObjectException e) { if (!ifExists) { diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index 40020e3d0..e5122676a 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -1201,6 +1201,11 @@ class ImpalaTestSuite(BaseTestSuite): else: return "UNIDENTIFIED" + @classmethod + def has_value(cls, value, lines): + """Check if lines contain value.""" + return any([line.find(value) != -1 for line in lines]) + def wait_for_db_to_appear(self, db_name, timeout_s): """Wait until the database with 'db_name' is present in the impalad's local catalog. Fail after timeout_s if the doesn't appear.""" @@ -1263,22 +1268,24 @@ class ImpalaTestSuite(BaseTestSuite): "Check failed to return True after {0} tries and {1} seconds{2}".format( count, timeout_s, error_msg_str)) - def assert_impalad_log_contains(self, level, line_regex, expected_count=1, timeout_s=6): + def assert_impalad_log_contains(self, level, line_regex, expected_count=1, timeout_s=6, + dry_run=False): """ Convenience wrapper around assert_log_contains for impalad logs. """ return self.assert_log_contains( - "impalad", level, line_regex, expected_count, timeout_s) + "impalad", level, line_regex, expected_count, timeout_s, dry_run) def assert_catalogd_log_contains(self, level, line_regex, expected_count=1, - timeout_s=6): + timeout_s=6, dry_run=False): """ Convenience wrapper around assert_log_contains for catalogd logs. """ return self.assert_log_contains( - "catalogd", level, line_regex, expected_count, timeout_s) + "catalogd", level, line_regex, expected_count, timeout_s, dry_run) - def assert_log_contains(self, daemon, level, line_regex, expected_count=1, timeout_s=6): + def assert_log_contains(self, daemon, level, line_regex, expected_count=1, timeout_s=6, + dry_run=False): """ Assert that the daemon log with specified level (e.g. ERROR, WARNING, INFO) contains expected_count lines with a substring matching the regex. When expected_count is -1, @@ -1312,14 +1319,15 @@ class ImpalaTestSuite(BaseTestSuite): if re_result: found += 1 last_re_result = re_result - if expected_count == -1: - assert found > 0, "Expected at least one line in file %s matching regex '%s'"\ - ", but found none." % (log_file_path, line_regex) - else: - assert found == expected_count, \ - "Expected %d lines in file %s matching regex '%s', but found %d lines. "\ - "Last line was: \n%s" %\ - (expected_count, log_file_path, line_regex, found, line) + if not dry_run: + if expected_count == -1: + assert found > 0, "Expected at least one line in file %s matching regex '%s'"\ + ", but found none." % (log_file_path, line_regex) + else: + assert found == expected_count, \ + "Expected %d lines in file %s matching regex '%s', but found %d lines. "\ + "Last line was: \n%s" %\ + (expected_count, log_file_path, line_regex, found, line) return last_re_result except AssertionError as e: # Re-throw the exception to the caller only when the timeout is expired. Otherwise diff --git a/tests/custom_cluster/test_partition.py b/tests/custom_cluster/test_partition.py index 89d80f2c1..a65296c34 100644 --- a/tests/custom_cluster/test_partition.py +++ b/tests/custom_cluster/test_partition.py @@ -19,6 +19,7 @@ from __future__ import absolute_import, division, print_function import logging import pytest import shutil +import time from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.skip import SkipIfFS @@ -77,3 +78,136 @@ class TestPartition(CustomClusterTestSuite): shutil.rmtree(local_file_dir) except OSError as e: LOG.info("Cannot remove directory %s, %s " % (local_file_dir, e.strerror)) + + +class TestPartitionDeletion(CustomClusterTestSuite): + """Tests catalogd sends deletion updates (i.e. isDeleted=true) for dropped partitions. + Use a normal catalog update frequency (2s) instead of the default one in custom + cluster tests (50ms) so the race conditions of IMPALA-13009 could happen.""" + @classmethod + def get_workload(self): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestPartitionDeletion, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension()) + + # It doesn't matter what the file format is. So just test on text/none. + cls.ImpalaTestMatrix.add_constraint(lambda v: + (v.get_value('table_format').file_format == 'text' + and v.get_value('table_format').compression_codec == 'none')) + + @CustomClusterTestSuite.with_args( + statestored_args="--statestore_update_frequency_ms=2000", + impalad_args="--use_local_catalog=false", + catalogd_args="--catalog_topic_mode=full --hms_event_polling_interval_s=0") + def test_legacy_catalog_no_event_processing(self, unique_database): + self._test_partition_deletion(unique_database) + + @CustomClusterTestSuite.with_args( + statestored_args="--statestore_update_frequency_ms=2000", + impalad_args="--use_local_catalog=false", + catalogd_args="--catalog_topic_mode=full --hms_event_polling_interval_s=1") + def test_legacy_catalog_with_event_processing(self, unique_database): + self._test_partition_deletion(unique_database) + + @CustomClusterTestSuite.with_args( + statestored_args="--statestore_update_frequency_ms=2000", + impalad_args="--use_local_catalog=true", + catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=0") + def test_local_catalog_no_event_processing(self, unique_database): + self._test_partition_deletion(unique_database) + + @CustomClusterTestSuite.with_args( + statestored_args="--statestore_update_frequency_ms=2000", + impalad_args="--use_local_catalog=true", + catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=1") + def test_local_catalog_with_event_processing(self, unique_database): + self._test_partition_deletion(unique_database) + + def _test_partition_deletion(self, unique_database): + tbl = unique_database + ".part_tbl" + self.client.execute("create table {}(i int) partitioned by (p int)".format(tbl)) + self.client.execute("alter table {} add partition(p=0)".format(tbl)) + + ############################################################# + # Test 1: DropPartition + Invalidate + Load + # + # Add and drop different partitions + for i in range(1, 4): + self.client.execute("alter table {} add partition(p={})".format(tbl, i)) + # Wait 1s so catalogd has chance to propagate new partitions before we drop them. + time.sleep(1) + # If the following 3 statements are executed in a catalog topic update cycle, it + # covers the bug of IMPALA-13009. + self.client.execute("alter table {} drop partition(p>0)".format(tbl)) + self.client.execute("invalidate metadata {}".format(tbl)) + # Trigger metadata loading on this table + self.client.execute("describe {}".format(tbl)) + + res = self.client.execute("show partitions {}".format(tbl)) + assert self.has_value("p=0", res.data) + # The last line is the total summary + assert len(res.data) == 2 + + # Check catalogd has sent deletions for dropped partitions if their updates have been + # sent before. + update_log_regex = "Collected . partition update.*HDFS_PARTITION:{}:.*p={}" + deletion_log_regex = "Collected . partition deletion.*HDFS_PARTITION:{}:.*p={}" + for i in range(1, 4): + update_found = self.assert_catalogd_log_contains("INFO", + update_log_regex.format(tbl, i), dry_run=True) + if update_found: + self.assert_catalogd_log_contains("INFO", deletion_log_regex.format(tbl, i)) + + # Restart impalad and check the partitions on it + self.cluster.impalads[0].restart() + self.client = self.create_impala_client() + new_res = self.client.execute("show partitions {}".format(tbl)) + assert new_res.data == res.data + self.assert_impalad_log_contains("WARNING", "stale partition", expected_count=0) + self.assert_impalad_log_contains("ERROR", "stale partition", expected_count=0) + + ############################################################# + # Test 2: UpdatePartition + Invalidate + Load + # + # Updates the partition, invalidates the table and then reloads it. Checks the dropped + # version of the partition in the removed table version won't interfere with the + # update. Run this 5 times so they could happen inside a catalog update cycle. + self.client.execute("alter table {} add partition(p=5)".format(tbl)) + for num in range(5): + self.client.execute("refresh {} partition(p=5)".format(tbl)) + self.client.execute("invalidate metadata " + tbl) + res = self.client.execute("show partitions " + tbl) + assert self.has_value("p=0", res.data) + assert self.has_value("p=5", res.data) + # The last line is the total summary + assert len(res.data) == 3 + + # Restart impalad and check the partitions on it + self.cluster.impalads[0].restart() + self.client = self.create_impala_client() + new_res = self.client.execute("show partitions {}".format(tbl)) + assert new_res.data == res.data + self.assert_impalad_log_contains("WARNING", "stale partition", expected_count=0) + self.assert_impalad_log_contains("ERROR", "stale partition", expected_count=0) + + ############################################################# + # Test 3: DropPartition + DropTable + CreateTable + Load + # + # Check no leaks if the partition and table are dropped sequentially. + self.client.execute("alter table {} drop partition(p=0)".format(tbl)) + self.client.execute("drop table " + tbl) + time.sleep(2) + # Re-create and reload the HdfsTable so Impalad will see a HdfsTable with an empty + # partition map. Any leaked stale partitions in the catalog topic will be reported. + self.client.execute("create table {}(i int) partitioned by (p int)".format(tbl)) + self.client.execute("describe " + tbl) + # Restart impalad and check the partitions on it + self.cluster.impalads[0].restart() + self.client = self.create_impala_client() + res = self.client.execute("show partitions {}".format(tbl)) + assert len(res.data) == 1 + self.assert_impalad_log_contains("WARNING", "stale partition", expected_count=0) + self.assert_impalad_log_contains("ERROR", "stale partition", expected_count=0) diff --git a/tests/metadata/test_recover_partitions.py b/tests/metadata/test_recover_partitions.py index 362c0fa83..4f0f4b081 100644 --- a/tests/metadata/test_recover_partitions.py +++ b/tests/metadata/test_recover_partitions.py @@ -457,10 +457,6 @@ class TestRecoverPartitions(ImpalaTestSuite): "ALTER TABLE %s RECOVER PARTITIONS failed to handle "\ "invalid partition key values." % fq_tbl_name - def has_value(self, value, lines): - """Check if lines contain value.""" - return any([line.find(value) != -1 for line in lines]) - def count_partition(self, lines): """Count the number of partitions in the lines.""" return self.count_value(WAREHOUSE, lines)
