DongLiang-0 commented on code in PR #248: URL: https://github.com/apache/doris-flink-connector/pull/248#discussion_r1408955427
########## flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java: ########## @@ -405,6 +479,25 @@ protected String extractTable(JsonNode record) { return extractJsonNode(record.get("source"), "table"); } + /** + * Parse event type + */ + protected EventType extractEventType(JsonNode record) throws JsonProcessingException { + JsonNode tableChange = extractTableChange(record); + if(tableChange == null || tableChange.get("type") == null){ + return null; + } + String type = tableChange.get("type").asText(); + if(EventType.ALTER.toString().equalsIgnoreCase(type)){ + return EventType.ALTER; + }else if(EventType.CREATE.toString().equalsIgnoreCase(type)){ + return EventType.CREATE; + }else{ Review Comment: This `else` seems redundant. ########## flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java: ########## @@ -214,44 +216,70 @@ public boolean schemaChangeV2(JsonNode recordRoot) { return false; } - // db,table - Tuple2<String, String> tuple = getDorisTableTuple(recordRoot); - if(tuple == null){ - return false; - } - - List<String> ddlSqlList = extractDDLList(recordRoot); - if (CollectionUtils.isEmpty(ddlSqlList)) { - LOG.info("ddl can not do schema change:{}", recordRoot); + EventType eventType = extractEventType(recordRoot); + if(eventType == null){ return false; } - - List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas(); - for (int i = 0; i < ddlSqlList.size(); i++) { - DDLSchema ddlSchema = ddlSchemas.get(i); - String ddlSql = ddlSqlList.get(i); - boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddlSchema); - status = doSchemaChange && schemaChangeManager.execute(ddlSql, tuple.f0); - LOG.info("schema change status:{}, ddl:{}", status, ddlSql); + if(eventType.equals(EventType.CREATE)){ + TableSchema tableSchema = extractCreateTableSchema(recordRoot); + status = schemaChangeManager.createTable(tableSchema); + if(status){ + String cdcTbl = getCdcTableIdentifier(recordRoot); + String dorisTbl = getCreateTableIdentifier(recordRoot); + tableMapping.put(cdcTbl, dorisTbl); + LOG.info("create table ddl status: {}", status); + } + } else if (eventType.equals(EventType.ALTER)){ + // db,table + Tuple2<String, String> tuple = getDorisTableTuple(recordRoot); + if(tuple == null){ + return false; + } + List<String> ddlSqlList = extractDDLList(recordRoot); + if (CollectionUtils.isEmpty(ddlSqlList)) { + LOG.info("ddl can not do schema change:{}", recordRoot); + return false; + } + List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas(); + for (int i = 0; i < ddlSqlList.size(); i++) { + DDLSchema ddlSchema = ddlSchemas.get(i); + String ddlSql = ddlSqlList.get(i); + boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddlSchema); + status = doSchemaChange && schemaChangeManager.execute(ddlSql, tuple.f0); + LOG.info("schema change status:{}, ddl:{}", status, ddlSql); + } + } else{ + LOG.info("Unsupported event type {}", eventType); } } catch (Exception ex) { LOG.warn("schema change error :", ex); } return status; } + protected JsonNode extractTableChange(JsonNode record) throws JsonProcessingException { + JsonNode historyRecord = extractHistoryRecord(record); + JsonNode tableChanges = historyRecord.get("tableChanges"); + if(!Objects.isNull(tableChanges)){ + JsonNode tableChange = tableChanges.get(0); + return tableChange; + } + return null; + } + + /** + * Parse Alter Event + */ @VisibleForTesting - public List<String> extractDDLList(JsonNode record) throws JsonProcessingException { + public List<String> extractDDLList(JsonNode record) throws IOException{ String dorisTable = getDorisTableIdentifier(record); JsonNode historyRecord = extractHistoryRecord(record); - JsonNode tableChanges = historyRecord.get("tableChanges"); String ddl = extractJsonNode(historyRecord, "ddl"); - if (Objects.isNull(tableChanges) || Objects.isNull(ddl)) { - return new ArrayList<>(); + JsonNode tableChange = extractTableChange(record); + if (Objects.isNull(tableChange) || Objects.isNull(ddl)) { + return null; } - LOG.debug("received debezium ddl :{}", ddl); - JsonNode tableChange = tableChanges.get(0); - if (Objects.isNull(tableChange) || !tableChange.get("type").asText().equals("ALTER")) { + if(!EventType.ALTER.equals(extractEventType(record))){ Review Comment: Is this redundant? Line 232 already determines whether it is `ALTER`. ########## flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java: ########## @@ -624,4 +715,9 @@ private String handleType(String type) { } + enum EventType{ Review Comment: EventType seems to be only used for schema changes. add a `private` keyword here to prevent it from being called elsewhere? or propose it separately as a public `EventType` enumeration. ########## flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java: ########## @@ -285,6 +313,46 @@ public List<String> extractDDLList(JsonNode record) throws JsonProcessingExcepti return SchemaChangeHelper.generateDDLSql(dorisTable); } + private TableSchema extractCreateTableSchema(JsonNode record) throws JsonProcessingException { Review Comment: Can you add a unit test here? ########## flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/JsonDebeziumSchemaSerializer.java: ########## @@ -214,44 +216,70 @@ public boolean schemaChangeV2(JsonNode recordRoot) { return false; } - // db,table - Tuple2<String, String> tuple = getDorisTableTuple(recordRoot); - if(tuple == null){ - return false; - } - - List<String> ddlSqlList = extractDDLList(recordRoot); - if (CollectionUtils.isEmpty(ddlSqlList)) { - LOG.info("ddl can not do schema change:{}", recordRoot); + EventType eventType = extractEventType(recordRoot); + if(eventType == null){ return false; } - - List<DDLSchema> ddlSchemas = SchemaChangeHelper.getDdlSchemas(); - for (int i = 0; i < ddlSqlList.size(); i++) { - DDLSchema ddlSchema = ddlSchemas.get(i); - String ddlSql = ddlSqlList.get(i); - boolean doSchemaChange = checkSchemaChange(tuple.f0, tuple.f1, ddlSchema); - status = doSchemaChange && schemaChangeManager.execute(ddlSql, tuple.f0); - LOG.info("schema change status:{}, ddl:{}", status, ddlSql); + if(eventType.equals(EventType.CREATE)){ + TableSchema tableSchema = extractCreateTableSchema(recordRoot); + status = schemaChangeManager.createTable(tableSchema); + if(status){ + String cdcTbl = getCdcTableIdentifier(recordRoot); + String dorisTbl = getCreateTableIdentifier(recordRoot); + tableMapping.put(cdcTbl, dorisTbl); + LOG.info("create table ddl status: {}", status); Review Comment: Do we also need to record table information where create failed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org