This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new fb789df3b IMPALA-13684: Improve waitForHmsEvent() to only wait for
related events
fb789df3b is described below
commit fb789df3be7cfbbae519248de6437aa87bad0963
Author: stiga-huang <[email protected]>
AuthorDate: Tue Mar 4 09:16:28 2025 +0800
IMPALA-13684: Improve waitForHmsEvent() to only wait for related events
waitForHmsEvent is a catalogd RPC for coordinators to send a requested
db/table names to catalogd and wait until it's safe (i.e. no stale
metadata) to start analyzing the statement. The wait time is configured
by query option sync_hms_events_wait_time_s. Currently, when this option
is enabled, catalogd waits until it syncs to the latest HMS event
regardless what the query is.
This patch reduces waiting by only checking related events and wait
until the last related event has been processed. In the ideal case, if
there are no pending events that are related, the query doesn't need to
wait.
Related pending events are determined as follows:
- For queries that need the db list, i.e. SHOW DATABASES, check pending
CREATE/ALTER/DROP_DATABASE events on all dbs. ALTER_DATABASE events
are checked in case the ownership changes and impacts visibility.
- For db statements like SHOW FUNCTIONS, CREATE/ALTER/DROP DATABASE,
check pending CREATE/ALTER/DROP events on that db.
- For db statements that require the table list, i.e. SHOW TABLES,
also check CREATE_TABLE, DROP_TABLE events under that db.
- For table statements,
- check all database events on related db names.
- If there are loaded transactional tables, check all the pending
COMMIT_TXN, ABORT_TXN events. Note that these events might modify
multiple transactional tables and we don't know their table names
until they are processed. To be safe, wait for all transactional
events.
- For all the other table names,
- if they are all missing/unloaded in the catalog, check all the
pending CREATE_TABLE, DROP_TABLE events on them for their
existence.
- Otherwise, some of them are loaded, check all the table events on
them. Note that we can fetch events on multiple tables under the
same db in a single fetch.
If the statement has a SELECT part, views will be expanded so underlying
tables will be checked as well. For performance, this feature assumes
that views won't be changed to tables, and vice versa. This is a rare
use case in regular jobs. Users should use INVALIDATE for such case.
This patch leverages the HMS API to fetch events of several tables under
the same db in batch. MetastoreEventsProcessor.MetaDataFilter is
improved for this.
Tests:
- Added test for multiple tables in a single query.
- Added test with views.
- Added test for transactional tables.
- Ran CORE tests.
Change-Id: Ic033b7e197cd19505653c3ff80c4857cc474bcfc
Reviewed-on: http://gerrit.cloudera.org:8080/22571
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
common/thrift/CatalogService.thrift | 3 +
.../org/apache/impala/compat/MetastoreShim.java | 1 +
.../org/apache/impala/compat/MetastoreShim.java | 7 +-
.../impala/catalog/CatalogServiceCatalog.java | 16 +-
.../impala/catalog/Hive3MetastoreShimBase.java | 4 +
.../impala/catalog/events/MetastoreEvents.java | 2 +
.../catalog/events/MetastoreEventsProcessor.java | 394 ++++++++++++++++++++-
.../apache/impala/service/CatalogOpExecutor.java | 2 +-
.../java/org/apache/impala/service/Frontend.java | 6 +-
tests/custom_cluster/test_events_custom_configs.py | 22 +-
tests/metadata/test_event_processing.py | 191 +++++++++-
11 files changed, 629 insertions(+), 19 deletions(-)
diff --git a/common/thrift/CatalogService.thrift
b/common/thrift/CatalogService.thrift
index 6702e3c62..ecf2c8f75 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -791,6 +791,9 @@ struct TWaitForHmsEventRequest {
// Descriptors of catalog objects that might be used by the query.
6: optional list<CatalogObjects.TCatalogObject> object_descs
+
+ // Whether to check tables used by views.
+ 7: optional bool should_expand_views = false
}
struct TWaitForHmsEventResponse {
diff --git
a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 98c96f1d2..76e579b29 100644
---
a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++
b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -622,6 +622,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
* CDP Hive-3 only function.
*/
public static class CommitTxnEvent extends MetastoreEvent {
+ public static final String EVENT_TYPE = "COMMIT_TXN";
public CommitTxnEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
NotificationEvent event) {
diff --git
a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 23fc2562b..ad6eab6e5 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -830,6 +830,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
* tables.
*/
public static class CommitTxnEvent extends MetastoreEvent {
+ public static final String EVENT_TYPE = "COMMIT_TXN";
private final CommitTxnMessage commitTxnMessage_;
private final long txnId_;
private Set<TableWriteId> tableWriteIds_ = Collections.emptySet();
@@ -1105,9 +1106,9 @@ public class MetastoreShim extends Hive3MetastoreShimBase
{
!metaDataFilter.getDbName().isEmpty()) {
eventRequest.setDbName(metaDataFilter.getDbName());
}
- if (metaDataFilter.getTableName() != null &&
- !metaDataFilter.getTableName().isEmpty()) {
-
eventRequest.setTableNames(Arrays.asList(metaDataFilter.getTableName()));
+ if (metaDataFilter.getTableNames() != null &&
+ !metaDataFilter.getTableNames().isEmpty()) {
+ eventRequest.setTableNames(metaDataFilter.getTableNames());
}
}
}
diff --git
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 237ad7498..aef45a8fe 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -33,6 +33,7 @@ import static
org.apache.impala.thrift.TCatalogObjectType.TABLE;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -41,6 +42,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@@ -61,21 +63,27 @@ import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.TableMeta;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.impala.analysis.Path;
import org.apache.impala.analysis.TableName;
+import org.apache.impala.analysis.TableRef;
import org.apache.impala.authorization.AuthorizationDelta;
import org.apache.impala.authorization.AuthorizationManager;
import org.apache.impala.authorization.AuthorizationPolicy;
import org.apache.impala.catalog.FeFsTable.Utils;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.catalog.events.ExternalEventsProcessor;
+import org.apache.impala.catalog.events.MetastoreEvents;
import
org.apache.impala.catalog.events.MetastoreEvents.EventFactoryForSyncToLatestEvent;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
import org.apache.impala.catalog.events.MetastoreEventsProcessor;
import
org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
+import org.apache.impala.catalog.events.MetastoreNotificationException;
import org.apache.impala.catalog.events.MetastoreNotificationFetchException;
import org.apache.impala.catalog.events.NoOpEventProcessor;
import org.apache.impala.catalog.events.SelfEventContext;
@@ -4331,11 +4339,13 @@ public class CatalogServiceCatalog extends Catalog {
public TWaitForHmsEventResponse waitForHmsEvent(TWaitForHmsEventRequest req)
{
LOG.info("waitForHmsEvent request: want_minimal_response={},
coordinator={}, " +
- "timeout_s={}, want_db_list={}, want_table_list={}, objects=[{}]",
+ "timeout_s={}, want_db_list={}, want_table_list={}, objects=[{}],
" +
+ "should_expand_views={}",
req.header.want_minimal_response, req.header.coordinator_hostname,
req.timeout_s,
req.want_db_list, req.want_table_list, !req.isSetObject_descs() ? "" :
req.object_descs.stream().map(Catalog::toCatalogObjectKey)
- .collect(Collectors.joining(", ")));
+ .collect(Collectors.joining(", ")),
+ req.should_expand_views);
TWaitForHmsEventResponse res = new TWaitForHmsEventResponse();
if (!(metastoreEventProcessor_ instanceof MetastoreEventsProcessor)) {
res.setStatus(new TStatus(TErrorCode.RPC_GENERAL_ERROR,
@@ -4345,7 +4355,7 @@ public class CatalogServiceCatalog extends Catalog {
}
MetastoreEventsProcessor eventsProcessor =
(MetastoreEventsProcessor) metastoreEventProcessor_;
- TStatus status = eventsProcessor.waitForSyncUpToCurrentEvent(req.timeout_s
* 1000L);
+ TStatus status = eventsProcessor.waitForMostRecentMetadata(req);
if (status.status_code != TErrorCode.OK) {
res.setStatus(status);
LOG.error(String.join("\n", status.error_msgs));
diff --git
a/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java
b/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java
index 9fb6b7f37..11a22c52b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.stream.Collectors;
@@ -258,6 +259,9 @@ public class Hive3MetastoreShimBase {
.of(TableType.EXTERNAL_TABLE, TableType.MANAGED_TABLE,
TableType.VIRTUAL_VIEW,
TableType.MATERIALIZED_VIEW);
+ public static final List<String> HIVE_VIEW_TYPE =
+ Collections.singletonList("VIRTUAL_VIEW");
+
/**
* mapping between the HMS-3 type the Impala types
*/
diff --git
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 6dd934dfe..7b848d3a6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -2268,6 +2268,7 @@ public class MetastoreEvents {
* MetastoreEvent for ALTER_DATABASE event type
*/
public static class AlterDatabaseEvent extends MetastoreDatabaseEvent {
+ public static final String EVENT_TYPE = "ALTER_DATABASE";
// metastore database object as parsed from NotificationEvent message
private final Database alteredDatabase_;
@@ -3257,6 +3258,7 @@ public class MetastoreEvents {
* tables.
*/
public static class AbortTxnEvent extends MetastoreEvent {
+ public static final String EVENT_TYPE = "ABORT_TXN";
private final long txnId_;
private Set<TableWriteId> tableWriteIds_ = Collections.emptySet();
diff --git
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 79cf8c305..313e91405 100644
---
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -26,26 +26,37 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import java.time.LocalDateTime;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.impala.analysis.TableRef;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.catalog.Db;
@@ -54,23 +65,36 @@ import
org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.catalog.MetastoreClientInstantiationException;
import org.apache.impala.catalog.Table;
import org.apache.impala.catalog.IncompleteTable;
+import org.apache.impala.catalog.View;
import org.apache.impala.catalog.events.ConfigValidator.ValidationResult;
+import org.apache.impala.catalog.events.MetastoreEvents.AlterDatabaseEvent;
+import org.apache.impala.catalog.events.MetastoreEvents.CreateDatabaseEvent;
+import org.apache.impala.catalog.events.MetastoreEvents.CreateTableEvent;
import org.apache.impala.catalog.events.MetastoreEvents.DropDatabaseEvent;
import org.apache.impala.catalog.events.MetastoreEvents.DropTableEvent;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
+import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType;
import org.apache.impala.common.Metrics;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.common.Reference;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.CatalogOpExecutor;
+import org.apache.impala.thrift.TCatalogObject;
+import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.TDatabase;
import org.apache.impala.thrift.TErrorCode;
import org.apache.impala.thrift.TEventBatchProgressInfo;
import org.apache.impala.thrift.TEventProcessorMetrics;
import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
+import org.apache.impala.thrift.TImpalaTableType;
import org.apache.impala.thrift.TStatus;
+import org.apache.impala.thrift.TTable;
+import org.apache.impala.thrift.TTableName;
+import org.apache.impala.thrift.TWaitForHmsEventRequest;
import org.apache.impala.util.AcidUtils;
+import org.apache.impala.util.EventSequence;
import org.apache.impala.util.MetaStoreUtil;
import org.apache.impala.util.NoOpEventSequence;
import org.apache.impala.util.ThreadNameAnnotator;
@@ -395,7 +419,7 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
NotificationEventRequest eventRequest = new NotificationEventRequest();
eventRequest.setMaxEvents(batchSize);
eventRequest.setLastEvent(currentEventId);
- // Need to set table/dbnames in the request according to the filter
+ // Need to set table/db names in the request according to the filter
MetastoreShim.setNotificationEventRequestWithFilter(eventRequest,
metaDataFilter);
NotificationEventResponse notificationEventResponse =
@@ -1488,10 +1512,30 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
}
public static class MetaDataFilter {
+ public static final List<String> DB_EVENT_TYPES = Lists.newArrayList(
+ AlterDatabaseEvent.EVENT_TYPE, CreateDatabaseEvent.EVENT_TYPE,
+ DropDatabaseEvent.EVENT_TYPE);
+ // Event types required to check for a SHOW TABLES statement.
ALTER_DATABASE is
+ // required to check changes on the ownership.
+ public static final List<String> TABLE_LIST_EVENT_TYPES =
Lists.newArrayList(
+ AlterDatabaseEvent.EVENT_TYPE, CreateDatabaseEvent.EVENT_TYPE,
+ DropDatabaseEvent.EVENT_TYPE, CreateTableEvent.EVENT_TYPE,
+ DropTableEvent.EVENT_TYPE);
+ public static final List<String> TABLE_EXIST_EVENT_TYPES =
Lists.newArrayList(
+ CreateTableEvent.EVENT_TYPE, DropTableEvent.EVENT_TYPE);
+ public static final List<String> TABLE_EVENT_TYPES =
+ Stream.of(
+ MetastoreEventType.CREATE_TABLE, MetastoreEventType.DROP_TABLE,
+ MetastoreEventType.ALTER_TABLE, MetastoreEventType.ADD_PARTITION,
+ MetastoreEventType.ALTER_PARTITION,
MetastoreEventType.ALTER_PARTITIONS,
+ MetastoreEventType.DROP_PARTITION, MetastoreEventType.INSERT,
+ MetastoreEventType.INSERT_PARTITIONS, MetastoreEventType.RELOAD,
+ MetastoreEventType.COMMIT_COMPACTION_EVENT
+ ).map(MetastoreEventType::toString).collect(Collectors.toList());
public NotificationFilter filter_;
public String catName_;
public String dbName_;
- public String tableName_;
+ public List<String> tableNames_;
public MetaDataFilter(NotificationFilter notificationFilter) {
this.filter_ = notificationFilter; // if this null then don't build
event filter
@@ -1507,7 +1551,17 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
public MetaDataFilter(NotificationFilter notificationFilter, String
catName,
String databaseName, String tblName) {
this(notificationFilter, catName, databaseName);
- this.tableName_ = tblName;
+ if (tblName != null && !tblName.isEmpty()) {
+ this.tableNames_ = Arrays.asList(tblName);
+ }
+ }
+
+ public MetaDataFilter(NotificationFilter notificationFilter, String
catName,
+ String databaseName, List<String> tblNames) {
+ this(notificationFilter, catName, databaseName);
+ if (tblNames != null && !tblNames.isEmpty()) {
+ this.tableNames_ = tblNames;
+ }
}
public void setNotificationFilter(NotificationFilter notificationFilter) {
@@ -1526,12 +1580,18 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
return dbName_;
}
- public String getTableName() {
- return tableName_;
+ public List<String> getTableNames() {
+ return tableNames_;
}
}
- public TStatus waitForSyncUpToCurrentEvent(long timeoutMs) {
+ /**
+ * Wait until the catalog doesn't have stale metadata that could be used by
the query.
+ * The min required event id is calculated based on the pending HMS events
on requested
+ * db/table names. We then wait until that event is processed by the
EventProcessor.
+ */
+ public TStatus waitForMostRecentMetadata(TWaitForHmsEventRequest req) {
+ long timeoutMs = req.timeout_s * 1000L;
TStatus res = new TStatus();
// Only waits when event-processor is in ACTIVE/PAUSED states. PAUSED
states happen
// at startup or when global invalidate is running, so it's ok to wait for.
@@ -1543,13 +1603,22 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
return res;
}
long waitForEventId;
+ long latestEventId;
try {
- waitForEventId = getCurrentEventId();
+ latestEventId = getCurrentEventId();
} catch (MetastoreNotificationFetchException e) {
res.setStatus_code(TErrorCode.GENERAL);
res.addToError_msgs("Failed to fetch current HMS event id: " +
e.getMessage());
return res;
}
+ try {
+ waitForEventId = getMinEventIdToWaitFor(req);
+ } catch (Throwable e) {
+ LOG.warn("Failed to check the min required event id to wait for.
Fallback to use " +
+ "the latest event id {}. Query might wait longer than it needs.",
+ latestEventId, e);
+ waitForEventId = latestEventId;
+ }
long lastSyncedEventId = getLastSyncedEventId();
long startMs = System.currentTimeMillis();
long sleepIntervalMs = Math.min(timeoutMs, hmsEventSyncSleepIntervalMs_);
@@ -1583,4 +1652,315 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
res.setStatus_code(TErrorCode.OK);
return res;
}
+
+ /**
+ * Find the min required event id that should be synced to avoid the query
using
+ * stale metadata.
+ */
+ private long getMinEventIdToWaitFor(TWaitForHmsEventRequest req)
+ throws MetastoreNotificationFetchException, TException, CatalogException
{
+ if (req.should_expand_views) expandViews(req);
+ // requiredEventId starts from the last synced event id. While checking
pending
+ // events of a target, we just fetch events after requiredEventId, since
events
+ // before it are decided to be waited for.
+ long requiredEventId = getLastSyncedEventId();
+ if (req.want_db_list) {
+ return getMinRequiredEventIdForDbList(requiredEventId);
+ }
+ if (req.isSetObject_descs()) {
+ // Step 1: Collect db/table names requested by the query.
+ // Group table names by the db name. HMS events on tables in the same db
can be
+ // checked by one RPC.
+ Set<String> dbNames = new HashSet<>();
+ Map<String, List<String>> db2Tables = new HashMap<>();
+ for (TCatalogObject catalogObject: req.getObject_descs()) {
+ if (catalogObject.isSetDb()) {
+ dbNames.add(catalogObject.getDb().db_name);
+ } else if (catalogObject.isSetTable()) {
+ TTable table = catalogObject.getTable();
+ if (catalog_.getDb(table.db_name) == null) {
+ // We will check existence of missing dbs. Once the missing db is
added,
+ // the underlying tables are also added (as IncompleteTables). So
don't
+ // need to check table events. I.e. there are no stale metadata on
those
+ // tables.
+ dbNames.add(table.db_name);
+ } else {
+ db2Tables.computeIfAbsent(table.db_name, k -> new ArrayList<>())
+ .add(table.tbl_name);
+ }
+ }
+ }
+ // Step 2: Check DB events
+ requiredEventId = getMinRequiredEventIdForDb(requiredEventId, dbNames,
+ req.want_table_list);
+ // Step 3: Check transactional events if there are transactional tables.
+ // Such events (COMMIT_TXN, ABORT_TXN) don't have the db/table names
since they
+ // might modify multiple transactional tables. If there are
transactional tables,
+ // wait for all the transactional events.
+ requiredEventId = getMinRequiredTxnEventId(requiredEventId, db2Tables);
+ // Step 4: Check table events
+ requiredEventId = getMinRequiredTableEventId(requiredEventId, db2Tables);
+ }
+ return requiredEventId;
+ }
+
+ private static void doForAllObjectNames(TWaitForHmsEventRequest req,
+ Function<TTableName, Object> tblFn, @Nullable Function<String, Object>
dbFn) {
+ for (TCatalogObject catalogObject : req.getObject_descs()) {
+ if (catalogObject.isSetTable()) {
+ TTable table = catalogObject.getTable();
+ tblFn.apply(new TTableName(table.db_name, table.tbl_name));
+ } else if (dbFn != null && catalogObject.isSetDb()) {
+ dbFn.apply(catalogObject.getDb().getDb_name());
+ }
+ }
+ }
+
+ /**
+ * Expand views used by the query so we know what additional tables need to
wait for
+ * their metadata to be synced. Throws exceptions if we can't expand any
views.
+ */
+ private void expandViews(TWaitForHmsEventRequest req)
+ throws TException, CatalogException {
+ if (!req.isSetObject_descs()) return;
+ // Check all the table names in the request. Expand views and add
underlying tables
+ // into the queue if they are new.
+ Queue<TTableName> uncheckedNames = new ArrayDeque<>();
+ doForAllObjectNames(req, uncheckedNames::offer, /*dbFn*/null);
+ Set<TTableName> checkedNames = new HashSet<>();
+ String loadReason = "expand view to check HMS events on underlying tables";
+ try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+ while (!uncheckedNames.isEmpty()) {
+ TTableName tblName = uncheckedNames.poll();
+ checkedNames.add(tblName);
+ Db db = catalog_.getDb(tblName.db_name);
+ Table tbl = null;
+ // Case 1: handle loaded table/views.
+ // If 'tbl' is a loaded view, reload it in case the view definition is
modified.
+ // Note that loading a view from HMS is cheaper than checking HMS
events. So we
+ // always reload it regardless of whether there are pending events on
it.
+ if (db != null) {
+ tbl = db.getTable(tblName.table_name);
+ if (tbl instanceof View) {
+ catalog_.reloadTable(tbl, loadReason, getLastSyncedEventId(),
+ /*unused*/true, EventSequence.getUnusedTimeline());
+ expandView((View) tbl, checkedNames, uncheckedNames);
+ continue;
+ }
+ if (tbl instanceof IncompleteTable && tbl.isLoaded()) {
+ throw new CatalogException(String.format("Cannot expand view %s.%s
due to " +
+ "failures in metadata loading", tblName.db_name,
tblName.table_name),
+ ((IncompleteTable) tbl).getCause());
+ }
+ // Ignore loaded tables.
+ if (tbl != null && tbl.isLoaded()) continue;
+ }
+ // Case 2: handle unloaded/missing tables/views.
+ // Check the metadata in HMS directly. Note that this is cheaper than
checking
+ // HMS events. Only trigger metadata loading for views. Unloaded
tables will
+ // remain unloaded which helps EventProcessor to skip most of their
events.
+ List<TableMeta> metaRes = null;
+ try {
+ metaRes = msClient.getHiveClient().getTableMeta(
+ tblName.db_name, tblName.table_name,
MetastoreShim.HIVE_VIEW_TYPE);
+ } catch (UnknownDBException | NoSuchObjectException e) {
+ // Ignore non-existing db/tables
+ }
+ // 'metaRes' could be null since we are just fetching views.
+ if (metaRes != null && !metaRes.isEmpty() &&
TImpalaTableType.VIEW.equals(
+
MetastoreShim.mapToInternalTableType(metaRes.get(0).getTableType()))) {
+ // View exists in HMS. If it's missing in the cache, invalidate it
to bring
+ // it up.
+ if (db == null || tbl == null) {
+ catalog_.invalidateTable(tblName, /*unused*/new Reference<>(),
+ /*unused*/new Reference<>(),
EventSequence.getUnusedTimeline());
+ }
+ tbl = catalog_.getOrLoadTable(tblName.db_name, tblName.table_name,
loadReason,
+ /*validWriteIdList*/null);
+ if (tbl instanceof View) {
+ expandView((View) tbl, checkedNames, uncheckedNames);
+ } else {
+ // The view could be dropped concurrently or loading its metadata
might fail.
+ throw new CatalogException(String.format("Failed to expand view
%s.%s",
+ tblName.db_name, tblName.table_name));
+ }
+ }
+ }
+ }
+ // Add new table names we found in view expansion to 'req'.
+ // Remove table names in 'checkedNames' that already exist in 'req' and
collect
+ // existing db names.
+ Set<String> existingDbNames = new HashSet<>();
+ doForAllObjectNames(req, checkedNames::remove, existingDbNames::add);
+ // Add new tables and dbs to 'req'.
+ for (TTableName tblName : checkedNames) {
+ TCatalogObject tblDesc = new TCatalogObject(TCatalogObjectType.TABLE, 0);
+ tblDesc.setTable(new TTable(tblName.db_name, tblName.table_name));
+ req.addToObject_descs(tblDesc);
+ if (!existingDbNames.contains(tblName.db_name)) {
+ existingDbNames.add(tblName.db_name);
+ TCatalogObject dbDesc = new
TCatalogObject(TCatalogObjectType.DATABASE, 0);
+ dbDesc.setDb(new TDatabase(tblName.db_name));
+ req.addToObject_descs(dbDesc);
+ }
+ }
+ }
+
+ private void expandView(View view, Set<TTableName> checkedNames,
+ Queue<TTableName> uncheckedNames) throws CatalogException {
+ for (TableRef tblRef : view.getQueryStmt().collectTableRefs()) {
+ List<String> strs = tblRef.getPath();
+ // Table names in the view definition should all be fully qualified.
+ // Add a check here in case something wrong happens in Hive.
+ if (strs.size() < 2) {
+ String str = String.join(".", strs);
+ LOG.error("Illegal table name found in view {}: {}. View
definition:\n{}",
+ view.getFullName(), str,
view.getMetaStoreTable().getViewExpandedText());
+ throw new CatalogException(String.format(
+ "Illegal table name found in view %s: %s", view.getFullName(),
str));
+ }
+ TTableName name = new TTableName(strs.get(0), strs.get(1));
+ if (!checkedNames.contains(name)) {
+ uncheckedNames.add(name);
+ LOG.info("Found new table name used by view {}: {}.{}",
view.getFullName(),
+ name.db_name, name.table_name);
+ }
+ }
+ }
+
+ /**
+ * Get the min required event id to avoid the query using stale metadata on
the
+ * requested dbs.
+ */
+ private long getMinRequiredEventIdForDb(long startEventId, Set<String>
dbNames,
+ boolean wantTableList) throws MetastoreNotificationFetchException {
+ long requiredEventId = startEventId;
+ for (String dbName : dbNames) {
+ // For SHOW TABLES, also check the CREATE/DROP table events.
+ List<String> eventTypes = wantTableList ?
+ MetaDataFilter.TABLE_LIST_EVENT_TYPES :
MetaDataFilter.DB_EVENT_TYPES;
+ NotificationFilter filter = e -> dbName.equalsIgnoreCase(e.getDbName())
+ && MetastoreShim.isDefaultCatalog(e.getCatName())
+ && eventTypes.contains(e.getEventType());
+ // Use 'requiredEventId' as the startEventId since events before it are
decided
+ // to wait for.
+ List<NotificationEvent> dbEvents = getNextMetastoreEventsInBatches(
+ catalog_, /*startEventId*/ requiredEventId, filter,
+ eventTypes.toArray(new String[0]));
+ if (dbEvents.isEmpty()) {
+ LOG.info("No pending events found on db {}", dbName);
+ continue;
+ }
+ NotificationEvent e = dbEvents.get(dbEvents.size() - 1);
+ LOG.info("Found db {} has pending event. EventId:{} EventType:{}",
+ dbName, e.getEventId(), e.getEventType());
+ requiredEventId = Math.max(requiredEventId, e.getEventId());
+ }
+ return requiredEventId;
+ }
+
+ /**
+ * Check if there are any transactional tables loaded in catalog
+ */
+ private boolean hasLoadedTxnTables(Map<String, List<String>> db2Tables) {
+ for (String dbName : db2Tables.keySet()) {
+ Db db = catalog_.getDb(dbName);
+ if (db == null) continue;
+ for (String tableName : db2Tables.get(dbName)) {
+ Table table = db.getTable(tableName);
+ if (table instanceof HdfsTable && AcidUtils.isTransactionalTable(
+ table.getMetaStoreTable().getParameters())) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private boolean hasLoadedTables(Map<String, List<String>> db2Tables) {
+ for (String dbName : db2Tables.keySet()) {
+ Db db = catalog_.getDb(dbName);
+ if (db == null) continue;
+ for (String tableName : db2Tables.get(dbName)) {
+ Table table = db.getTable(tableName);
+ if (table != null && !(table instanceof IncompleteTable)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private long getMinRequiredTxnEventId(long startEventId,
+ Map<String, List<String>> db2Tables) throws
MetastoreNotificationFetchException {
+ if (!hasLoadedTxnTables(db2Tables)) return startEventId;
+ NotificationFilter filter = e ->
+ MetastoreShim.CommitTxnEvent.EVENT_TYPE.equals(e.getEventType())
+ ||
MetastoreEvents.AbortTxnEvent.EVENT_TYPE.equals(e.getEventType());
+ List<NotificationEvent> txnEvents =
getNextMetastoreEventsInBatches(catalog_,
+ startEventId, filter, MetastoreShim.CommitTxnEvent.EVENT_TYPE,
+ MetastoreEvents.AbortTxnEvent.EVENT_TYPE);
+ if (!txnEvents.isEmpty()) {
+ NotificationEvent e = txnEvents.get(txnEvents.size() - 1);
+ LOG.info("Some of the requested tables are transactional tables. Found
{} " +
+ "pending transactional events. The last event is EventId: {} " +
+ "EventType: {}", txnEvents.size(), e.getEventId(), e.getEventType());
+ return e.getEventId();
+ }
+ LOG.info("Some of the requested tables are transactional tables. " +
+ "No pending transactional events found.");
+ return startEventId;
+ }
+
+ private long getMinRequiredTableEventId(long startEventId,
+ Map<String, List<String>> db2Tables) throws
MetastoreNotificationFetchException {
+ long requiredEventId = startEventId;
+ // If all requested tables are unloaded, just check CREATE/DROP table
events.
+ List<String> eventTypes = hasLoadedTables(db2Tables) ?
+ MetaDataFilter.TABLE_EVENT_TYPES :
MetaDataFilter.TABLE_EXIST_EVENT_TYPES;
+ for (String dbName : db2Tables.keySet()) {
+ Set<String> tblNames = new HashSet<>(db2Tables.get(dbName));
+ NotificationFilter filter = e -> dbName.equalsIgnoreCase(e.getDbName())
+ && tblNames.contains(e.getTableName().toLowerCase())
+ && MetastoreShim.isDefaultCatalog(e.getCatName())
+ && eventTypes.contains(e.getEventType());
+ MetaDataFilter metaDataFilter = new MetaDataFilter(filter,
+ MetastoreShim.getDefaultCatalogName(), dbName,
db2Tables.get(dbName));
+ List<NotificationEvent> tableEvents =
getNextMetastoreEventsWithFilterInBatches(
+ catalog_, requiredEventId, metaDataFilter, EVENTS_BATCH_SIZE_PER_RPC,
+ eventTypes.toArray(new String[0]));
+ if (tableEvents.isEmpty()) {
+ LOG.info("No pending events on specified tables under db {}", dbName);
+ continue;
+ }
+ NotificationEvent e = tableEvents.get(tableEvents.size() - 1);
+ requiredEventId = Math.max(requiredEventId, e.getEventId());
+ LOG.info("Found {} pending events on table {} of db {}. The last event
is " +
+ "EventId: {} EventType: {} Table: {}",
+ tableEvents.size(), String.join(",", db2Tables.get(dbName)), dbName,
+ e.getEventId(), e.getEventType(), e.getTableName());
+ }
+ return requiredEventId;
+ }
+
+ /**
+ * Get the min required event id to avoid the query using stale db list.
+ */
+ private long getMinRequiredEventIdForDbList(long startEventId)
+ throws MetastoreNotificationFetchException {
+ NotificationFilter filter = e ->
MetastoreShim.isDefaultCatalog(e.getCatName())
+ && MetaDataFilter.DB_EVENT_TYPES.contains(e.getEventType());
+ List<NotificationEvent> dbEvents = getNextMetastoreEventsInBatches(
+ catalog_, startEventId, filter,
+ MetaDataFilter.DB_EVENT_TYPES.toArray(new String[0]));
+ if (dbEvents.isEmpty()) {
+ LOG.info("No events need to be synced for db list");
+ return startEventId;
+ }
+ NotificationEvent e = dbEvents.get(dbEvents.size() - 1);
+ LOG.info("Found {} pending events on db list. The last one is EventId: {}
" +
+ "EventType: {} db: {}", dbEvents.size(), e.getEventId(),
e.getEventType(),
+ e.getDbName());
+ return e.getEventId();
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 883e21fcd..4461c03f5 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -2357,7 +2357,7 @@ public class CatalogOpExecutor {
}
/**
- * Fetches CreateDatabase and CreateTable events of a db if events
processing is active.
+ * Fetches DropDatabase and DropTable events of a db if events processing is
active.
* Returns an empty list if not active. Also updates the given
'catalogTimeline'.
*/
private List<NotificationEvent> getNextMetastoreDropEventsForDbIfEnabled(
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index c61cc272c..cf0223c9a 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -77,6 +77,7 @@ import org.apache.impala.analysis.CommentOnStmt;
import org.apache.impala.analysis.CopyTestCaseStmt;
import org.apache.impala.analysis.CreateDataSrcStmt;
import org.apache.impala.analysis.CreateDropRoleStmt;
+import org.apache.impala.analysis.CreateTableAsSelectStmt;
import org.apache.impala.analysis.CreateUdaStmt;
import org.apache.impala.analysis.CreateUdfStmt;
import org.apache.impala.analysis.DescribeTableStmt;
@@ -2294,7 +2295,10 @@ public class Frontend {
dbDesc.setDb(new TDatabase(name));
req.addToObject_descs(dbDesc);
}
- LOG.info("Waiting for HMS events on the dbs: {} and tables: {}",
+ // Only statements that have SELECT need to expand views.
+ req.setShould_expand_views(stmt instanceof QueryStmt
+ || stmt instanceof DmlStatementBase || stmt instanceof
CreateTableAsSelectStmt);
+ LOG.info("Waiting for HMS events on dbs: {} and tables: {}",
String.join(", ", dbNames),
tableNames.stream().map(TableName::toString).collect(Collectors.joining(", ")));
}
diff --git a/tests/custom_cluster/test_events_custom_configs.py
b/tests/custom_cluster/test_events_custom_configs.py
index 1b6f29670..4ef2738ff 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -22,7 +22,6 @@ import re
from os import getenv
from time import sleep
-
from impala_thrift_gen.hive_metastore.ttypes import FireEventRequest
from impala_thrift_gen.hive_metastore.ttypes import FireEventRequestData
from impala_thrift_gen.hive_metastore.ttypes import InsertEventRequestData
@@ -1617,3 +1616,24 @@ class
TestEventSyncFailures(TestEventProcessingCustomConfigsBase):
# SELECT gets the new row if waiting for enough time
results = self.execute_query_expect_success(client, query,
EVENT_SYNC_QUERY_OPTIONS)
assert len(results.data) == 1
+
+
+class TestEventSyncWaiting(TestEventProcessingCustomConfigsBase):
+
+ @CustomClusterTestSuite.with_args(
+
catalogd_args="--debug_actions=catalogd_event_processing_delay:SLEEP@200")
+ def test_hms_event_sync_with_commit_compaction(self, vector,
unique_database):
+ """Test waiting for COMMIT_COMPACTION_EVENT. There is always a COMMIT_TXN
event
+ before it so use a sleep of 200ms to delay processing it."""
+ client = self.default_impala_client(vector.get_value('protocol'))
+ client.set_configuration({"sync_hms_events_wait_time_s": 10})
+ tbl = unique_database + ".foo"
+ self.run_stmt_in_hive("""create transactional table {} partitioned by(p)
+ as select 0 as i, 0 as p""".format(tbl))
+ self.run_stmt_in_hive("insert into {} select 1,0".format(tbl))
+ res = self.execute_query_expect_success(client, "show files in " + tbl)
+ assert len(res.data) == 2
+ self.run_stmt_in_hive(
+ "alter table {} partition(p=0) compact 'minor' and wait".format(tbl))
+ res = self.execute_query_expect_success(client, "show files in " + tbl)
+ assert len(res.data) == 1
diff --git a/tests/metadata/test_event_processing.py
b/tests/metadata/test_event_processing.py
index 9315d99dc..f04a4ccfb 100644
--- a/tests/metadata/test_event_processing.py
+++ b/tests/metadata/test_event_processing.py
@@ -19,6 +19,7 @@ from subprocess import check_call
import pytest
import re
import time
+import threading
from tests.common.test_dimensions import (
create_single_exec_option_dimension,
@@ -309,11 +310,30 @@ class TestEventSyncWaiting(ImpalaTestSuite):
add_mandatory_exec_option(cls, 'sync_hms_events_wait_time_s',
PROCESSING_TIMEOUT_S)
add_mandatory_exec_option(cls, 'sync_hms_events_strict_mode', True)
- def test_hms_event_sync(self, vector, unique_database):
+ @pytest.mark.execute_serially
+ def test_event_processor_pauses(self, vector, unique_database):
+ client = self.create_impala_client_from_vector(vector)
+ tbl = unique_database + ".foo"
+
+ # Create a table in Hive and submit a query on it when EP is paused.
+ client.execute(":event_processor('pause')")
+ self.run_stmt_in_hive("create table {} as select 1".format(tbl))
+
+ # execute_async() is not really async that it returns after query planning
finishes.
+ # So we use execute_query_expect_success here and resume EP in a
background thread.
+ def resume_event_processor():
+ time.sleep(2)
+ client = self.create_impala_client_from_vector(vector)
+ client.execute(":event_processor('start')")
+ resume_ep_thread = threading.Thread(target=resume_event_processor)
+ resume_ep_thread.start()
+ res = self.execute_query_expect_success(client, "select * from " + tbl)
+ assert res.data == ['1']
+
+ def test_hms_event_sync_basic(self, vector, unique_database):
"""Verify query option sync_hms_events_wait_time_s should protect the
query by
waiting until Impala sync the HMS changes."""
- client = self.default_impala_client(vector.get_value('protocol'))
- client.set_configuration(vector.get_exec_option_dict())
+ client = self.create_impala_client_from_vector(vector)
tbl_name = unique_database + ".tbl"
label = "Synced events from Metastore"
# Test DESCRIBE on new table created in Hive
@@ -414,3 +434,168 @@ class TestEventSyncWaiting(ImpalaTestSuite):
assert res.data == ["j\tint\t"]
assert res.log == ''
self.verify_timeline_item("Query Compilation", label, res.runtime_profile)
+
+ def test_hms_event_sync_multiple_tables(self, vector, unique_database):
+ client = self.create_impala_client_from_vector(vector)
+ for i in range(3):
+ self.execute_query("create table {0}.tbl{1} (i
int)".format(unique_database, i))
+ res = self.execute_query_expect_success(client, """
+ select t1.i from {0}.tbl0 t0, {0}.tbl1 t1, {0}.tbl2 t2
+ where t0.i = t1.i and t1.i = t2.i""".format(unique_database))
+ assert len(res.data) == 0
+
+ for i in range(3):
+ self.run_stmt_in_hive("insert into table {0}.tbl{1} select 1".format(
+ unique_database, i))
+ res = self.execute_scalar_expect_success(client, """
+ select t1.i from {0}.tbl0 t0, {0}.tbl1 t1, {0}.tbl2 t2
+ where t0.i = t1.i and t1.i = t2.i""".format(unique_database))
+ assert res == "1"
+
+ def test_hms_event_sync_with_view(self, vector, unique_database):
+ client = self.create_impala_client_from_vector(vector)
+ tbl = unique_database + ".foo"
+ view = unique_database + ".foo_view"
+ count_stmt = "select count(*) from {}".format(view)
+ self.execute_query("create table {}(i int)".format(tbl))
+ self.execute_query("create view {} as select * from {}".format(view, tbl))
+ # Run a query to make the metadata loaded so they can be stale later.
+ res = self.execute_scalar(count_stmt)
+ assert res == '0'
+
+ # Modify the table in Hive and read the view in Impala
+ self.run_stmt_in_hive("insert into {} select 1".format(tbl))
+ res = self.execute_query_expect_success(client, count_stmt)
+ assert res.data[0] == '1'
+
+ # Modify the view in Hive and read it in Impala
+ self.run_stmt_in_hive(
+ "alter view {} as select * from {} where i > 1".format(view, tbl))
+ res = self.execute_query_expect_success(client, count_stmt)
+ assert res.data[0] == '0'
+
+ def test_hms_event_sync_with_view_partitioned(self, vector, unique_database):
+ client = self.create_impala_client_from_vector(vector)
+ tbl = unique_database + ".foo"
+ view = unique_database + ".foo_view"
+ select_stmt = "select * from " + view
+ self.execute_query("create table {}(i int) partitioned by(p
int)".format(tbl))
+ self.execute_query("create view {} as select * from {} where
p>0".format(view, tbl))
+ res = self.execute_query_expect_success(client, select_stmt)
+ assert len(res.data) == 0
+
+ # Ingest data in Hive and read the view in Impala
+ # Add a new partition that will be filtered out by the view
+ self.run_stmt_in_hive("insert into {} select 0, 0".format(tbl))
+ res = self.execute_query_expect_success(client, select_stmt)
+ assert len(res.data) == 0
+ # Add a new partition that will show up in the view
+ self.run_stmt_in_hive("insert into {} select 1, 1".format(tbl))
+ res = self.execute_scalar_expect_success(client, select_stmt)
+ assert res == '1\t1'
+ # Add a new partition and alter the view to only show it
+ self.run_stmt_in_hive("insert into {} select 2, 2".format(tbl))
+ self.run_stmt_in_hive("alter view {} as select * from {} where
p>1".format(view, tbl))
+ res = self.execute_scalar_expect_success(client, select_stmt)
+ assert res == '2\t2'
+
+ def test_hms_event_sync_compute_stats(self, vector, unique_database):
+ client = self.create_impala_client_from_vector(vector)
+ tbl = unique_database + ".foo"
+ self.execute_query("create table {}(i int) partitioned by(p
int)".format(tbl))
+ # Add one partition in Hive and compute incremental stats on that
partition in Impala
+ self.run_stmt_in_hive("insert into {} select 0,0".format(tbl))
+ res = self.execute_query_expect_success(
+ client, "compute incremental stats {} partition(p=0)".format(tbl))
+ assert res.data == ['Updated 1 partition(s) and 1 column(s).']
+ # Add one partition in Hive and compute incremental stats on that table in
Impala
+ self.run_stmt_in_hive("insert into {} select 1,1 union all select
2,2".format(tbl))
+ res = self.execute_query_expect_success(
+ client, "compute incremental stats {}".format(tbl))
+ assert res.data == ['Updated 2 partition(s) and 1 column(s).']
+ # Drop two partitions in Hive and compute stats on that table in Impala.
The
+ # incremental stats will be replaced with non-incremental stats so the
remaining
+ # partition is updated.
+ self.run_stmt_in_hive("alter table {} drop partition(p<2)".format(tbl))
+ res = self.execute_query_expect_success(
+ client, "compute stats {}".format(tbl))
+ assert res.data == ['Updated 1 partition(s) and 1 column(s).']
+
+ def test_hms_event_sync_ctas(self, vector, unique_database):
+ client = self.create_impala_client_from_vector(vector)
+ tbl = unique_database + ".foo"
+ tmp_tbl = unique_database + ".tmp"
+ self.execute_query("create table {}(i int) partitioned by(p
int)".format(tbl))
+ # Add one partition in Hive and use the table in Impala
+ self.run_stmt_in_hive("insert into {} select 0,0".format(tbl))
+ res = self.execute_query_expect_success(
+ client, "create table {} as select * from {}".format(tmp_tbl, tbl))
+ assert res.data == ['Inserted 1 row(s)']
+ # Insert one row into the same partition in Hive and use the table in
Impala
+ self.run_stmt_in_hive("insert into {} select 1,0".format(tbl))
+ res = self.execute_query_expect_success(
+ client, "create table {}_2 as select * from {}".format(tmp_tbl, tbl))
+ assert res.data == ['Inserted 2 row(s)']
+ # Truncate the table in Hive and use it in Impala
+ self.run_stmt_in_hive("truncate table {}".format(tbl))
+ res = self.execute_query_expect_success(
+ client, "create table {}_3 as select * from {}".format(tmp_tbl, tbl))
+ assert res.data == ['Inserted 0 row(s)']
+
+ # Create a table in Hive before CTAS of it in Impala
+ self.run_stmt_in_hive("create table {}_4(i int) partitioned by(p
int)".format(tbl))
+ exception = self.execute_query_expect_failure(
+ client, "create table {}_4 as select 1,1".format(tbl))
+ assert 'Table already exists: {}_4'.format(tbl) in str(exception)
+
+ def test_hms_event_sync_insert(self, vector, unique_database):
+ client = self.create_impala_client_from_vector(vector)
+ tbl = unique_database + ".foo"
+ tmp_tbl = unique_database + ".tmp"
+ self.execute_query("create table {}(i int) partitioned by(p
int)".format(tbl))
+ self.execute_query("create table {}(i int) partitioned by(p
int)".format(tmp_tbl))
+ insert_stmt = "insert into {} partition (p) select * from
{}".format(tmp_tbl, tbl)
+ # Add one partition in Hive and use the table in INSERT in Impala
+ self.run_stmt_in_hive("insert into {} select 0,0".format(tbl))
+ res = self.execute_query_expect_success(client, insert_stmt)
+ # Result rows are "partition_name: num_rows_inserted" for each modified
partitions
+ assert res.data == ['p=0: 1']
+ # Insert one row into the same partition in Hive and use the table in
INSERT in Impala
+ self.run_stmt_in_hive("insert into {} select 1,0".format(tbl))
+ res = self.execute_query_expect_success(client, insert_stmt)
+ assert res.data == ['p=0: 2']
+ # Add another new partition in Hive and use the table in INSERT in Impala
+ self.run_stmt_in_hive("insert into {} select 2,2".format(tbl))
+ res = self.execute_query_expect_success(client, insert_stmt)
+ assert res.data == ['p=0: 2', 'p=2: 1']
+ # Drop one partition in Hive and use the table in INSERT in Impala
+ self.run_stmt_in_hive("alter table {} drop partition(p=0)".format(tbl))
+ res = self.execute_query_expect_success(client, insert_stmt)
+ assert res.data == ['p=2: 1']
+ # Truncate the table in Hive and use it in INSERT in Impala
+ self.run_stmt_in_hive("truncate table {}".format(tbl))
+ res = self.execute_query_expect_success(client, insert_stmt)
+ assert len(res.data) == 0
+
+ def test_hms_event_sync_txn(self, vector, unique_database):
+ client = self.create_impala_client_from_vector(vector)
+ tbl = unique_database + ".foo"
+ self.run_stmt_in_hive(
+ "create transactional table {}(i int) partitioned by(p
int)".format(tbl))
+ # Load the table in Impala
+ self.execute_query_expect_success(client, "describe " + tbl)
+ # Insert the table in Hive and check it in Impala immediately
+ self.run_stmt_in_hive("insert into {} select 0,0".format(tbl))
+ res = self.execute_query_expect_success(client, "select * from " + tbl)
+ assert res.data == ['0\t0']
+ # Insert the table in Hive again and check number of rows in Impala
+ self.run_stmt_in_hive("insert into {} select 1,0".format(tbl))
+ res = self.execute_query_expect_success(client, "select count(*) from " +
tbl)
+ assert res.data == ['2']
+ res = self.execute_query_expect_success(client, "show files in " + tbl)
+ assert len(res.data) == 2
+ # Trigger compaction in Hive
+ self.run_stmt_in_hive(
+ "alter table {} partition(p=0)compact 'minor' and wait".format(tbl))
+ res = self.execute_query_expect_success(client, "show files in " + tbl)
+ assert len(res.data) == 1