This is an automated email from the ASF dual-hosted git repository.
yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new c50d682a6f [#7415] fix(core): Use doWithCommitAndFetchResult in
deleteColumnsByLegacyTimeline (#7428)
c50d682a6f is described below
commit c50d682a6f244ee9556bf9b641646bfa475e8417
Author: liuxian <[email protected]>
AuthorDate: Wed Jun 18 21:18:55 2025 +0800
[#7415] fix(core): Use doWithCommitAndFetchResult in
deleteColumnsByLegacyTimeline (#7428)
<!--
1. Title: [#<issue>] <type>(<scope>): <subject>
Examples:
- "[#123] feat(operator): support xxx"
- "[#233] fix: check null before access result in xxx"
- "[MINOR] refactor: fix typo in variable name"
- "[MINOR] docs: fix typo in README"
- "[#255] test: fix flaky test NameOfTheTest"
Reference: https://www.conventionalcommits.org/en/v1.0.0/
2. If the PR is unfinished, please mark this PR as draft.
-->
### What changes were proposed in this pull request?
This PR modifies the deleteColumnsByLegacyTimeline method in the
TableColumnMetaService class, changing the transaction handling from
doWithoutCommitAndFetchResult to doWithCommitAndFetchResult.
### Why are the changes needed?
This change is necessary because the deleteColumnsByLegacyTimeline
method is a standalone operation that needs to commit its transaction
immediately after execution. Using doWithCommitAndFetchResult ensures:
1. The deletion operation executes and commits within its own
transaction, not depending on external transactions
2. Consistency with garbage collection operations for other entity types
(like MetalakeMeta, CatalogMeta, etc.) which also use
doWithCommitAndFetchResult
3. Prevention of database locking issues caused by long-running
transactions
4. Immediate visibility of deletion results to other transactions
Fix: #7415
### Does this PR introduce _any_ user-facing change?
No, this is an improvement to the internal implementation and does not
affect user interfaces or functionality.
### How was this patch tested?
The modified method was verified through the junit test
TestTableColumnMetaService.testDeleteColumnsByLegacyTimeline, which
ensures:
1. The system correctly identifies and deletes columns with a specific
legacy timeline
2. The batch deletion functionality (with limit parameter) works
properly
3. The deletion operation is idempotent, meaning repeated calls do not
cause errors
Co-authored-by: liuxian131 <[email protected]>
---
.../relational/service/TableColumnMetaService.java | 3 +-
.../service/TestTableColumnMetaService.java | 119 +++++++++++++++++++++
2 files changed, 120 insertions(+), 2 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableColumnMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableColumnMetaService.java
index 1527811fa9..0fb78573be 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableColumnMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableColumnMetaService.java
@@ -106,8 +106,7 @@ public class TableColumnMetaService {
}
public int deleteColumnsByLegacyTimeline(Long legacyTimeline, int limit) {
- // deleteColumns will be done in the outside transaction, so we don't do
commit here.
- return SessionUtils.doWithoutCommitAndFetchResult(
+ return SessionUtils.doWithCommitAndFetchResult(
TableColumnMapper.class,
mapper -> mapper.deleteColumnPOsByLegacyTimeline(legacyTimeline,
limit));
}
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableColumnMetaService.java
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableColumnMetaService.java
index a0e9f3427f..b6e1ef2843 100644
---
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableColumnMetaService.java
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableColumnMetaService.java
@@ -19,7 +19,12 @@
package org.apache.gravitino.storage.relational.service;
import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@@ -34,7 +39,9 @@ import org.apache.gravitino.rel.expressions.literals.Literals;
import org.apache.gravitino.rel.types.Types;
import org.apache.gravitino.storage.RandomIdGenerator;
import org.apache.gravitino.storage.relational.TestJDBCBackend;
+import org.apache.gravitino.storage.relational.mapper.TableColumnMapper;
import org.apache.gravitino.storage.relational.po.ColumnPO;
+import org.apache.gravitino.storage.relational.session.SqlSessions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.com.google.common.collect.Lists;
@@ -547,4 +554,116 @@ public class TestTableColumnMetaService extends
TestJDBCBackend {
Assertions.assertEquals(expectedColumn.auditInfo(),
column.auditInfo());
});
}
+
+ @Test
+ public void testDeleteColumnsByLegacyTimeline() throws IOException {
+ String catalogName = "catalog1";
+ String schemaName = "schema1";
+ createParentEntities(METALAKE_NAME, catalogName, schemaName, auditInfo);
+
+ List<ColumnEntity> columns = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ columns.add(
+ ColumnEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("column_" + i)
+ .withPosition(i)
+ .withComment("comment_" + i)
+ .withDataType(Types.StringType.get())
+ .withNullable(true)
+ .withAutoIncrement(false)
+ .withAuditInfo(auditInfo)
+ .build());
+ }
+
+ TableEntity createdTable =
+ TableEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("legacy_table")
+ .withNamespace(Namespace.of(METALAKE_NAME, catalogName,
schemaName))
+ .withColumns(columns)
+ .withAuditInfo(auditInfo)
+ .build();
+
+ TableMetaService.getInstance().insertTable(createdTable, false);
+ long now = System.currentTimeMillis();
+ long legacyTimeline = now - 100000; // Past timestamp
+ Connection connection = null;
+ PreparedStatement stmt = null;
+ try {
+ connection = SqlSessions.getSqlSession().getConnection();
+ for (ColumnEntity column : columns) {
+ String sql =
+ "UPDATE "
+ + TableColumnMapper.COLUMN_TABLE_NAME
+ + " SET deleted_at = ? WHERE column_id = ?";
+ stmt = connection.prepareStatement(sql);
+ stmt.setLong(1, legacyTimeline);
+ stmt.setLong(2, column.id());
+ stmt.executeUpdate();
+ stmt.close();
+ }
+ SqlSessions.commitAndCloseSqlSession();
+ } catch (Exception e) {
+ SqlSessions.rollbackAndCloseSqlSession();
+ throw new IOException("Failed to update column deleted_at timestamp", e);
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ }
+ int count = countColumnsByTableId(legacyTimeline);
+ Assertions.assertEquals(5, count, "Should have 5 columns with legacy
timeline");
+ TableColumnMetaService service = TableColumnMetaService.getInstance();
+ service.deleteColumnsByLegacyTimeline(now, 3);
+ count = countColumnsByTableId(legacyTimeline);
+ Assertions.assertEquals(2, count, "Should have 2 columns remaining");
+ service.deleteColumnsByLegacyTimeline(now, 10);
+ count = countColumnsByTableId(legacyTimeline);
+ Assertions.assertEquals(0, count, "Should have no columns remaining");
+ Assertions.assertTrue(
+
MetalakeMetaService.getInstance().deleteMetalake(NameIdentifier.of(METALAKE_NAME),
true));
+ }
+
+ private int countColumnsByTableId(long legacyTimeline) throws IOException {
+ int count = 0;
+ Connection connection = null;
+ PreparedStatement stmt = null;
+ ResultSet rs = null;
+ try {
+ connection = SqlSessions.getSqlSession().getConnection();
+ String sql =
+ "SELECT COUNT(*) FROM " + TableColumnMapper.COLUMN_TABLE_NAME + "
WHERE deleted_at = ?";
+ stmt = connection.prepareStatement(sql);
+ stmt.setLong(1, legacyTimeline);
+ rs = stmt.executeQuery();
+ if (rs.next()) {
+ count = rs.getInt(1);
+ }
+ SqlSessions.commitAndCloseSqlSession();
+ } catch (Exception e) {
+ SqlSessions.rollbackAndCloseSqlSession();
+ throw new IOException("Failed to count columns with legacy timeline", e);
+ } finally {
+ if (rs != null) {
+ try {
+ rs.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
+ }
+ }
+ return count;
+ }
}