zhilinli123 commented on code in PR #8936: URL: https://github.com/apache/seatunnel/pull/8936#discussion_r1986566983
########## seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java: ########## @@ -259,4 +277,278 @@ public Object queryNextChunkMax( } } } + + @Override + public TypeConverter<BasicTypeDefine> getTypeConverter() { + return SqlServerTypeConverter.INSTANCE; + } + + @Override + public void applySchemaChange( + Connection connection, TablePath tablePath, AlterTableAddColumnEvent event) + throws SQLException { + List<String> ddlSQL = new ArrayList<>(); + Column column = event.getColumn(); + String sourceDialectName = event.getSourceDialectName(); + boolean sameCatalog = StringUtils.equals(dialectName(), sourceDialectName); + BasicTypeDefine typeDefine = getTypeConverter().reconvert(column); + String columnType = sameCatalog ? column.getSourceType() : typeDefine.getColumnType(); + + // Build the SQL statement that add the column + StringBuilder sqlBuilder = + new StringBuilder() + .append("ALTER TABLE ") + .append(tableIdentifier(tablePath)) + .append(" ADD ") + .append(quoteIdentifier(column.getName())) + .append(" ") + .append(columnType) + .append(" "); + + if (column.getDefaultValue() != null) { + // Handle default values + String defaultValueClause = sqlClauseWithDefaultValue(typeDefine, sourceDialectName); + sqlBuilder.append(defaultValueClause); + } + + if (!column.isNullable()) { + // Handle null constraints + sqlBuilder.append(" NOT NULL"); + } + + ddlSQL.add(sqlBuilder.toString()); + // Process column comment + if (column.getComment() != null) { + ddlSQL.add(buildColumnCommentSQL(tablePath, column)); + } + + // Execute the DDL statement + executeDDL(connection, ddlSQL); + } + + @Override + public void applySchemaChange( + Connection connection, TablePath tablePath, AlterTableChangeColumnEvent event) + throws SQLException { + List<String> ddlSQL = new ArrayList<>(); + if (event.getOldColumn() != null + && !(event.getColumn().getName().equals(event.getOldColumn()))) { + StringBuilder sqlBuilder = + new StringBuilder() + .append("EXEC sp_rename ") + .append( + String.format( + "'%s.%s.%s.%s', ", + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + tablePath.getTableName(), + event.getOldColumn())) + .append(String.format("'%s', 'COLUMN';", event.getColumn().getName())); + ddlSQL.add(sqlBuilder.toString()); + } + + executeDDL(connection, ddlSQL); + + if (event.getColumn().getDataType() != null) { + applySchemaChange( + connection, + tablePath, + AlterTableModifyColumnEvent.modify(event.tableIdentifier(), event.getColumn())); + } + } + + @Override + public void applySchemaChange( + Connection connection, TablePath tablePath, AlterTableModifyColumnEvent event) + throws SQLException { + Column column = event.getColumn(); + String sourceDialectName = event.getSourceDialectName(); + boolean sameCatalog = StringUtils.equals(dialectName(), sourceDialectName); + BasicTypeDefine typeDefine = getTypeConverter().reconvert(column); + String columnType = sameCatalog ? column.getSourceType() : typeDefine.getColumnType(); + if (event.getTypeChanged() != null + && event.getTypeChanged() + && SQLSERVER_TEXT.equals(typeDefine.getColumnType())) { + log.warn( + "SqlServer does not support modifying the TEXT type directly. " + + "Please use ALTER TABLE MODIFY COLUMN to change the column type."); + } + + List<String> ddlSQL = new ArrayList<>(); + // Handle field default constraints. + if (column.getDefaultValue() != null) { + if (sameCatalog + || !isSpecialDefaultValue(typeDefine.getDefaultValue(), sourceDialectName)) { + String constraintQuery = + String.format( + "SELECT dc.name AS constraint_name\n" + + "FROM sys.default_constraints dc \n" + + "JOIN sys.columns c ON dc.parent_object_id = c.object_id AND dc.parent_column_id = c.column_id \n" + + "JOIN sys.tables t ON c.object_id = t.object_id \n" + + "JOIN sys.schemas s ON t.schema_id = s.schema_id \n" + + "WHERE t.name = '%s' AND s.name = '%s' AND c.name = '%s';", + tablePath.getTableName(), + tablePath.getSchemaName(), + event.getColumn().getName()); + + try (Statement stmt = connection.createStatement(); + ResultSet rs = stmt.executeQuery(constraintQuery)) { + while (rs.next()) { + String constraintName = rs.getString(1); + if (StringUtils.isBlank(constraintName)) { + continue; + } + String dropConstraintSQL = + String.format( + "ALTER TABLE %s DROP CONSTRAINT %s", + tableIdentifier(tablePath), + quoteIdentifier(constraintName)); + ddlSQL.add(dropConstraintSQL); + } + } + + // Process column default + String defaultValueClause = + sqlClauseWithDefaultValue(typeDefine, sourceDialectName); + if (StringUtils.isNotBlank(defaultValueClause)) { + String defaultSqlBuilder = + "ALTER TABLE " + + tableIdentifier(tablePath) + + " ADD " + + defaultValueClause + + " FOR " + + quoteIdentifier(column.getName()); + ddlSQL.add(defaultSqlBuilder); + } + } else { + log.warn( + "Skipping unsupported default value for column {} in table {}.", + column.getName(), + tablePath.getFullName()); + } + } + + // Process column comment + if (column.getComment() != null) { + ddlSQL.add(buildColumnCommentSQL(tablePath, column)); + } + + // Build the SQL statement that modifies the column + StringBuilder sqlBuilder = + new StringBuilder("ALTER TABLE ") + .append(tableIdentifier(tablePath)) + .append(" ALTER COLUMN ") Review Comment: With StringBuilder repeated redundancy above, it is possible to encapsulate a common writing method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org