This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 66f057e09f [core] Add rollback schema to Table and Catalog (#7498)
66f057e09f is described below
commit 66f057e09f5a25fbe39b40c30d464ddba7b472ca
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 23 12:09:48 2026 +0800
[core] Add rollback schema to Table and Catalog (#7498)
PR Summary: Adds a rollbackSchema(long schemaId) API to both Table and
Catalog interfaces, allowing users to roll back a table's schema to a
previous version. All schema versions newer than the target are deleted,
with validation to prevent deletion when snapshots/tags/changelogs still
reference newer schemas.
---
docs/static/rest-catalog-open-api.yaml | 44 +++++++++
.../main/java/org/apache/paimon/rest/RESTApi.java | 19 ++++
.../java/org/apache/paimon/rest/ResourcePaths.java | 11 +++
.../rest/requests/RollbackSchemaRequest.java | 46 +++++++++
.../java/org/apache/paimon/catalog/Catalog.java | 16 ++++
.../org/apache/paimon/catalog/DelegateCatalog.java | 6 ++
.../java/org/apache/paimon/rest/RESTCatalog.java | 12 +++
.../org/apache/paimon/schema/SchemaManager.java | 54 +++++++++++
.../paimon/table/AbstractFileStoreTable.java | 18 ++++
.../apache/paimon/table/CatalogEnvironment.java | 16 ++++
.../paimon/table/DelegatedFileStoreTable.java | 5 +
.../java/org/apache/paimon/table/FormatTable.java | 5 +
.../org/apache/paimon/table/ReadonlyTable.java | 8 ++
.../main/java/org/apache/paimon/table/Table.java | 4 +
.../org/apache/paimon/utils/SnapshotManager.java | 5 +-
.../org/apache/paimon/rest/RESTCatalogServer.java | 32 +++++++
.../org/apache/paimon/rest/RESTCatalogTest.java | 91 ++++++++++++++++++
.../apache/paimon/schema/SchemaManagerTest.java | 103 +++++++++++++++++++++
.../apache/paimon/table/SimpleTableTestBase.java | 51 ++++++++++
tools/maven/suppressions.xml | 1 +
20 files changed, 545 insertions(+), 2 deletions(-)
diff --git a/docs/static/rest-catalog-open-api.yaml
b/docs/static/rest-catalog-open-api.yaml
index 237d9a987e..a4bda071f9 100644
--- a/docs/static/rest-catalog-open-api.yaml
+++ b/docs/static/rest-catalog-open-api.yaml
@@ -669,6 +669,42 @@ paths:
$ref: '#/components/examples/TagNotExistError'
"500":
$ref: '#/components/responses/ServerErrorResponse'
+ /v1/{prefix}/databases/{database}/tables/{table}/rollback-schema:
+ post:
+ tags:
+ - table
+ summary: Rollback table schema
+ operationId: rollbackSchema
+ parameters:
+ - name: prefix
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: database
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: table
+ in: path
+ required: true
+ schema:
+ type: string
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/RollbackSchemaRequest'
+ responses:
+ "200":
+ description: Success, no content
+ "401":
+ $ref: '#/components/responses/UnauthorizedErrorResponse'
+ "404":
+ $ref: '#/components/responses/TableNotExistErrorResponse'
+ "500":
+ $ref: '#/components/responses/ServerErrorResponse'
/v1/{prefix}/databases/{database}/tables/{table}/token:
get:
tags:
@@ -2936,6 +2972,14 @@ components:
type: integer
format: int64
nullable: true
+ RollbackSchemaRequest:
+ type: object
+ required:
+ - schemaId
+ properties:
+ schemaId:
+ type: integer
+ format: int64
Instant:
anyOf:
- $ref: '#/components/schemas/SnapshotInstant'
diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
index 1e549a116c..a939c5ce16 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
@@ -51,6 +51,7 @@ import
org.apache.paimon.rest.requests.MarkDonePartitionsRequest;
import org.apache.paimon.rest.requests.RegisterTableRequest;
import org.apache.paimon.rest.requests.RenameTableRequest;
import org.apache.paimon.rest.requests.ResetConsumerRequest;
+import org.apache.paimon.rest.requests.RollbackSchemaRequest;
import org.apache.paimon.rest.requests.RollbackTableRequest;
import org.apache.paimon.rest.responses.AlterDatabaseResponse;
import org.apache.paimon.rest.responses.AuthTableQueryResponse;
@@ -711,6 +712,24 @@ public class RESTApi {
restAuthFunction);
}
+ /**
+ * Rollback schema for table.
+ *
+ * @param identifier database name and table name.
+ * @param schemaId the target schema version to rollback to
+ * @throws NoSuchResourceException Exception thrown on HTTP 404 means the
table not exists
+ * @throws ForbiddenException Exception thrown on HTTP 403 means don't
have the permission for
+ * this table
+ */
+ public void rollbackSchema(Identifier identifier, long schemaId) {
+ RollbackSchemaRequest request = new RollbackSchemaRequest(schemaId);
+ client.post(
+ resourcePaths.rollbackSchemaTable(
+ identifier.getDatabaseName(),
identifier.getObjectName()),
+ request,
+ restAuthFunction);
+ }
+
/**
* Create table.
*
diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
index baf0b8a4af..466ac975b4 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
@@ -120,6 +120,17 @@ public class ResourcePaths {
ROLLBACK);
}
+ public String rollbackSchemaTable(String databaseName, String objectName) {
+ return SLASH.join(
+ V1,
+ prefix,
+ DATABASES,
+ encodeString(databaseName),
+ TABLES,
+ encodeString(objectName),
+ "rollback-schema");
+ }
+
public String registerTable(String databaseName) {
return SLASH.join(V1, prefix, DATABASES, encodeString(databaseName),
REGISTER);
}
diff --git
a/paimon-api/src/main/java/org/apache/paimon/rest/requests/RollbackSchemaRequest.java
b/paimon-api/src/main/java/org/apache/paimon/rest/requests/RollbackSchemaRequest.java
new file mode 100644
index 0000000000..f35d22ca80
--- /dev/null
+++
b/paimon-api/src/main/java/org/apache/paimon/rest/requests/RollbackSchemaRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.requests;
+
+import org.apache.paimon.rest.RESTRequest;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/** Request for rollback table schema. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class RollbackSchemaRequest implements RESTRequest {
+
+ private static final String FIELD_SCHEMA_ID = "schemaId";
+
+ @JsonProperty(FIELD_SCHEMA_ID)
+ private final long schemaId;
+
+ @JsonCreator
+ public RollbackSchemaRequest(@JsonProperty(FIELD_SCHEMA_ID) long schemaId)
{
+ this.schemaId = schemaId;
+ }
+
+ @JsonGetter(FIELD_SCHEMA_ID)
+ public long getSchemaId() {
+ return schemaId;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 3947a950e5..8924dc6ae1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -809,6 +809,22 @@ public interface Catalog extends AutoCloseable {
void rollbackTo(Identifier identifier, Instant instant, @Nullable Long
fromSnapshot)
throws Catalog.TableNotExistException;
+ /**
+ * Rollback table schema to a specific schema version. All schema versions
greater than the
+ * target will be deleted. This operation will fail if any snapshot, tag,
or changelog
+ * references a schema version greater than the target.
+ *
+ * @param identifier path of the table
+ * @param schemaId the target schema version to rollback to
+ * @throws Catalog.TableNotExistException if the table does not exist
+ * @throws UnsupportedOperationException if the catalog does not {@link
+ * #supportsVersionManagement()}
+ */
+ default void rollbackSchema(Identifier identifier, long schemaId)
+ throws Catalog.TableNotExistException {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Create a new branch for this table. By default, an empty branch will be
created using the
* latest schema. If you provide {@code #fromTag}, a branch will be
created from the tag and the
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index 81558ac68a..4a8eeabe50 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -222,6 +222,12 @@ public abstract class DelegateCatalog implements Catalog {
wrapped.rollbackTo(identifier, instant, fromSnapshot);
}
+ @Override
+ public void rollbackSchema(Identifier identifier, long schemaId)
+ throws Catalog.TableNotExistException {
+ wrapped.rollbackSchema(identifier, schemaId);
+ }
+
@Override
public void createBranch(Identifier identifier, String branch, @Nullable
String fromTag)
throws TableNotExistException, BranchAlreadyExistException,
TagNotExistException {
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 7b053d120c..dbb2a8fd48 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -442,6 +442,18 @@ public class RESTCatalog implements Catalog {
}
}
+ @Override
+ public void rollbackSchema(Identifier identifier, long schemaId)
+ throws Catalog.TableNotExistException {
+ try {
+ api.rollbackSchema(identifier, schemaId);
+ } catch (NoSuchResourceException e) {
+ throw new TableNotExistException(identifier);
+ } catch (ForbiddenException e) {
+ throw new TableNoPermissionException(identifier, e);
+ }
+ }
+
private TableMetadata loadTableMetadata(Identifier identifier) throws
TableNotExistException {
// if the table is system table, we need to load table metadata from
the system table's data
// table
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index a896d3f63f..dbc605b96c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -45,10 +45,12 @@ import org.apache.paimon.types.MapType;
import org.apache.paimon.types.ReassignFieldId;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.LazyField;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.StringUtils;
+import org.apache.paimon.utils.TagManager;
import
org.apache.paimon.shade.guava30.com.google.common.collect.FluentIterable;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
@@ -66,10 +68,12 @@ import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
@@ -1099,6 +1103,56 @@ public class SchemaManager implements Serializable {
fileIO.deleteQuietly(toSchemaPath(schemaId));
}
+ /**
+ * Rollback to a specific schema version. All schema versions greater than
the target will be
+ * deleted. This operation will fail if any snapshot, tag, or changelog
references a schema
+ * version greater than the target.
+ *
+ * @param targetSchemaId the schema version to rollback to.
+ * @param snapshotManager the snapshot manager to check snapshot
references.
+ * @param tagManager the tag manager to check tag references.
+ * @param changelogManager the changelog manager to check changelog
references.
+ */
+ public void rollbackTo(
+ long targetSchemaId,
+ SnapshotManager snapshotManager,
+ TagManager tagManager,
+ ChangelogManager changelogManager)
+ throws IOException {
+ checkArgument(schemaExists(targetSchemaId), "Schema %s does not
exist.", targetSchemaId);
+
+ // Collect all schemaIds referenced by snapshots, tags, and changelogs
+ Set<Long> usedSchemaIds = new HashSet<>();
+
+ snapshotManager.pickOrLatest(
+ snapshot -> {
+ usedSchemaIds.add(snapshot.schemaId());
+ return false;
+ });
+ tagManager.taggedSnapshots().forEach(s ->
usedSchemaIds.add(s.schemaId()));
+ changelogManager.changelogs().forEachRemaining(c ->
usedSchemaIds.add(c.schemaId()));
+
+ // Check if any referenced schema is newer than the target
+ Optional<Long> conflict =
+ usedSchemaIds.stream().filter(id -> id >
targetSchemaId).min(Long::compareTo);
+ if (conflict.isPresent()) {
+ throw new RuntimeException(
+ String.format(
+ "Cannot rollback to schema %d, schema %d is still
referenced by snapshots/tags/changelogs.",
+ targetSchemaId, conflict.get()));
+ }
+
+ // Delete all schemas newer than the target
+ List<Long> toBeDeleted =
+ listAllIds().stream()
+ .filter(id -> id > targetSchemaId)
+ .collect(Collectors.toList());
+ toBeDeleted.sort((o1, o2) -> Long.compare(o2, o1));
+ for (Long id : toBeDeleted) {
+ fileIO.delete(toSchemaPath(id), false);
+ }
+ }
+
public static void checkAlterTableOption(
Map<String, String> options, String key, @Nullable String
oldValue, String newValue) {
if (CoreOptions.IMMUTABLE_OPTIONS.contains(key)) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 014df441ac..52b9d162a5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -72,6 +72,8 @@ import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cach
import javax.annotation.Nullable;
import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
@@ -80,6 +82,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
import java.util.function.BiConsumer;
+import java.util.function.LongConsumer;
import static org.apache.paimon.CoreOptions.PATH;
@@ -558,6 +561,21 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
}
}
+ @Override
+ public void rollbackSchema(long schemaId) {
+ LongConsumer schemaRollback =
catalogEnvironment.catalogSchemaRollback();
+ if (schemaRollback != null) {
+ schemaRollback.accept(schemaId);
+ } else {
+ try {
+ schemaManager()
+ .rollbackTo(schemaId, snapshotManager(), tagManager(),
changelogManager());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
+
public Snapshot findSnapshot(long fromSnapshotId) throws
SnapshotNotExistException {
SnapshotManager snapshotManager = snapshotManager();
Snapshot snapshot = null;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
index 882ad952b5..dac63fc178 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
@@ -39,6 +39,7 @@ import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Optional;
+import java.util.function.LongConsumer;
/** Catalog environment in table which contains log factory, metastore client
factory. */
public class CatalogEnvironment implements Serializable {
@@ -143,6 +144,21 @@ public class CatalogEnvironment implements Serializable {
return null;
}
+ @Nullable
+ public LongConsumer catalogSchemaRollback() {
+ if (catalogLoader != null && supportsVersionManagement) {
+ Catalog catalog = catalogLoader.load();
+ return schemaId -> {
+ try {
+ catalog.rollbackSchema(identifier, schemaId);
+ } catch (Catalog.TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ }
+ return null;
+ }
+
@Nullable
public SnapshotLoader snapshotLoader() {
if (catalogLoader == null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index b92c4c1630..9eaa4f8d2e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -255,6 +255,11 @@ public abstract class DelegatedFileStoreTable implements
FileStoreTable {
wrapped.rollbackTo(tagName);
}
+ @Override
+ public void rollbackSchema(long schemaId) {
+ wrapped.rollbackSchema(schemaId);
+ }
+
@Override
public void createBranch(String branchName) {
wrapped.createBranch(branchName);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
index f1fbc74c12..a2b01fb7f5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
@@ -374,6 +374,11 @@ public interface FormatTable extends Table {
throw new UnsupportedOperationException();
}
+ @Override
+ default void rollbackSchema(long schemaId) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
default void createBranch(String branchName) {
throw new UnsupportedOperationException();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index c5b8deeca9..55f14e7025 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -217,6 +217,14 @@ public interface ReadonlyTable extends InnerTable {
this.getClass().getSimpleName()));
}
+ @Override
+ default void rollbackSchema(long schemaId) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Readonly Table %s does not support rollbackSchema.",
+ this.getClass().getSimpleName()));
+ }
+
@Override
default void createBranch(String branchName) {
throw new UnsupportedOperationException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 8d49d7206e..53e99ea45f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -157,6 +157,10 @@ public interface Table extends Serializable {
@Experimental
void rollbackTo(String tagName);
+ /** Rollback table's schema to a specific schema version. */
+ @Experimental
+ void rollbackSchema(long schemaId);
+
/** Create an empty branch. */
@Experimental
void createBranch(String branchName);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 2fb624c71e..414ef3f5b7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -272,11 +272,12 @@ public class SnapshotManager implements Serializable {
}
for (long snapshotId = latestId; snapshotId >= earliestId;
snapshotId--) {
- if (snapshotExists(snapshotId)) {
- Snapshot snapshot = snapshot(snapshotId);
+ try {
+ Snapshot snapshot = tryGetSnapshot(snapshotId);
if (predicate.test(snapshot)) {
return snapshot.id();
}
+ } catch (FileNotFoundException ignored) {
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 81cb097f5a..bd9036a4b0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -63,6 +63,7 @@ import
org.apache.paimon.rest.requests.ListPartitionsByNamesRequest;
import org.apache.paimon.rest.requests.MarkDonePartitionsRequest;
import org.apache.paimon.rest.requests.RenameTableRequest;
import org.apache.paimon.rest.requests.ResetConsumerRequest;
+import org.apache.paimon.rest.requests.RollbackSchemaRequest;
import org.apache.paimon.rest.requests.RollbackTableRequest;
import org.apache.paimon.rest.responses.AlterDatabaseResponse;
import org.apache.paimon.rest.responses.AuthTableQueryResponse;
@@ -104,6 +105,7 @@ import org.apache.paimon.table.TableSnapshot;
import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.tag.Tag;
import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.LazyField;
import org.apache.paimon.utils.Pair;
@@ -417,6 +419,10 @@ public class RESTCatalogServer {
resources.length == 4
&&
ResourcePaths.TABLES.equals(resources[1])
&&
ResourcePaths.ROLLBACK.equals(resources[3]);
+ boolean isRollbackSchema =
+ resources.length == 4
+ &&
ResourcePaths.TABLES.equals(resources[1])
+ &&
"rollback-schema".equals(resources[3]);
boolean isPartitions =
resources.length == 4
&&
ResourcePaths.TABLES.equals(resources[1])
@@ -538,6 +544,8 @@ public class RESTCatalogServer {
.getTagName();
return
rollbackTableByTagNameHandle(identifier, tagName);
}
+ } else if (isRollbackSchema) {
+ return rollbackSchemaHandle(identifier,
restAuthParameter.data());
} else if (isTable) {
return tableHandle(
restAuthParameter.method(),
@@ -1009,6 +1017,30 @@ public class RESTCatalogServer {
new ErrorResponse(ErrorResponse.RESOURCE_TYPE_TAG, "" +
tagName, "", 404), 404);
}
+ private MockResponse rollbackSchemaHandle(Identifier identifier, String
data) throws Exception {
+ RollbackSchemaRequest requestBody = RESTApi.fromJson(data,
RollbackSchemaRequest.class);
+ if (noPermissionTables.contains(identifier.getFullName())) {
+ throw new Catalog.TableNoPermissionException(identifier);
+ }
+ if (!tableMetadataStore.containsKey(identifier.getFullName())) {
+ throw new Catalog.TableNotExistException(identifier);
+ }
+ FileStoreTable table = getFileTable(identifier);
+ long schemaId = requestBody.getSchemaId();
+ SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location());
+ try {
+ schemaManager.rollbackTo(
+ schemaId,
+ table.snapshotManager(),
+ table.tagManager(),
+ new ChangelogManager(table.fileIO(), table.location(),
null));
+ } catch (Exception e) {
+ return mockResponse(new ErrorResponse(null, null, e.getMessage(),
500), 500);
+ }
+
+ return new MockResponse().setResponseCode(200);
+ }
+
private void cleanSnapshot(Identifier identifier, Long snapshotId, Long
latestSnapshotId)
throws IOException {
if (latestSnapshotId > snapshotId) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 35f2792f7b..0c20dfdc17 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -1933,6 +1933,97 @@ public abstract class RESTCatalogTest extends
CatalogTestBase {
assertThat(table.latestSnapshot().get().id()).isEqualTo(2);
}
+ @Test
+ public void testRollbackSchema() throws Exception {
+ Identifier identifier = Identifier.create("test_rollback_schema",
"table_for_schema");
+ createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1"));
+
+ // get initial schema id
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+ SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location());
+ long firstSchemaId = schemaManager.latest().get().id();
+
+ // evolve schema
+ catalog.alterTable(identifier, SchemaChange.setOption("aa", "bb"),
false);
+ long secondSchemaId = schemaManager.latest().get().id();
+ assertThat(secondSchemaId).isEqualTo(firstSchemaId + 1);
+
+ // rollback schema to first version
+ catalog.rollbackSchema(identifier, firstSchemaId);
+ assertThat(schemaManager.latest().get().id()).isEqualTo(firstSchemaId);
+ assertThat(schemaManager.schemaExists(secondSchemaId)).isFalse();
+
+ // rollback to non-existent schema should fail
+ assertThatThrownBy(() -> catalog.rollbackSchema(identifier, 999))
+ .isInstanceOf(Exception.class);
+ }
+
+ @Test
+ public void testRollbackSchemaFromTable() throws Exception {
+ Identifier identifier =
+ Identifier.create("test_rollback_schema",
"table_for_schema_from_table");
+ createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1"));
+
+ // get initial schema id
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+ SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location());
+ long firstSchemaId = schemaManager.latest().get().id();
+
+ // evolve schema
+ catalog.alterTable(identifier, SchemaChange.setOption("aa", "bb"),
false);
+ long secondSchemaId = schemaManager.latest().get().id();
+ assertThat(secondSchemaId).isEqualTo(firstSchemaId + 1);
+
+ // rollback schema to first version
+ table.rollbackSchema(firstSchemaId);
+ assertThat(schemaManager.latest().get().id()).isEqualTo(firstSchemaId);
+ assertThat(schemaManager.schemaExists(secondSchemaId)).isFalse();
+
+ // get schema from new table
+ table = (FileStoreTable) catalog.getTable(identifier);
+ assertThat(table.schema().id()).isEqualTo(1);
+ }
+
+ @Test
+ public void testRollbackSchemaFailedWithSnapshotReference() throws
Exception {
+ Identifier identifier =
+ Identifier.create("test_rollback_schema_fail",
"table_for_schema_fail");
+ createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1"));
+
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+ SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location());
+ long firstSchemaId = schemaManager.latest().get().id();
+
+ // write data to create a snapshot referencing firstSchemaId
+ StreamTableWrite write = table.newWrite("commitUser");
+ StreamTableCommit commit = table.newCommit("commitUser");
+ write.write(GenericRow.of(1));
+ commit.commit(0, write.prepareCommit(false, 0));
+ write.close();
+ commit.close();
+
+ // evolve schema
+ catalog.alterTable(identifier, SchemaChange.setOption("aa", "bb"),
false);
+ long secondSchemaId = schemaManager.latest().get().id();
+
+ // write data to create a snapshot referencing secondSchemaId
+ table = (FileStoreTable) catalog.getTable(identifier);
+ write = table.newWrite("commitUser");
+ commit = table.newCommit("commitUser");
+ write.write(GenericRow.of(2));
+ commit.commit(1, write.prepareCommit(false, 1));
+ write.close();
+ commit.close();
+
+ // rollback should fail because snapshot references secondSchemaId
+ assertThatThrownBy(() -> catalog.rollbackSchema(identifier,
firstSchemaId))
+ .hasMessageContaining("Cannot rollback to schema " +
firstSchemaId)
+ .hasMessageContaining(
+ "schema "
+ + secondSchemaId
+ + " is still referenced by
snapshots/tags/changelogs");
+ }
+
@Test
public void testDataTokenExpired() throws Exception {
this.catalog = newRestCatalogWithDataToken();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index bc433ff18e..31496f8bb8 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -40,7 +40,10 @@ import org.apache.paimon.types.IntType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.FailingFileIO;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
@@ -791,4 +794,104 @@ public class SchemaManagerTest {
table = FileStoreTableFactory.create(LocalFileIO.create(), tableRoot);
assertThat(table.options().get(DELETION_VECTORS_ENABLED.key())).isEqualTo("true");
}
+
+ @Test
+ public void testRollbackSchemaSuccess() throws Exception {
+ SchemaManager manager = new SchemaManager(LocalFileIO.create(), path);
+ manager.createTable(schema);
+ long firstSchemaId = manager.latest().get().id();
+
+ manager.commitChanges(SchemaChange.setOption("aa", "bb"));
+ long secondSchemaId = manager.latest().get().id();
+ assertThat(secondSchemaId).isEqualTo(firstSchemaId + 1);
+
+ manager.commitChanges(SchemaChange.setOption("cc", "dd"));
+ long thirdSchemaId = manager.latest().get().id();
+ assertThat(thirdSchemaId).isEqualTo(firstSchemaId + 2);
+
+ // rollback to first schema
+ SnapshotManager snapshotManager =
+ new SnapshotManager(LocalFileIO.create(), path, null, null,
null);
+ TagManager tagManager = new TagManager(LocalFileIO.create(), path);
+ ChangelogManager changelogManager = new
ChangelogManager(LocalFileIO.create(), path, null);
+ manager.rollbackTo(firstSchemaId, snapshotManager, tagManager,
changelogManager);
+
+ assertThat(manager.latest().get().id()).isEqualTo(firstSchemaId);
+ assertThat(manager.schemaExists(secondSchemaId)).isFalse();
+ assertThat(manager.schemaExists(thirdSchemaId)).isFalse();
+ }
+
+ @Test
+ public void testRollbackSchemaFailedDueToSnapshotReference() throws
Exception {
+ Schema appendOnlySchema =
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ options,
+ "");
+ Path tableRoot = new Path(tempDir.toString(), "table");
+ SchemaManager manager = new SchemaManager(LocalFileIO.create(),
tableRoot);
+ manager.createTable(appendOnlySchema);
+ long firstSchemaId = manager.latest().get().id();
+
+ // write data to create a snapshot referencing firstSchemaId
+ FileStoreTable table =
FileStoreTableFactory.create(LocalFileIO.create(), tableRoot);
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl<?> write =
+
table.newWrite(commitUser).withIOManager(IOManager.create(tempDir + "/io"));
+ TableCommitImpl commit = table.newCommit(commitUser);
+ write.write(GenericRow.of(1, 10L, BinaryString.fromString("apple")));
+ commit.commit(1, write.prepareCommit(false, 1));
+ write.close();
+ commit.close();
+
+ // evolve schema
+ manager.commitChanges(SchemaChange.setOption("aa", "bb"));
+ long secondSchemaId = manager.latest().get().id();
+
+ // write data to create a snapshot referencing secondSchemaId
+ table = FileStoreTableFactory.create(LocalFileIO.create(), tableRoot);
+ write =
table.newWrite(commitUser).withIOManager(IOManager.create(tempDir + "/io"));
+ commit = table.newCommit(commitUser);
+ write.write(GenericRow.of(2, 20L, BinaryString.fromString("banana")));
+ commit.commit(2, write.prepareCommit(false, 2));
+ write.close();
+ commit.close();
+
+ // rollback to first schema should fail because snapshot references
secondSchemaId
+ SnapshotManager snapshotManager =
+ new SnapshotManager(LocalFileIO.create(), tableRoot, null,
null, null);
+ TagManager tagManager = new TagManager(LocalFileIO.create(),
tableRoot);
+ ChangelogManager changelogManager =
+ new ChangelogManager(LocalFileIO.create(), tableRoot, null);
+ assertThatThrownBy(
+ () ->
+ manager.rollbackTo(
+ firstSchemaId,
+ snapshotManager,
+ tagManager,
+ changelogManager))
+ .hasMessageContaining("Cannot rollback to schema " +
firstSchemaId)
+ .hasMessageContaining(
+ "schema "
+ + secondSchemaId
+ + " is still referenced by
snapshots/tags/changelogs");
+ }
+
+ @Test
+ public void testRollbackSchemaNotExist() throws Exception {
+ SchemaManager manager = new SchemaManager(LocalFileIO.create(), path);
+ manager.createTable(schema);
+
+ assertThatThrownBy(
+ () ->
+ manager.rollbackTo(
+ 999,
+ new SnapshotManager(
+ LocalFileIO.create(), path,
null, null, null),
+ new TagManager(LocalFileIO.create(),
path),
+ new
ChangelogManager(LocalFileIO.create(), path, null)))
+ .hasMessageContaining("Schema 999 does not exist");
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index 1b58b012cd..f50e9922f5 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -1844,4 +1844,55 @@ public abstract class SimpleTableTestBase {
assertThat(table.tagManager().allTagNames())
.containsExactlyInAnyOrderElementsOf(expectedTags);
}
+
+ @Test
+ public void testRollbackSchemaSuccess() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ SchemaManager schemaManager = table.schemaManager();
+ long firstSchemaId = schemaManager.latest().get().id();
+
+ // evolve schema twice
+ schemaManager.commitChanges(SchemaChange.setOption("aa", "bb"));
+ long secondSchemaId = schemaManager.latest().get().id();
+ schemaManager.commitChanges(SchemaChange.setOption("cc", "dd"));
+ long thirdSchemaId = schemaManager.latest().get().id();
+
+ // rollback to first schema
+ table.rollbackSchema(firstSchemaId);
+ assertThat(schemaManager.latest().get().id()).isEqualTo(firstSchemaId);
+ assertThat(schemaManager.schemaExists(secondSchemaId)).isFalse();
+ assertThat(schemaManager.schemaExists(thirdSchemaId)).isFalse();
+ }
+
+ @Test
+ public void testRollbackSchemaFailedWithSnapshotReference() throws
Exception {
+ FileStoreTable table = createFileStoreTable();
+ SchemaManager schemaManager = table.schemaManager();
+ long firstSchemaId = schemaManager.latest().get().id();
+
+ // write data to create a snapshot referencing firstSchemaId
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+ write.write(rowData(0, 0, 0L));
+ commit.commit(0, write.prepareCommit(false, 0));
+ write.close();
+ commit.close();
+
+ // evolve schema
+ schemaManager.commitChanges(SchemaChange.setOption("aa", "bb"));
+
+ // write data to create a snapshot referencing secondSchemaId
+ table = table.copyWithLatestSchema();
+ write = table.newWrite(commitUser);
+ commit = table.newCommit(commitUser);
+ write.write(rowData(1, 10, 100L));
+ commit.commit(1, write.prepareCommit(false, 1));
+ write.close();
+ commit.close();
+
+ // rollback should fail because snapshot references secondSchemaId
+ FileStoreTable finalTable = table;
+ assertThatThrownBy(() -> finalTable.rollbackSchema(firstSchemaId))
+ .hasMessageContaining("Cannot rollback to schema");
+ }
}
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index 12490f9c20..5dc06563d8 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -26,6 +26,7 @@ under the License.
<suppress files="DataTypes.java" checks="MethodNameCheck"/>
<suppress files="CoreOptions.java" checks="FileLength"/>
+ <suppress files="RESTCatalogTest.java" checks="FileLength"/>
<!-- target directory is not relevant for checkstyle -->
<suppress