This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 880690c7b IMPALA-12680: Fix NullPointerException during
AlterTableAddPartitions
880690c7b is described below
commit 880690c7bd46e6af058d8a669bea41af4a131049
Author: Sai Hemanth Gantasala <[email protected]>
AuthorDate: Tue May 14 18:40:04 2024 -0700
IMPALA-12680: Fix NullPointerException during AlterTableAddPartitions
When global INVALIDATE METADATA is run at the same time while
AlterTableAddPartition statement is being run, a precondition check in
addHmsPartitions() could lead to NullPointerException. This happens
due to Map<String, Long> partitionToEventId being initialized to null
when event processor is not active.
We should always initialize 'partitionToEventId' to empty hash map
regardless of the state of event processor. If the event processor is
not active, then addHmsPartitions() adds partitions that are directly
fetched from metastore.
Note: Also, addressed the same issue that could potentially happen in
AlterTableRecoverPartitions.
Testing:
- Verified manually that NullPointerException scenario is avoided.
- Added a unit test to verify the above use case.
Change-Id: I730fed311ebc09762dccc152d9583d5394b0b9b3
Reviewed-on: http://gerrit.cloudera.org:8080/21430
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../apache/impala/service/CatalogOpExecutor.java | 28 ++++++++------
.../java/org/apache/impala/util/DebugUtils.java | 3 ++
.../events/MetastoreEventsProcessorTest.java | 45 +++++++++++++++++++++-
3 files changed, 64 insertions(+), 12 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 6b5ffa01d..17210b657 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -145,6 +145,7 @@ import
org.apache.impala.catalog.events.MetastoreEvents.DropTableEvent;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
import
org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
import org.apache.impala.catalog.events.MetastoreEventsProcessor;
+import
org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
import org.apache.impala.catalog.events.MetastoreNotificationException;
import org.apache.impala.catalog.monitor.CatalogMonitor;
import org.apache.impala.catalog.monitor.CatalogOperationTracker;
@@ -1270,7 +1271,7 @@ public class CatalogOpExecutor {
format = params.getSet_file_format_params().file_format;
}
alterTableAddPartitions(tbl, params.getAdd_partition_params(),
format,
- catalogTimeline, modification);
+ catalogTimeline, modification, debugAction);
reloadMetadata = false;
responseSummaryMsg = "New partition has been added to the table.";
break;
@@ -4600,8 +4601,8 @@ public class CatalogOpExecutor {
*/
private void alterTableAddPartitions(Table tbl,
TAlterTableAddPartitionParams addPartParams, THdfsFileFormat fileFormat,
- EventSequence catalogTimeline, InProgressTableModification modification)
- throws ImpalaException {
+ EventSequence catalogTimeline, InProgressTableModification modification,
+ String debugAction) throws ImpalaException {
Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
TableName tableName = tbl.getTableName();
@@ -4639,10 +4640,10 @@ public class CatalogOpExecutor {
List<Partition> difference = null;
try (MetaStoreClient msClient =
catalog_.getMetaStoreClient(catalogTimeline)) {
- Map<String, Long> partitionToEventId =
catalog_.isEventProcessingActive() ?
- Maps.newHashMap() : null;
+ Map<String, Long> partitionToEventId = Maps.newHashMap();
List<Partition> addedHmsPartitions =
addHmsPartitionsInTransaction(msClient,
- tbl, allHmsPartitionsToAdd, partitionToEventId, ifNotExists,
catalogTimeline);
+ tbl, allHmsPartitionsToAdd, partitionToEventId, ifNotExists,
catalogTimeline,
+ debugAction);
// Handle HDFS cache. This is done in a separate round bacause we have
to apply
// caching only to newly added partitions.
alterTableCachePartitions(msTbl, msClient, tableName, addedHmsPartitions,
@@ -5299,7 +5300,7 @@ public class CatalogOpExecutor {
*/
private List<Partition> addHmsPartitions(MetaStoreClient msClient,
Table tbl, List<Partition> allHmsPartitionsToAdd,
- @Nullable Map<String, Long> partitionToEventId, boolean ifNotExists,
+ Map<String, Long> partitionToEventId, boolean ifNotExists,
EventSequence catalogTimeline) throws ImpalaRuntimeException,
CatalogException {
long eventId = getCurrentEventId(msClient, catalogTimeline);
List<Partition> addedHmsPartitions = Lists
@@ -5332,7 +5333,6 @@ public class CatalogOpExecutor {
// add_partitions call above.
addedHmsPartitions.addAll(addedPartitions);
} else {
- Preconditions.checkNotNull(partitionToEventId);
addedHmsPartitions.addAll(partitionToEventSubMap.keySet());
// we cannot keep a mapping of Partition to event ids because the
// partition objects are changed later in the cachePartitions code
path.
@@ -5361,8 +5361,12 @@ public class CatalogOpExecutor {
*/
private List<Partition> addHmsPartitionsInTransaction(MetaStoreClient
msClient,
Table tbl, List<Partition> partitions, Map<String, Long>
partitionToEventId,
- boolean ifNotExists, EventSequence catalogTimeline) throws
ImpalaException {
+ boolean ifNotExists, EventSequence catalogTimeline, String debugAction)
+ throws ImpalaException {
if
(!AcidUtils.isTransactionalTable(tbl.getMetaStoreTable().getParameters())) {
+ if (DebugUtils.hasDebugAction(debugAction,
DebugUtils.ENABLE_EVENT_PROCESSOR)) {
+ catalog_.startEventsProcessor();
+ }
return addHmsPartitions(msClient, tbl, partitions, partitionToEventId,
ifNotExists, catalogTimeline);
}
@@ -6302,10 +6306,12 @@ public class CatalogOpExecutor {
}
// Add partitions to metastore.
- Map<String, Long> partitionToEventId = catalog_.isEventProcessingActive() ?
- Maps.newHashMap() : null;
+ Map<String, Long> partitionToEventId = Maps.newHashMap();
String annotation = String.format("Recovering %d partitions for %s",
hmsPartitions.size(), tbl.getFullName());
+ if (DebugUtils.hasDebugAction(debugAction,
DebugUtils.ENABLE_EVENT_PROCESSOR)) {
+ catalog_.startEventsProcessor();
+ }
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation);
MetaStoreClient msClient =
catalog_.getMetaStoreClient(catalogTimeline)) {
List<Partition> addedPartitions = addHmsPartitions(msClient, tbl,
hmsPartitions,
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index f1b150fbc..67b05e4e4 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -83,6 +83,9 @@ public class DebugUtils {
// debug action label for introducing delay in loading table metadata.
public static final String LOAD_TABLES_DELAY = "impalad_load_tables_delay";
+ // debug action to enable eventProcessor
+ public static final String ENABLE_EVENT_PROCESSOR = "enable_event_processor";
+
/**
* Returns true if the label of action is set in the debugActions
*/
diff --git
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index c54c04f26..5a800ca60 100644
---
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -4056,6 +4056,40 @@ public class MetastoreEventsProcessorTest {
catalog_, currentEventId, metaDataFilter,
EVENTS_BATCH_SIZE_PER_RPC).size(), 8);
}
+ @Test
+ public void testAlterTableWithEpDisabled() throws Exception {
+ try {
+ createDatabaseFromImpala(TEST_DB_NAME, null);
+ String testTable = "testAlterTableNoError";
+ createTableFromImpala(TEST_DB_NAME, testTable, true);
+ eventsProcessor_.processEvents();
+ // set EP to paused state and execute Alter table add partition query
+ eventsProcessor_.pause();
+ long numberOfSelfEventsBefore =
+ eventsProcessor_.getMetrics()
+
.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount();
+ TPartitionDef partitionDef = new TPartitionDef();
+ partitionDef.addToPartition_spec(new TPartitionKeyValue("p1", "100"));
+ partitionDef.addToPartition_spec(new TPartitionKeyValue("p2", "200"));
+ alterTableAddPartition(TEST_DB_NAME, testTable, partitionDef,
+ "enable_event_processor");
+ eventsProcessor_.processEvents();
+ assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
+ long numberOfSelfEventsAfter =
+ eventsProcessor_.getMetrics()
+
.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount();
+ // expect ADD_PARTITION event to be skipped as self-event
+ assertEquals("Unexpected self events skipped: ", numberOfSelfEventsAfter,
+ numberOfSelfEventsBefore + 1);
+ } catch (NullPointerException ex) {
+ throw new CatalogException("Exception occured while applying
AlterTableEvent", ex);
+ } finally {
+ if (eventsProcessor_.getStatus() != EventProcessorStatus.ACTIVE) {
+ eventsProcessor_.start();
+ }
+ }
+ }
+
private void createDatabase(String catName, String dbName,
Map<String, String> params) throws TException {
try(MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
@@ -4353,8 +4387,17 @@ public class MetastoreEventsProcessorTest {
*/
private void alterTableAddPartition(
String dbName, String tblName, TPartitionDef partitionDef) throws
ImpalaException {
+ alterTableAddPartition(dbName, tblName,partitionDef, null);
+ }
+
+ private void alterTableAddPartition(String dbName, String tblName,
+ TPartitionDef partitionDef, String debugActions) throws ImpalaException {
TDdlExecRequest req = new TDdlExecRequest();
- req.setQuery_options(new TDdlQueryOptions());
+ TDdlQueryOptions queryOptions = new TDdlQueryOptions();
+ if (debugActions != null) {
+ queryOptions.setDebug_action(debugActions);
+ }
+ req.setQuery_options(queryOptions);
req.setDdl_type(TDdlType.ALTER_TABLE);
TAlterTableParams alterTableParams = new TAlterTableParams();
alterTableParams.setTable_name(new TTableName(dbName, tblName));