yunqingmoswu commented on code in PR #6202:
URL: https://github.com/apache/inlong/pull/6202#discussion_r999388571


##########
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##########
@@ -162,29 +166,58 @@ public static KafkaLoadNode createLoadNode(KafkaSink 
kafkaSink, List<FieldInfo>
                 format = new JsonFormat();
                 break;
             case CANAL:
-                format = new CanalJsonFormat();
+                format = new RawFormat();
+                innerFormat = new CanalJsonFormat();
                 break;
             case DEBEZIUM_JSON:
-                format = new DebeziumJsonFormat();
+                format = new RawFormat();
+                innerFormat = new DebeziumJsonFormat();
                 break;
             default:
                 throw new IllegalArgumentException(String.format("Unsupported 
dataType=%s for Kafka", dataType));
         }
 
+        String sinkPartitioner = null;
+        if (dataType == DataTypeEnum.CANAL || dataType == 
DataTypeEnum.DEBEZIUM_JSON) {
+            sinkPartitioner = kafkaSink.getPartitionStrategy() == null ? null 
: RAW_HASH;
+        }
+
+        if (StringUtils.isNotEmpty(kafkaSink.getTopicName())) {
+            return new KafkaLoadNode(
+                    kafkaSink.getSinkName(),
+                    kafkaSink.getSinkName(),
+                    fieldInfos,
+                    fieldRelations,
+                    Lists.newArrayList(),
+                    null,
+                    kafkaSink.getTopicName(),
+                    kafkaSink.getBootstrapServers(),
+                    format,
+                    sinkParallelism,
+                    properties,
+                    kafkaSink.getPrimaryKey()
+            );
+        }
+
         return new KafkaLoadNode(
                 kafkaSink.getSinkName(),
                 kafkaSink.getSinkName(),
                 fieldInfos,
                 fieldRelations,
                 Lists.newArrayList(),
                 null,
-                kafkaSink.getTopicName(),
+                "mock_topic",

Review Comment:
   You can do it in upstream services for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to