This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new c518d3c81 IMPALA-13501: Clean up uncommitted Iceberg files after
validation check failure
c518d3c81 is described below
commit c518d3c8182749b83f938105c605c1b67755513c
Author: Noemi Pap-Takacs <[email protected]>
AuthorDate: Fri Nov 22 11:43:21 2024 +0100
IMPALA-13501: Clean up uncommitted Iceberg files after validation check
failure
Iceberg supports multiple writers with optimistic concurrency.
Each writer can write new files which are then added to the table
after a validation check to ensure that the commit does not conflict
with other modifications made during the execution.
When there was a conflicting change which could not be resolved, it
means that the newly written files cannot be committed to the table,
so they used to become orphan files on the file system. Orphan files
can accumulate over time, taking up a lot of storage space. They do
not belong to the table because they are not referenced by any snapshot
and therefore they can't be removed by expiring snapshots.
This change introduces automatic cleanup of uncommitted files
after an unsuccessful DML operation to prevent creating orphan files.
No cleanup is done if Iceberg throws CommitStateUnknownException
because the update success or failure is unknown in this case.
Testing:
- E2E test: Injected ValidationException with debug option.
- stress test: Added a method to check that no orphan files were
created after failed conflicting commits.
Change-Id: Ibe59546ebf3c639b75b53dfa1daba37cef50eb21
Reviewed-on: http://gerrit.cloudera.org:8080/22189
Reviewed-by: Daniel Becker <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
.../apache/impala/service/CatalogOpExecutor.java | 47 +++++++----
.../impala/service/IcebergCatalogOpExecutor.java | 93 ++++++++++++----------
.../java/org/apache/impala/util/DebugUtils.java | 7 ++
tests/query_test/test_iceberg.py | 40 +++++++++-
tests/stress/test_update_stress.py | 33 +++++++-
5 files changed, 157 insertions(+), 63 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 32e2e33cd..33eb0d8f3 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -87,6 +87,7 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.mr.Catalogs;
import org.apache.impala.analysis.AlterTableSortByStmt;
@@ -7570,25 +7571,37 @@ public class CatalogOpExecutor {
throws ImpalaException {
FeIcebergTable iceTbl = (FeIcebergTable)table;
org.apache.iceberg.Transaction iceTxn =
IcebergUtil.getIcebergTransaction(iceTbl);
- IcebergCatalogOpExecutor.execute(iceTbl, iceTxn,
- update.getIceberg_operation());
- catalogTimeline.markEvent("Executed Iceberg operation " +
- update.getIceberg_operation().getOperation());
- if (isIcebergHmsIntegrationEnabled(iceTbl.getMetaStoreTable())) {
- // Add catalog service id and the 'newCatalogVersion' to the table
parameters.
- // This way we can avoid reloading the table on self-events (Iceberg
generates
- // an ALTER TABLE statement to set the new metadata_location).
- modification.registerInflightEvent();
- IcebergCatalogOpExecutor.addCatalogVersionToTxn(
- iceTxn, catalog_.getCatalogServiceId(),
modification.newVersionNumber());
- catalogTimeline.markEvent("Updated table properties");
- }
+ try {
+ DebugUtils.executeDebugAction(
+ update.getDebug_action(), DebugUtils.ICEBERG_CONFLICT);
+ IcebergCatalogOpExecutor.execute(iceTbl, iceTxn,
update.getIceberg_operation());
+ catalogTimeline.markEvent("Executed Iceberg operation " +
+ update.getIceberg_operation().getOperation());
+ if (isIcebergHmsIntegrationEnabled(iceTbl.getMetaStoreTable())) {
+ // Add catalog service id and the 'newCatalogVersion' to the table
parameters.
+ // This way we can avoid reloading the table on self-events (Iceberg
generates
+ // an ALTER TABLE statement to set the new metadata_location).
+ modification.registerInflightEvent();
+ IcebergCatalogOpExecutor.addCatalogVersionToTxn(
+ iceTxn, catalog_.getCatalogServiceId(),
modification.newVersionNumber());
+ catalogTimeline.markEvent("Updated table properties");
+ }
- if (update.isSetDebug_action()) {
- String debugAction = update.getDebug_action();
- DebugUtils.executeDebugAction(debugAction, DebugUtils.ICEBERG_COMMIT);
+ DebugUtils.executeDebugAction(update.getDebug_action(),
DebugUtils.ICEBERG_COMMIT);
+ iceTxn.commitTransaction();
+ // If we have no information about the success of the commit, we should
not delete
+ // anything.
+ } catch (CommitStateUnknownException u) {
+ throw new ImpalaRuntimeException(u.getMessage(), u);
+ // If the commit failed, the newly written files should be deleted to
avoid creating
+ // orphan files in the table. Only data/delete files need cleanup from
Impala, Iceberg
+ // deletes the metadata files created for this update.
+ } catch (Exception e) {
+
IcebergCatalogOpExecutor.cleanupUncommittedFiles(update.getIceberg_operation());
+ LOG.info("Cleaned up uncommitted data files after failing to commit them
to " +
+ "table {}", table.getFullName());
+ throw new ImpalaRuntimeException(e.getMessage(), e);
}
- iceTxn.commitTransaction();
modification.markInflightEventRegistrationComplete();
}
diff --git
a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 4435ad9e1..1d1a50e8d 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -18,7 +18,6 @@
package org.apache.impala.service;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -45,7 +44,6 @@ import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Term;
import org.apache.iceberg.hive.HiveCatalog;
@@ -77,6 +75,8 @@ import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.impala.common.FileSystemUtil.deleteIfExists;
+
/**
* This is a helper for the CatalogOpExecutor to provide Iceberg related DDL
functionality
* such as creating and dropping tables from Iceberg.
@@ -381,19 +381,15 @@ public class IcebergCatalogOpExecutor {
DeleteFile deleteFile = createDeleteFile(feIcebergTable, buf);
rowDelta.addDeletes(deleteFile);
}
- try {
- // Validate that there are no conflicting data files, because if data
files are
- // added in the meantime, they potentially contain records that should
have been
- // affected by this DELETE operation.
- rowDelta.validateFromSnapshot(icebergOp.getInitial_snapshot_id());
- rowDelta.validateNoConflictingDataFiles();
- rowDelta.validateDataFilesExist(
- icebergOp.getData_files_referenced_by_position_deletes());
- rowDelta.validateDeletedFiles();
- rowDelta.commit();
- } catch (ValidationException e) {
- throw new ImpalaRuntimeException(e.getMessage(), e);
- }
+ // Validate that there are no conflicting data files, because if data
files are
+ // added in the meantime, they potentially contain records that should
have been
+ // affected by this DELETE operation.
+ rowDelta.validateFromSnapshot(icebergOp.getInitial_snapshot_id());
+ rowDelta.validateNoConflictingDataFiles();
+ rowDelta.validateDataFilesExist(
+ icebergOp.getData_files_referenced_by_position_deletes());
+ rowDelta.validateDeletedFiles();
+ rowDelta.commit();
}
private static void updateRows(FeIcebergTable feIcebergTable, Transaction
txn,
@@ -409,22 +405,18 @@ public class IcebergCatalogOpExecutor {
DataFile dataFile = createDataFile(feIcebergTable, buf);
rowDelta.addRows(dataFile);
}
- try {
- // Validate that there are no conflicting data files, because if data
files are
- // added in the meantime, they potentially contain records that should
have been
- // affected by this UPDATE operation. Also validate that there are no
conflicting
- // delete files, because we don't want to revive records that have been
deleted
- // in the meantime.
- rowDelta.validateFromSnapshot(icebergOp.getInitial_snapshot_id());
- rowDelta.validateNoConflictingDataFiles();
- rowDelta.validateNoConflictingDeleteFiles();
- rowDelta.validateDataFilesExist(
- icebergOp.getData_files_referenced_by_position_deletes());
- rowDelta.validateDeletedFiles();
- rowDelta.commit();
- } catch (ValidationException e) {
- throw new ImpalaRuntimeException(e.getMessage(), e);
- }
+ // Validate that there are no conflicting data files, because if data
files are
+ // added in the meantime, they potentially contain records that should
have been
+ // affected by this UPDATE operation. Also validate that there are no
conflicting
+ // delete files, because we don't want to revive records that have been
deleted
+ // in the meantime.
+ rowDelta.validateFromSnapshot(icebergOp.getInitial_snapshot_id());
+ rowDelta.validateNoConflictingDataFiles();
+ rowDelta.validateNoConflictingDeleteFiles();
+ rowDelta.validateDataFilesExist(
+ icebergOp.getData_files_referenced_by_position_deletes());
+ rowDelta.validateDeletedFiles();
+ rowDelta.commit();
}
private static DataFile createDataFile(FeIcebergTable feIcebergTable,
ByteBuffer buf)
@@ -488,11 +480,7 @@ public class IcebergCatalogOpExecutor {
DataFile dataFile = createDataFile(feIcebergTable, buf);
batchWrite.addFile(dataFile);
}
- try {
- batchWrite.commit();
- } catch (ValidationException e) {
- throw new ImpalaRuntimeException(e.getMessage(), e);
- }
+ batchWrite.commit();
}
private static Metrics buildDataFileMetrics(FbIcebergDataFile dataFile) {
@@ -564,12 +552,8 @@ public class IcebergCatalogOpExecutor {
DataFile dataFile = createDataFile(feIcebergTable, buf);
rewrite.addFile(dataFile);
}
- try {
- rewrite.validateFromSnapshot(icebergOp.getInitial_snapshot_id());
- rewrite.commit();
- } catch (ValidationException e) {
- throw new ImpalaRuntimeException(e.getMessage(), e);
- }
+ rewrite.validateFromSnapshot(icebergOp.getInitial_snapshot_id());
+ rewrite.commit();
}
/**
@@ -594,4 +578,29 @@ public class IcebergCatalogOpExecutor {
String.valueOf(version));
updateProps.commit();
}
+
+ /**
+ * When a write operation fails the validation check due to conflicts, the
data/delete
+ * files created by it cannot be committed to the table: they would become
orphan files.
+ * This method deletes these uncommitted data/delete files from the file
system.
+ * @param icebergOp the failed DML operation that contains the list of newly
written
+ * data and delete files which have to be cleaned up.
+ */
+ protected static void cleanupUncommittedFiles(TIcebergOperationParam
icebergOp) {
+ if (icebergOp.isSetIceberg_data_files_fb()) {
+ deleteIcebergFiles(icebergOp.getIceberg_data_files_fb());
+ }
+ if (icebergOp.isSetIceberg_delete_files_fb()) {
+ deleteIcebergFiles(icebergOp.getIceberg_delete_files_fb());
+ }
+ }
+
+ private static void deleteIcebergFiles(Iterable<ByteBuffer> icebergFiles) {
+ for (ByteBuffer buf : icebergFiles) {
+ String pathString =
FbIcebergDataFile.getRootAsFbIcebergDataFile(buf).path();
+ if (pathString != null) {
+ deleteIfExists(new org.apache.hadoop.fs.Path((pathString)));
+ }
+ }
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index 781acd766..9f7750d8f 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Random;
import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,6 +60,9 @@ public class DebugUtils {
// debug action label for Iceberg transaction commit.
public static final String ICEBERG_COMMIT = "catalogd_iceberg_commit";
+ // debug action label for Iceberg validation check failure.
+ public static final String ICEBERG_CONFLICT = "catalogd_iceberg_conflict";
+
// debug action label for Iceberg create table.
public static final String ICEBERG_CREATE = "catalogd_iceberg_create";
@@ -196,6 +200,9 @@ public class DebugUtils {
case "commitfailedexception":
exceptionToThrow = new CommitFailedException(param);
break;
+ case "validationexception":
+ exceptionToThrow = new ValidationException(param);
+ break;
case "icebergalreadyexistsexception":
exceptionToThrow = new org.apache.iceberg.exceptions.
AlreadyExistsException("Table already exists");
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 91e08723f..ab51bab13 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -44,7 +44,7 @@ from tests.common.file_utils import (
create_iceberg_table_from_directory,
create_table_from_parquet)
from tests.shell.util import run_impala_shell_cmd
-from tests.util.filesystem_utils import get_fs_path, IS_HDFS, WAREHOUSE
+from tests.util.filesystem_utils import get_fs_path, IS_HDFS, WAREHOUSE,
FILESYSTEM_PREFIX
from tests.util.get_parquet_metadata import get_parquet_metadata
from tests.util.iceberg_util import cast_ts, quote, get_snapshots,
IcebergCatalogs
@@ -2001,6 +2001,44 @@ class TestIcebergV2Table(IcebergTestSuite):
def test_merge_star(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-merge-star', vector, unique_database)
+ def test_cleanup(self, unique_database):
+ """Test that all uncommitted files written by Impala are removed from
the file
+ system when a DML commit to an Iceberg table fails, and that the effects
of the
+ failed operation are not visible."""
+ table_name = "iceberg_cleanup_failure"
+ fq_tbl_name = unique_database + "." + table_name
+ # The query options that inject an iceberg validation check failure.
+ fail_ice_commit_options = {'debug_action':
+ 'CATALOGD_ICEBERG_CONFLICT:EXCEPTION@'
+ 'ValidationException@'
+ 'simulated validation check failure'}
+ # Create an iceberg table and insert a row.
+ self.execute_query_expect_success(self.client, """CREATE TABLE {0} (i
int)
+ STORED BY ICEBERG TBLPROPERTIES
('format-version'='2')""".format(fq_tbl_name))
+ self.execute_query_expect_success(self.client,
+ "insert into {0} values (1)".format(fq_tbl_name))
+
+ # Run a query that would update a row, but pass the query options that
+ # will cause the iceberg validation check to fail.
+ err = self.execute_query_expect_failure(self.client,
+ "update {0} set i=2 where i=1".format(fq_tbl_name),
+ query_options=fail_ice_commit_options)
+ # Check that we get the error message.
+ assert error_msg_expected(
+ str(err), "ValidationException: simulated validation check failure")
+ # Check that the table content was not updated.
+ data = self.execute_query_expect_success(self.client,
+ "select * from {0}".format(fq_tbl_name))
+ assert len(data.data) == 1
+ assert data.data[0] == '1'
+
+ # Check that the uncommitted data and delete files are removed from the
file system
+ # and only the first data file remains.
+ table_location = "{0}/test-warehouse/{1}.db/{2}/data".format(
+ FILESYSTEM_PREFIX, unique_database, table_name)
+ files_result = check_output(["hdfs", "dfs", "-ls", table_location])
+ assert "Found 1 items" in files_result
+
# Tests to exercise the DIRECTED distribution mode for V2 Iceberg tables.
Note, that most
# of the test coverage is in TestIcebergV2Table.test_read_position_deletes but
since it
diff --git a/tests/stress/test_update_stress.py
b/tests/stress/test_update_stress.py
index 1148f7ce4..bb966c864 100644
--- a/tests/stress/test_update_stress.py
+++ b/tests/stress/test_update_stress.py
@@ -21,12 +21,13 @@ import pytest
import random
import time
from multiprocessing import Value
+from subprocess import check_output
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.parametrize import UniqueDatabase
from tests.common.test_dimensions import create_exec_option_dimension
from tests.stress.stress_util import run_tasks, Task
-from tests.util.filesystem_utils import IS_HDFS
+from tests.util.filesystem_utils import FILESYSTEM_PREFIX, IS_HDFS
# Longer-running UPDATE tests are executed here
@@ -238,7 +239,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite):
"""Issues DELETE and UPDATE statements in parallel in a way that some
invariants must be true when a spectator process inspects the table."""
- tbl_name = "%s.test_concurrent_deletes_and_updates" % unique_database
+ tbl_suffix = "test_concurrent_deletes_and_updates"
+ tbl_name = unique_database + "." + tbl_suffix
self.client.set_configuration_option("SYNC_DDL", "true")
self.client.execute("""create table {0} (id int, j bigint)
stored as iceberg
@@ -259,6 +261,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite):
result = self.client.execute("select count(*) from {}".format(tbl_name))
assert result.data == ['0']
+ self.check_no_orphan_files(unique_database, tbl_suffix)
+
@pytest.mark.execute_serially
@UniqueDatabase.parametrize(sync_ddl=True)
def test_iceberg_deletes_and_updates_and_optimize(self, unique_database):
@@ -266,7 +270,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite):
invariants must be true when a spectator process inspects the table.
An optimizer thread also invokes OPTMIZE regularly on the table."""
- tbl_name = "%s.test_concurrent_write_and_optimize" % unique_database
+ tbl_suffix = "test_concurrent_write_and_optimize"
+ tbl_name = unique_database + "." + tbl_suffix
self.client.set_configuration_option("SYNC_DDL", "true")
self.client.execute("""create table {0} (id int, j bigint)
stored as iceberg
@@ -293,3 +298,25 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite):
result = self.client.execute("select count(*) from {}".format(tbl_name))
assert result.data == ['0']
+
+ self.check_no_orphan_files(unique_database, tbl_suffix)
+
+ def check_no_orphan_files(self, unique_database, table_name):
+ # Check that the uncommitted data and delete files are removed from the
file system
+ # and only those files remain that are reachable through valid snapshots.
+ data_files_in_tbl_result = self.client.execute(
+ "select file_path from {}.{}.all_files;".format(unique_database,
table_name))
+ data_files_in_tbl = [row.split('/test-warehouse')[1]
+ for row in data_files_in_tbl_result.data]
+
+ table_location = "{0}/test-warehouse/{1}.db/{2}/data".format(
+ FILESYSTEM_PREFIX, unique_database, table_name)
+ data_files_on_fs_result = check_output(["hdfs", "dfs", "-ls",
table_location])
+ # The first row of the HDFS result is a summary, the following lines
contain
+ # 1 file each.
+ data_files_on_fs_rows = data_files_on_fs_result.strip().split('\n')[1:]
+ data_files_on_fs = [row.split()[-1].split('/test-warehouse')[1]
+ for row in data_files_on_fs_rows]
+
+ assert len(data_files_in_tbl) == len(data_files_on_fs_rows)
+ assert set(data_files_on_fs) == set(data_files_in_tbl)