yunqingmoswu commented on code in PR #6202: URL: https://github.com/apache/inlong/pull/6202#discussion_r999388571
########## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java: ########## @@ -162,29 +166,58 @@ public static KafkaLoadNode createLoadNode(KafkaSink kafkaSink, List<FieldInfo> format = new JsonFormat(); break; case CANAL: - format = new CanalJsonFormat(); + format = new RawFormat(); + innerFormat = new CanalJsonFormat(); break; case DEBEZIUM_JSON: - format = new DebeziumJsonFormat(); + format = new RawFormat(); + innerFormat = new DebeziumJsonFormat(); break; default: throw new IllegalArgumentException(String.format("Unsupported dataType=%s for Kafka", dataType)); } + String sinkPartitioner = null; + if (dataType == DataTypeEnum.CANAL || dataType == DataTypeEnum.DEBEZIUM_JSON) { + sinkPartitioner = kafkaSink.getPartitionStrategy() == null ? null : RAW_HASH; + } + + if (StringUtils.isNotEmpty(kafkaSink.getTopicName())) { + return new KafkaLoadNode( + kafkaSink.getSinkName(), + kafkaSink.getSinkName(), + fieldInfos, + fieldRelations, + Lists.newArrayList(), + null, + kafkaSink.getTopicName(), + kafkaSink.getBootstrapServers(), + format, + sinkParallelism, + properties, + kafkaSink.getPrimaryKey() + ); + } + return new KafkaLoadNode( kafkaSink.getSinkName(), kafkaSink.getSinkName(), fieldInfos, fieldRelations, Lists.newArrayList(), null, - kafkaSink.getTopicName(), + "mock_topic", Review Comment: You can do it in upstream services for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org