This is an automated email from the ASF dual-hosted git repository.
csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new b67a9cecb IMPALA-13593: Enable event processor to consume
ALTER_PARTITIONS events from metastore
b67a9cecb is described below
commit b67a9cecb33a7241ad99b33262173dd0287cfe3b
Author: Sai Hemanth Gantasala <[email protected]>
AuthorDate: Mon Dec 9 14:41:00 2024 -0800
IMPALA-13593: Enable event processor to consume ALTER_PARTITIONS events
from metastore
HIVE-27746 introduced ALTER_PARTITIONS event type which is an
optimization of reducing the bulk ALTER_PARTITION events into a single
event. The components version is updated to pick up this change. It
would be a good optimization to include this in Impala so that the
number of events consumed by event processor would be significantly
reduced and help event processor to catch up with events quickly.
This patch enables the ability to consume ALTER_PARTITIONS event. The
downside of this patch is that, there is no before_partitions object in
the event message. This can cause partitions to be refreshed even on
trivial changes to them. HIVE-29141 will address this concern.
Testing:
- Added an end-to-end test to verify consuming the ALTER_PARTITIONS
event. Also, bigger time outs were added in this test as there was
flakiness observed while looping this test several times.
Change-Id: I009a87ef5e2c331272f9e2d7a6342cc860e64737
Reviewed-on: http://gerrit.cloudera.org:8080/22554
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Csaba Ringhofer <[email protected]>
---
be/src/catalog/catalog-server.cc | 5 +-
bin/create-test-configuration.sh | 7 +
bin/impala-config.sh | 22 ++--
.../org/apache/impala/compat/MetastoreShim.java | 8 ++
.../org/apache/impala/compat/MetastoreShim.java | 34 +++++
.../impala/catalog/Hive3MetastoreShimBase.java | 39 ++++++
.../impala/catalog/events/MetastoreEvents.java | 144 +++++++++++++++------
fe/src/test/resources/hive-site.xml.py | 5 +
tests/custom_cluster/test_events_custom_configs.py | 63 +++++++++
9 files changed, 275 insertions(+), 52 deletions(-)
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 63b054109..2208bca77 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -241,12 +241,11 @@ DEFINE_string(default_skipped_hms_event_types,
"the latest event time in HMS.");
DEFINE_string(common_hms_event_types,
"ADD_PARTITION,ALTER_PARTITION,DROP_PARTITION,"
-
"ADD_PARTITION,ALTER_PARTITION,DROP_PARTITION,CREATE_TABLE,ALTER_TABLE,DROP_TABLE,"
+
"ALTER_PARTITIONS,CREATE_TABLE,ALTER_TABLE,DROP_TABLE,RELOAD,COMMIT_COMPACTION_EVENT"
"CREATE_DATABASE,ALTER_DATABASE,DROP_DATABASE,INSERT,OPEN_TXN,COMMIT_TXN,ABORT_TXN,"
"ALLOC_WRITE_ID_EVENT,ACID_WRITE_EVENT,BATCH_ACID_WRITE_EVENT,"
"UPDATE_TBL_COL_STAT_EVENT,DELETE_TBL_COL_STAT_EVENT,UPDATE_PART_COL_STAT_EVENT,"
-
"UPDATE_PART_COL_STAT_EVENT_BATCH,DELETE_PART_COL_STAT_EVENT,COMMIT_COMPACTION_EVENT,"
- "RELOAD",
+ "UPDATE_PART_COL_STAT_EVENT_BATCH,DELETE_PART_COL_STAT_EVENT,",
"Common HMS event types that will be used in eventTypeSkipList when
fetching events "
"from HMS. The strings come from constants in "
"org.apache.hadoop.hive.metastore.messaging.MessageBuilder. When bumping
Hive "
diff --git a/bin/create-test-configuration.sh b/bin/create-test-configuration.sh
index f8c376e92..6eb51fde7 100755
--- a/bin/create-test-configuration.sh
+++ b/bin/create-test-configuration.sh
@@ -169,6 +169,13 @@ rm -f hive-site-housekeeping-on/hive-site.xml
ln -s "${CONFIG_DIR}/hive-site_housekeeping_on.xml" \
hive-site-housekeeping-on/hive-site.xml
+export HIVE_VARIANT=events_config_change
+$IMPALA_HOME/bin/generate_xml_config.py hive-site.xml.py
hive-site_events_config.xml
+mkdir -p hive-site-events-config
+rm -f hive-site-events-config/hive-site.xml
+ln -s "${CONFIG_DIR}/hive-site_events_config.xml" \
+ hive-site-events-config/hive-site.xml
+
export HIVE_VARIANT=ranger_auth
HIVE_RANGER_CONF_DIR=hive-site-ranger-auth
$IMPALA_HOME/bin/generate_xml_config.py hive-site.xml.py
hive-site_ranger_auth.xml
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 88aab1813..8a4ec56e6 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -240,19 +240,19 @@ fi
: ${IMPALA_TOOLCHAIN_HOST:=native-toolchain.s3.amazonaws.com}
export IMPALA_TOOLCHAIN_HOST
-export CDP_BUILD_NUMBER=58457853
+export CDP_BUILD_NUMBER=66846208
export CDP_MAVEN_REPOSITORY=\
"https://${IMPALA_TOOLCHAIN_HOST}/build/cdp_components/${CDP_BUILD_NUMBER}/maven"
-export CDP_AVRO_JAVA_VERSION=1.11.1.7.3.1.0-160
-export CDP_HADOOP_VERSION=3.1.1.7.3.1.0-160
-export CDP_HBASE_VERSION=2.4.17.7.3.1.0-160
-export CDP_HIVE_VERSION=3.1.3000.7.3.1.0-160
-export CDP_ICEBERG_VERSION=1.3.1.7.3.1.0-160
-export CDP_KNOX_VERSION=2.0.0.7.3.1.0-160
-export CDP_OZONE_VERSION=1.3.0.7.3.1.0-160
-export CDP_PARQUET_VERSION=1.12.3.7.3.1.0-160
-export CDP_RANGER_VERSION=2.4.0.7.3.1.0-160
-export CDP_TEZ_VERSION=0.9.1.7.3.1.0-160
+export CDP_AVRO_JAVA_VERSION=1.11.1.7.3.1.500-30
+export CDP_HADOOP_VERSION=3.1.1.7.3.1.500-30
+export CDP_HBASE_VERSION=2.4.17.7.3.1.500-30
+export CDP_HIVE_VERSION=3.1.3000.7.3.1.500-30
+export CDP_ICEBERG_VERSION=1.3.1.7.3.1.500-30
+export CDP_KNOX_VERSION=2.0.0.7.3.1.500-30
+export CDP_OZONE_VERSION=1.4.0.7.3.1.500-30
+export CDP_PARQUET_VERSION=1.12.3.7.3.1.500-30
+export CDP_RANGER_VERSION=2.4.0.7.3.1.500-30
+export CDP_TEZ_VERSION=0.9.1.7.3.1.500-30
# Ref: https://infra.apache.org/release-download-pages.html#closer
: ${APACHE_MIRROR:="https://www.apache.org/dyn/closer.cgi"}
diff --git
a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index ac0a0657c..24fe4933b 100644
---
a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++
b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -448,6 +448,14 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
throw new UnsupportedOperationException("Reload event is not supported.");
}
+ /**
+ * CDP Hive-3 only function.
+ */
+ public static AlterPartitionsInfo getFieldsFromAlterPartitionsEvent(
+ NotificationEvent event) throws MetastoreNotificationException {
+ throw new UnsupportedOperationException("AlterPartitions event is not
supported.");
+ }
+
/**
* CDP Hive-3 only function.
*/
diff --git
a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index e80669919..10f6c3b6c 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -79,6 +79,7 @@ import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
import org.apache.hadoop.hive.metastore.api.WriteNotificationLogBatchRequest;
import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.AlterPartitionsMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
import org.apache.hadoop.hive.metastore.messaging.CommitCompactionMessage;
@@ -606,6 +607,39 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
return updatedFields;
}
+ /**
+ * This method extracts the table, partitions, and isTruncateOp fields from
the
+ * notification event and returns them in a AlterPartitionsInfo class
object.
+ *
+ * @param event Metastore notification event,
+ * @return a AlterPartitionsInfo class object required for the reload event.
+ */
+ public static AlterPartitionsInfo getFieldsFromAlterPartitionsEvent(
+ NotificationEvent event) throws MetastoreNotificationException{
+ Preconditions.checkNotNull(event.getMessage());
+ AlterPartitionsMessage alterPartitionsMessage =
+ MetastoreEventsProcessor.getMessageDeserializer()
+ .getAlterPartitionsMessage(event.getMessage());
+ AlterPartitionsInfo alterPartitionsInfo = null;
+ try {
+ Iterator<Partition> partitionsIterator = Preconditions.checkNotNull(
+ alterPartitionsMessage.getPartitionObjs().iterator());
+ List<org.apache.hadoop.hive.metastore.api.Partition> partitionsAfter =
+ new ArrayList<>();
+ while (partitionsIterator.hasNext()) {
+ partitionsAfter.add(partitionsIterator.next());
+ }
+ org.apache.hadoop.hive.metastore.api.Table msTbl =
Preconditions.checkNotNull(
+ alterPartitionsMessage.getTableObj());
+ boolean isTruncateOp = alterPartitionsMessage.getIsTruncateOp();
+ alterPartitionsInfo = new AlterPartitionsInfo(msTbl, partitionsAfter,
+ isTruncateOp);
+ } catch (Exception e) {
+ throw new MetastoreNotificationException(e);
+ }
+ return alterPartitionsInfo;
+ }
+
/**
* This method extracts the partition name field from the
* notification event and returns it in the form of string.
diff --git
a/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java
b/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java
index 11a22c52b..447636ea1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java
@@ -842,4 +842,43 @@ public class Hive3MetastoreShimBase {
public static boolean validateColumnName(String name) {
return MetaStoreUtils.validateColumnName(name);
}
+
+ /**
+ * Constructs a new AlterPartitionsInfo object.
+ */
+ public static class AlterPartitionsInfo {
+ private final org.apache.hadoop.hive.metastore.api.Table msTable;
+ private final List<org.apache.hadoop.hive.metastore.api.Partition>
partitions;
+ private final boolean isTruncate;
+
+ public AlterPartitionsInfo(org.apache.hadoop.hive.metastore.api.Table
msTable,
+ List<org.apache.hadoop.hive.metastore.api.Partition> partitions,
+ boolean isTruncate) {
+ this.msTable = msTable;
+ this.partitions = partitions;
+ this.isTruncate = isTruncate;
+ }
+
+ /**
+ * Returns the Thrift representation of the table.
+ */
+ public org.apache.hadoop.hive.metastore.api.Table getMsTable() {
+ return msTable;
+ }
+
+ /**
+ * Returns the list of Thrift partition objects affected by the reload.
+ * Can be null or empty if not applicable.
+ */
+ public List<org.apache.hadoop.hive.metastore.api.Partition>
getPartitions() {
+ return partitions;
+ }
+
+ /**
+ * Returns true if the reload operation was due to a TRUNCATE TABLE.
+ */
+ public boolean isTruncate() {
+ return isTruncate;
+ }
+ }
}
diff --git
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 2405e91f6..1055b212c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Map;
@@ -128,6 +129,7 @@ public class MetastoreEvents {
DROP_DATABASE("DROP_DATABASE"),
ALTER_DATABASE("ALTER_DATABASE"),
ADD_PARTITION("ADD_PARTITION"),
+ ALTER_BATCH_PARTITIONS("ALTER_BATCH_PARTITIONS"),
ALTER_PARTITION("ALTER_PARTITION"),
ALTER_PARTITIONS("ALTER_PARTITIONS"),
DROP_PARTITION("DROP_PARTITION"),
@@ -227,6 +229,8 @@ public class MetastoreEvents {
return new DropPartitionEvent(catalogOpExecutor_, metrics, event);
case ALTER_PARTITION:
return new AlterPartitionEvent(catalogOpExecutor_, metrics, event);
+ case ALTER_PARTITIONS:
+ return new AlterPartitionsEvent(catalogOpExecutor_, metrics, event);
case RELOAD:
return new ReloadEvent(catalogOpExecutor_, metrics, event);
case INSERT:
@@ -1367,6 +1371,52 @@ public class MetastoreEvents {
return false;
}
+ protected void processAlterPartitionEvent(boolean isTruncateOp,
+ List<org.apache.hadoop.hive.metastore.api.Partition> partitionsAfter)
+ throws CatalogException, MetastoreNotificationException {
+ // Reload the whole table if it's a transactional table or materialized
view.
+ // Materialized views are treated as a special case because it's
possible to
+ // receive partition event on MVs, but they are regular views in Impala.
That
+ // cause problems on the reloading partition logic which expects it to
be a
+ // HdfsTable.
+ if (AcidUtils.isTransactionalTable(msTbl_.getParameters())
+ || MetaStoreUtils.isMaterializedViewTable(msTbl_)) {
+ reloadTransactionalTable(partitionsAfter);
+ } else {
+ try {
+ // load file metadata only if storage descriptor of partitionAfter_
differs
+ // from sd of HdfsPartition. If the alter_partition event type is of
truncate
+ // then force load the file metadata.
+ FileMetadataLoadOpts fileMetadataLoadOpts =
+ isTruncateOp ? FileMetadataLoadOpts.FORCE_LOAD :
+ FileMetadataLoadOpts.LOAD_IF_SD_CHANGED;
+ reloadPartitions(partitionsAfter, fileMetadataLoadOpts,
getEventDesc(),
+ false);
+ } catch (CatalogException e) {
+ throw new MetastoreNotificationException(
+ debugString("Refresh partitions on table {} failed. Event " +
+ "processing cannot continue. Issue an invalidate command to
reset " +
+ "the event processor state.", getFullyQualifiedTblName(),
e));
+ }
+ }
+ }
+
+ protected void reloadTransactionalTable(
+ List<org.apache.hadoop.hive.metastore.api.Partition> partitionsAfter)
+ throws CatalogException {
+ boolean incrementalRefresh =
+
BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable();
+ if (incrementalRefresh) {
+ reloadPartitionsFromEvent(partitionsAfter, getEventDesc()
+ + " FOR TRANSACTIONAL TABLE");
+ } else {
+ boolean notSkipped = reloadTableFromCatalog(true);
+ if (!notSkipped) {
+
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
+ }
+ }
+ }
+
@Override
protected void process() throws MetastoreNotificationException,
CatalogException {
Timer.Context context = null;
@@ -2657,7 +2707,7 @@ public class MetastoreEvents {
@Override
protected MetastoreEventType getBatchEventType() {
- return MetastoreEventType.ALTER_PARTITIONS;
+ return MetastoreEventType.ALTER_BATCH_PARTITIONS;
}
@Override
@@ -2721,32 +2771,7 @@ public class MetastoreEvents {
getEventId());
return;
}
- // Reload the whole table if it's a transactional table or materialized
view.
- // Materialized views are treated as a special case because it's
possible to
- // receive partition event on MVs, but they are regular views in Impala.
That
- // cause problems on the reloading partition logic which expects it to
be a
- // HdfsTable.
- if (AcidUtils.isTransactionalTable(msTbl_.getParameters())
- || MetaStoreUtils.isMaterializedViewTable(msTbl_)) {
- reloadTransactionalTable();
- } else {
- // Refresh the partition that was altered.
- Preconditions.checkNotNull(partitionAfter_);
- try {
- // load file metadata only if storage descriptor of partitionAfter_
differs
- // from sd of HdfsPartition. If the alter_partition event type is of
truncate
- // then force load the file metadata.
- FileMetadataLoadOpts fileMetadataLoadOpts =
- isTruncateOp_ ? FileMetadataLoadOpts.FORCE_LOAD :
- FileMetadataLoadOpts.LOAD_IF_SD_CHANGED;
- reloadPartitions(Arrays.asList(partitionAfter_),
fileMetadataLoadOpts,
- getEventDesc(), false);
- } catch (CatalogException e) {
- throw new MetastoreNotificationNeedsInvalidateException(
- debugString("Refresh partition on table {} partition {} failed.",
- getFullyQualifiedTblName(), partName_), e);
- }
- }
+ processAlterPartitionEvent(isTruncateOp_,
Arrays.asList(partitionAfter_));
}
@Override
@@ -2775,20 +2800,63 @@ public class MetastoreEvents {
Arrays.asList(getTPartitionSpecFromHmsPartition(msTbl_,
partitionAfter_)),
partitionAfter_.getParameters());
}
+ }
- private void reloadTransactionalTable() throws CatalogException {
- boolean incrementalRefresh =
-
BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable();
- if (incrementalRefresh) {
- reloadPartitionsFromEvent(Collections.singletonList(partitionAfter_),
- getEventDesc() + " FOR TRANSACTIONAL TABLE");
- } else {
- boolean notSkipped = reloadTableFromCatalog(true);
- if (!notSkipped) {
-
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
- }
+ public static class AlterPartitionsEvent extends MetastoreTableEvent {
+ public static final String EVENT_TYPE = "ALTER_PARTITIONS";
+ // the list of partition objects of alter operation, as parsed from the
+ // NotificationEvent
+ private final List<org.apache.hadoop.hive.metastore.api.Partition>
partitionsAfter_;
+ private final boolean isTruncateOp_;
+
+ /**
+ * Prevent instantiation from outside should use MetastoreEventFactory
instead
+ */
+ private AlterPartitionsEvent(CatalogOpExecutor catalogOpExecutor, Metrics
metrics,
+ NotificationEvent event) throws MetastoreNotificationException {
+ super(catalogOpExecutor, metrics, event);
+ Preconditions.checkState(getEventType().equals(
+ MetastoreEventType.ALTER_PARTITIONS));
+ try {
+ MetastoreShim.AlterPartitionsInfo alterPartitionsInfo =
+ MetastoreShim.getFieldsFromAlterPartitionsEvent(event);
+ msTbl_ = alterPartitionsInfo.getMsTable();
+ partitionsAfter_ = alterPartitionsInfo.getPartitions();
+ isTruncateOp_ = alterPartitionsInfo.isTruncate();
+ } catch (Exception e) {
+ throw new MetastoreNotificationException(
+ debugString("Unable to parse the alter partition message"), e);
}
}
+
+ @Override
+ public void processTableEvent() throws MetastoreNotificationException,
+ CatalogException {
+ if (partitionsAfter_.isEmpty()) {
+
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
+ warnLog("Not processing the alter partitions event {} as no partitions
are " +
+ "received in the event.", getEventId());
+ return;
+ }
+ if (isSelfEvent()) {
+ infoLog("Not processing the event as it is a self-event");
+ return;
+ }
+
+ if (isOlderEvent(partitionsAfter_.get(0))) {
+ infoLog("Not processing the alter partition event {} as it is an older
event",
+ getEventId());
+ return;
+ }
+ processAlterPartitionEvent(isTruncateOp_, partitionsAfter_);
+ }
+
+ @Override
+ public SelfEventContext getSelfEventContext() {
+ return new SelfEventContext(dbName_, tblName_,
+ Arrays.asList(getTPartitionSpecFromHmsPartition(msTbl_,
+ partitionsAfter_.get(0))),
partitionsAfter_.get(0).getParameters());
+ }
}
/**
diff --git a/fe/src/test/resources/hive-site.xml.py
b/fe/src/test/resources/hive-site.xml.py
index d3a3609b4..d2bf9a9d9 100644
--- a/fe/src/test/resources/hive-site.xml.py
+++ b/fe/src/test/resources/hive-site.xml.py
@@ -95,6 +95,11 @@ elif VARIANT == 'housekeeping_on':
CONFIG.update({
'hive.metastore.housekeeping.threads.on': 'true',
})
+elif VARIANT == 'events_config_change':
+ # HMS config change needed for HIVE-27746 to emit ALTER_PARTITIONS event
+ CONFIG.update({
+ 'hive.metastore.alterPartitions.notification.v2.enabled': 'true',
+ })
# HBase-related configs.
# Impala processes need to connect to zookeeper on INTERNAL_LISTEN_HOST for
HBase.
diff --git a/tests/custom_cluster/test_events_custom_configs.py
b/tests/custom_cluster/test_events_custom_configs.py
index 5d3b11b26..7a0f7031d 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -39,6 +39,8 @@ from tests.util.iceberg_util import IcebergCatalogs
HIVE_SITE_HOUSEKEEPING_ON =\
getenv('IMPALA_HOME') + '/fe/src/test/resources/hive-site-housekeeping-on'
+HIVE_SITE_ALTER_PARTITIONS_EVENT =\
+ getenv('IMPALA_HOME') + '/fe/src/test/resources/hive-site-events-config'
TRUNCATE_TBL_STMT = 'truncate table'
# The statestore heartbeat and topic update frequency (ms). Set low for
testing.
STATESTORE_RPC_FREQUENCY_MS = 100
@@ -1694,6 +1696,67 @@ class
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
self.client.execute("""DROP DATABASE {}
CASCADE""".format(unique_database))
self.client.execute("""CREATE DATABASE {}""".format(unique_database))
+ @SkipIf.is_test_jdk
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--use_local_catalog=true",
+ catalogd_args="--catalog_topic_mode=minimal
--hms_event_polling_interval_s=1 "
+ "--debug_actions=catalogd_event_processing_delay:SLEEP@1000",
+ hive_conf_dir=HIVE_SITE_ALTER_PARTITIONS_EVENT)
+ def test_alter_partitions_event_from_metastore(self, unique_database):
+ tbl = unique_database + ".test_alter_partitions"
+ self.client.execute("create table {} (id int) partitioned by (year int)"
+ .format(tbl))
+
+ def _verify_alter_partitions_event(events):
+ event_found = False
+ for event in events:
+ if event.eventType == "ALTER_PARTITIONS":
+ event_found = True
+ else:
+ logging.debug("Found " + str(event))
+ return event_found
+
+ # Verify that test always generates single ALTER_PARTITIONS event
+ self.client.execute(
+ "insert into {} partition(year) values (0,2024), (1,2023), (2,2022)"
+ .format(tbl))
+ EventProcessorUtils.wait_for_event_processing(self, 10)
+
+ # Case-I: compute stats from hive
+ parts_refreshed_before =
EventProcessorUtils.get_int_metric("partitions-refreshed")
+ batch_events_before =
EventProcessorUtils.get_int_metric("batch-events-created")
+ self.run_stmt_in_hive("analyze table {} compute statistics".format(tbl))
+ EventProcessorUtils.wait_for_event_processing(self, 10)
+ batch_events_after =
EventProcessorUtils.get_int_metric("batch-events-created")
+ parts_refreshed_after =
EventProcessorUtils.get_int_metric("partitions-refreshed")
+ assert batch_events_after == batch_events_before # verify there are no
new batches
+ assert parts_refreshed_after == parts_refreshed_before + 3
+
+ # Case-II: compute stats from impala
+ last_event_id =
EventProcessorUtils.get_current_notification_id(self.hive_client)
+ self.client.execute("compute stats {}".format(tbl))
+ events_skipped_before =
EventProcessorUtils.get_int_metric('events-skipped', 0)
+ EventProcessorUtils.wait_for_event_processing(self, 10)
+ events = EventProcessorUtils.get_next_notification(self.hive_client,
last_event_id)
+ # There will be COMMIT_TXN, ALLOC_WRITE_ID_EVENT, ALTER_PARTITIONS in any
order
+ events_skipped_after =
EventProcessorUtils.get_int_metric('events-skipped', 0)
+ assert _verify_alter_partitions_event(events)
+ assert events_skipped_after > events_skipped_before
+
+ # Case-III: truncate table from Impala
+ last_event_id =
EventProcessorUtils.get_current_notification_id(self.hive_client)
+ self.client.execute("truncate table {}".format(tbl))
+ events_skipped_before =
EventProcessorUtils.get_int_metric('events-skipped', 0)
+ EventProcessorUtils.wait_for_event_processing(self, 10)
+ events = EventProcessorUtils.get_next_notification(self.hive_client,
last_event_id)
+ events_skipped_after =
EventProcessorUtils.get_int_metric('events-skipped', 0)
+ assert _verify_alter_partitions_event(events)
+ assert events_skipped_after > events_skipped_before
+
+ # Case-IV: Truncate table from Hive is currently generating single
alter_partition
+ # events. HIVE-28668 will address it.
+
@SkipIfFS.hive
class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase):