This is an automated email from the ASF dual-hosted git repository.

mchades pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 075a851b4 [#5086] core(feat): Add logic to support manipulating 
columns in TableOperationDispatcher (#5127)
075a851b4 is described below

commit 075a851b488061f7464f2695458b6240d92f087e
Author: Jerry Shao <jerrys...@datastrato.com>
AuthorDate: Thu Oct 17 14:27:06 2024 +0800

    [#5086] core(feat): Add logic to support manipulating columns in 
TableOperationDispatcher (#5127)
    
    ### What changes were proposed in this pull request?
    
    Add core logic to support manipulating columns in table create/load/load
    operations.
    
    ### Why are the changes needed?
    
    With this PR, we can manage the columns during the table operations.
    
    Fix: #5086
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add new UTs to cover the new changes.
---
 .../jdbc/operation/JdbcTableOperations.java        |  13 +-
 .../operation/PostgreSqlTableOperations.java       |  11 +
 .../gravitino/catalog/EntityCombinedTable.java     |   8 +
 .../catalog/TableOperationDispatcher.java          | 236 +++++++++--
 .../org/apache/gravitino/meta/ColumnEntity.java    |  13 +
 .../test/java/org/apache/gravitino/TestColumn.java |   1 +
 .../catalog/TestTableNormalizeDispatcher.java      |   9 +-
 .../catalog/TestTableOperationDispatcher.java      | 464 +++++++++++++++++++++
 .../gravitino/connector/TestCatalogOperations.java | 137 +++++-
 scripts/h2/schema-0.7.0-h2.sql                     |   4 +-
 scripts/h2/upgrade-0.6.0-to-0.7.0-h2.sql           |   4 +-
 scripts/mysql/schema-0.7.0-mysql.sql               |   4 +-
 scripts/mysql/upgrade-0.6.0-to-0.7.0-mysql.sql     |   4 +-
 scripts/postgresql/schema-0.7.0-postgresql.sql     |   4 +-
 14 files changed, 870 insertions(+), 42 deletions(-)

diff --git 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
index e65926fd0..e9b6bf6ab 100644
--- 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
+++ 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
@@ -168,6 +168,15 @@ public abstract class JdbcTableOperations implements 
TableOperation {
     return builder;
   }
 
+  protected JdbcColumn.Builder getColumnBuilder(
+      ResultSet columnsResult, String databaseName, String tableName) throws 
SQLException {
+    JdbcColumn.Builder builder = null;
+    if (Objects.equals(columnsResult.getString("TABLE_NAME"), tableName)) {
+      builder = getBasicJdbcColumnInfo(columnsResult);
+    }
+    return builder;
+  }
+
   @Override
   public JdbcTable load(String databaseName, String tableName) throws 
NoSuchTableException {
     // We should handle case sensitivity and wild card issue in some catalog 
tables, take MySQL
@@ -188,8 +197,8 @@ public abstract class JdbcTableOperations implements 
TableOperation {
       ResultSet columns = getColumns(connection, databaseName, tableName);
       while (columns.next()) {
         // TODO(yunqing): check schema and catalog also
-        if (Objects.equals(columns.getString("TABLE_NAME"), tableName)) {
-          JdbcColumn.Builder columnBuilder = getBasicJdbcColumnInfo(columns);
+        JdbcColumn.Builder columnBuilder = getColumnBuilder(columns, 
databaseName, tableName);
+        if (columnBuilder != null) {
           boolean autoIncrement = getAutoIncrementInfo(columns);
           columnBuilder.withAutoIncrement(autoIncrement);
           jdbcColumns.add(columnBuilder.build());
diff --git 
a/catalogs/catalog-jdbc-postgresql/src/main/java/org/apache/gravitino/catalog/postgresql/operation/PostgreSqlTableOperations.java
 
b/catalogs/catalog-jdbc-postgresql/src/main/java/org/apache/gravitino/catalog/postgresql/operation/PostgreSqlTableOperations.java
index 639544105..775687abd 100644
--- 
a/catalogs/catalog-jdbc-postgresql/src/main/java/org/apache/gravitino/catalog/postgresql/operation/PostgreSqlTableOperations.java
+++ 
b/catalogs/catalog-jdbc-postgresql/src/main/java/org/apache/gravitino/catalog/postgresql/operation/PostgreSqlTableOperations.java
@@ -108,6 +108,17 @@ public class PostgreSqlTableOperations extends 
JdbcTableOperations {
     return builder;
   }
 
+  @Override
+  protected JdbcColumn.Builder getColumnBuilder(
+      ResultSet columnsResult, String databaseName, String tableName) throws 
SQLException {
+    JdbcColumn.Builder builder = null;
+    if (Objects.equals(columnsResult.getString("TABLE_NAME"), tableName)
+        && Objects.equals(columnsResult.getString("TABLE_SCHEM"), 
databaseName)) {
+      builder = getBasicJdbcColumnInfo(columnsResult);
+    }
+    return builder;
+  }
+
   @Override
   protected String generateCreateTableSql(
       String tableName,
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java 
b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java
index 460835f51..4b0da1568 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java
@@ -128,6 +128,14 @@ public final class EntityCombinedTable implements Table {
     return imported;
   }
 
+  public Table tableFromCatalog() {
+    return table;
+  }
+
+  public TableEntity tableFromGravitino() {
+    return tableEntity;
+  }
+
   @Override
   public Audit auditInfo() {
     AuditInfo mergedAudit =
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
index 4472859d9..b54f06887 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
@@ -24,9 +24,16 @@ import static 
org.apache.gravitino.catalog.PropertiesMetadataHelpers.validatePro
 import static 
org.apache.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM;
 import static 
org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier;
 
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
 import java.time.Instant;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
@@ -41,6 +48,7 @@ import 
org.apache.gravitino.exceptions.TableAlreadyExistsException;
 import org.apache.gravitino.lock.LockType;
 import org.apache.gravitino.lock.TreeLockUtils;
 import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.ColumnEntity;
 import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.Table;
@@ -97,26 +105,31 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
    */
   @Override
   public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
-    EntityCombinedTable table =
+    EntityCombinedTable entityCombinedTable =
         TreeLockUtils.doWithTreeLock(ident, LockType.READ, () -> 
internalLoadTable(ident));
 
-    if (!table.imported()) {
+    if (!entityCombinedTable.imported()) {
       // Load the schema to make sure the schema is imported.
       SchemaDispatcher schemaDispatcher = 
GravitinoEnv.getInstance().schemaDispatcher();
       NameIdentifier schemaIdent = 
NameIdentifier.of(ident.namespace().levels());
       schemaDispatcher.loadSchema(schemaIdent);
 
       // Import the table.
-      TreeLockUtils.doWithTreeLock(
-          schemaIdent,
-          LockType.WRITE,
-          () -> {
-            importTable(ident);
-            return null;
-          });
+      entityCombinedTable =
+          TreeLockUtils.doWithTreeLock(schemaIdent, LockType.WRITE, () -> 
importTable(ident));
     }
 
-    return table;
+    // Update the column entities in Gravitino store if the columns are 
different from the ones
+    // fetching from the underlying source.
+    TableEntity updatedEntity = updateColumnsIfNecessaryWhenLoad(ident, 
entityCombinedTable);
+
+    return EntityCombinedTable.of(entityCombinedTable.tableFromCatalog(), 
updatedEntity)
+        .withHiddenPropertiesSet(
+            getHiddenPropertyNames(
+                getCatalogIdentifier(ident),
+                HasPropertyMetadata::tablePropertiesMetadata,
+                entityCombinedTable.tableFromCatalog().properties()))
+        .withImported(entityCombinedTable.imported());
   }
 
   /**
@@ -215,11 +228,15 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
                               .map(c -> ((TableChange.RenameTable) 
c).getNewName())
                               .reduce((c1, c2) -> c2)
                               .orElse(tableEntity.name());
+                      // Update the columns
+                      Pair<Boolean, List<ColumnEntity>> columnsUpdateResult =
+                          updateColumnsIfNecessary(alteredTable, tableEntity);
 
                       return TableEntity.builder()
                           .withId(tableEntity.id())
                           .withName(newName)
                           .withNamespace(ident.namespace())
+                          .withColumns(columnsUpdateResult.getRight())
                           .withAuditInfo(
                               AuditInfo.builder()
                                   
.withCreator(tableEntity.auditInfo().creator())
@@ -328,11 +345,11 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
         : droppedFromCatalog;
   }
 
-  private void importTable(NameIdentifier identifier) {
+  private EntityCombinedTable importTable(NameIdentifier identifier) {
     EntityCombinedTable table = internalLoadTable(identifier);
 
     if (table.imported()) {
-      return;
+      return table;
     }
 
     StringIdentifier stringId = null;
@@ -348,8 +365,8 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
       // of external system to correct it.
       LOG.warn(
           "The Table uid {} existed but still need to be imported, this could 
be happened "
-              + "when Table is renamed by external systems not controlled by 
Gravitino. In this case, "
-              + "we need to overwrite the stored entity to keep the 
consistency.",
+              + "when Table is renamed by external systems not controlled by 
Gravitino. In this "
+              + "case, we need to overwrite the stored entity to keep the 
consistency.",
           stringId);
       uid = stringId.id();
     } else {
@@ -357,18 +374,22 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
       uid = idGenerator.nextId();
     }
 
+    AuditInfo audit =
+        AuditInfo.builder()
+            .withCreator(table.auditInfo().creator())
+            .withCreateTime(table.auditInfo().createTime())
+            .withLastModifier(table.auditInfo().lastModifier())
+            .withLastModifiedTime(table.auditInfo().lastModifiedTime())
+            .build();
+    List<ColumnEntity> columnEntityList =
+        toColumnEntities(table.tableFromCatalog().columns(), audit);
     TableEntity tableEntity =
         TableEntity.builder()
             .withId(uid)
             .withName(identifier.name())
             .withNamespace(identifier.namespace())
-            .withAuditInfo(
-                AuditInfo.builder()
-                    .withCreator(table.auditInfo().creator())
-                    .withCreateTime(table.auditInfo().createTime())
-                    .withLastModifier(table.auditInfo().lastModifier())
-                    .withLastModifiedTime(table.auditInfo().lastModifiedTime())
-                    .build())
+            .withColumns(columnEntityList)
+            .withAuditInfo(audit)
             .build();
     try {
       store.put(tableEntity, true);
@@ -376,6 +397,13 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
       LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", identifier, e);
       throw new RuntimeException("Fail to import the table entity to the 
store.", e);
     }
+
+    return EntityCombinedTable.of(table.tableFromCatalog(), tableEntity)
+        .withHiddenPropertiesSet(
+            getHiddenPropertyNames(
+                getCatalogIdentifier(identifier),
+                HasPropertyMetadata::tablePropertiesMetadata,
+                table.tableFromCatalog().properties()));
   }
 
   private EntityCombinedTable internalLoadTable(NameIdentifier ident) {
@@ -465,16 +493,23 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
             NoSuchSchemaException.class,
             TableAlreadyExistsException.class);
 
+    AuditInfo audit =
+        AuditInfo.builder()
+            .withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+            .withCreateTime(Instant.now())
+            .build();
+    List<ColumnEntity> columnEntityList =
+        Arrays.stream(columns)
+            .map(c -> ColumnEntity.toColumnEntity(c, idGenerator.nextId(), 
audit))
+            .collect(Collectors.toList());
+
     TableEntity tableEntity =
         TableEntity.builder()
             .withId(uid)
             .withName(ident.name())
             .withNamespace(ident.namespace())
-            .withAuditInfo(
-                AuditInfo.builder()
-                    
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
-                    .withCreateTime(Instant.now())
-                    .build())
+            .withColumns(columnEntityList)
+            .withAuditInfo(audit)
             .build();
 
     try {
@@ -492,4 +527,153 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
             getHiddenPropertyNames(
                 catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, 
table.properties()));
   }
+
+  private List<ColumnEntity> toColumnEntities(Column[] columns, AuditInfo 
audit) {
+    return columns == null
+        ? Collections.emptyList()
+        : Arrays.stream(columns)
+            .map(c -> ColumnEntity.toColumnEntity(c, idGenerator.nextId(), 
audit))
+            .collect(Collectors.toList());
+  }
+
+  private boolean isSameColumn(Column left, ColumnEntity right) {
+    return Objects.equal(left.name(), right.name())
+        && Objects.equal(left.dataType(), right.dataType())
+        && Objects.equal(left.comment(), right.comment())
+        && left.nullable() == right.nullable()
+        && left.autoIncrement() == right.autoIncrement()
+        && Objects.equal(left.defaultValue(), right.defaultValue());
+  }
+
+  private Pair<Boolean, List<ColumnEntity>> updateColumnsIfNecessary(
+      Table tableFromCatalog, TableEntity tableFromGravitino) {
+    if (tableFromCatalog == null || tableFromGravitino == null) {
+      LOG.warn(
+          "Cannot update columns for table when altering because table or 
table entity is "
+              + "null");
+      return Pair.of(false, Collections.emptyList());
+    }
+
+    Map<String, Column> columnsFromCatalogTable =
+        tableFromCatalog.columns() == null
+            ? Collections.emptyMap()
+            : Arrays.stream(tableFromCatalog.columns())
+                .collect(Collectors.toMap(Column::name, Function.identity()));
+    Map<String, ColumnEntity> columnsFromTableEntity =
+        tableFromGravitino.columns() == null
+            ? Collections.emptyMap()
+            : tableFromGravitino.columns().stream()
+                .collect(Collectors.toMap(ColumnEntity::name, 
Function.identity()));
+
+    // Check if columns need to be updated in Gravitino store
+    List<ColumnEntity> columnsToInsert = Lists.newArrayList();
+    boolean columnsNeedsUpdate = false;
+    for (Map.Entry<String, ColumnEntity> entry : 
columnsFromTableEntity.entrySet()) {
+      Column column = columnsFromCatalogTable.get(entry.getKey());
+      if (column == null) {
+        LOG.debug(
+            "Column {} is not found in the table from underlying source, it 
will be removed"
+                + " from the table entity",
+            entry.getKey());
+        columnsNeedsUpdate = true;
+
+      } else if (!isSameColumn(column, entry.getValue())) {
+        // If the column need to be updated, we create a new ColumnEntity with 
the same id
+        LOG.debug(
+            "Column {} is found in the table from underlying source, but it is 
different "
+                + "from the one in the table entity, it will be updated",
+            entry.getKey());
+
+        ColumnEntity updatedColumnEntity =
+            ColumnEntity.builder()
+                .withId(entry.getValue().id())
+                .withName(column.name())
+                .withDataType(column.dataType())
+                .withComment(column.comment())
+                .withNullable(column.nullable())
+                .withAutoIncrement(column.autoIncrement())
+                .withDefaultValue(column.defaultValue())
+                .withAuditInfo(
+                    AuditInfo.builder()
+                        .withCreator(entry.getValue().auditInfo().creator())
+                        
.withCreateTime(entry.getValue().auditInfo().createTime())
+                        
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+                        .withLastModifiedTime(Instant.now())
+                        .build())
+                .build();
+
+        columnsNeedsUpdate = true;
+        columnsToInsert.add(updatedColumnEntity);
+
+      } else {
+        // If the column is the same, we keep the original ColumnEntity
+        columnsToInsert.add(entry.getValue());
+      }
+    }
+
+    // Check if there are new columns in the table from the underlying source
+    for (Map.Entry<String, Column> entry : columnsFromCatalogTable.entrySet()) 
{
+      if (!columnsFromTableEntity.containsKey(entry.getKey())) {
+        LOG.debug(
+            "Column {} is found in the table from underlying source but not in 
the table "
+                + "entity, it will be added to the table entity",
+            entry.getKey());
+        ColumnEntity newColumnEntity =
+            ColumnEntity.toColumnEntity(
+                entry.getValue(),
+                idGenerator.nextId(),
+                AuditInfo.builder()
+                    
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+                    .withCreateTime(Instant.now())
+                    .build());
+
+        columnsNeedsUpdate = true;
+        columnsToInsert.add(newColumnEntity);
+      }
+    }
+
+    return Pair.of(columnsNeedsUpdate, columnsToInsert);
+  }
+
+  private TableEntity updateColumnsIfNecessaryWhenLoad(
+      NameIdentifier tableIdent, EntityCombinedTable combinedTable) {
+    Pair<Boolean, List<ColumnEntity>> columnsUpdateResult =
+        updateColumnsIfNecessary(
+            combinedTable.tableFromCatalog(), 
combinedTable.tableFromGravitino());
+
+    // No need to update the columns
+    if (!columnsUpdateResult.getLeft()) {
+      return combinedTable.tableFromGravitino();
+    }
+
+    // Update the columns in the Gravitino store
+    return TreeLockUtils.doWithTreeLock(
+        tableIdent,
+        LockType.WRITE,
+        () ->
+            operateOnEntity(
+                tableIdent,
+                id ->
+                    store.update(
+                        id,
+                        TableEntity.class,
+                        TABLE,
+                        entity ->
+                            TableEntity.builder()
+                                .withId(entity.id())
+                                .withName(entity.name())
+                                .withNamespace(entity.namespace())
+                                .withColumns(columnsUpdateResult.getRight())
+                                .withAuditInfo(
+                                    AuditInfo.builder()
+                                        
.withCreator(entity.auditInfo().creator())
+                                        
.withCreateTime(entity.auditInfo().createTime())
+                                        .withLastModifier(
+                                            
PrincipalUtils.getCurrentPrincipal().getName())
+                                        .withLastModifiedTime(Instant.now())
+                                        .build())
+                                .build()),
+                "UPDATE",
+                combinedTable.tableFromGravitino().id()));
+  }
 }
diff --git a/core/src/main/java/org/apache/gravitino/meta/ColumnEntity.java 
b/core/src/main/java/org/apache/gravitino/meta/ColumnEntity.java
index 5b89bfce5..379044260 100644
--- a/core/src/main/java/org/apache/gravitino/meta/ColumnEntity.java
+++ b/core/src/main/java/org/apache/gravitino/meta/ColumnEntity.java
@@ -150,6 +150,19 @@ public class ColumnEntity implements Entity, Auditable {
     return new Builder();
   }
 
+  public static ColumnEntity toColumnEntity(Column column, long uid, AuditInfo 
audit) {
+    return builder()
+        .withId(uid)
+        .withName(column.name())
+        .withComment(column.comment())
+        .withDataType(column.dataType())
+        .withNullable(column.nullable())
+        .withAutoIncrement(column.autoIncrement())
+        .withDefaultValue(column.defaultValue())
+        .withAuditInfo(audit)
+        .build();
+  }
+
   public static class Builder {
     private final ColumnEntity columnEntity;
 
diff --git a/core/src/test/java/org/apache/gravitino/TestColumn.java 
b/core/src/test/java/org/apache/gravitino/TestColumn.java
index 93af75c05..7085da6d3 100644
--- a/core/src/test/java/org/apache/gravitino/TestColumn.java
+++ b/core/src/test/java/org/apache/gravitino/TestColumn.java
@@ -40,6 +40,7 @@ public class TestColumn extends BaseColumn {
       column.comment = comment;
       column.dataType = dataType;
       column.nullable = nullable;
+      column.autoIncrement = autoIncrement;
       column.defaultValue = defaultValue;
 
       return column;
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestTableNormalizeDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestTableNormalizeDispatcher.java
index 2c8938edc..c45f5cab2 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestTableNormalizeDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestTableNormalizeDispatcher.java
@@ -22,6 +22,8 @@ import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.gravitino.MetadataObjects;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
@@ -178,8 +180,11 @@ public class TestTableNormalizeDispatcher extends 
TestOperationDispatcher {
   private void assertTableCaseInsensitive(
       NameIdentifier tableIdent, Column[] expectedColumns, Table table) {
     Assertions.assertEquals(tableIdent.name().toLowerCase(), table.name());
-    Assertions.assertEquals(expectedColumns[0].name().toLowerCase(), 
table.columns()[0].name());
-    Assertions.assertEquals(expectedColumns[1].name().toLowerCase(), 
table.columns()[1].name());
+    Set<String> expectedColumnNames =
+        Arrays.stream(expectedColumns).map(c -> 
c.name().toLowerCase()).collect(Collectors.toSet());
+    Set<String> actualColumnNames =
+        
Arrays.stream(table.columns()).map(Column::name).collect(Collectors.toSet());
+    Assertions.assertEquals(expectedColumnNames, actualColumnNames);
     Assertions.assertEquals(
         expectedColumns[0].name().toLowerCase(),
         table.partitioning()[0].references()[0].fieldName()[0]);
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
index 1dd31fd33..f31b95e1e 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
@@ -38,8 +38,11 @@ import java.time.Instant;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.GravitinoEnv;
@@ -52,10 +55,12 @@ import org.apache.gravitino.connector.TestCatalogOperations;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.lock.LockManager;
 import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.ColumnEntity;
 import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableChange;
+import org.apache.gravitino.rel.expressions.literals.Literals;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.types.Types;
 import org.junit.jupiter.api.Assertions;
@@ -345,4 +350,463 @@ public class TestTableOperationDispatcher extends 
TestOperationDispatcher {
     
Assertions.assertTrue(entityStore.exists(NameIdentifier.of(tableNs.levels()), 
SCHEMA));
     Assertions.assertTrue(entityStore.exists(tableIdent, TABLE));
   }
+
+  @Test
+  public void testCreateAndLoadTableWithColumn() throws IOException {
+    Namespace tableNs = Namespace.of(metalake, catalog, "schema91");
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    
schemaOperationDispatcher.createSchema(NameIdentifier.of(tableNs.levels()), 
"comment", props);
+
+    NameIdentifier tableIdent = NameIdentifier.of(tableNs, "table41");
+    Column[] columns =
+        new Column[] {
+          TestColumn.builder()
+              .withName("col1")
+              .withType(Types.StringType.get())
+              .withComment("comment1")
+              .withNullable(true)
+              .withAutoIncrement(true)
+              .withDefaultValue(Literals.stringLiteral("1"))
+              .build(),
+          TestColumn.builder()
+              .withName("col2")
+              .withType(Types.StringType.get())
+              .withComment("comment2")
+              .withNullable(false)
+              .withAutoIncrement(false)
+              .withDefaultValue(Literals.stringLiteral("2"))
+              .build()
+        };
+
+    Table table1 =
+        tableOperationDispatcher.createTable(
+            tableIdent, columns, "comment", props, new Transform[0]);
+
+    Table loadedTable1 = tableOperationDispatcher.loadTable(tableIdent);
+    Assertions.assertEquals(table1.name(), loadedTable1.name());
+    Assertions.assertEquals(table1.comment(), loadedTable1.comment());
+    testProperties(table1.properties(), loadedTable1.properties());
+    testColumns(columns, loadedTable1.columns());
+
+    // The columns from table and table entity should be the same after 
creating.
+    TableEntity tableEntity = entityStore.get(tableIdent, TABLE, 
TableEntity.class);
+    Assertions.assertNotNull(tableEntity);
+    Assertions.assertEquals("table41", tableEntity.name());
+    testColumnAndColumnEntities(columns, tableEntity.columns());
+
+    // Test if the column from table is not matched with the column from table 
entity
+    TestCatalog testCatalog =
+        (TestCatalog) catalogManager.loadCatalog(NameIdentifier.of(metalake, 
catalog));
+    TestCatalogOperations testCatalogOperations = (TestCatalogOperations) 
testCatalog.ops();
+
+    // 1. Update the existing column
+    Table alteredTable2 =
+        testCatalogOperations.alterTable(
+            tableIdent, TableChange.renameColumn(new String[] {"col1"}, 
"col3"));
+    Table loadedTable2 = tableOperationDispatcher.loadTable(tableIdent);
+    testColumns(alteredTable2.columns(), loadedTable2.columns());
+
+    // columns in table entity should be updated to match the columns in table
+    TableEntity tableEntity2 = entityStore.get(tableIdent, TABLE, 
TableEntity.class);
+    testColumnAndColumnEntities(alteredTable2.columns(), 
tableEntity2.columns());
+
+    // 2. Add a new column
+    Table alteredTable3 =
+        testCatalogOperations.alterTable(
+            tableIdent,
+            TableChange.addColumn(
+                new String[] {"col4"},
+                Types.StringType.get(),
+                "comment4",
+                TableChange.ColumnPosition.first(),
+                true,
+                true,
+                Literals.stringLiteral("4")));
+
+    Table loadedTable3 = tableOperationDispatcher.loadTable(tableIdent);
+    testColumns(alteredTable3.columns(), loadedTable3.columns());
+
+    TableEntity tableEntity3 = entityStore.get(tableIdent, TABLE, 
TableEntity.class);
+    testColumnAndColumnEntities(alteredTable3.columns(), 
tableEntity3.columns());
+
+    // 3. Drop a column
+    Table alteredTable4 =
+        testCatalogOperations.alterTable(
+            tableIdent, TableChange.deleteColumn(new String[] {"col2"}, true));
+    Table loadedTable4 = tableOperationDispatcher.loadTable(tableIdent);
+    testColumns(alteredTable4.columns(), loadedTable4.columns());
+
+    TableEntity tableEntity4 = entityStore.get(tableIdent, TABLE, 
TableEntity.class);
+    testColumnAndColumnEntities(alteredTable4.columns(), 
tableEntity4.columns());
+
+    // No column for the table
+    Table alteredTable5 =
+        testCatalogOperations.alterTable(
+            tableIdent,
+            TableChange.deleteColumn(new String[] {"col3"}, true),
+            TableChange.deleteColumn(new String[] {"col4"}, true));
+    Table loadedTable5 = tableOperationDispatcher.loadTable(tableIdent);
+    Assertions.assertEquals(0, alteredTable5.columns().length);
+    Assertions.assertEquals(0, loadedTable5.columns().length);
+
+    TableEntity tableEntity5 = entityStore.get(tableIdent, TABLE, 
TableEntity.class);
+    Assertions.assertEquals(0, tableEntity5.columns().size());
+
+    // Re-add columns to the table
+    Table alteredTable6 =
+        testCatalogOperations.alterTable(
+            tableIdent,
+            TableChange.addColumn(
+                new String[] {"col5"},
+                Types.StringType.get(),
+                "comment5",
+                TableChange.ColumnPosition.first(),
+                true,
+                true,
+                Literals.stringLiteral("5")),
+            TableChange.addColumn(
+                new String[] {"col6"},
+                Types.StringType.get(),
+                "comment6",
+                TableChange.ColumnPosition.first(),
+                false,
+                false,
+                Literals.stringLiteral("2")));
+    Table loadedTable6 = tableOperationDispatcher.loadTable(tableIdent);
+    testColumns(alteredTable6.columns(), loadedTable6.columns());
+
+    TableEntity tableEntity6 = entityStore.get(tableIdent, TABLE, 
TableEntity.class);
+    testColumnAndColumnEntities(alteredTable6.columns(), 
tableEntity6.columns());
+  }
+
+  @Test
+  public void testCreateAndAlterTableWithColumn() throws IOException {
+    Namespace tableNs = Namespace.of(metalake, catalog, "schema101");
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    
schemaOperationDispatcher.createSchema(NameIdentifier.of(tableNs.levels()), 
"comment", props);
+
+    NameIdentifier tableIdent = NameIdentifier.of(tableNs, "table51");
+    Column[] columns =
+        new Column[] {
+          TestColumn.builder()
+              .withName("col1")
+              .withType(Types.StringType.get())
+              .withComment("comment1")
+              .withNullable(true)
+              .withAutoIncrement(true)
+              .withDefaultValue(Literals.stringLiteral("1"))
+              .build(),
+          TestColumn.builder()
+              .withName("col2")
+              .withType(Types.StringType.get())
+              .withComment("comment2")
+              .withNullable(false)
+              .withAutoIncrement(false)
+              .withDefaultValue(Literals.stringLiteral("2"))
+              .build()
+        };
+
+    Table table1 =
+        tableOperationDispatcher.createTable(
+            tableIdent, columns, "comment", props, new Transform[0]);
+    testColumns(columns, table1.columns());
+
+    // 1. Rename the column
+    Table alteredTable1 =
+        tableOperationDispatcher.alterTable(
+            tableIdent, TableChange.renameColumn(new String[] {"col1"}, 
"col3"));
+    Column[] expectedColumns =
+        new Column[] {
+          TestColumn.builder()
+              .withName("col3")
+              .withType(Types.StringType.get())
+              .withComment("comment1")
+              .withNullable(true)
+              .withAutoIncrement(true)
+              .withDefaultValue(Literals.stringLiteral("1"))
+              .build(),
+          TestColumn.builder()
+              .withName("col2")
+              .withType(Types.StringType.get())
+              .withComment("comment2")
+              .withNullable(false)
+              .withAutoIncrement(false)
+              .withDefaultValue(Literals.stringLiteral("2"))
+              .build()
+        };
+    testColumns(expectedColumns, alteredTable1.columns());
+
+    TableEntity tableEntity1 = entityStore.get(tableIdent, TABLE, 
TableEntity.class);
+    testColumnAndColumnEntities(expectedColumns, tableEntity1.columns());
+
+    // 2. Add a new column
+    Table alteredTable2 =
+        tableOperationDispatcher.alterTable(
+            tableIdent,
+            TableChange.addColumn(
+                new String[] {"col4"},
+                Types.StringType.get(),
+                "comment4",
+                TableChange.ColumnPosition.first(),
+                true,
+                true,
+                Literals.stringLiteral("4")));
+    Column[] expectedColumns2 =
+        new Column[] {
+          TestColumn.builder()
+              .withName("col4")
+              .withType(Types.StringType.get())
+              .withComment("comment4")
+              .withNullable(true)
+              .withAutoIncrement(true)
+              .withDefaultValue(Literals.stringLiteral("4"))
+              .build(),
+          TestColumn.builder()
+              .withName("col3")
+              .withType(Types.StringType.get())
+              .withComment("comment1")
+              .withNullable(true)
+              .withAutoIncrement(true)
+              .withDefaultValue(Literals.stringLiteral("1"))
+              .build(),
+          TestColumn.builder()
+              .withName("col2")
+              .withType(Types.StringType.get())
+              .withComment("comment2")
+              .withNullable(false)
+              .withAutoIncrement(false)
+              .withDefaultValue(Literals.stringLiteral("2"))
+              .build()
+        };
+
+    testColumns(expectedColumns2, alteredTable2.columns());
+
+    TableEntity tableEntity2 = entityStore.get(tableIdent, TABLE, 
TableEntity.class);
+    testColumnAndColumnEntities(expectedColumns2, tableEntity2.columns());
+
+    // 3. Drop a column
+    Table alteredTable3 =
+        tableOperationDispatcher.alterTable(
+            tableIdent,
+            TableChange.deleteColumn(new String[] {"col2"}, true),
+            TableChange.deleteColumn(new String[] {"col3"}, true));
+    Column[] expectedColumns3 =
+        new Column[] {
+          TestColumn.builder()
+              .withName("col4")
+              .withType(Types.StringType.get())
+              .withComment("comment4")
+              .withNullable(true)
+              .withAutoIncrement(true)
+              .withDefaultValue(Literals.stringLiteral("4"))
+              .build()
+        };
+    testColumns(expectedColumns3, alteredTable3.columns());
+
+    TableEntity tableEntity3 = entityStore.get(tableIdent, TABLE, 
TableEntity.class);
+    testColumnAndColumnEntities(expectedColumns3, tableEntity3.columns());
+
+    // 4. Update column default value
+    Table alteredTable4 =
+        tableOperationDispatcher.alterTable(
+            tableIdent,
+            TableChange.updateColumnDefaultValue(
+                new String[] {"col4"}, Literals.stringLiteral("5")));
+
+    Column[] expectedColumns4 =
+        new Column[] {
+          TestColumn.builder()
+              .withName("col4")
+              .withType(Types.StringType.get())
+              .withComment("comment4")
+              .withNullable(true)
+              .withAutoIncrement(true)
+              .withDefaultValue(Literals.stringLiteral("5"))
+              .build()
+        };
+    testColumns(expectedColumns4, alteredTable4.columns());
+
+    TableEntity tableEntity4 = entityStore.get(tableIdent, TABLE, 
TableEntity.class);
+    testColumnAndColumnEntities(expectedColumns4, tableEntity4.columns());
+
+    // 5. Update column type
+    Table alteredTable5 =
+        tableOperationDispatcher.alterTable(
+            tableIdent,
+            TableChange.updateColumnType(new String[] {"col4"}, 
Types.IntegerType.get()));
+
+    Column[] expectedColumns5 =
+        new Column[] {
+          TestColumn.builder()
+              .withName("col4")
+              .withType(Types.IntegerType.get())
+              .withComment("comment4")
+              .withNullable(true)
+              .withAutoIncrement(true)
+              .withDefaultValue(Literals.stringLiteral("5"))
+              .build()
+        };
+
+    testColumns(expectedColumns5, alteredTable5.columns());
+
+    TableEntity tableEntity5 = entityStore.get(tableIdent, TABLE, 
TableEntity.class);
+    testColumnAndColumnEntities(expectedColumns5, tableEntity5.columns());
+
+    // 6. Update column comment
+    Table alteredTable6 =
+        tableOperationDispatcher.alterTable(
+            tableIdent, TableChange.updateColumnComment(new String[] {"col4"}, 
"new comment"));
+
+    Column[] expectedColumns6 =
+        new Column[] {
+          TestColumn.builder()
+              .withName("col4")
+              .withType(Types.IntegerType.get())
+              .withComment("new comment")
+              .withNullable(true)
+              .withAutoIncrement(true)
+              .withDefaultValue(Literals.stringLiteral("5"))
+              .build()
+        };
+
+    testColumns(expectedColumns6, alteredTable6.columns());
+
+    TableEntity tableEntity6 = entityStore.get(tableIdent, TABLE, 
TableEntity.class);
+    testColumnAndColumnEntities(expectedColumns6, tableEntity6.columns());
+
+    // 7. Update column nullable
+    Table alteredTable7 =
+        tableOperationDispatcher.alterTable(
+            tableIdent, TableChange.updateColumnNullability(new String[] 
{"col4"}, false));
+
+    Column[] expectedColumns7 =
+        new Column[] {
+          TestColumn.builder()
+              .withName("col4")
+              .withType(Types.IntegerType.get())
+              .withComment("new comment")
+              .withNullable(false)
+              .withAutoIncrement(true)
+              .withDefaultValue(Literals.stringLiteral("5"))
+              .build()
+        };
+
+    testColumns(expectedColumns7, alteredTable7.columns());
+
+    TableEntity tableEntity7 = entityStore.get(tableIdent, TABLE, 
TableEntity.class);
+    testColumnAndColumnEntities(expectedColumns7, tableEntity7.columns());
+
+    // 8. Update column auto increment
+    Table alteredTable8 =
+        tableOperationDispatcher.alterTable(
+            tableIdent, TableChange.updateColumnAutoIncrement(new String[] 
{"col4"}, false));
+
+    Column[] expectedColumns8 =
+        new Column[] {
+          TestColumn.builder()
+              .withName("col4")
+              .withType(Types.IntegerType.get())
+              .withComment("new comment")
+              .withNullable(false)
+              .withAutoIncrement(false)
+              .withDefaultValue(Literals.stringLiteral("5"))
+              .build()
+        };
+
+    testColumns(expectedColumns8, alteredTable8.columns());
+
+    TableEntity tableEntity8 = entityStore.get(tableIdent, TABLE, 
TableEntity.class);
+    testColumnAndColumnEntities(expectedColumns8, tableEntity8.columns());
+  }
+
+  @Test
+  public void testCreateAndDropTableWithColumn() throws IOException {
+    Namespace tableNs = Namespace.of(metalake, catalog, "schema111");
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    
schemaOperationDispatcher.createSchema(NameIdentifier.of(tableNs.levels()), 
"comment", props);
+
+    NameIdentifier tableIdent = NameIdentifier.of(tableNs, "table61");
+    Column[] columns =
+        new Column[] {
+          TestColumn.builder()
+              .withName("col1")
+              .withType(Types.StringType.get())
+              .withComment("comment1")
+              .withNullable(true)
+              .withAutoIncrement(true)
+              .withDefaultValue(Literals.stringLiteral("1"))
+              .build(),
+          TestColumn.builder()
+              .withName("col2")
+              .withType(Types.StringType.get())
+              .withComment("comment2")
+              .withNullable(false)
+              .withAutoIncrement(false)
+              .withDefaultValue(Literals.stringLiteral("2"))
+              .build()
+        };
+
+    Table table1 =
+        tableOperationDispatcher.createTable(
+            tableIdent, columns, "comment", props, new Transform[0]);
+    testColumns(columns, table1.columns());
+
+    // Delete table
+    boolean dropped = tableOperationDispatcher.dropTable(tableIdent);
+    Assertions.assertTrue(dropped);
+    Assertions.assertFalse(entityStore.exists(tableIdent, TABLE));
+  }
+
+  private static void testColumns(Column[] expectedColumns, Column[] 
actualColumns) {
+    Map<String, Column> expectedColumnMap =
+        expectedColumns == null
+            ? Collections.emptyMap()
+            : Arrays.stream(expectedColumns)
+                .collect(Collectors.toMap(c -> c.name().toLowerCase(), 
Function.identity()));
+    Map<String, Column> actualColumnMap =
+        actualColumns == null
+            ? Collections.emptyMap()
+            : Arrays.stream(actualColumns)
+                .collect(Collectors.toMap(Column::name, Function.identity()));
+
+    Assertions.assertEquals(expectedColumnMap.size(), actualColumnMap.size());
+    expectedColumnMap.forEach(
+        (name, expectedColumn) -> {
+          Column actualColumn = actualColumnMap.get(name);
+          Assertions.assertNotNull(actualColumn);
+          Assertions.assertEquals(expectedColumn.name().toLowerCase(), 
actualColumn.name());
+          Assertions.assertEquals(expectedColumn.dataType(), 
actualColumn.dataType());
+          Assertions.assertEquals(expectedColumn.comment(), 
actualColumn.comment());
+          Assertions.assertEquals(expectedColumn.nullable(), 
actualColumn.nullable());
+          Assertions.assertEquals(expectedColumn.autoIncrement(), 
actualColumn.autoIncrement());
+          Assertions.assertEquals(expectedColumn.defaultValue(), 
actualColumn.defaultValue());
+        });
+  }
+
+  private static void testColumnAndColumnEntities(
+      Column[] expectedColumns, List<ColumnEntity> ColumnEntities) {
+    Map<String, Column> expectedColumnMap =
+        expectedColumns == null
+            ? Collections.emptyMap()
+            : Arrays.stream(expectedColumns)
+                .collect(Collectors.toMap(Column::name, Function.identity()));
+    Map<String, ColumnEntity> actualColumnMap =
+        ColumnEntities == null
+            ? Collections.emptyMap()
+            : ColumnEntities.stream()
+                .collect(Collectors.toMap(ColumnEntity::name, 
Function.identity()));
+
+    Assertions.assertEquals(expectedColumnMap.size(), actualColumnMap.size());
+    expectedColumnMap.forEach(
+        (name, expectedColumn) -> {
+          ColumnEntity actualColumn = actualColumnMap.get(name);
+          Assertions.assertNotNull(actualColumn);
+          Assertions.assertEquals(expectedColumn.name(), actualColumn.name());
+          Assertions.assertEquals(expectedColumn.dataType(), 
actualColumn.dataType());
+          Assertions.assertEquals(expectedColumn.comment(), 
actualColumn.comment());
+          Assertions.assertEquals(expectedColumn.nullable(), 
actualColumn.nullable());
+          Assertions.assertEquals(expectedColumn.autoIncrement(), 
actualColumn.autoIncrement());
+          Assertions.assertEquals(expectedColumn.defaultValue(), 
actualColumn.defaultValue());
+        });
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java 
b/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java
index d6bbd81c3..13c465205 100644
--- 
a/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java
+++ 
b/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java
@@ -23,14 +23,18 @@ import com.google.common.collect.Maps;
 import java.io.File;
 import java.io.IOException;
 import java.time.Instant;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.Schema;
 import org.apache.gravitino.SchemaChange;
+import org.apache.gravitino.TestColumn;
 import org.apache.gravitino.TestFileset;
 import org.apache.gravitino.TestSchema;
 import org.apache.gravitino.TestTable;
@@ -198,17 +202,24 @@ public class TestCatalogOperations
           throw new TableAlreadyExistsException("Table %s already exists", 
ident);
         }
       } else {
-        throw new IllegalArgumentException("Unsupported table change: " + 
change);
+        // do nothing
       }
     }
 
+    TableChange.ColumnChange[] columnChanges =
+        Arrays.stream(changes)
+            .filter(change -> change instanceof TableChange.ColumnChange)
+            .map(change -> (TableChange.ColumnChange) change)
+            .toArray(TableChange.ColumnChange[]::new);
+    Column[] newColumns = updateColumns(table.columns(), columnChanges);
+
     TestTable updatedTable =
         TestTable.builder()
             .withName(newIdent.name())
             .withComment(table.comment())
             .withProperties(new HashMap<>(newProps))
             .withAuditInfo(updatedAuditInfo)
-            .withColumns(table.columns())
+            .withColumns(newColumns)
             .withPartitioning(table.partitioning())
             .withDistribution(table.distribution())
             .withSortOrders(table.sortOrder())
@@ -634,4 +645,126 @@ public class TestCatalogOperations
       return false;
     }
   }
+
+  private Column[] updateColumns(Column[] columns, TableChange.ColumnChange[] 
columnChanges) {
+    Map<String, Column> columnMap =
+        Arrays.stream(columns).collect(Collectors.toMap(Column::name, 
Function.identity()));
+
+    for (TableChange.ColumnChange columnChange : columnChanges) {
+      if (columnChange instanceof TableChange.AddColumn) {
+        TableChange.AddColumn addColumn = (TableChange.AddColumn) columnChange;
+        TestColumn column =
+            TestColumn.builder()
+                .withName(String.join(".", addColumn.fieldName()))
+                .withComment(addColumn.getComment())
+                .withType(addColumn.getDataType())
+                .withNullable(addColumn.isNullable())
+                .withAutoIncrement(addColumn.isAutoIncrement())
+                .withDefaultValue(addColumn.getDefaultValue())
+                .build();
+        columnMap.put(column.name(), column);
+
+      } else if (columnChange instanceof TableChange.DeleteColumn) {
+        columnMap.remove(String.join(".", columnChange.fieldName()));
+
+      } else if (columnChange instanceof TableChange.RenameColumn) {
+        String oldName = String.join(".", columnChange.fieldName());
+        String newName = ((TableChange.RenameColumn) 
columnChange).getNewName();
+        Column column = columnMap.remove(oldName);
+        TestColumn newColumn =
+            TestColumn.builder()
+                .withName(newName)
+                .withComment(column.comment())
+                .withType(column.dataType())
+                .withNullable(column.nullable())
+                .withAutoIncrement(column.autoIncrement())
+                .withDefaultValue(column.defaultValue())
+                .build();
+        columnMap.put(newName, newColumn);
+
+      } else if (columnChange instanceof TableChange.UpdateColumnDefaultValue) 
{
+        String columnName = String.join(".", columnChange.fieldName());
+        TableChange.UpdateColumnDefaultValue updateColumnDefaultValue =
+            (TableChange.UpdateColumnDefaultValue) columnChange;
+        Column oldColumn = columnMap.get(columnName);
+        TestColumn newColumn =
+            TestColumn.builder()
+                .withName(columnName)
+                .withComment(oldColumn.comment())
+                .withType(oldColumn.dataType())
+                .withNullable(oldColumn.nullable())
+                .withAutoIncrement(oldColumn.autoIncrement())
+                
.withDefaultValue(updateColumnDefaultValue.getNewDefaultValue())
+                .build();
+        columnMap.put(columnName, newColumn);
+
+      } else if (columnChange instanceof TableChange.UpdateColumnType) {
+        String columnName = String.join(".", columnChange.fieldName());
+        TableChange.UpdateColumnType updateColumnType = 
(TableChange.UpdateColumnType) columnChange;
+        Column oldColumn = columnMap.get(columnName);
+        TestColumn newColumn =
+            TestColumn.builder()
+                .withName(columnName)
+                .withComment(oldColumn.comment())
+                .withType(updateColumnType.getNewDataType())
+                .withNullable(oldColumn.nullable())
+                .withAutoIncrement(oldColumn.autoIncrement())
+                .withDefaultValue(oldColumn.defaultValue())
+                .build();
+        columnMap.put(columnName, newColumn);
+
+      } else if (columnChange instanceof TableChange.UpdateColumnComment) {
+        String columnName = String.join(".", columnChange.fieldName());
+        TableChange.UpdateColumnComment updateColumnComment =
+            (TableChange.UpdateColumnComment) columnChange;
+        Column oldColumn = columnMap.get(columnName);
+        TestColumn newColumn =
+            TestColumn.builder()
+                .withName(columnName)
+                .withComment(updateColumnComment.getNewComment())
+                .withType(oldColumn.dataType())
+                .withNullable(oldColumn.nullable())
+                .withAutoIncrement(oldColumn.autoIncrement())
+                .withDefaultValue(oldColumn.defaultValue())
+                .build();
+        columnMap.put(columnName, newColumn);
+
+      } else if (columnChange instanceof TableChange.UpdateColumnNullability) {
+        String columnName = String.join(".", columnChange.fieldName());
+        TableChange.UpdateColumnNullability updateColumnNullable =
+            (TableChange.UpdateColumnNullability) columnChange;
+        Column oldColumn = columnMap.get(columnName);
+        TestColumn newColumn =
+            TestColumn.builder()
+                .withName(columnName)
+                .withComment(oldColumn.comment())
+                .withType(oldColumn.dataType())
+                .withNullable(updateColumnNullable.nullable())
+                .withAutoIncrement(oldColumn.autoIncrement())
+                .withDefaultValue(oldColumn.defaultValue())
+                .build();
+        columnMap.put(columnName, newColumn);
+
+      } else if (columnChange instanceof 
TableChange.UpdateColumnAutoIncrement) {
+        String columnName = String.join(".", columnChange.fieldName());
+        TableChange.UpdateColumnAutoIncrement updateColumnAutoIncrement =
+            (TableChange.UpdateColumnAutoIncrement) columnChange;
+        Column oldColumn = columnMap.get(columnName);
+        TestColumn newColumn =
+            TestColumn.builder()
+                .withName(columnName)
+                .withComment(oldColumn.comment())
+                .withType(oldColumn.dataType())
+                .withNullable(oldColumn.nullable())
+                .withAutoIncrement(updateColumnAutoIncrement.isAutoIncrement())
+                .withDefaultValue(oldColumn.defaultValue())
+                .build();
+        columnMap.put(columnName, newColumn);
+
+      } else {
+        // do nothing
+      }
+    }
+    return columnMap.values().toArray(new Column[0]);
+  }
 }
diff --git a/scripts/h2/schema-0.7.0-h2.sql b/scripts/h2/schema-0.7.0-h2.sql
index 8a76f1e6a..bada37abc 100644
--- a/scripts/h2/schema-0.7.0-h2.sql
+++ b/scripts/h2/schema-0.7.0-h2.sql
@@ -93,11 +93,11 @@ CREATE TABLE IF NOT EXISTS `table_column_version_info` (
     `table_version` INT UNSIGNED NOT NULL COMMENT 'table version',
     `column_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'column id',
     `column_name` VARCHAR(128) NOT NULL COMMENT 'column name',
-    `column_type` VARCHAR(128) NOT NULL COMMENT 'column type',
+    `column_type` TEXT NOT NULL COMMENT 'column type',
     `column_comment` VARCHAR(256) DEFAULT '' COMMENT 'column comment',
     `column_nullable` TINYINT(1) NOT NULL DEFAULT 1 COMMENT 'column nullable, 
0 is not nullable, 1 is nullable',
     `column_auto_increment` TINYINT(1) NOT NULL DEFAULT 0 COMMENT 'column auto 
increment, 0 is not auto increment, 1 is auto increment',
-    `column_default_value` VARCHAR(256) DEFAULT NULL COMMENT 'column default 
value',
+    `column_default_value` TEXT DEFAULT NULL COMMENT 'column default value',
     `column_op_type` TINYINT(1) NOT NULL COMMENT 'column operation type, 1 is 
create, 2 is update, 3 is delete',
     `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'column 
deleted at',
     `audit_info` MEDIUMTEXT NOT NULL COMMENT 'column audit info',
diff --git a/scripts/h2/upgrade-0.6.0-to-0.7.0-h2.sql 
b/scripts/h2/upgrade-0.6.0-to-0.7.0-h2.sql
index fba1075f5..cdf1bbdc4 100644
--- a/scripts/h2/upgrade-0.6.0-to-0.7.0-h2.sql
+++ b/scripts/h2/upgrade-0.6.0-to-0.7.0-h2.sql
@@ -25,11 +25,11 @@ CREATE TABLE IF NOT EXISTS `table_column_version_info` (
     `table_version` INT UNSIGNED NOT NULL COMMENT 'table version',
     `column_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'column id',
     `column_name` VARCHAR(128) NOT NULL COMMENT 'column name',
-    `column_type` VARCHAR(128) NOT NULL COMMENT 'column type',
+    `column_type` TEXT NOT NULL COMMENT 'column type',
     `column_comment` VARCHAR(256) DEFAULT '' COMMENT 'column comment',
     `column_nullable` TINYINT(1) NOT NULL DEFAULT 1 COMMENT 'column nullable, 
0 is not nullable, 1 is nullable',
     `column_auto_increment` TINYINT(1) NOT NULL DEFAULT 0 COMMENT 'column auto 
increment, 0 is not auto increment, 1 is auto increment',
-    `column_default_value` VARCHAR(256) DEFAULT NULL COMMENT 'column default 
value',
+    `column_default_value` TEXT DEFAULT NULL COMMENT 'column default value',
     `column_op_type` TINYINT(1) NOT NULL COMMENT 'column operation type, 1 is 
create, 2 is update, 3 is delete',
     `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'column 
deleted at',
     `audit_info` MEDIUMTEXT NOT NULL COMMENT 'column audit info',
diff --git a/scripts/mysql/schema-0.7.0-mysql.sql 
b/scripts/mysql/schema-0.7.0-mysql.sql
index 858d47d49..13f46debc 100644
--- a/scripts/mysql/schema-0.7.0-mysql.sql
+++ b/scripts/mysql/schema-0.7.0-mysql.sql
@@ -88,11 +88,11 @@ CREATE TABLE IF NOT EXISTS `table_column_version_info` (
     `table_version` INT UNSIGNED NOT NULL COMMENT 'table version',
     `column_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'column id',
     `column_name` VARCHAR(128) NOT NULL COMMENT 'column name',
-    `column_type` VARCHAR(128) NOT NULL COMMENT 'column type',
+    `column_type` TEXT NOT NULL COMMENT 'column type',
     `column_comment` VARCHAR(256) DEFAULT '' COMMENT 'column comment',
     `column_nullable` TINYINT(1) NOT NULL DEFAULT 1 COMMENT 'column nullable, 
0 is not nullable, 1 is nullable',
     `column_auto_increment` TINYINT(1) NOT NULL DEFAULT 0 COMMENT 'column auto 
increment, 0 is not auto increment, 1 is auto increment',
-    `column_default_value` VARCHAR(256) DEFAULT NULL COMMENT 'column default 
value',
+    `column_default_value` TEXT DEFAULT NULL COMMENT 'column default value',
     `column_op_type` TINYINT(1) NOT NULL COMMENT 'column operation type, 1 is 
create, 2 is update, 3 is delete',
     `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'column 
deleted at',
     `audit_info` MEDIUMTEXT NOT NULL COMMENT 'column audit info',
diff --git a/scripts/mysql/upgrade-0.6.0-to-0.7.0-mysql.sql 
b/scripts/mysql/upgrade-0.6.0-to-0.7.0-mysql.sql
index 007fc0a8d..0afe56078 100644
--- a/scripts/mysql/upgrade-0.6.0-to-0.7.0-mysql.sql
+++ b/scripts/mysql/upgrade-0.6.0-to-0.7.0-mysql.sql
@@ -25,11 +25,11 @@ CREATE TABLE IF NOT EXISTS `table_column_version_info` (
     `table_version` INT UNSIGNED NOT NULL COMMENT 'table version',
     `column_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'column id',
     `column_name` VARCHAR(128) NOT NULL COMMENT 'column name',
-    `column_type` VARCHAR(128) NOT NULL COMMENT 'column type',
+    `column_type` TEXT NOT NULL COMMENT 'column type',
     `column_comment` VARCHAR(256) DEFAULT '' COMMENT 'column comment',
     `column_nullable` TINYINT(1) NOT NULL DEFAULT 1 COMMENT 'column nullable, 
0 is not nullable, 1 is nullable',
     `column_auto_increment` TINYINT(1) NOT NULL DEFAULT 0 COMMENT 'column auto 
increment, 0 is not auto increment, 1 is auto increment',
-    `column_default_value` VARCHAR(256) DEFAULT NULL COMMENT 'column default 
value',
+    `column_default_value` TEXT DEFAULT NULL COMMENT 'column default value',
     `column_op_type` TINYINT(1) NOT NULL COMMENT 'column operation type, 1 is 
create, 2 is update, 3 is delete',
     `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'column 
deleted at',
     `audit_info` MEDIUMTEXT NOT NULL COMMENT 'column audit info',
diff --git a/scripts/postgresql/schema-0.7.0-postgresql.sql 
b/scripts/postgresql/schema-0.7.0-postgresql.sql
index e2ad46cf5..d377c57b5 100644
--- a/scripts/postgresql/schema-0.7.0-postgresql.sql
+++ b/scripts/postgresql/schema-0.7.0-postgresql.sql
@@ -149,11 +149,11 @@ CREATE TABLE IF NOT EXISTS table_column_version_info (
     table_version INT NOT NULL,
     column_id BIGINT NOT NULL,
     column_name VARCHAR(128) NOT NULL,
-    column_type VARCHAR(128) NOT NULL,
+    column_type TEXT NOT NULL,
     column_comment VARCHAR(256) DEFAULT '',
     column_nullable SMALLINT NOT NULL DEFAULT 1,
     column_auto_increment SMALLINT NOT NULL DEFAULT 0,
-    column_default_value VARCHAR(256) DEFAULT NULL,
+    column_default_value TEXT DEFAULT NULL,
     column_op_type SMALLINT NOT NULL,
     deleted_at BIGINT NOT NULL DEFAULT 0,
     audit_info TEXT NOT NULL,

Reply via email to