zhilinli123 commented on code in PR #8936: URL: https://github.com/apache/seatunnel/pull/8936#discussion_r1986568151
########## seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java: ########## @@ -259,4 +277,278 @@ public Object queryNextChunkMax( } } } + + @Override + public TypeConverter<BasicTypeDefine> getTypeConverter() { + return SqlServerTypeConverter.INSTANCE; + } + + @Override + public void applySchemaChange( + Connection connection, TablePath tablePath, AlterTableAddColumnEvent event) + throws SQLException { + List<String> ddlSQL = new ArrayList<>(); + Column column = event.getColumn(); + String sourceDialectName = event.getSourceDialectName(); + boolean sameCatalog = StringUtils.equals(dialectName(), sourceDialectName); + BasicTypeDefine typeDefine = getTypeConverter().reconvert(column); + String columnType = sameCatalog ? column.getSourceType() : typeDefine.getColumnType(); + + // Build the SQL statement that add the column + StringBuilder sqlBuilder = + new StringBuilder() + .append("ALTER TABLE ") + .append(tableIdentifier(tablePath)) + .append(" ADD ") + .append(quoteIdentifier(column.getName())) + .append(" ") + .append(columnType) + .append(" "); + + if (column.getDefaultValue() != null) { + // Handle default values + String defaultValueClause = sqlClauseWithDefaultValue(typeDefine, sourceDialectName); + sqlBuilder.append(defaultValueClause); + } + + if (!column.isNullable()) { + // Handle null constraints + sqlBuilder.append(" NOT NULL"); + } + + ddlSQL.add(sqlBuilder.toString()); + // Process column comment + if (column.getComment() != null) { + ddlSQL.add(buildColumnCommentSQL(tablePath, column)); + } + + // Execute the DDL statement + executeDDL(connection, ddlSQL); + } + + @Override + public void applySchemaChange( + Connection connection, TablePath tablePath, AlterTableChangeColumnEvent event) + throws SQLException { + List<String> ddlSQL = new ArrayList<>(); + if (event.getOldColumn() != null + && !(event.getColumn().getName().equals(event.getOldColumn()))) { + StringBuilder sqlBuilder = + new StringBuilder() + .append("EXEC sp_rename ") + .append( + String.format( + "'%s.%s.%s.%s', ", + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + tablePath.getTableName(), + event.getOldColumn())) + .append(String.format("'%s', 'COLUMN';", event.getColumn().getName())); + ddlSQL.add(sqlBuilder.toString()); + } + + executeDDL(connection, ddlSQL); + + if (event.getColumn().getDataType() != null) { + applySchemaChange( + connection, + tablePath, + AlterTableModifyColumnEvent.modify(event.tableIdentifier(), event.getColumn())); + } + } + + @Override + public void applySchemaChange( + Connection connection, TablePath tablePath, AlterTableModifyColumnEvent event) + throws SQLException { + Column column = event.getColumn(); + String sourceDialectName = event.getSourceDialectName(); + boolean sameCatalog = StringUtils.equals(dialectName(), sourceDialectName); + BasicTypeDefine typeDefine = getTypeConverter().reconvert(column); + String columnType = sameCatalog ? column.getSourceType() : typeDefine.getColumnType(); + if (event.getTypeChanged() != null + && event.getTypeChanged() + && SQLSERVER_TEXT.equals(typeDefine.getColumnType())) { + log.warn( + "SqlServer does not support modifying the TEXT type directly. " + + "Please use ALTER TABLE MODIFY COLUMN to change the column type."); + } + + List<String> ddlSQL = new ArrayList<>(); + // Handle field default constraints. + if (column.getDefaultValue() != null) { + if (sameCatalog + || !isSpecialDefaultValue(typeDefine.getDefaultValue(), sourceDialectName)) { + String constraintQuery = + String.format( + "SELECT dc.name AS constraint_name\n" + + "FROM sys.default_constraints dc \n" + + "JOIN sys.columns c ON dc.parent_object_id = c.object_id AND dc.parent_column_id = c.column_id \n" + + "JOIN sys.tables t ON c.object_id = t.object_id \n" + + "JOIN sys.schemas s ON t.schema_id = s.schema_id \n" + + "WHERE t.name = '%s' AND s.name = '%s' AND c.name = '%s';", + tablePath.getTableName(), + tablePath.getSchemaName(), + event.getColumn().getName()); + + try (Statement stmt = connection.createStatement(); + ResultSet rs = stmt.executeQuery(constraintQuery)) { + while (rs.next()) { + String constraintName = rs.getString(1); + if (StringUtils.isBlank(constraintName)) { + continue; + } + String dropConstraintSQL = + String.format( + "ALTER TABLE %s DROP CONSTRAINT %s", + tableIdentifier(tablePath), + quoteIdentifier(constraintName)); + ddlSQL.add(dropConstraintSQL); + } + } + + // Process column default + String defaultValueClause = + sqlClauseWithDefaultValue(typeDefine, sourceDialectName); + if (StringUtils.isNotBlank(defaultValueClause)) { + String defaultSqlBuilder = + "ALTER TABLE " + + tableIdentifier(tablePath) + + " ADD " + + defaultValueClause + + " FOR " + + quoteIdentifier(column.getName()); + ddlSQL.add(defaultSqlBuilder); + } + } else { + log.warn( + "Skipping unsupported default value for column {} in table {}.", + column.getName(), + tablePath.getFullName()); + } + } + + // Process column comment + if (column.getComment() != null) { + ddlSQL.add(buildColumnCommentSQL(tablePath, column)); + } + + // Build the SQL statement that modifies the column + StringBuilder sqlBuilder = + new StringBuilder("ALTER TABLE ") + .append(tableIdentifier(tablePath)) + .append(" ALTER COLUMN ") + .append(quoteIdentifier(column.getName())) + .append(" ") + .append(columnType); + boolean targetColumnNullable = columnIsNullable(connection, tablePath, column.getName()); + if (column.isNullable() != targetColumnNullable && !targetColumnNullable) { + sqlBuilder.append(" NULL "); + } + ddlSQL.add(sqlBuilder.toString()); + + // Execute the DDL statement + executeDDL(connection, ddlSQL); + } + + @Override + public void applySchemaChange( + Connection connection, TablePath tablePath, AlterTableDropColumnEvent event) + throws SQLException { + // Handle field`s constraints. + String constraintQuery = + String.format( + "SELECT dc.name AS constraint_name\n" + + "FROM sys.default_constraints dc \n" + + "JOIN sys.columns c ON dc.parent_object_id = c.object_id AND dc.parent_column_id = c.column_id \n" + + "JOIN sys.tables t ON c.object_id = t.object_id \n" + + "JOIN sys.schemas s ON t.schema_id = s.schema_id \n" + + "WHERE t.name = '%s' AND c.name = '%s' and s.name = '%s';", + tablePath.getTableName(), event.getColumn(), tablePath.getSchemaName()); + + try (Statement stmt = connection.createStatement(); + ResultSet rs = stmt.executeQuery(constraintQuery)) { + while (rs.next()) { + String constraintName = rs.getString(1); + String dropConstraintSQL = + String.format( + "ALTER TABLE %s DROP CONSTRAINT %s", + tableIdentifier(tablePath), quoteIdentifier(constraintName)); + try (Statement dropStmt = connection.createStatement()) { + log.info("Executing drop constraint SQL: {}", dropConstraintSQL); + dropStmt.execute(dropConstraintSQL); + } + } + } + + String dropColumnSQL = + String.format( + "ALTER TABLE %s DROP COLUMN %s", + tableIdentifier(tablePath), quoteIdentifier(event.getColumn())); + try (Statement statement = connection.createStatement()) { + log.info("Executing drop column SQL: {}", dropColumnSQL); + statement.execute(dropColumnSQL); + } + } + + @Override + public boolean needsQuotesWithDefaultValue(BasicTypeDefine columnDefine) { + String sqlServerType = columnDefine.getDataType(); + switch (sqlServerType) { + case SQLSERVER_CHAR: + case SQLSERVER_VARCHAR: + case SQLSERVER_NCHAR: + case SQLSERVER_NVARCHAR: + case SQLSERVER_TEXT: + case SQLSERVER_NTEXT: + case SQLSERVER_XML: + case SQLSERVER_UNIQUEIDENTIFIER: + case SQLSERVER_SQLVARIANT: + return true; + default: + return false; + } + } + + private void executeDDL(Connection connection, List<String> ddlSQL) throws SQLException { + try (Statement statement = connection.createStatement()) { + for (String sql : ddlSQL) { + log.info("Executing SqlServer SQL: {}", sql); + statement.execute(sql); + } + } catch (SQLException e) { + throw new SQLException("Error executing SqlServer SQL: " + ddlSQL, e.getSQLState(), e); + } + } + + private String buildColumnCommentSQL(TablePath tablePath, Column column) { + return String.format( + "EXEC %s.sys.sp_updateextendedproperty 'MS_Description', N'%s', 'schema', N'%s', " + + "'table', N'%s', 'column', N'%s';", + tablePath.getDatabaseName(), + column.getComment(), + tablePath.getSchemaName(), + tablePath.getTableName(), + column.getName()); + } + + private boolean columnIsNullable(Connection connection, TablePath tablePath, String column) + throws SQLException { + String selectColumnSQL = + String.format( + "SELECT IS_NULLABLE FROM information_schema.COLUMNS WHERE TABLE_CATALOG = '%s' " + + "AND TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND COLUMN_NAME = '%s';", + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + tablePath.getTableName(), + column); + try (Statement statement = connection.createStatement()) { + ResultSet rs = statement.executeQuery(selectColumnSQL); + if (rs.next()) { + return rs.getString("IS_NULLABLE").equals("YES"); + } else { + throw new SQLException("Column not found: " + column); Review Comment: Print detail log -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org