This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 421b43d65d [#10444] fix(core): batchSelect queries missing
version-info JOIN and field aliases (#10451)
421b43d65d is described below
commit 421b43d65d4517d7a0bc09aef9f8578ee88ba047
Author: Jerry Shao <[email protected]>
AuthorDate: Tue Mar 17 20:58:29 2026 +0800
[#10444] fix(core): batchSelect queries missing version-info JOIN and field
aliases (#10451)
### What changes were proposed in this pull request?
- Fixed `batchSelectTableByIdentifier` in `TableMetaBaseSQLProvider` to
include a LEFT JOIN with `table_version_info`, selecting all
version-info fields (`format`, `properties`, `partitioning`,
`sort_orders`, `distribution`, `indexes`, `comment`). Previously these
fields were omitted, causing incomplete `TableEntity` objects to be
stored in the entity cache.
- Remove `batchSelectJobByIdentifier` in `JobMetaBaseSQLProvider` as it
is not used in the current code base.
- Added regression tests in `TestTableMetaService` covering all
version-info fields and the columns limitation.
### Why are the changes needed?
`MetadataAuthzHelper.preloadToCache()` calls `entityStore().batchGet()`
for TABLE entities on every authorized request. This invokes
`batchSelectTableByIdentifier`, which was missing the LEFT JOIN with
`table_version_info`. As a result, incomplete `TableEntity` objects
(with no `format` or `properties`) were stored in the entity cache.
After a server restart, `tableFormatCache` (an in-memory Guava cache in
`GenericCatalogOperations`) is cleared. When loading a Delta/generic
lakehouse table, `tableOps()` misses the `tableFormatCache` and falls
back to `entityStore().get()`, which returns the incomplete cached
entity. This causes `Preconditions.checkArgument(format != null, "Table
format for %s is null...")` to throw an `IllegalArgumentException`.
Fix: #10444
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- Added `testBatchGetTableByIdentifierIncludesVersionInfoFields` to
verify all version-info fields (`format`, `properties`, `comment`,
`distribution`, `sortOrders`, `partitioning`, `indexes`) are returned by
`batchGetTableByIdentifier`.
- Added `testBatchGetTableByIdentifierDoesNotIncludeColumns` to document
that columns are fetched via a separate path (`getTableByIdentifier`)
and are intentionally not included in batch results.
---
.../storage/relational/mapper/JobMetaMapper.java | 6 -
.../mapper/JobMetaSQLProviderFactory.java | 7 --
.../provider/base/JobMetaBaseSQLProvider.java | 28 -----
.../provider/base/TableMetaBaseSQLProvider.java | 23 +++-
.../relational/service/TestTableMetaService.java | 125 +++++++++++++++++++++
5 files changed, 142 insertions(+), 47 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
index 66ed42469e..7bfe738595 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaMapper.java
@@ -74,10 +74,4 @@ public interface JobMetaMapper {
@SelectProvider(type = JobMetaSQLProviderFactory.class, method =
"batchSelectJobByRunIds")
List<JobPO> batchSelectJobByRunIds(
@Param("metalakeName") String metalakeName, @Param("jobRunIds")
List<Long> jobRunIds);
-
- @SelectProvider(type = JobMetaSQLProviderFactory.class, method =
"batchSelectJobByIdentifier")
- List<JobPO> batchSelectJobByIdentifier(
- @Param("metalakeName") String metalakeName,
- @Param("jobTemplateName") String jobTemplateName,
- @Param("jobNames") List<String> jobNames);
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
index 35e0839572..269e652504 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/JobMetaSQLProviderFactory.java
@@ -104,11 +104,4 @@ public class JobMetaSQLProviderFactory {
@Param("metalakeName") String metalakeName, @Param("jobRunIds")
List<Long> jobRunIds) {
return getProvider().batchSelectJobByRunIds(metalakeName, jobRunIds);
}
-
- public static String batchSelectJobByIdentifier(
- @Param("metalakeName") String metalakeName,
- @Param("jobTemplateName") String jobTemplateName,
- @Param("jobNames") List<String> jobNames) {
- return getProvider().batchSelectJobByIdentifier(metalakeName,
jobTemplateName, jobNames);
- }
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
index 3c4c88ae59..054c093311 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/JobMetaBaseSQLProvider.java
@@ -183,34 +183,6 @@ public class JobMetaBaseSQLProvider {
+ " WHERE deleted_at < #{legacyTimeline} AND deleted_at > 0 LIMIT
#{limit}";
}
- public String batchSelectJobByIdentifier(
- @Param("metalakeName") String metalakeName,
- @Param("jobTemplateName") String jobTemplateName,
- @Param("jobNames") List<String> jobNames) {
- return "<script>"
- + "SELECT jm.job_run_id, jm.metalake_id, jm.job_template_id,"
- + " jm.job_execution_id, jm.job_run_status, jm.job_finished_at,"
- + " jm.audit_info, jm.current_version, jm.last_version, jm.deleted_at"
- + " FROM "
- + JobMetaMapper.TABLE_NAME
- + " jm"
- + " JOIN "
- + JobTemplateMetaMapper.TABLE_NAME
- + " jtm ON jm.job_template_id = jtm.job_template_id"
- + " JOIN "
- + MetalakeMetaMapper.TABLE_NAME
- + " mm ON jm.metalake_id = mm.metalake_id"
- + " WHERE mm.metalake_name = #{metalakeName}"
- + " AND jtm.job_template_name = #{jobTemplateName}"
- + " AND jm.job_run_id IN ("
- + "<foreach collection='jobNames' item='jobName' separator=','>"
- + "#{jobName}"
- + "</foreach>"
- + " )"
- + " AND jm.deleted_at = 0 AND jtm.deleted_at = 0 AND mm.deleted_at = 0"
- + "</script>";
- }
-
public String batchSelectJobByRunIds(
@Param("metalakeName") String metalakeName, @Param("jobRunIds")
List<Long> jobRunIds) {
return "<script>"
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
index 6770ebbc2e..60d8bf0d51 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/TableMetaBaseSQLProvider.java
@@ -350,19 +350,30 @@ public class TableMetaBaseSQLProvider {
tm.table_id AS tableId,
tm.table_name AS tableName,
tm.metalake_id AS metalakeId,
+ tm.catalog_id AS catalogId,
+ tm.schema_id AS schemaId,
tm.audit_info AS auditInfo,
tm.current_version AS currentVersion,
tm.last_version AS lastVersion,
- tm.deleted_at AS deletedAt
- FROM %s tm
- WHERE schema_id = #{schemaId}
- AND table_name IN
+ tm.deleted_at AS deletedAt,
+ tv.format AS format,
+ tv.properties AS properties,
+ tv.partitioning AS partitions,
+ tv.sort_orders AS sortOrders,
+ tv.distribution AS distribution,
+ tv.indexes AS indexes,
+ tv.comment AS comment
+ FROM %s tm LEFT JOIN %s tv
+ ON tm.table_id = tv.table_id AND tm.current_version = tv.version
+ AND tv.deleted_at = 0
+ WHERE tm.schema_id = #{schemaId}
+ AND tm.table_name IN
<foreach collection="tableNames" item="tableName" open="("
separator="," close=")">
#{tableName}
</foreach>
- AND deleted_at = 0
+ AND tm.deleted_at = 0
</script>
"""
- .formatted(TABLE_NAME);
+ .formatted(TABLE_NAME, TableVersionMapper.TABLE_NAME);
}
}
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableMetaService.java
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableMetaService.java
index 9c4eea387a..0c30660818 100644
---
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableMetaService.java
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableMetaService.java
@@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
@@ -30,6 +31,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.gravitino.Entity;
import org.apache.gravitino.EntityAlreadyExistsException;
+import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.meta.BaseMetalake;
@@ -37,7 +39,19 @@ import org.apache.gravitino.meta.CatalogEntity;
import org.apache.gravitino.meta.ColumnEntity;
import org.apache.gravitino.meta.SchemaEntity;
import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.distributions.Strategy;
import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.sorts.SortDirection;
+import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.sorts.SortOrders;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes;
import org.apache.gravitino.rel.types.Types;
import org.apache.gravitino.storage.RandomIdGenerator;
import org.apache.gravitino.storage.relational.TestJDBCBackend;
@@ -235,6 +249,117 @@ public class TestTableMetaService extends TestJDBCBackend
{
compareTwoColumns(updatedTable2.columns(), retrievedTable2.columns());
}
+ @TestTemplate
+ public void testBatchGetTableByIdentifierIncludesVersionInfoFields() throws
IOException {
+ createAndInsertMakeLake(metalakeName);
+ createAndInsertCatalog(metalakeName, catalogName);
+ createAndInsertSchema(metalakeName, catalogName, schemaName);
+
+ Map<String, String> tableProps =
+ ImmutableMap.of(Table.PROPERTY_TABLE_FORMAT, "delta", "location",
"s3://bucket/path");
+ Distribution distribution = Distributions.of(Strategy.HASH, 4,
NamedReference.field("col1"));
+ SortOrder[] sortOrders =
+ new SortOrder[] {SortOrders.of(NamedReference.field("col1"),
SortDirection.ASCENDING)};
+ Transform[] partitioning = new Transform[] {Transforms.identity("col2")};
+ Index[] indexes = new Index[] {Indexes.primary("pk", new String[][]
{{"col1"}})};
+
+ TableEntity table =
+ TableEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("delta_table")
+ .withNamespace(NamespaceUtil.ofTable(metalakeName, catalogName,
schemaName))
+ .withProperties(tableProps)
+ .withComment("test table comment")
+ .withDistribution(distribution)
+ .withSortOrders(sortOrders)
+ .withPartitioning(partitioning)
+ .withIndexes(indexes)
+ .withAuditInfo(AUDIT_INFO)
+ .build();
+ TableMetaService.getInstance().insertTable(table, false);
+
+ NameIdentifier tableIdent =
+ NameIdentifier.of(metalakeName, catalogName, schemaName,
"delta_table");
+ List<TableEntity> results =
+
TableMetaService.getInstance().batchGetTableByIdentifier(List.of(tableIdent));
+
+ Assertions.assertEquals(1, results.size());
+ TableEntity result = results.get(0);
+
+ // Verify properties (including format) are returned
+ Assertions.assertNotNull(result.properties());
+ Assertions.assertEquals("delta",
result.properties().get(Table.PROPERTY_TABLE_FORMAT));
+ Assertions.assertEquals("s3://bucket/path",
result.properties().get("location"));
+
+ // Verify comment is returned
+ Assertions.assertEquals("test table comment", result.comment());
+
+ // Verify distribution is returned
+ Assertions.assertNotNull(result.distribution());
+ Assertions.assertEquals(distribution, result.distribution());
+
+ // Verify sort orders are returned
+ Assertions.assertNotNull(result.sortOrders());
+ Assertions.assertArrayEquals(sortOrders, result.sortOrders());
+
+ // Verify partitioning is returned — compare field references since
serialization may change
+ // the concrete implementation class (e.g., IdentityTransform ->
IdentityPartitioningDTO)
+ Assertions.assertNotNull(result.partitioning());
+ Assertions.assertEquals(partitioning.length, result.partitioning().length);
+ Assertions.assertEquals(
+ ((NamedReference.FieldReference)
partitioning[0].references()[0]).fieldName()[0],
+ ((NamedReference.FieldReference)
result.partitioning()[0].references()[0]).fieldName()[0]);
+
+ // Verify indexes are returned
+ Assertions.assertNotNull(result.indexes());
+ Assertions.assertArrayEquals(indexes, result.indexes());
+ }
+
+ @TestTemplate
+ public void testBatchGetTableByIdentifierDoesNotIncludeColumns() throws
IOException {
+ createAndInsertMakeLake(metalakeName);
+ createAndInsertCatalog(metalakeName, catalogName);
+ createAndInsertSchema(metalakeName, catalogName, schemaName);
+
+ ColumnEntity column =
+ ColumnEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("col1")
+ .withPosition(0)
+ .withDataType(Types.IntegerType.get())
+ .withNullable(true)
+ .withAutoIncrement(false)
+ .withAuditInfo(AUDIT_INFO)
+ .build();
+ TableEntity table =
+ TableEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("table_with_columns")
+ .withNamespace(NamespaceUtil.ofTable(metalakeName, catalogName,
schemaName))
+ .withColumns(List.of(column))
+ .withAuditInfo(AUDIT_INFO)
+ .build();
+ TableMetaService.getInstance().insertTable(table, false);
+
+ // Verify getTableByIdentifier (single-get path) returns columns
+ TableEntity singleGetResult =
+ TableMetaService.getInstance()
+ .getTableByIdentifier(
+ NameIdentifier.of(metalakeName, catalogName, schemaName,
"table_with_columns"));
+ Assertions.assertEquals(1, singleGetResult.columns().size());
+
+ // batchGetTableByIdentifier does not fetch columns (separate
table_column_meta table)
+ NameIdentifier tableIdent =
+ NameIdentifier.of(metalakeName, catalogName, schemaName,
"table_with_columns");
+ List<TableEntity> results =
+
TableMetaService.getInstance().batchGetTableByIdentifier(List.of(tableIdent));
+
+ Assertions.assertEquals(1, results.size());
+ Assertions.assertTrue(
+ results.get(0).columns().isEmpty(),
+ "batchGetTableByIdentifier does not fetch columns from
table_column_meta");
+ }
+
private void compareTwoColumns(
List<ColumnEntity> expectedColumns, List<ColumnEntity> actualColumns) {
Assertions.assertEquals(expectedColumns.size(), actualColumns.size());