hantmac commented on code in PR #9661:
URL: https://github.com/apache/seatunnel/pull/9661#discussion_r2302669246
##########
seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkWriter.java:
##########
@@ -110,46 +143,142 @@ public DatabendSinkWriter(
e);
}
} else {
- // use the catalog table schema to create the target table
- SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
- if (rowType == null || rowType.getFieldNames().length == 0) {
- throw new DatabendConnectorException(
- DatabendConnectorErrorCode.SCHEMA_NOT_FOUND,
- "Source table schema is empty or null");
- }
-
try {
- if (!tableExists(database, table)) {
+ if (isCdcMode) {
+ // In CDC mode, we don't create tables here, it's done in
AggregatedCommitter
+ // We'll get the raw table and stream names from the
committer via prepareCommit
log.info(
- "Target table {}.{} does not exist, creating with
source schema",
- database,
- table);
- createTable(database, table, rowType);
+ "CDC mode enabled, table creation will be handled
by AggregatedCommitter");
} else {
- log.info("Target table {}.{} exists, verifying schema",
database, table);
- verifyTableSchema(database, table, rowType);
+ // Traditional mode
+ initTraditionalMode(database, table);
}
} catch (SQLException e) {
throw new DatabendConnectorException(
DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
- "Failed to verify/create target table: " +
e.getMessage(),
+ "Failed to initialize sink writer: " + e.getMessage(),
e);
}
+ }
+ }
- this.insertSql = generateInsertSql(database, table, rowType);
- log.info("Generated insert SQL: {}", insertSql);
- try {
- this.schemaChangeManager = new
SchemaChangeManager(databendSinkConfig);
- this.preparedStatement =
connection.prepareStatement(insertSql);
- this.preparedStatement.setQueryTimeout(executeTimeoutSec);
- log.info("PreparedStatement created successfully");
- } catch (SQLException e) {
- throw new DatabendConnectorException(
- DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
- "Failed to prepare statement: " + e.getMessage(),
- e);
- }
+ private String getCurrentTimestamp() {
+ return
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"));
+ }
+
+ private void initializeCdcPreparedStatement() throws SQLException {
+ log.info("Initializing CDC PreparedStatement");
+
+ // In CDC mode, the rawTableName should be set by the
AggregatedCommitter
+ // If it's not set yet, we can't proceed with CDC operations
+ if (rawTableName == null || rawTableName.isEmpty()) {
+ throw new DatabendConnectorException(
+ DatabendConnectorErrorCode.SQL_OPERATION_FAILED,
+ "Raw table name not set by AggregatedCommitter. Cannot
initialize CDC PreparedStatement.");
+ }
+
+ // Generate insert SQL for raw table
+ String insertRawSql =
generateInsertRawSql(sinkTablePath.getDatabaseName());
+
+ // Create the PreparedStatement
+ this.cdcPreparedStatement = connection.prepareStatement(insertRawSql);
+ this.cdcPreparedStatement.setQueryTimeout(executeTimeoutSec);
+
+ log.info("CDC PreparedStatement created successfully with SQL: {}",
insertRawSql);
+ }
+
+ private void initTraditionalMode(String database, String table) throws
SQLException {
+ // use the catalog table schema to create the target table
+ SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
+ if (rowType == null || rowType.getFieldNames().length == 0) {
+ throw new DatabendConnectorException(
+ DatabendConnectorErrorCode.SCHEMA_NOT_FOUND,
+ "Source table schema is empty or null");
+ }
+
+ if (!tableExists(database, table)) {
+ log.info(
+ "Target table {}.{} does not exist, creating with source
schema",
+ database,
+ table);
+ createTable(database, table, rowType);
+ } else {
+ log.info("Target table {}.{} exists, verifying schema", database,
table);
+ verifyTableSchema(database, table, rowType);
+ }
Review Comment:
Thanks, this part is for testing, I forgot to remove it. Removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]