This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 9e0649b9c IMPALA-12867: Filter files to OPTIMIZE based on file size
9e0649b9c is described below
commit 9e0649b9ceac86643d69afeb62d32e01bbc43717
Author: Noemi Pap-Takacs <[email protected]>
AuthorDate: Mon Mar 25 10:00:59 2024 +0100
IMPALA-12867: Filter files to OPTIMIZE based on file size
The OPTIMIZE TABLE statement is currently used to rewrite the entire
Iceberg table. With the 'FILE_SIZE_THRESHOLD_MB' option, the user can
specify a file size limit to rewrite only small files.
Syntax: OPTIMIZE TABLE <table_name> [(FILE_SIZE_THRESHOLD_MB=<value>)];
The value of the threshold is the file size in MBs. It must be a
non-negative integer. Data files larger than the given limit will only
be rewritten if they are referenced from delete files.
If only 1 file is selected in a partition, it will not be rewritten.
If the threshold is 0, only the delete files and the referenced data
files will be rewritten.
IMPALA-12839: 'Optimizing empty table should be no-op' is also
resolved in this patch.
With the file selection option, the OPTIMIZE operation can operate
in 3 different modes:
- REWRITE_ALL: The entire table is rewritten. Either because the
compaction was triggered by a simple 'OPTIMIZE TABLE' command
without a specified 'FILE_SIZE_THRESHOLD_MB' parameter, or
because all files of the table are deletes/referenced by deletes
or are smaller than the limit.
- PARTIAL: If the value of 'FILE_SIZE_THRESHOLD_MB' parameter is
specified then only the small data files without deletes are selected
and the delete files are merged. Large data files without deletes
are kept to avoid unnecessary resource consuming writes.
- NOOP: When no files qualify for the selection criteria, there is
no need to rewrite any files. This is a no-operation.
Testing:
- Parser test
- FE unit tests
- E2E tests
Change-Id: Icfbb589513aacdb68a86c1aec4a0d39b12091820
Reviewed-on: http://gerrit.cloudera.org:8080/21388
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/runtime/dml-exec-state.cc | 7 +-
be/src/runtime/dml-exec-state.h | 3 +-
be/src/service/client-request-state.cc | 20 ++-
common/thrift/CatalogService.thrift | 4 +
common/thrift/Query.thrift | 23 +++
fe/src/main/cup/sql-parser.cup | 23 +--
.../org/apache/impala/analysis/OptimizeStmt.java | 143 ++++++++++++++++--
.../java/org/apache/impala/analysis/TableRef.java | 18 ++-
.../org/apache/impala/catalog/FeIcebergTable.java | 12 +-
.../apache/impala/planner/IcebergScanPlanner.java | 6 +-
.../java/org/apache/impala/service/Frontend.java | 30 +++-
.../impala/service/IcebergCatalogOpExecutor.java | 22 ++-
.../impala/util/IcebergOptimizeFileFilter.java | 154 ++++++++++++++++++++
fe/src/main/jflex/sql-scanner.flex | 1 +
.../apache/impala/analysis/AnalyzeStmtsTest.java | 2 +-
.../org/apache/impala/analysis/ParserTest.java | 7 +
.../apache/impala/util/IcebergFileFilterTest.java | 160 +++++++++++++++++++++
.../queries/QueryTest/iceberg-negative.test | 4 +-
.../queries/QueryTest/iceberg-optimize.test | 77 ++++++++++
tests/query_test/test_iceberg.py | 123 ++++++++++++++++
20 files changed, 790 insertions(+), 49 deletions(-)
diff --git a/be/src/runtime/dml-exec-state.cc b/be/src/runtime/dml-exec-state.cc
index 49e7f05bc..24cec2992 100644
--- a/be/src/runtime/dml-exec-state.cc
+++ b/be/src/runtime/dml-exec-state.cc
@@ -157,7 +157,8 @@ int64_t DmlExecState::GetNumModifiedRows() {
return result;
}
-bool DmlExecState::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update)
{
+bool DmlExecState::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update,
+ const TFinalizeParams& finalize_params) {
lock_guard<mutex> l(lock_);
for (const PartitionStatusMap::value_type& partition :
per_partition_status_) {
TUpdatedPartition updatedPartition;
@@ -171,6 +172,10 @@ bool
DmlExecState::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) {
}
catalog_update->updated_partitions[partition.first] = updatedPartition;
}
+ if (finalize_params.__isset.iceberg_params
+ && finalize_params.iceberg_params.operation ==
TIcebergOperation::OPTIMIZE) {
+ return true;
+ }
return catalog_update->updated_partitions.size() != 0;
}
diff --git a/be/src/runtime/dml-exec-state.h b/be/src/runtime/dml-exec-state.h
index 9c6a4e87f..aa6439edf 100644
--- a/be/src/runtime/dml-exec-state.h
+++ b/be/src/runtime/dml-exec-state.h
@@ -109,7 +109,8 @@ class DmlExecState {
/// Populates 'catalog_update' with PartitionStatusMap data.
/// Returns true if a catalog update is required, false otherwise.
- bool PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update);
+ bool PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update,
+ const TFinalizeParams& finalize_params);
/// For HDFS (and other Hadoop FileSystem) INSERT, moves all temporary
staging files
/// to their final destinations, as indicated by 'params', and creates new
partitions
diff --git a/be/src/service/client-request-state.cc
b/be/src/service/client-request-state.cc
index c91037392..ea2465b4f 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1548,7 +1548,7 @@ Status ClientRequestState::UpdateCatalog() {
catalog_update.__set_debug_action(exec_req.query_options.debug_action);
}
DmlExecState* dml_exec_state = GetCoordinator()->dml_exec_state();
- if (!dml_exec_state->PrepareCatalogUpdate(&catalog_update)) {
+ if (!dml_exec_state->PrepareCatalogUpdate(&catalog_update,
finalize_params)) {
VLOG_QUERY << "No partitions altered, not updating metastore (query id: "
<< PrintId(query_id()) << ")";
} else {
@@ -1575,7 +1575,8 @@ Status ClientRequestState::UpdateCatalog() {
TIcebergOperationParam& cat_ice_op = catalog_update.iceberg_operation;
catalog_update.__isset.iceberg_operation = true;
if (!CreateIcebergCatalogOps(finalize_params, &cat_ice_op)) {
- // No change, no need to update catalog.
+ VLOG_QUERY << "No Iceberg partitions altered, not updating metastore
"
+ << "(query id: " << PrintId(query_id()) << ")";
return Status::OK();
}
}
@@ -1671,8 +1672,19 @@ bool ClientRequestState::CreateIcebergCatalogOps(
update_catalog = false;
}
} else if (ice_finalize_params.operation == TIcebergOperation::OPTIMIZE) {
- cat_ice_op->__set_iceberg_data_files_fb(
- dml_exec_state->CreateIcebergDataFilesVector());
+ DCHECK(ice_finalize_params.__isset.optimize_params);
+ const TIcebergOptimizeParams& optimize_params =
ice_finalize_params.optimize_params;
+ if (optimize_params.mode == TIcebergOptimizationMode::NOOP) {
+ update_catalog = false;
+ } else {
+ cat_ice_op->__set_iceberg_data_files_fb(
+ dml_exec_state->CreateIcebergDataFilesVector());
+ if (optimize_params.mode == TIcebergOptimizationMode::PARTIAL) {
+ DCHECK(optimize_params.__isset.selected_data_files_without_deletes);
+ cat_ice_op->__set_replaced_data_files_without_deletes(
+ optimize_params.selected_data_files_without_deletes);
+ }
+ }
}
if (!update_catalog) query_events_->MarkEvent("No-op Iceberg DML statement");
return update_catalog;
diff --git a/common/thrift/CatalogService.thrift
b/common/thrift/CatalogService.thrift
index 66b1455bb..728c35bb0 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -247,6 +247,10 @@ struct TIcebergOperationParam {
// The data files referenced by the position delete files.
7: optional list<string> data_files_referenced_by_position_deletes
+
+ // Data files without deletes that are replaced by OPTIMIZE operation. Set
only if there
+ // is file filtering. Unset in case of full table compaction, which rewrites
all files.
+ 8: optional set<string> replaced_data_files_without_deletes;
}
// Per-partion info needed by Catalog to handle an INSERT.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 6fb804106..42745aa4e 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -940,6 +940,26 @@ struct TPlanExecInfo {
per_node_scan_ranges
}
+// Determines the type of the OPTIMIZE operation. Based on the number of files
selected
+// for compaction, it can be
+// 1. a full table rewrite,
+// 2. a partial optimization with file filtering, or
+// 3. no-op with no selected files.
+enum TIcebergOptimizationMode {
+ REWRITE_ALL = 0
+ PARTIAL = 1
+ NOOP = 2
+}
+
+struct TIcebergOptimizeParams {
+ 1: required TIcebergOptimizationMode mode;
+
+ // Stores the file paths to the data files without deletes that are targeted
by the
+ // OPTIMIZE operation. Set only if the mode is PARTIAL, which means that
data files are
+ // filtered (by size).
+ 2: optional set<string> selected_data_files_without_deletes;
+}
+
struct TIcebergDmlFinalizeParams {
// Type of the Iceberg operation
1: required Types.TIcebergOperation operation
@@ -949,6 +969,9 @@ struct TIcebergDmlFinalizeParams {
// Stores the Iceberg snapshot id of the target table for this DML operation.
3: optional i64 initial_snapshot_id;
+
+ // Stores additional information about the OPTIMIZE operation.
+ 4: optional TIcebergOptimizeParams optimize_params;
}
// Metadata required to finalize a query - that is, to clean up after the
query is done.
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index ce41abef8..3fe20eb66 100755
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -305,13 +305,13 @@ terminal
KW_ARRAY, KW_AS, KW_ASC, KW_AUTHORIZATION, KW_AVRO, KW_BETWEEN, KW_BIGINT,
KW_BINARY,
KW_BLOCKSIZE, KW_BOOLEAN, KW_BY, KW_CACHED, KW_CASCADE, KW_CASE, KW_CAST,
KW_CHANGE,
KW_CHAR, KW_CLASS, KW_CLOSE_FN, KW_COLUMN, KW_COLUMNS, KW_COMMENT,
KW_COMPRESSION,
- KW_COMPUTE, KW_CONSTRAINT, KW_CONVERT, KW_COPY, KW_CREATE, KW_CROSS,
KW_CUBE, KW_CURRENT, KW_DATA,
- KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_DECIMAL, KW_DEFAULT,
KW_DELETE,
+ KW_COMPUTE, KW_CONSTRAINT, KW_CONVERT, KW_COPY, KW_CREATE, KW_CROSS,
KW_CUBE, KW_CURRENT,
+ KW_DATA, KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_DECIMAL,
KW_DEFAULT, KW_DELETE,
KW_DELIMITED, KW_DESC, KW_DESCRIBE, KW_DISABLE, KW_DISTINCT, KW_DIV,
KW_DOUBLE,
KW_DROP, KW_ELSE, KW_ENABLE, KW_ENCODING, KW_END, KW_ENFORCED, KW_ESCAPED,
KW_EXCEPT, KW_EXECUTE,
KW_EXISTS, KW_EXPLAIN, KW_EXTENDED, KW_EXTERNAL, KW_FALSE, KW_FIELDS,
KW_FILEFORMAT,
- KW_FILES, KW_FINALIZE_FN, KW_FIRST, KW_FLOAT, KW_FOLLOWING, KW_FOR,
KW_FOREIGN,
- KW_FORMAT, KW_FORMATTED, KW_FROM, KW_FULL, KW_FUNCTION, KW_FUNCTIONS,
KW_GRANT,
+ KW_FILES, KW_FILE_SIZE_THRESHOLD_MB, KW_FINALIZE_FN, KW_FIRST, KW_FLOAT,
KW_FOLLOWING, KW_FOR,
+ KW_FOREIGN, KW_FORMAT, KW_FORMATTED, KW_FROM, KW_FULL, KW_FUNCTION,
KW_FUNCTIONS, KW_GRANT,
KW_GROUP, KW_GROUPING, KW_HASH, KW_HUDIPARQUET, KW_IGNORE, KW_HAVING,
KW_ICEBERG, KW_IF,
KW_ILIKE, KW_IN, KW_INCREMENTAL, KW_INIT_FN, KW_INNER, KW_INPATH, KW_INSERT,
KW_INT,
KW_INTERMEDIATE, KW_INTERSECT, KW_INTERVAL, KW_INTO, KW_INVALIDATE,
KW_IREGEXP, KW_IS,
@@ -324,11 +324,11 @@ terminal
KW_PARTITIONS, KW_PRECEDING, KW_PREPARE_FN, KW_PRIMARY, KW_PRODUCED,
KW_PURGE,
KW_RANGE, KW_RCFILE, KW_RECOVER, KW_REFERENCES, KW_REFRESH, KW_REGEXP,
KW_RELY,
KW_RENAME, KW_REPEATABLE, KW_REPLACE, KW_REPLICATION, KW_RESTRICT,
KW_RETURNS,
- KW_REVOKE, KW_RIGHT, KW_RLIKE, KW_ROLE, KW_ROLES, KW_ROLLUP, KW_ROW,
KW_ROWS, KW_RWSTORAGE, KW_SCHEMA,
- KW_SCHEMAS, KW_SELECT, KW_SELECTIVITY, KW_SEMI, KW_SEQUENCEFILE,
KW_SERDEPROPERTIES, KW_SERIALIZE_FN,
- KW_SET, KW_SHOW, KW_SMALLINT, KW_SETS, KW_SORT, KW_SPEC,
KW_STORAGE_HANDLER_URI, KW_STORED, KW_STRAIGHT_JOIN,
- KW_STRING, KW_STRUCT, KW_SYMBOL, KW_SYSTEM_TIME, KW_SYSTEM_VERSION,
- KW_TABLE, KW_TABLES, KW_TABLESAMPLE, KW_TBLPROPERTIES,
+ KW_REVOKE, KW_RIGHT, KW_RLIKE, KW_ROLE, KW_ROLES, KW_ROLLUP, KW_ROW,
KW_ROWS, KW_RWSTORAGE,
+ KW_SCHEMA, KW_SCHEMAS, KW_SELECT, KW_SELECTIVITY, KW_SEMI, KW_SEQUENCEFILE,
KW_SERDEPROPERTIES,
+ KW_SERIALIZE_FN, KW_SET, KW_SHOW, KW_SMALLINT, KW_SETS, KW_SORT, KW_SPEC,
+ KW_STORAGE_HANDLER_URI, KW_STORED, KW_STRAIGHT_JOIN, KW_STRING, KW_STRUCT,
KW_SYMBOL,
+ KW_SYSTEM_TIME, KW_SYSTEM_VERSION, KW_TABLE, KW_TABLES, KW_TABLESAMPLE,
KW_TBLPROPERTIES,
KW_TERMINATED, KW_TEXTFILE, KW_THEN, KW_TIMESTAMP, KW_TINYINT, KW_TRUNCATE,
KW_STATS,
KW_TO, KW_TRUE, KW_UDF, KW_UNBOUNDED, KW_UNCACHED, KW_UNION, KW_UNIQUE,
KW_UNKNOWN,
KW_UNNEST, KW_UNSET, KW_UPDATE, KW_UPDATE_FN, KW_UPSERT, KW_USE, KW_USING,
KW_VALIDATE,
@@ -1003,6 +1003,9 @@ delete_stmt ::=
optimize_stmt ::=
KW_OPTIMIZE KW_TABLE table_name:table
{: RESULT = new OptimizeStmt(table); :}
+ | KW_OPTIMIZE KW_TABLE table_name:table LPAREN KW_FILE_SIZE_THRESHOLD_MB
EQUAL
+ INTEGER_LITERAL:size RPAREN
+ {: RESULT = new OptimizeStmt(table, size.intValue()); :}
;
opt_query_stmt ::=
@@ -4341,6 +4344,8 @@ word ::=
{: RESULT = r.toString(); :}
| KW_FIELDS:r
{: RESULT = r.toString(); :}
+ | KW_FILE_SIZE_THRESHOLD_MB:r
+ {: RESULT = r.toString(); :}
| KW_FILEFORMAT:r
{: RESULT = r.toString(); :}
| KW_FILES:r
diff --git a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
index 0ab6efb69..59f4f9b09 100644
--- a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
@@ -18,33 +18,61 @@
package org.apache.impala.analysis;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.iceberg.DataFile;
import org.apache.impala.authorization.Privilege;
import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.HdfsPartition;
+import org.apache.impala.catalog.IcebergContentFileStore;
+import org.apache.impala.catalog.iceberg.GroupedContentFiles;
import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.ByteUnits;
+import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.Pair;
import org.apache.impala.planner.DataSink;
import org.apache.impala.planner.TableSink;
import org.apache.impala.rewrite.ExprRewriter;
+import org.apache.impala.thrift.TIcebergOptimizationMode;
import org.apache.impala.thrift.TSortingOrder;
import org.apache.impala.util.ExprUtil;
+import org.apache.impala.util.IcebergOptimizeFileFilter;
import org.apache.impala.util.IcebergUtil;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
/**
- * Representation of an OPTIMIZE statement used to execute table maintenance
tasks in
- * Iceberg tables, such as:
- * 1. compacting small files,
- * 2. merging delete deltas,
- * 3. rewriting the table according to the latest partition spec and schema.
+ * Representation of an OPTIMIZE TABLE statement used to execute table
maintenance tasks
+ * in Iceberg tables. It operates in 2+1 modes: full table compaction, partial
+ * compaction and no-operation. The mode is decided during analysis.
+ * 1. Full table compaction: It rewrites the entire table
+ * - compacting small files,
+ * - merging delete deltas,
+ * - rewriting the table according to the latest partition spec and schema.
+ * 2. Partial table compaction: This mode has an additional parameter,
+ * 'FILE_SIZE_THRESHOLD_MB', that puts an upper limit on the size of data
files without
+ * deletes to be selected for compaction. The operation executes the following
tasks:
+ * - compacting data files without deletes that are smaller than the given
threshold,
+ * - merging all delete deltas and compacting all data files with deletes,
+ * - rewriting the selected files according to the latest partition spec and
schema.
+ * Note that the use of the latest schema and partition spec for the entire
table is not
+ * guaranteed in partial compaction mode.
+ * 3. No-operation: covers every case when there is no change made to the
table:
+ * - the table is empty, or
+ * - there were no files that qualified for the selection criteria in partial
mode.
*/
public class OptimizeStmt extends DmlStatementBase {
-
// Target table name as seen by the parser.
private final TableName originalTableName_;
+ // Data files larger than the given value will only be compacted if they are
referenced
+ // from delete files.
+ // The value comes from FILE_SIZE_THRESHOLD_MB, but is converted to bytes.
+ private final long fileSizeThreshold_;
/////////////////////////////////////////
// BEGIN: Members that need to be reset()
@@ -70,6 +98,22 @@ public class OptimizeStmt extends DmlStatementBase {
private TSortingOrder sortingOrder_ = TSortingOrder.LEXICAL;;
// Exprs corresponding to the partition fields of the table.
protected List<Expr> partitionKeyExprs_ = new ArrayList<>();
+ // File paths of data files without deletes selected for compaction after
file size
+ // filtering.
+ private Set<String> selectedIcebergFilePaths_ = new HashSet<>();
+ // Describes the mode of this OPTIMIZE operation. Decided during analysis.
+ // NOOP: The table was empty or no files were selected for compaction. This
means that
+ // the operation has no effect.
+ // PARTIAL: In this mode only the selected files are compacted, all others
will remain
+ // unchanged. Files that will be selected:
+ // - data files without deletes that are smaller than fileSizeThreshold_,
+ // - all delete files,
+ // - all data files with deletes.
+ // Possible only if FILE_SIZE_THRESHOLD_MB is set.
+ // REWRITE_ALL: Rewrite all files of the table. This will ensure that the
optimized
+ // table has the latest schema and partition spec. Possible if
FILE_SIZE_THRESHOLD_MB is
+ // not set in the SQL command or large enough that all files get selected.
+ private TIcebergOptimizationMode mode_;
// END: Members that need to be reset()
/////////////////////////////////////////
@@ -77,12 +121,29 @@ public class OptimizeStmt extends DmlStatementBase {
public OptimizeStmt(TableName tableName) {
tableName_ = tableName;
originalTableName_ = tableName_;
+ fileSizeThreshold_ = -1;
+ }
+
+ public OptimizeStmt(TableName tableName, int fileSizeMb) {
+ tableName_ = tableName;
+ originalTableName_ = tableName_;
+ fileSizeThreshold_ = ((long) fileSizeMb) * ByteUnits.MEGABYTE;
}
private OptimizeStmt(OptimizeStmt other) {
super(other);
tableName_ = other.tableName_;
originalTableName_ = other.originalTableName_;
+ fileSizeThreshold_ = other.fileSizeThreshold_;
+ tableRef_ = other.tableRef_;
+ sourceStmt_ = other.sourceStmt_;
+ resultExprs_ = other.resultExprs_;
+ sortExprs_ = other.sortExprs_;
+ sortColumns_ = other.sortColumns_;
+ sortingOrder_ = other.sortingOrder_;
+ partitionKeyExprs_ = other.partitionKeyExprs_;
+ selectedIcebergFilePaths_ = other.selectedIcebergFilePaths_;
+ mode_ = other.mode_;
}
@Override
@@ -104,12 +165,11 @@ public class OptimizeStmt extends DmlStatementBase {
if (!(table_ instanceof FeIcebergTable)) {
throw new AnalysisException("OPTIMIZE is only supported for Iceberg
tables.");
}
- // TODO: IMPALA-12839 Optimizing empty table should be no-op.
- if (((FeIcebergTable) table_).getContentFileStore().getNumFiles() == 0) {
- throw new AnalysisException(String.format(
- "Table '%s' is empty.", table_.getFullName()));
- }
- IcebergUtil.validateIcebergTableForInsert((FeIcebergTable) table_);
+ FeIcebergTable iceTable = (FeIcebergTable) table_;
+ IcebergUtil.validateIcebergTableForInsert(iceTable);
+
+ selectFiles(iceTable);
+
prepareExpressions(analyzer);
createSourceStmt(analyzer);
setMaxTableSinks(analyzer_.getQueryOptions().getMax_fs_writers());
@@ -133,6 +193,8 @@ public class OptimizeStmt extends DmlStatementBase {
sortColumns_.clear();
sortingOrder_ = TSortingOrder.LEXICAL;
partitionKeyExprs_.clear();
+ selectedIcebergFilePaths_.clear();
+ mode_ = null;
}
public DataSink createDataSink() {
@@ -143,6 +205,50 @@ public class OptimizeStmt extends DmlStatementBase {
return tableSink;
}
+ private void selectFiles(FeIcebergTable iceTable) throws AnalysisException {
+ try {
+ GroupedContentFiles contentFiles = IcebergUtil.getIcebergFiles(
+ iceTable, Lists.newArrayList(), null);
+ if (contentFiles.isEmpty()) {
+ mode_ = TIcebergOptimizationMode.NOOP;
+ } else {
+ IcebergOptimizeFileFilter.FilterArgs args =
+ new IcebergOptimizeFileFilter.FilterArgs(contentFiles,
fileSizeThreshold_);
+ IcebergOptimizeFileFilter.FileFilteringResult filterResult =
+ IcebergOptimizeFileFilter.filterFilesBySize(args);
+ mode_ = filterResult.getOptimizationMode();
+
+ if (mode_ == TIcebergOptimizationMode.PARTIAL) {
+ List<HdfsPartition.FileDescriptor> selectedDataFilesWithoutDeletes =
+ dataFilesWithoutDeletesToFileDescriptors(
+ filterResult.getSelectedFilesWithoutDeletes(), iceTable);
+
tableRef_.setSelectedDataFilesForOptimize(selectedDataFilesWithoutDeletes);
+ collectAbsolutePaths(selectedDataFilesWithoutDeletes);
+ }
+ }
+ } catch (Exception e) {
+ throw new AnalysisException(e);
+ }
+ }
+
+ private List<HdfsPartition.FileDescriptor>
dataFilesWithoutDeletesToFileDescriptors(
+ List<DataFile> contentFiles, FeIcebergTable iceTable)
+ throws IOException, ImpalaRuntimeException {
+ GroupedContentFiles selectedContentFiles = new GroupedContentFiles();
+ selectedContentFiles.dataFilesWithoutDeletes = contentFiles;
+ IcebergContentFileStore selectedFiles = FeIcebergTable.Utils
+ .loadAllPartition(iceTable, selectedContentFiles);
+ return selectedFiles.getDataFilesWithoutDeletes();
+ }
+
+ private void collectAbsolutePaths(List<HdfsPartition.FileDescriptor>
selectedFiles) {
+ for (HdfsPartition.FileDescriptor fileDesc : selectedFiles) {
+ org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(
+ fileDesc.getAbsolutePath(((FeIcebergTable)
table_).getHdfsBaseDir()));
+ selectedIcebergFilePaths_.add(path.toUri().toString());
+ }
+ }
+
private void createSourceStmt(Analyzer analyzer) throws AnalysisException {
List<TableRef> tableRefs = Arrays.asList(tableRef_);
List<Column> columns = table_.getColumns();
@@ -170,7 +276,8 @@ public class OptimizeStmt extends DmlStatementBase {
private SlotRef createSlotRef(Analyzer analyzer, String colName)
throws AnalysisException {
- List<String> path = Path.createRawPath(tableRef_.getUniqueAlias(),
colName);
+ List<String> path = org.apache.impala.analysis.Path
+ .createRawPath(tableRef_.getUniqueAlias(), colName);
SlotRef ref = new SlotRef(path);
ref.analyze(analyzer);
return ref;
@@ -223,6 +330,16 @@ public class OptimizeStmt extends DmlStatementBase {
@Override
public List<Expr> getResultExprs() { return resultExprs_; }
+ public TIcebergOptimizationMode getOptimizationMode() {
+ Preconditions.checkState(isAnalyzed());
+ return mode_;
+ }
+
+ public Set<String> getSelectedIcebergFilePaths() {
+ Preconditions.checkState(isAnalyzed());
+ return selectedIcebergFilePaths_;
+ }
+
@Override
public void collectTableRefs(List<TableRef> tblRefs) {
tblRefs.add(new TableRef(tableName_.toPath(), null));
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableRef.java
b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
index d94008795..d46c82cba 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
@@ -31,9 +31,9 @@ import java.util.Set;
import org.apache.impala.authorization.Privilege;
import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.FeFsTable;
-import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.HdfsPartition;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.planner.JoinNode.DistributionMode;
import org.apache.impala.rewrite.ExprRewriter;
@@ -163,6 +163,10 @@ public class TableRef extends StmtNode {
// FOR SYSTEM_TIME AS OF <timestamp> or FOR SYSTEM_TIME AS OF <version>
clause.
protected TimeTravelSpec timeTravelSpec_;
+ // Iceberg data files without deletes selected for OPTIMIZE from this table
ref.
+ // Used only in PARTIAL optimization mode, otherwise it is null.
+ private List<HdfsPartition.FileDescriptor>
selectedDataFilesWithoutDeletesForOptimize_;
+
// END: Members that need to be reset()
/////////////////////////////////////////
@@ -265,6 +269,8 @@ public class TableRef extends StmtNode {
columns_ = new LinkedHashMap<>(other.columns_);
isHidden_ = other.isHidden_;
zippingUnnestType_ = other.zippingUnnestType_;
+ selectedDataFilesWithoutDeletesForOptimize_ =
+ other.selectedDataFilesWithoutDeletesForOptimize_;
}
@Override
@@ -805,6 +811,7 @@ public class TableRef extends StmtNode {
correlatedTupleIds_.clear();
desc_ = null;
if (timeTravelSpec_ != null) timeTravelSpec_.reset();
+ selectedDataFilesWithoutDeletesForOptimize_ = null;
}
public boolean isTableMaskingView() { return false; }
@@ -843,6 +850,15 @@ public class TableRef extends StmtNode {
return res;
}
+ public void setSelectedDataFilesForOptimize(
+ List<HdfsPartition.FileDescriptor> fileDescs) {
+ selectedDataFilesWithoutDeletesForOptimize_ = fileDescs;
+ }
+
+ public List<HdfsPartition.FileDescriptor> getSelectedDataFilesForOptimize() {
+ return selectedDataFilesWithoutDeletesForOptimize_;
+ }
+
void migratePropertiesTo(TableRef other) {
other.aliases_ = aliases_;
other.onClause_ = onClause_;
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index 98a13865c..af2f9efdd 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -768,12 +768,12 @@ public interface FeIcebergTable extends FeFsTable {
* path, using org.apache.hadoop.fs.Path to normalize the paths.
*/
public static IcebergContentFileStore loadAllPartition(
- IcebergTable table, GroupedContentFiles icebergFiles)
+ FeIcebergTable table, GroupedContentFiles icebergFiles)
throws IOException, ImpalaRuntimeException {
Map<String, HdfsPartition.FileDescriptor> hdfsFileDescMap = new
HashMap<>();
- Collection<HdfsPartition> partitions =
- ((HdfsTable)table.getFeFsTable()).partitionMap_.values();
- for (HdfsPartition partition : partitions) {
+ Collection<? extends FeFsPartition> partitions =
+ FeCatalogUtils.loadAllPartitions(table.getFeFsTable());
+ for (FeFsPartition partition : partitions) {
for (FileDescriptor fileDesc : partition.getFileDescriptors()) {
Path path = new
Path(fileDesc.getAbsolutePath(table.getHdfsBaseDir()));
hdfsFileDescMap.put(path.toUri().getPath(), fileDesc);
@@ -801,7 +801,7 @@ public interface FeIcebergTable extends FeFsTable {
}
private static Pair<String, HdfsPartition.FileDescriptor> getPathHashAndFd(
- ContentFile<?> contentFile, IcebergTable table,
+ ContentFile<?> contentFile, FeIcebergTable table,
Map<String, HdfsPartition.FileDescriptor> hdfsFileDescMap) throws
IOException {
String pathHash = IcebergUtil.getFilePathHash(contentFile);
HdfsPartition.FileDescriptor fd = getOrCreateIcebergFd(
@@ -809,7 +809,7 @@ public interface FeIcebergTable extends FeFsTable {
return new Pair<>(pathHash, fd);
}
- private static FileDescriptor getOrCreateIcebergFd(IcebergTable table,
+ private static FileDescriptor getOrCreateIcebergFd(FeIcebergTable table,
Map<String, HdfsPartition.FileDescriptor> hdfsFileDescMap,
ContentFile<?> contentFile) throws IllegalArgumentException,
IOException {
Path path = new Path(contentFile.path().toString());
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
index e059c221d..b51832c66 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java
@@ -176,7 +176,11 @@ public class IcebergScanPlanner {
private void setFileDescriptorsBasedOnFileStore() throws ImpalaException {
IcebergContentFileStore fileStore = getIceTable().getContentFileStore();
- dataFilesWithoutDeletes_ = fileStore.getDataFilesWithoutDeletes();
+ if (tblRef_.getSelectedDataFilesForOptimize() != null) {
+ dataFilesWithoutDeletes_ = tblRef_.getSelectedDataFilesForOptimize();
+ } else {
+ dataFilesWithoutDeletes_ = fileStore.getDataFilesWithoutDeletes();
+ }
dataFilesWithDeletes_ = fileStore.getDataFilesWithDeletes();
positionDeleteFiles_ = new HashSet<>(fileStore.getPositionDeleteFiles());
initEqualityIds(fileStore.getEqualityDeleteFiles());
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 1c0f9a0fa..d1d19ba5b 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -87,6 +87,7 @@ import org.apache.impala.analysis.DropTableOrViewStmt;
import org.apache.impala.analysis.GrantRevokePrivStmt;
import org.apache.impala.analysis.GrantRevokeRoleStmt;
import org.apache.impala.analysis.InsertStmt;
+import org.apache.impala.analysis.OptimizeStmt;
import org.apache.impala.analysis.Parser;
import org.apache.impala.analysis.QueryStmt;
import org.apache.impala.analysis.ResetMetadataStmt;
@@ -194,6 +195,8 @@ import org.apache.impala.thrift.TLoadDataReq;
import org.apache.impala.thrift.TLoadDataResp;
import org.apache.impala.thrift.TMetadataOpRequest;
import org.apache.impala.thrift.TConvertTableRequest;
+import org.apache.impala.thrift.TIcebergOptimizationMode;
+import org.apache.impala.thrift.TIcebergOptimizeParams;
import org.apache.impala.thrift.TPlanExecInfo;
import org.apache.impala.thrift.TPlanFragment;
import org.apache.impala.thrift.TPoolConfig;
@@ -2761,8 +2764,8 @@ public class Frontend {
addFinalizationParamsForIcebergModify(queryCtx, queryExecRequest,
analysisResult.getUpdateStmt(), TIcebergOperation.UPDATE);
} else if (analysisResult.isOptimizeStmt()) {
- addFinalizationParamsForIcebergModify(queryCtx, queryExecRequest,
- analysisResult.getOptimizeStmt(), TIcebergOperation.OPTIMIZE);
+ addFinalizationParamsForIcebergOptimize(queryCtx, queryExecRequest,
+ analysisResult.getOptimizeStmt());
}
}
return result;
@@ -2827,8 +2830,8 @@ public class Frontend {
}
/**
- * Add the finalize params to the queryExecRequest for a non-INSERT Iceberg
DML
- * statement: DELETE, UPDATE and OPTIMIZE.
+ * Add the finalize params to the queryExecRequest for an Iceberg modify
statement:
+ * DELETE and UPDATE.
*/
private static void addFinalizationParamsForIcebergModify(TQueryCtx queryCtx,
TQueryExecRequest queryExecRequest, DmlStatementBase dmlStmt,
@@ -2848,6 +2851,25 @@ public class Frontend {
queryExecRequest.setFinalize_params(finalizeParams);
}
+ private static void addFinalizationParamsForIcebergOptimize(TQueryCtx
queryCtx,
+ TQueryExecRequest queryExecRequest, OptimizeStmt optimizeStmt) {
+ FeTable targetTable = optimizeStmt.getTargetTable();
+ Preconditions.checkState(targetTable instanceof FeIcebergTable);
+ TFinalizeParams finalizeParams = addFinalizationParamsForDml(
+ queryCtx, targetTable, false);
+ TIcebergDmlFinalizeParams iceFinalizeParams =
addFinalizationParamsForIcebergDml(
+ (FeIcebergTable)targetTable, TIcebergOperation.OPTIMIZE);
+ TIcebergOptimizeParams optimizeParams = new TIcebergOptimizeParams();
+ optimizeParams.setMode(optimizeStmt.getOptimizationMode());
+ if (optimizeStmt.getOptimizationMode() ==
TIcebergOptimizationMode.PARTIAL) {
+ optimizeParams.setSelected_data_files_without_deletes(
+ optimizeStmt.getSelectedIcebergFilePaths());
+ }
+ iceFinalizeParams.setOptimize_params(optimizeParams);
+ finalizeParams.setIceberg_params(iceFinalizeParams);
+ queryExecRequest.setFinalize_params(finalizeParams);
+ }
+
private static TIcebergDmlFinalizeParams addFinalizationParamsForIcebergDml(
FeIcebergTable iceTable, TIcebergOperation iceOperation) {
TIcebergDmlFinalizeParams iceFinalizeParams = new
TIcebergDmlFinalizeParams();
diff --git
a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 404050471..3c9feb412 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -366,7 +366,7 @@ public class IcebergCatalogOpExecutor {
case INSERT: appendFiles(feIcebergTable, txn, icebergOp); break;
case DELETE: deleteRows(feIcebergTable, txn, icebergOp); break;
case UPDATE: updateRows(feIcebergTable, txn, icebergOp); break;
- case OPTIMIZE: rewriteTable(feIcebergTable, txn, icebergOp); break;
+ case OPTIMIZE: optimizeTable(feIcebergTable, txn, icebergOp); break;
default: throw new ImpalaRuntimeException(
"Unknown Iceberg operation: " + icebergOp.operation);
}
@@ -521,7 +521,7 @@ public class IcebergCatalogOpExecutor {
nullValueCounts, null, lowerBounds, upperBounds);
}
- private static void rewriteTable(FeIcebergTable feIcebergTable, Transaction
txn,
+ private static void optimizeTable(FeIcebergTable feIcebergTable, Transaction
txn,
TIcebergOperationParam icebergOp) throws ImpalaRuntimeException {
GroupedContentFiles contentFiles;
try {
@@ -533,11 +533,21 @@ public class IcebergCatalogOpExecutor {
throw new ImpalaRuntimeException(e.getMessage(), e);
}
RewriteFiles rewrite = txn.newRewrite();
- // Delete current data files from table.
- for (DataFile dataFile : contentFiles.dataFilesWithDeletes) {
- rewrite.deleteFile(dataFile);
+ // Delete current data files from table if the operation is a full table
rewrite.
+ // If there was file filtering, keep the data files without deletes that
were not
+ // selected, and delete only the selected, rewritten files.
+ if (icebergOp.isSetReplaced_data_files_without_deletes()) {
+ for (DataFile dataFile : contentFiles.dataFilesWithoutDeletes) {
+ if
(icebergOp.replaced_data_files_without_deletes.contains(dataFile.path())) {
+ rewrite.deleteFile(dataFile);
+ }
+ }
+ } else {
+ for (DataFile dataFile : contentFiles.dataFilesWithoutDeletes) {
+ rewrite.deleteFile(dataFile);
+ }
}
- for (DataFile dataFile : contentFiles.dataFilesWithoutDeletes) {
+ for (DataFile dataFile : contentFiles.dataFilesWithDeletes) {
rewrite.deleteFile(dataFile);
}
// Delete current delete files from table.
diff --git
a/fe/src/main/java/org/apache/impala/util/IcebergOptimizeFileFilter.java
b/fe/src/main/java/org/apache/impala/util/IcebergOptimizeFileFilter.java
new file mode 100644
index 000000000..bbb6dee7d
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/util/IcebergOptimizeFileFilter.java
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.iceberg.DataFile;
+import org.apache.impala.catalog.iceberg.GroupedContentFiles;
+import org.apache.impala.common.Pair;
+import org.apache.impala.thrift.TIcebergOptimizationMode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class provides file filtering for Iceberg files, based on file size.
Used by
+ * OptimizeStmt to filter small data files to compact.
+ * It also determines the mode of optimization: full table compaction, partial
compaction
+ * or none (no-op).
+ */
+public class IcebergOptimizeFileFilter {
+
+ public static class FileFilteringResult {
+ // Defines if the operation is a partial or full table compaction or no-op.
+ private final TIcebergOptimizationMode optimizationMode_;
+ // Contains the selected data files without deletes to compact.
+ private final List<DataFile> selectedFilesWithoutDeletes_;
+
+ public FileFilteringResult(TIcebergOptimizationMode mode,
+ List<DataFile> selectedFiles) {
+ optimizationMode_ = mode;
+ selectedFilesWithoutDeletes_ = selectedFiles;
+ }
+
+ public TIcebergOptimizationMode getOptimizationMode() {
+ return optimizationMode_;
+ }
+
+ public List<DataFile> getSelectedFilesWithoutDeletes() {
+ return selectedFilesWithoutDeletes_;
+ }
+ }
+
+
+ public static class FilterArgs {
+ // File size threshold in bytes. -1 indicates full table compaction with
no filtering.
+ private final long fileSizeThreshold_;
+ private final GroupedContentFiles filesToFilter_;
+
+ public FilterArgs(GroupedContentFiles filesToFilter, long
fileSizeThreshold) {
+ filesToFilter_ = filesToFilter;
+ fileSizeThreshold_ = fileSizeThreshold;
+ }
+ }
+
+ /**
+ * Filters Iceberg data files without deletes based on file size. Files
smaller than the
+ * given threshold will be selected for rewriting. All delete files and data
files with
+ * deletes should be compacted regardless of the filter criteria.
+ * If there is only 1 file per partition that qualifies for the selection
criteria,
+ * it should be excluded from the result, since rewriting it would be
redundant.
+ * @param args contains the file size threshold and the files to filter
+ * @return the mode of the optimization and the selected data files without
deletes in
+ * case of PARTIAL mode
+ */
+ public static FileFilteringResult filterFilesBySize(FilterArgs args) {
+ // Stores the selected data files without deletes.
+ List<DataFile> selectedFiles = new ArrayList<>();
+
+ // Select data files without deletes only if the file size threshold is
positive.
+ // If the file size threshold is 0, no data files without deletes will be
selected,
+ // only the delete files will be merged. It would be unnecessary to
iterate through
+ // the file descriptors.
+ if (args.fileSizeThreshold_ > 0) {
+ // Stores the number of selected data files without deletes plus all
data files with
+ // deletes per partition and the list of file paths to the selected data
files
+ // without deletes. The key is the partition hash.
+ Map<Integer, Pair<Integer, List<DataFile>>> filesPerPartition = new
HashMap<>();
+ // Add files from contentFiles.dataFilesWithoutDeletes filtered by
file_size.
+ // Only data files without deletes have to be filtered, all other files
will be
+ // rewritten.
+ // Group file descriptors per partition; if there is only 1 file per
partition,
+ // do not rewrite it.
+ for (DataFile dataFile : args.filesToFilter_.dataFilesWithoutDeletes) {
+ if (dataFile.fileSizeInBytes() < args.fileSizeThreshold_) {
+ int hashValue = dataFile.partition().hashCode();
+ Pair<Integer, List<DataFile>> partition =
filesPerPartition.computeIfAbsent(
+ hashValue, k -> new Pair<>(0, new ArrayList<>()));
+ partition.first += 1;
+ partition.second.add(dataFile);
+ }
+ }
+ // Also count data files with deletes when counting the files per
partition. Since
+ // all of them will be rewritten, we do not add the path, just the
number.
+ for (DataFile dataFile : args.filesToFilter_.dataFilesWithDeletes) {
+ int hashValue = dataFile.partition().hashCode();
+ if (filesPerPartition.get(hashValue) != null) {
+ filesPerPartition.get(hashValue).first += 1;
+ }
+ }
+ // If there are multiple data files in the partition, add all data files
without
+ // deletes to the selected files.
+ for (Pair<Integer, List<DataFile>> partition :
filesPerPartition.values()) {
+ if (partition.first > 1) {
+ selectedFiles.addAll(partition.second);
+ }
+ }
+ } else if (args.fileSizeThreshold_ < 0) {
+ // Select all files if FILE_SIZE_THRESHOLD_MB was not specified. We must
still
+ // calculate the optimization mode since the operation could be
'REWRITE_ALL'
+ // or 'NOOP'.
+ Preconditions.checkState(args.fileSizeThreshold_ == -1);
+ selectedFiles = args.filesToFilter_.dataFilesWithoutDeletes;
+ }
+ TIcebergOptimizationMode mode =
+ calculateOptimizationMode(args.filesToFilter_, selectedFiles);
+ if (mode != TIcebergOptimizationMode.PARTIAL) {
+ selectedFiles.clear();
+ }
+ return new FileFilteringResult(mode, selectedFiles);
+ }
+
+ private static TIcebergOptimizationMode calculateOptimizationMode(
+ GroupedContentFiles filesToFilter, List<DataFile> selectedFiles) {
+ // Check if no files are selected for optimization, and set the operation
no-op.
+ if (selectedFiles.isEmpty() &&
filesToFilter.dataFilesWithDeletes.isEmpty()) {
+ Preconditions.checkState(filesToFilter.positionDeleteFiles.isEmpty() &&
+ filesToFilter.equalityDeleteFiles.isEmpty());
+ return TIcebergOptimizationMode.NOOP;
+ } else {
+ if (selectedFiles.size() ==
filesToFilter.dataFilesWithoutDeletes.size()) {
+ return TIcebergOptimizationMode.REWRITE_ALL;
+ } else {
+ return TIcebergOptimizationMode.PARTIAL;
+ }
+ }
+ }
+}
diff --git a/fe/src/main/jflex/sql-scanner.flex
b/fe/src/main/jflex/sql-scanner.flex
index 50ba0b52a..33b067dfe 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -137,6 +137,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
keywordMap.put("fields", SqlParserSymbols.KW_FIELDS);
keywordMap.put("fileformat", SqlParserSymbols.KW_FILEFORMAT);
keywordMap.put("files", SqlParserSymbols.KW_FILES);
+ keywordMap.put("file_size_threshold_mb",
SqlParserSymbols.KW_FILE_SIZE_THRESHOLD_MB);
keywordMap.put("finalize_fn", SqlParserSymbols.KW_FINALIZE_FN);
keywordMap.put("first", SqlParserSymbols.KW_FIRST);
keywordMap.put("float", SqlParserSymbols.KW_FLOAT);
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 2f9405a67..abb1e71bc 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -4682,7 +4682,7 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
testNumberOfMembers(ValuesStmt.class, 0);
// Also check TableRefs.
- testNumberOfMembers(TableRef.class, 30);
+ testNumberOfMembers(TableRef.class, 31);
testNumberOfMembers(BaseTableRef.class, 0);
testNumberOfMembers(InlineViewRef.class, 10);
}
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index a9a5f4a16..a4cf578d3 100755
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -1967,6 +1967,13 @@ public class ParserTest extends FrontendTestBase {
@Test
public void TestOptimize() {
ParsesOk("optimize table t");
+ ParsesOk("optimize table t (file_size_threshold_mb=10)");
+ ParsesOk("optimize table t (file_size_threshold_mb=0)");
+ ParserError("optimize table t file_size_threshold_mb=10");
+ ParserError("optimize table t file_size_threshold_mb 10");
+ ParserError("optimize table t (file_size_threshold_mb=-10)");
+ ParserError("optimize table t (file_size_threshold_mb=0.1)");
+ ParserError("optimize table t 10");
ParserError("optimize t");
ParserError("optimize table t for system_time as of now()");
ParserError("optimize table t for system_version as of 12345");
diff --git a/fe/src/test/java/org/apache/impala/util/IcebergFileFilterTest.java
b/fe/src/test/java/org/apache/impala/util/IcebergFileFilterTest.java
new file mode 100644
index 000000000..25ce1217c
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/util/IcebergFileFilterTest.java
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+import org.apache.impala.catalog.iceberg.GroupedContentFiles;
+import org.apache.impala.thrift.TIcebergOptimizationMode;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class IcebergFileFilterTest {
+
+ private final static List<Integer> fileSizes =
+ Arrays.asList(2, 10, 11, 100, 101, 200, 222, 250);
+
+ private static PartitionSpec partitionSpec = buildPartitionSpec();
+
+ private static PartitionSpec buildPartitionSpec() {
+ Schema schema =
+ new Schema(
+ required(1, "a", Types.IntegerType.get()),
+ required(2, "b", Types.StringType.get()),
+ required(3, "data", Types.IntegerType.get()));
+ return PartitionSpec.builderFor(schema).identity("a").build();
+ }
+
+ private static DataFile buildDataFile(long fileSizeInBytes, String path,
+ String filename) {
+ DataFile df = DataFiles.builder(partitionSpec)
+ .withPath(path + "/" + filename + ".parquet")
+ .withFileSizeInBytes(fileSizeInBytes)
+ .withPartitionPath(path)
+ .withRecordCount(1)
+ .build();
+ return df;
+ }
+
+ private void checkFiltering(GroupedContentFiles contentFiles, int
fileSizeThreshold,
+ TIcebergOptimizationMode expectedMode, Set<String> expectedPaths) {
+ IcebergOptimizeFileFilter.FileFilteringResult result =
+ IcebergOptimizeFileFilter.filterFilesBySize(
+ new IcebergOptimizeFileFilter.FilterArgs(contentFiles,
fileSizeThreshold));
+ assertEquals(result.getSelectedFilesWithoutDeletes().size(),
+ expectedPaths != null ? expectedPaths.size() : 0);
+ assertEquals(result.getOptimizationMode(), expectedMode);
+ if (expectedMode == TIcebergOptimizationMode.PARTIAL) {
+ Preconditions.checkState(expectedPaths != null);
+ for (DataFile df : result.getSelectedFilesWithoutDeletes()) {
+ assertTrue(expectedPaths.contains(df.path()));
+ }
+ } else {
+ Preconditions.checkState(expectedPaths == null);
+ assertTrue(result.getSelectedFilesWithoutDeletes().isEmpty());
+ }
+ }
+
+ @Test
+ public void testUnpartitioned() {
+ GroupedContentFiles contentFiles = new GroupedContentFiles();
+
+ for (long fileSize : fileSizes) {
+ DataFile df = buildDataFile(fileSize, "a=1", "size_" + fileSize);
+ contentFiles.dataFilesWithoutDeletes.add(df);
+ }
+
+ checkFiltering(contentFiles, 0, TIcebergOptimizationMode.NOOP, null);
+ checkFiltering(contentFiles, 2, TIcebergOptimizationMode.NOOP, null);
+
+ Set<String> filePaths = new HashSet<>();
+ Collections.addAll(filePaths,
+ "a=1/size_2.parquet", "a=1/size_10.parquet", "a=1/size_11.parquet");
+ checkFiltering(contentFiles, 100, TIcebergOptimizationMode.PARTIAL,
filePaths);
+ checkFiltering(contentFiles, 500, TIcebergOptimizationMode.REWRITE_ALL,
null);
+ }
+
+ @Test
+ public void testPartitioned() {
+ GroupedContentFiles contentFiles = new GroupedContentFiles();
+
+ for (int i = 0; i < fileSizes.size(); i++) {
+ int size = fileSizes.get(i);
+ DataFile df = buildDataFile(size, "a=" + i % 3, "size_" + size);
+ contentFiles.dataFilesWithoutDeletes.add(df);
+ }
+ // Add data files that are alone in their partitions.
+ contentFiles.dataFilesWithoutDeletes.add(buildDataFile(100, "a=3",
"size_100"));
+ contentFiles.dataFilesWithoutDeletes.add(buildDataFile(120, "a=4",
"size_120"));
+
+ /*
+ Naming of the data files: size_[file_size](.parquet).
+ The content of the table per partition so far:
+ a=0: size_2, size_100, size_222
+ a=1: size_10, size_101, size_250
+ a=2: size_11, size_200
+ a=3: size_100
+ a=4: size_120
+ */
+
+ // Only a=0/size_2 meets the filtering criteria, but it is the only
selected file from
+ // the partition, so it will not be rewritten.
+ checkFiltering(contentFiles, 5, TIcebergOptimizationMode.NOOP, null);
+
+ // Add data files with deletes to check if they are considered in the 1
file per
+ // partition rule.
+ contentFiles.dataFilesWithDeletes.add(buildDataFile(10, "a=1", "d10"));
+ contentFiles.dataFilesWithDeletes.add(buildDataFile(100, "a=4", "d100"));
+
+ // No data files without deletes were selected, but there are data files
with deletes,
+ // so this is not a NOOP. Delete files should be merged with corresponding
data files.
+ Set<String> filePaths = new HashSet<>();
+ checkFiltering(contentFiles, 0, TIcebergOptimizationMode.PARTIAL,
filePaths);
+
+ // Data files without deletes selected: a=0/size_2, a=1/size_10,
a=2/size_11.
+ // There is another data file with deletes in a=1 (d10), so they will be
merged, but
+ // the other 2 files will not be rewritten since they are the only data
files in their
+ // partition.
+ filePaths.add("a=1/size_10.parquet");
+ checkFiltering(contentFiles, 12, TIcebergOptimizationMode.PARTIAL,
filePaths);
+
+ // a=2/size_11 and a=3/size_100 are the only files in their partitions, so
they will
+ // not be selected.
+ Collections.addAll(filePaths, "a=0/size_2.parquet", "a=0/size_100.parquet",
+ "a=1/size_101.parquet", "a=4/size_120.parquet");
+ checkFiltering(contentFiles, 200, TIcebergOptimizationMode.PARTIAL,
filePaths);
+
+ // a=3/size_100 is the only file in the a=3 partition, so it will not be
selected.
+ Collections.addAll(filePaths, "a=0/size_222.parquet",
"a=1/size_250.parquet",
+ "a=2/size_11.parquet", "a=2/size_200.parquet");
+ checkFiltering(contentFiles, 500, TIcebergOptimizationMode.PARTIAL,
filePaths);
+ }
+}
\ No newline at end of file
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index 605df7f5f..effb641fc 100644
---
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -772,10 +772,10 @@ AnalysisException: Impala can only write Parquet data
files, while table 'functi
---- QUERY
CREATE TABLE ice_complex (id BIGINT NULL, int_array ARRAY<INT> NULL) STORED AS
ICEBERG
TBLPROPERTIES ('format-version'='2');
-# Empty table cannot be optimized. TODO: IMPALA-12839 Optimizing empty table
should be no-op.
+# Tables with complex types cannot be optimized.
optimize table ice_complex;
---- CATCH
-AnalysisException: Table '$DATABASE.ice_complex' is empty.
+AnalysisException: Impala does not support writing tables with complex types.
Table '$DATABASE.ice_complex' has column 'int_array' with type: ARRAY<INT>
====
---- QUERY
# ICEBERG__DATA__SEQUENCE__NUMBER is not supported for non-Iceberg tables.
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test
index 4da54a8e4..ce3a8a483 100644
---
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test
+++
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test
@@ -1,5 +1,58 @@
====
---- QUERY
+CREATE TABLE ice_optimize_part (int_col int, string_col string, bool_col
boolean)
+PARTITIONED BY SPEC(int_col)
+STORED BY ICEBERG
+TBLPROPERTIES ('format-version'='2');
+====
+---- QUERY
+# Insert a value then delete everything from the table.
+INSERT INTO ice_optimize_part VALUES(1, 'one', true);
+DELETE FROM ice_optimize_part WHERE bool_col=true;
+SHOW FILES IN ice_optimize_part;
+---- LABELS
+Path,Size,Partition,EC Policy
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/int_col=1/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/int_col=1/delete-.*parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+# Testing that OPTIMIZE merged the data and delete files and the result is an
empty snapshot.
+OPTIMIZE TABLE ice_optimize_part;
+SHOW FILES IN ice_optimize_part;
+---- LABELS
+Path,Size,Partition,EC Policy
+---- RESULTS: VERIFY_IS_EQUAL
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+# Testing that OPTIMIZE empty table is no-op.
+DESCRIBE HISTORY ice_optimize_part;
+---- LABELS
+creation_time,snapshot_id,parent_id,is_current_ancestor
+---- RESULTS: VERIFY_IS_EQUAL
+row_regex:'.*','.*','NULL','TRUE'
+row_regex:'.*','.*','.*','TRUE'
+row_regex:'.*','.*','.*','TRUE'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+OPTIMIZE TABLE ice_optimize_part;
+DESCRIBE HISTORY ice_optimize_part;
+---- LABELS
+creation_time,snapshot_id,parent_id,is_current_ancestor
+---- RESULTS: VERIFY_IS_EQUAL
+row_regex:'.*','.*','NULL','TRUE'
+row_regex:'.*','.*','.*','TRUE'
+row_regex:'.*','.*','.*','TRUE'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
CREATE TABLE ice_optimize (int_col int, string_col string, bool_col boolean)
STORED BY ICEBERG
TBLPROPERTIES ('format-version'='2');
@@ -28,6 +81,30 @@ Path,Size,Partition,EC Policy
STRING, STRING, STRING, STRING
====
---- QUERY
+# Testing that OPTIMIZE empty table is no-op.
+DESCRIBE HISTORY ice_optimize;
+---- LABELS
+creation_time,snapshot_id,parent_id,is_current_ancestor
+---- RESULTS: VERIFY_IS_EQUAL
+row_regex:'.*','.*','NULL','TRUE'
+row_regex:'.*','.*','.*','TRUE'
+row_regex:'.*','.*','.*','TRUE'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+OPTIMIZE TABLE ice_optimize;
+DESCRIBE HISTORY ice_optimize;
+---- LABELS
+creation_time,snapshot_id,parent_id,is_current_ancestor
+---- RESULTS: VERIFY_IS_EQUAL
+row_regex:'.*','.*','NULL','TRUE'
+row_regex:'.*','.*','.*','TRUE'
+row_regex:'.*','.*','.*','TRUE'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
# Insert rows one by one to write multiple small files.
INSERT INTO ice_optimize VALUES(1, 'one', true);
INSERT INTO ice_optimize VALUES(2, 'two', false);
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 4a0311b61..48ae18232 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -17,6 +17,7 @@
from __future__ import absolute_import, division, print_function
from builtins import range
+from collections import defaultdict, namedtuple
import datetime
import logging
import os
@@ -1790,6 +1791,128 @@ class TestIcebergV2Table(IcebergTestSuite):
tbl_name, snapshot_before_last.get_snapshot_id()))
assert result_after_opt.data.sort() == result_time_travel.data.sort()
+ def _check_file_filtering(self, tbl_name, threshold_mb, mode,
had_partition_evolution):
+ threshold_bytes = threshold_mb * 1024 * 1024
+ DATA_FILE = "0"
+ DELETE_FILE = "1"
+ FileMetadata = namedtuple('FileMetadata', 'content, path, partition, size')
+ metadata_query = """select content, file_path, `partition`,
file_size_in_bytes
+ from {0}.`files`;""".format(tbl_name)
+ result = self.execute_query(metadata_query)
+ files_before = set()
+ files_per_partition = defaultdict(set)
+ for line in result.data:
+ file = FileMetadata._make(line.split("\t"))
+ partition = file.partition
+ files_per_partition[partition].add(file)
+ files_before.add(file)
+
+ selected_files = set()
+ partitions_with_removed_files = set()
+ for partition, files in files_per_partition.items():
+ if len(files) > 1:
+ num_small_files = 0
+ # count small datafiles
+ for file in files:
+ if file.content == DATA_FILE and int(file.size) < threshold_bytes:
+ num_small_files += 1
+ for file in files:
+ # We assume that a delete file in a partition references all data
files in
+ # that partition, because we cannot differentiate between data
files
+ # with/without deletes.
+ if file.content == DELETE_FILE:
+ selected_files.update(files)
+ partitions_with_removed_files.add(partition)
+ break
+ # Only merge small files if there are at least 2 of them.
+ elif num_small_files > 1 and int(file.size) < threshold_bytes:
+ selected_files.add(file)
+ partitions_with_removed_files.add(partition)
+
+ self.execute_query(
+ "OPTIMIZE TABLE {0} (file_size_threshold_mb={1});".format(tbl_name,
threshold_mb))
+ optimized_result = self.execute_query(metadata_query)
+ files_after = set()
+ for line in optimized_result.data:
+ file = FileMetadata._make(line.split("\t"))
+ files_after.add(file)
+
+ # Check the resulting files and the modified partitions after the OPTIMIZE
operation.
+ # files_after = files_before - selected_files + 1 new file per partition
+ # The result should not contain the files that were selected and should
contain 1 new
+ # file per written partition.
+ unchanged_files = files_before - selected_files
+ # Check that files that were not selected are still present in the result.
+ assert unchanged_files.issubset(files_after)
+ # Check that selected files are rewritten and not present in the result.
+ assert selected_files.isdisjoint(files_after)
+ new_files = files_after - unchanged_files
+ assert new_files == files_after - files_before
+
+ if mode == "NOOP":
+ assert selected_files == set([])
+ assert files_after == files_before
+ elif mode == "REWRITE_ALL":
+ assert selected_files == files_before
+ assert files_after.isdisjoint(files_before)
+ elif mode == "PARTIAL":
+ assert selected_files < files_before and selected_files != set([])
+ assert unchanged_files < files_after and unchanged_files != set([])
+ assert unchanged_files == files_after.intersection(files_before)
+
+ # Check that all delete files were merged.
+ for file in files_after:
+ assert file.content == DATA_FILE
+ # Check that there's only one new file in every partition.
+ partitions_with_new_files = set()
+ for file in new_files:
+ assert file.partition not in partitions_with_new_files
+ partitions_with_new_files.add(file.partition)
+ assert len(new_files) == len(partitions_with_new_files)
+
+ # WITH PARTITION EVOLUTION
+ # Only new partitions are written to.
+ # WITHOUT PARTITION EVOLUTION
+ if not had_partition_evolution:
+ # Check that 1 new content file is written in every updated partition.
+ assert len(new_files) == len(partitions_with_removed_files)
+ assert partitions_with_new_files == partitions_with_removed_files
+
+ def test_optimize_file_filtering(self, unique_database):
+ tbl_name = unique_database + ".ice_optimize_filter"
+ self.execute_query("""CREATE TABLE {0} partitioned by spec (l_linenumber)
+ STORED BY ICEBERG TBLPROPERTIES ('format-version'='2')
+ AS SELECT * FROM tpch_parquet.lineitem
+ WHERE l_quantity < 10;""".format(tbl_name))
+ self.execute_query("""INSERT INTO {0} SELECT * FROM tpch_parquet.lineitem
+ WHERE l_quantity>=10 AND
l_quantity<=12;""".format(tbl_name))
+ # There are no delete files in the table, so this should be a no-op. Check
that no new
+ # snapshot was created.
+ self._check_file_filtering(tbl_name, 0, "NOOP", False)
+ assert len(get_snapshots(self.client, tbl_name)) == 2
+ self._check_file_filtering(tbl_name, 5, "PARTIAL", False)
+ self._check_file_filtering(tbl_name, 50, "PARTIAL", False)
+ # Check that the following is a no-op, since the table is already in a
compact form.
+ self._check_file_filtering(tbl_name, 100, "NOOP", False)
+ self.execute_query("""UPDATE {0} SET l_linenumber=7 WHERE l_linenumber>4
AND
+ l_linestatus='F';""".format(tbl_name))
+ self._check_file_filtering(tbl_name, 6, "PARTIAL", False)
+ self.execute_query("""ALTER TABLE {0} SET PARTITION SPEC(l_linestatus);"""
+ .format(tbl_name))
+ self.execute_query("""UPDATE {0} SET l_shipmode='AIR' WHERE
l_shipmode='MAIL'
+ AND l_linenumber<4;""".format(tbl_name))
+ self.execute_query("""INSERT INTO {0} SELECT * FROM tpch_parquet.lineitem
+ WHERE l_quantity=13 AND
l_linenumber<3;""".format(tbl_name))
+ self.execute_query("""INSERT INTO {0} SELECT * FROM tpch_parquet.lineitem
+ WHERE l_quantity=14 AND
l_linenumber<3;""".format(tbl_name))
+ # Merges the delete files and rewrites the small files.
+ self._check_file_filtering(tbl_name, 2, "PARTIAL", True)
+ # Rewrites the remaining small files (2MB <= file_size < 100MB).
+ self._check_file_filtering(tbl_name, 100, "PARTIAL", True)
+ self.execute_query("""UPDATE {0} SET l_shipmode='AIR' WHERE
l_shipmode='MAIL';"""
+ .format(tbl_name))
+ # All partitions have delete files, therefore the entire table is
rewritten.
+ self._check_file_filtering(tbl_name, 100, "REWRITE_ALL", True)
# Tests to exercise the DIRECTED distribution mode for V2 Iceberg tables.
Note, that most
# of the test coverage is in TestIcebergV2Table.test_read_position_deletes but
since it