This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 66f057e09f [core] Add rollback schema to Table and Catalog (#7498)
66f057e09f is described below

commit 66f057e09f5a25fbe39b40c30d464ddba7b472ca
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 23 12:09:48 2026 +0800

    [core] Add rollback schema to Table and Catalog (#7498)
    
    PR Summary: Adds a rollbackSchema(long schemaId) API to both Table and
    Catalog interfaces, allowing users to roll back a table's schema to a
    previous version. All schema versions newer than the target are deleted,
    with validation to prevent deletion when snapshots/tags/changelogs still
    reference newer schemas.
---
 docs/static/rest-catalog-open-api.yaml             |  44 +++++++++
 .../main/java/org/apache/paimon/rest/RESTApi.java  |  19 ++++
 .../java/org/apache/paimon/rest/ResourcePaths.java |  11 +++
 .../rest/requests/RollbackSchemaRequest.java       |  46 +++++++++
 .../java/org/apache/paimon/catalog/Catalog.java    |  16 ++++
 .../org/apache/paimon/catalog/DelegateCatalog.java |   6 ++
 .../java/org/apache/paimon/rest/RESTCatalog.java   |  12 +++
 .../org/apache/paimon/schema/SchemaManager.java    |  54 +++++++++++
 .../paimon/table/AbstractFileStoreTable.java       |  18 ++++
 .../apache/paimon/table/CatalogEnvironment.java    |  16 ++++
 .../paimon/table/DelegatedFileStoreTable.java      |   5 +
 .../java/org/apache/paimon/table/FormatTable.java  |   5 +
 .../org/apache/paimon/table/ReadonlyTable.java     |   8 ++
 .../main/java/org/apache/paimon/table/Table.java   |   4 +
 .../org/apache/paimon/utils/SnapshotManager.java   |   5 +-
 .../org/apache/paimon/rest/RESTCatalogServer.java  |  32 +++++++
 .../org/apache/paimon/rest/RESTCatalogTest.java    |  91 ++++++++++++++++++
 .../apache/paimon/schema/SchemaManagerTest.java    | 103 +++++++++++++++++++++
 .../apache/paimon/table/SimpleTableTestBase.java   |  51 ++++++++++
 tools/maven/suppressions.xml                       |   1 +
 20 files changed, 545 insertions(+), 2 deletions(-)

diff --git a/docs/static/rest-catalog-open-api.yaml 
b/docs/static/rest-catalog-open-api.yaml
index 237d9a987e..a4bda071f9 100644
--- a/docs/static/rest-catalog-open-api.yaml
+++ b/docs/static/rest-catalog-open-api.yaml
@@ -669,6 +669,42 @@ paths:
                   $ref: '#/components/examples/TagNotExistError'
         "500":
           $ref: '#/components/responses/ServerErrorResponse'
+  /v1/{prefix}/databases/{database}/tables/{table}/rollback-schema:
+    post:
+      tags:
+        - table
+      summary: Rollback table schema
+      operationId: rollbackSchema
+      parameters:
+        - name: prefix
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: database
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: table
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/RollbackSchemaRequest'
+      responses:
+        "200":
+          description: Success, no content
+        "401":
+          $ref: '#/components/responses/UnauthorizedErrorResponse'
+        "404":
+          $ref: '#/components/responses/TableNotExistErrorResponse'
+        "500":
+          $ref: '#/components/responses/ServerErrorResponse'
   /v1/{prefix}/databases/{database}/tables/{table}/token:
     get:
       tags:
@@ -2936,6 +2972,14 @@ components:
           type: integer
           format: int64
           nullable: true
+    RollbackSchemaRequest:
+      type: object
+      required:
+        - schemaId
+      properties:
+        schemaId:
+          type: integer
+          format: int64
     Instant:
       anyOf:
         - $ref: '#/components/schemas/SnapshotInstant'
diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java 
b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
index 1e549a116c..a939c5ce16 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
@@ -51,6 +51,7 @@ import 
org.apache.paimon.rest.requests.MarkDonePartitionsRequest;
 import org.apache.paimon.rest.requests.RegisterTableRequest;
 import org.apache.paimon.rest.requests.RenameTableRequest;
 import org.apache.paimon.rest.requests.ResetConsumerRequest;
+import org.apache.paimon.rest.requests.RollbackSchemaRequest;
 import org.apache.paimon.rest.requests.RollbackTableRequest;
 import org.apache.paimon.rest.responses.AlterDatabaseResponse;
 import org.apache.paimon.rest.responses.AuthTableQueryResponse;
@@ -711,6 +712,24 @@ public class RESTApi {
                 restAuthFunction);
     }
 
+    /**
+     * Rollback schema for table.
+     *
+     * @param identifier database name and table name.
+     * @param schemaId the target schema version to rollback to
+     * @throws NoSuchResourceException Exception thrown on HTTP 404 means the 
table not exists
+     * @throws ForbiddenException Exception thrown on HTTP 403 means don't 
have the permission for
+     *     this table
+     */
+    public void rollbackSchema(Identifier identifier, long schemaId) {
+        RollbackSchemaRequest request = new RollbackSchemaRequest(schemaId);
+        client.post(
+                resourcePaths.rollbackSchemaTable(
+                        identifier.getDatabaseName(), 
identifier.getObjectName()),
+                request,
+                restAuthFunction);
+    }
+
     /**
      * Create table.
      *
diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java 
b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
index baf0b8a4af..466ac975b4 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
@@ -120,6 +120,17 @@ public class ResourcePaths {
                 ROLLBACK);
     }
 
+    public String rollbackSchemaTable(String databaseName, String objectName) {
+        return SLASH.join(
+                V1,
+                prefix,
+                DATABASES,
+                encodeString(databaseName),
+                TABLES,
+                encodeString(objectName),
+                "rollback-schema");
+    }
+
     public String registerTable(String databaseName) {
         return SLASH.join(V1, prefix, DATABASES, encodeString(databaseName), 
REGISTER);
     }
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/rest/requests/RollbackSchemaRequest.java
 
b/paimon-api/src/main/java/org/apache/paimon/rest/requests/RollbackSchemaRequest.java
new file mode 100644
index 0000000000..f35d22ca80
--- /dev/null
+++ 
b/paimon-api/src/main/java/org/apache/paimon/rest/requests/RollbackSchemaRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.requests;
+
+import org.apache.paimon.rest.RESTRequest;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/** Request for rollback table schema. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class RollbackSchemaRequest implements RESTRequest {
+
+    private static final String FIELD_SCHEMA_ID = "schemaId";
+
+    @JsonProperty(FIELD_SCHEMA_ID)
+    private final long schemaId;
+
+    @JsonCreator
+    public RollbackSchemaRequest(@JsonProperty(FIELD_SCHEMA_ID) long schemaId) 
{
+        this.schemaId = schemaId;
+    }
+
+    @JsonGetter(FIELD_SCHEMA_ID)
+    public long getSchemaId() {
+        return schemaId;
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 3947a950e5..8924dc6ae1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -809,6 +809,22 @@ public interface Catalog extends AutoCloseable {
     void rollbackTo(Identifier identifier, Instant instant, @Nullable Long 
fromSnapshot)
             throws Catalog.TableNotExistException;
 
+    /**
+     * Rollback table schema to a specific schema version. All schema versions 
greater than the
+     * target will be deleted. This operation will fail if any snapshot, tag, 
or changelog
+     * references a schema version greater than the target.
+     *
+     * @param identifier path of the table
+     * @param schemaId the target schema version to rollback to
+     * @throws Catalog.TableNotExistException if the table does not exist
+     * @throws UnsupportedOperationException if the catalog does not {@link
+     *     #supportsVersionManagement()}
+     */
+    default void rollbackSchema(Identifier identifier, long schemaId)
+            throws Catalog.TableNotExistException {
+        throw new UnsupportedOperationException();
+    }
+
     /**
      * Create a new branch for this table. By default, an empty branch will be 
created using the
      * latest schema. If you provide {@code #fromTag}, a branch will be 
created from the tag and the
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index 81558ac68a..4a8eeabe50 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -222,6 +222,12 @@ public abstract class DelegateCatalog implements Catalog {
         wrapped.rollbackTo(identifier, instant, fromSnapshot);
     }
 
+    @Override
+    public void rollbackSchema(Identifier identifier, long schemaId)
+            throws Catalog.TableNotExistException {
+        wrapped.rollbackSchema(identifier, schemaId);
+    }
+
     @Override
     public void createBranch(Identifier identifier, String branch, @Nullable 
String fromTag)
             throws TableNotExistException, BranchAlreadyExistException, 
TagNotExistException {
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 7b053d120c..dbb2a8fd48 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -442,6 +442,18 @@ public class RESTCatalog implements Catalog {
         }
     }
 
+    @Override
+    public void rollbackSchema(Identifier identifier, long schemaId)
+            throws Catalog.TableNotExistException {
+        try {
+            api.rollbackSchema(identifier, schemaId);
+        } catch (NoSuchResourceException e) {
+            throw new TableNotExistException(identifier);
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        }
+    }
+
     private TableMetadata loadTableMetadata(Identifier identifier) throws 
TableNotExistException {
         // if the table is system table, we need to load table metadata from 
the system table's data
         // table
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index a896d3f63f..dbc605b96c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -45,10 +45,12 @@ import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.ReassignFieldId;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.LazyField;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.StringUtils;
+import org.apache.paimon.utils.TagManager;
 
 import 
org.apache.paimon.shade.guava30.com.google.common.collect.FluentIterable;
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
@@ -66,10 +68,12 @@ import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 import java.util.function.Function;
@@ -1099,6 +1103,56 @@ public class SchemaManager implements Serializable {
         fileIO.deleteQuietly(toSchemaPath(schemaId));
     }
 
+    /**
+     * Rollback to a specific schema version. All schema versions greater than 
the target will be
+     * deleted. This operation will fail if any snapshot, tag, or changelog 
references a schema
+     * version greater than the target.
+     *
+     * @param targetSchemaId the schema version to rollback to.
+     * @param snapshotManager the snapshot manager to check snapshot 
references.
+     * @param tagManager the tag manager to check tag references.
+     * @param changelogManager the changelog manager to check changelog 
references.
+     */
+    public void rollbackTo(
+            long targetSchemaId,
+            SnapshotManager snapshotManager,
+            TagManager tagManager,
+            ChangelogManager changelogManager)
+            throws IOException {
+        checkArgument(schemaExists(targetSchemaId), "Schema %s does not 
exist.", targetSchemaId);
+
+        // Collect all schemaIds referenced by snapshots, tags, and changelogs
+        Set<Long> usedSchemaIds = new HashSet<>();
+
+        snapshotManager.pickOrLatest(
+                snapshot -> {
+                    usedSchemaIds.add(snapshot.schemaId());
+                    return false;
+                });
+        tagManager.taggedSnapshots().forEach(s -> 
usedSchemaIds.add(s.schemaId()));
+        changelogManager.changelogs().forEachRemaining(c -> 
usedSchemaIds.add(c.schemaId()));
+
+        // Check if any referenced schema is newer than the target
+        Optional<Long> conflict =
+                usedSchemaIds.stream().filter(id -> id > 
targetSchemaId).min(Long::compareTo);
+        if (conflict.isPresent()) {
+            throw new RuntimeException(
+                    String.format(
+                            "Cannot rollback to schema %d, schema %d is still 
referenced by snapshots/tags/changelogs.",
+                            targetSchemaId, conflict.get()));
+        }
+
+        // Delete all schemas newer than the target
+        List<Long> toBeDeleted =
+                listAllIds().stream()
+                        .filter(id -> id > targetSchemaId)
+                        .collect(Collectors.toList());
+        toBeDeleted.sort((o1, o2) -> Long.compare(o2, o1));
+        for (Long id : toBeDeleted) {
+            fileIO.delete(toSchemaPath(id), false);
+        }
+    }
+
     public static void checkAlterTableOption(
             Map<String, String> options, String key, @Nullable String 
oldValue, String newValue) {
         if (CoreOptions.IMMUTABLE_OPTIONS.contains(key)) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 014df441ac..52b9d162a5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -72,6 +72,8 @@ import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cach
 import javax.annotation.Nullable;
 
 import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
@@ -80,6 +82,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.SortedMap;
 import java.util.function.BiConsumer;
+import java.util.function.LongConsumer;
 
 import static org.apache.paimon.CoreOptions.PATH;
 
@@ -558,6 +561,21 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         }
     }
 
+    @Override
+    public void rollbackSchema(long schemaId) {
+        LongConsumer schemaRollback = 
catalogEnvironment.catalogSchemaRollback();
+        if (schemaRollback != null) {
+            schemaRollback.accept(schemaId);
+        } else {
+            try {
+                schemaManager()
+                        .rollbackTo(schemaId, snapshotManager(), tagManager(), 
changelogManager());
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+        }
+    }
+
     public Snapshot findSnapshot(long fromSnapshotId) throws 
SnapshotNotExistException {
         SnapshotManager snapshotManager = snapshotManager();
         Snapshot snapshot = null;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java 
b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
index 882ad952b5..dac63fc178 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
@@ -39,6 +39,7 @@ import javax.annotation.Nullable;
 
 import java.io.Serializable;
 import java.util.Optional;
+import java.util.function.LongConsumer;
 
 /** Catalog environment in table which contains log factory, metastore client 
factory. */
 public class CatalogEnvironment implements Serializable {
@@ -143,6 +144,21 @@ public class CatalogEnvironment implements Serializable {
         return null;
     }
 
+    @Nullable
+    public LongConsumer catalogSchemaRollback() {
+        if (catalogLoader != null && supportsVersionManagement) {
+            Catalog catalog = catalogLoader.load();
+            return schemaId -> {
+                try {
+                    catalog.rollbackSchema(identifier, schemaId);
+                } catch (Catalog.TableNotExistException e) {
+                    throw new RuntimeException(e);
+                }
+            };
+        }
+        return null;
+    }
+
     @Nullable
     public SnapshotLoader snapshotLoader() {
         if (catalogLoader == null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index b92c4c1630..9eaa4f8d2e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -255,6 +255,11 @@ public abstract class DelegatedFileStoreTable implements 
FileStoreTable {
         wrapped.rollbackTo(tagName);
     }
 
+    @Override
+    public void rollbackSchema(long schemaId) {
+        wrapped.rollbackSchema(schemaId);
+    }
+
     @Override
     public void createBranch(String branchName) {
         wrapped.createBranch(branchName);
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
index f1fbc74c12..a2b01fb7f5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
@@ -374,6 +374,11 @@ public interface FormatTable extends Table {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    default void rollbackSchema(long schemaId) {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     default void createBranch(String branchName) {
         throw new UnsupportedOperationException();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
index c5b8deeca9..55f14e7025 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java
@@ -217,6 +217,14 @@ public interface ReadonlyTable extends InnerTable {
                         this.getClass().getSimpleName()));
     }
 
+    @Override
+    default void rollbackSchema(long schemaId) {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Readonly Table %s does not support rollbackSchema.",
+                        this.getClass().getSimpleName()));
+    }
+
     @Override
     default void createBranch(String branchName) {
         throw new UnsupportedOperationException(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java 
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 8d49d7206e..53e99ea45f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -157,6 +157,10 @@ public interface Table extends Serializable {
     @Experimental
     void rollbackTo(String tagName);
 
+    /** Rollback table's schema to a specific schema version. */
+    @Experimental
+    void rollbackSchema(long schemaId);
+
     /** Create an empty branch. */
     @Experimental
     void createBranch(String branchName);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 2fb624c71e..414ef3f5b7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -272,11 +272,12 @@ public class SnapshotManager implements Serializable {
         }
 
         for (long snapshotId = latestId; snapshotId >= earliestId; 
snapshotId--) {
-            if (snapshotExists(snapshotId)) {
-                Snapshot snapshot = snapshot(snapshotId);
+            try {
+                Snapshot snapshot = tryGetSnapshot(snapshotId);
                 if (predicate.test(snapshot)) {
                     return snapshot.id();
                 }
+            } catch (FileNotFoundException ignored) {
             }
         }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 81cb097f5a..bd9036a4b0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -63,6 +63,7 @@ import 
org.apache.paimon.rest.requests.ListPartitionsByNamesRequest;
 import org.apache.paimon.rest.requests.MarkDonePartitionsRequest;
 import org.apache.paimon.rest.requests.RenameTableRequest;
 import org.apache.paimon.rest.requests.ResetConsumerRequest;
+import org.apache.paimon.rest.requests.RollbackSchemaRequest;
 import org.apache.paimon.rest.requests.RollbackTableRequest;
 import org.apache.paimon.rest.responses.AlterDatabaseResponse;
 import org.apache.paimon.rest.responses.AuthTableQueryResponse;
@@ -104,6 +105,7 @@ import org.apache.paimon.table.TableSnapshot;
 import org.apache.paimon.table.object.ObjectTable;
 import org.apache.paimon.tag.Tag;
 import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.LazyField;
 import org.apache.paimon.utils.Pair;
@@ -417,6 +419,10 @@ public class RESTCatalogServer {
                                 resources.length == 4
                                         && 
ResourcePaths.TABLES.equals(resources[1])
                                         && 
ResourcePaths.ROLLBACK.equals(resources[3]);
+                        boolean isRollbackSchema =
+                                resources.length == 4
+                                        && 
ResourcePaths.TABLES.equals(resources[1])
+                                        && 
"rollback-schema".equals(resources[3]);
                         boolean isPartitions =
                                 resources.length == 4
                                         && 
ResourcePaths.TABLES.equals(resources[1])
@@ -538,6 +544,8 @@ public class RESTCatalogServer {
                                                 .getTagName();
                                 return 
rollbackTableByTagNameHandle(identifier, tagName);
                             }
+                        } else if (isRollbackSchema) {
+                            return rollbackSchemaHandle(identifier, 
restAuthParameter.data());
                         } else if (isTable) {
                             return tableHandle(
                                     restAuthParameter.method(),
@@ -1009,6 +1017,30 @@ public class RESTCatalogServer {
                 new ErrorResponse(ErrorResponse.RESOURCE_TYPE_TAG, "" + 
tagName, "", 404), 404);
     }
 
+    private MockResponse rollbackSchemaHandle(Identifier identifier, String 
data) throws Exception {
+        RollbackSchemaRequest requestBody = RESTApi.fromJson(data, 
RollbackSchemaRequest.class);
+        if (noPermissionTables.contains(identifier.getFullName())) {
+            throw new Catalog.TableNoPermissionException(identifier);
+        }
+        if (!tableMetadataStore.containsKey(identifier.getFullName())) {
+            throw new Catalog.TableNotExistException(identifier);
+        }
+        FileStoreTable table = getFileTable(identifier);
+        long schemaId = requestBody.getSchemaId();
+        SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location());
+        try {
+            schemaManager.rollbackTo(
+                    schemaId,
+                    table.snapshotManager(),
+                    table.tagManager(),
+                    new ChangelogManager(table.fileIO(), table.location(), 
null));
+        } catch (Exception e) {
+            return mockResponse(new ErrorResponse(null, null, e.getMessage(), 
500), 500);
+        }
+
+        return new MockResponse().setResponseCode(200);
+    }
+
     private void cleanSnapshot(Identifier identifier, Long snapshotId, Long 
latestSnapshotId)
             throws IOException {
         if (latestSnapshotId > snapshotId) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 35f2792f7b..0c20dfdc17 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -1933,6 +1933,97 @@ public abstract class RESTCatalogTest extends 
CatalogTestBase {
         assertThat(table.latestSnapshot().get().id()).isEqualTo(2);
     }
 
+    @Test
+    public void testRollbackSchema() throws Exception {
+        Identifier identifier = Identifier.create("test_rollback_schema", 
"table_for_schema");
+        createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1"));
+
+        // get initial schema id
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+        SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location());
+        long firstSchemaId = schemaManager.latest().get().id();
+
+        // evolve schema
+        catalog.alterTable(identifier, SchemaChange.setOption("aa", "bb"), 
false);
+        long secondSchemaId = schemaManager.latest().get().id();
+        assertThat(secondSchemaId).isEqualTo(firstSchemaId + 1);
+
+        // rollback schema to first version
+        catalog.rollbackSchema(identifier, firstSchemaId);
+        assertThat(schemaManager.latest().get().id()).isEqualTo(firstSchemaId);
+        assertThat(schemaManager.schemaExists(secondSchemaId)).isFalse();
+
+        // rollback to non-existent schema should fail
+        assertThatThrownBy(() -> catalog.rollbackSchema(identifier, 999))
+                .isInstanceOf(Exception.class);
+    }
+
+    @Test
+    public void testRollbackSchemaFromTable() throws Exception {
+        Identifier identifier =
+                Identifier.create("test_rollback_schema", 
"table_for_schema_from_table");
+        createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1"));
+
+        // get initial schema id
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+        SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location());
+        long firstSchemaId = schemaManager.latest().get().id();
+
+        // evolve schema
+        catalog.alterTable(identifier, SchemaChange.setOption("aa", "bb"), 
false);
+        long secondSchemaId = schemaManager.latest().get().id();
+        assertThat(secondSchemaId).isEqualTo(firstSchemaId + 1);
+
+        // rollback schema to first version
+        table.rollbackSchema(firstSchemaId);
+        assertThat(schemaManager.latest().get().id()).isEqualTo(firstSchemaId);
+        assertThat(schemaManager.schemaExists(secondSchemaId)).isFalse();
+
+        // get schema from new table
+        table = (FileStoreTable) catalog.getTable(identifier);
+        assertThat(table.schema().id()).isEqualTo(1);
+    }
+
+    @Test
+    public void testRollbackSchemaFailedWithSnapshotReference() throws 
Exception {
+        Identifier identifier =
+                Identifier.create("test_rollback_schema_fail", 
"table_for_schema_fail");
+        createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1"));
+
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+        SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location());
+        long firstSchemaId = schemaManager.latest().get().id();
+
+        // write data to create a snapshot referencing firstSchemaId
+        StreamTableWrite write = table.newWrite("commitUser");
+        StreamTableCommit commit = table.newCommit("commitUser");
+        write.write(GenericRow.of(1));
+        commit.commit(0, write.prepareCommit(false, 0));
+        write.close();
+        commit.close();
+
+        // evolve schema
+        catalog.alterTable(identifier, SchemaChange.setOption("aa", "bb"), 
false);
+        long secondSchemaId = schemaManager.latest().get().id();
+
+        // write data to create a snapshot referencing secondSchemaId
+        table = (FileStoreTable) catalog.getTable(identifier);
+        write = table.newWrite("commitUser");
+        commit = table.newCommit("commitUser");
+        write.write(GenericRow.of(2));
+        commit.commit(1, write.prepareCommit(false, 1));
+        write.close();
+        commit.close();
+
+        // rollback should fail because snapshot references secondSchemaId
+        assertThatThrownBy(() -> catalog.rollbackSchema(identifier, 
firstSchemaId))
+                .hasMessageContaining("Cannot rollback to schema " + 
firstSchemaId)
+                .hasMessageContaining(
+                        "schema "
+                                + secondSchemaId
+                                + " is still referenced by 
snapshots/tags/changelogs");
+    }
+
     @Test
     public void testDataTokenExpired() throws Exception {
         this.catalog = newRestCatalogWithDataToken();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index bc433ff18e..31496f8bb8 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -40,7 +40,10 @@ import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.FailingFileIO;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
@@ -791,4 +794,104 @@ public class SchemaManagerTest {
         table = FileStoreTableFactory.create(LocalFileIO.create(), tableRoot);
         
assertThat(table.options().get(DELETION_VECTORS_ENABLED.key())).isEqualTo("true");
     }
+
+    @Test
+    public void testRollbackSchemaSuccess() throws Exception {
+        SchemaManager manager = new SchemaManager(LocalFileIO.create(), path);
+        manager.createTable(schema);
+        long firstSchemaId = manager.latest().get().id();
+
+        manager.commitChanges(SchemaChange.setOption("aa", "bb"));
+        long secondSchemaId = manager.latest().get().id();
+        assertThat(secondSchemaId).isEqualTo(firstSchemaId + 1);
+
+        manager.commitChanges(SchemaChange.setOption("cc", "dd"));
+        long thirdSchemaId = manager.latest().get().id();
+        assertThat(thirdSchemaId).isEqualTo(firstSchemaId + 2);
+
+        // rollback to first schema
+        SnapshotManager snapshotManager =
+                new SnapshotManager(LocalFileIO.create(), path, null, null, 
null);
+        TagManager tagManager = new TagManager(LocalFileIO.create(), path);
+        ChangelogManager changelogManager = new 
ChangelogManager(LocalFileIO.create(), path, null);
+        manager.rollbackTo(firstSchemaId, snapshotManager, tagManager, 
changelogManager);
+
+        assertThat(manager.latest().get().id()).isEqualTo(firstSchemaId);
+        assertThat(manager.schemaExists(secondSchemaId)).isFalse();
+        assertThat(manager.schemaExists(thirdSchemaId)).isFalse();
+    }
+
+    @Test
+    public void testRollbackSchemaFailedDueToSnapshotReference() throws 
Exception {
+        Schema appendOnlySchema =
+                new Schema(
+                        rowType.getFields(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        options,
+                        "");
+        Path tableRoot = new Path(tempDir.toString(), "table");
+        SchemaManager manager = new SchemaManager(LocalFileIO.create(), 
tableRoot);
+        manager.createTable(appendOnlySchema);
+        long firstSchemaId = manager.latest().get().id();
+
+        // write data to create a snapshot referencing firstSchemaId
+        FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tableRoot);
+        String commitUser = UUID.randomUUID().toString();
+        TableWriteImpl<?> write =
+                
table.newWrite(commitUser).withIOManager(IOManager.create(tempDir + "/io"));
+        TableCommitImpl commit = table.newCommit(commitUser);
+        write.write(GenericRow.of(1, 10L, BinaryString.fromString("apple")));
+        commit.commit(1, write.prepareCommit(false, 1));
+        write.close();
+        commit.close();
+
+        // evolve schema
+        manager.commitChanges(SchemaChange.setOption("aa", "bb"));
+        long secondSchemaId = manager.latest().get().id();
+
+        // write data to create a snapshot referencing secondSchemaId
+        table = FileStoreTableFactory.create(LocalFileIO.create(), tableRoot);
+        write = 
table.newWrite(commitUser).withIOManager(IOManager.create(tempDir + "/io"));
+        commit = table.newCommit(commitUser);
+        write.write(GenericRow.of(2, 20L, BinaryString.fromString("banana")));
+        commit.commit(2, write.prepareCommit(false, 2));
+        write.close();
+        commit.close();
+
+        // rollback to first schema should fail because snapshot references 
secondSchemaId
+        SnapshotManager snapshotManager =
+                new SnapshotManager(LocalFileIO.create(), tableRoot, null, 
null, null);
+        TagManager tagManager = new TagManager(LocalFileIO.create(), 
tableRoot);
+        ChangelogManager changelogManager =
+                new ChangelogManager(LocalFileIO.create(), tableRoot, null);
+        assertThatThrownBy(
+                        () ->
+                                manager.rollbackTo(
+                                        firstSchemaId,
+                                        snapshotManager,
+                                        tagManager,
+                                        changelogManager))
+                .hasMessageContaining("Cannot rollback to schema " + 
firstSchemaId)
+                .hasMessageContaining(
+                        "schema "
+                                + secondSchemaId
+                                + " is still referenced by 
snapshots/tags/changelogs");
+    }
+
+    @Test
+    public void testRollbackSchemaNotExist() throws Exception {
+        SchemaManager manager = new SchemaManager(LocalFileIO.create(), path);
+        manager.createTable(schema);
+
+        assertThatThrownBy(
+                        () ->
+                                manager.rollbackTo(
+                                        999,
+                                        new SnapshotManager(
+                                                LocalFileIO.create(), path, 
null, null, null),
+                                        new TagManager(LocalFileIO.create(), 
path),
+                                        new 
ChangelogManager(LocalFileIO.create(), path, null)))
+                .hasMessageContaining("Schema 999 does not exist");
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index 1b58b012cd..f50e9922f5 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -1844,4 +1844,55 @@ public abstract class SimpleTableTestBase {
         assertThat(table.tagManager().allTagNames())
                 .containsExactlyInAnyOrderElementsOf(expectedTags);
     }
+
+    @Test
+    public void testRollbackSchemaSuccess() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        SchemaManager schemaManager = table.schemaManager();
+        long firstSchemaId = schemaManager.latest().get().id();
+
+        // evolve schema twice
+        schemaManager.commitChanges(SchemaChange.setOption("aa", "bb"));
+        long secondSchemaId = schemaManager.latest().get().id();
+        schemaManager.commitChanges(SchemaChange.setOption("cc", "dd"));
+        long thirdSchemaId = schemaManager.latest().get().id();
+
+        // rollback to first schema
+        table.rollbackSchema(firstSchemaId);
+        assertThat(schemaManager.latest().get().id()).isEqualTo(firstSchemaId);
+        assertThat(schemaManager.schemaExists(secondSchemaId)).isFalse();
+        assertThat(schemaManager.schemaExists(thirdSchemaId)).isFalse();
+    }
+
+    @Test
+    public void testRollbackSchemaFailedWithSnapshotReference() throws 
Exception {
+        FileStoreTable table = createFileStoreTable();
+        SchemaManager schemaManager = table.schemaManager();
+        long firstSchemaId = schemaManager.latest().get().id();
+
+        // write data to create a snapshot referencing firstSchemaId
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+        write.write(rowData(0, 0, 0L));
+        commit.commit(0, write.prepareCommit(false, 0));
+        write.close();
+        commit.close();
+
+        // evolve schema
+        schemaManager.commitChanges(SchemaChange.setOption("aa", "bb"));
+
+        // write data to create a snapshot referencing secondSchemaId
+        table = table.copyWithLatestSchema();
+        write = table.newWrite(commitUser);
+        commit = table.newCommit(commitUser);
+        write.write(rowData(1, 10, 100L));
+        commit.commit(1, write.prepareCommit(false, 1));
+        write.close();
+        commit.close();
+
+        // rollback should fail because snapshot references secondSchemaId
+        FileStoreTable finalTable = table;
+        assertThatThrownBy(() -> finalTable.rollbackSchema(firstSchemaId))
+                .hasMessageContaining("Cannot rollback to schema");
+    }
 }
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index 12490f9c20..5dc06563d8 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -26,6 +26,7 @@ under the License.
                <suppress files="DataTypes.java" checks="MethodNameCheck"/>
 
                <suppress files="CoreOptions.java" checks="FileLength"/>
+               <suppress files="RESTCatalogTest.java" checks="FileLength"/>
 
                <!-- target directory is not relevant for checkstyle -->
                <suppress


Reply via email to