yunqingmoswu commented on code in PR #6750:
URL: https://github.com/apache/inlong/pull/6750#discussion_r1053020028


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java:
##########
@@ -164,6 +170,9 @@ public ProducerRecord<byte[], byte[]> serialize(RowData 
consumedRow, @Nullable L
             final byte[] valueSerialized = 
serializeWithDirtyHandle(consumedRow,
                     DirtyType.VALUE_SERIALIZE_ERROR, valueSerialization);
             if (valueSerialized != null) {
+                if (metricData != null) {

Review Comment:
   Why report dirty data metric here?



##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java:
##########
@@ -875,7 +902,12 @@ public void onCompletion(RecordMetadata metadata, 
Exception e) {
                         public void onCompletion(RecordMetadata metadata, 
Exception exception) {
                             if (exception != null && asyncException == null) {
                                 asyncException = exception;
-                                sendDirtyMetrics(rowSize, dataSize);
+                            } else if (metadata != null) {

Review Comment:
   What should do if the metadata is null?



##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java:
##########
@@ -214,6 +217,7 @@ public KafkaDynamicSink(
         this.topicPattern = topicPattern;
         this.dirtyOptions = dirtyOptions;
         this.dirtySink = dirtySink;
+        this.migrateAll = migrateAll;

Review Comment:
   migrateall -> multipleSink



##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java:
##########
@@ -199,6 +211,9 @@ public ProducerRecord<byte[], byte[]> serialize(RowData 
consumedRow, @Nullable L
         } else {
             valueSerialized = serializeWithDirtyHandle(valueRow, 
DirtyType.VALUE_SERIALIZE_ERROR, valueSerialization);
             mayDirtyData = mayDirtyData || valueSerialized == null;
+            if (metricData != null) {

Review Comment:
   Why report dirty data metric here?



##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java:
##########
@@ -525,7 +527,8 @@ protected KafkaDynamicSink createKafkaTableSink(
             @Nullable String sinkMultipleFormat,
             @Nullable String topicPattern,
             DirtyOptions dirtyOptions,
-            @Nullable DirtySink<Object> dirtySink) {
+            @Nullable DirtySink<Object> dirtySink,
+            boolean migrateAll) {

Review Comment:
   migrateall -> multipleSink



##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java:
##########
@@ -859,10 +883,13 @@ public void open(Configuration configuration) throws 
Exception {
                         @Override
                         public void onCompletion(RecordMetadata metadata, 
Exception e) {
                             if (e != null) {
-                                sendDirtyMetrics(rowSize, dataSize);
-                                LOG.error(
-                                        "Error while sending record to Kafka: 
" + e.getMessage(),
-                                        e);
+                                LOG.error("Error while sending record to 
Kafka: " + e.getMessage(), e);
+                            } else if (metadata != null) {

Review Comment:
   What should do if the metadata is null?



##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java:
##########
@@ -179,6 +188,9 @@ public ProducerRecord<byte[], byte[]> serialize(RowData 
consumedRow, @Nullable L
         } else {
             final RowData keyRow = createProjectedRow(consumedRow, 
RowKind.INSERT, keyFieldGetters);
             keySerialized = serializeWithDirtyHandle(keyRow, 
DirtyType.KEY_SERIALIZE_ERROR, keySerialization);
+            if (metricData != null) {

Review Comment:
   Why report dirty data metric here?



##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java:
##########
@@ -181,7 +183,8 @@ public KafkaDynamicSink(
             @Nullable String sinkMultipleFormat,
             @Nullable String topicPattern,
             DirtyOptions dirtyOptions,
-            @Nullable DirtySink<Object> dirtySink) {
+            @Nullable DirtySink<Object> dirtySink,
+            boolean migrateAll) {

Review Comment:
   migrateall -> multipleSink



##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java:
##########
@@ -440,7 +441,8 @@ public DynamicTableSink createDynamicTableSink(Context 
context) {
                 sinkMultipleFormat,
                 tableOptions.getOptional(TOPIC_PATTERN).orElse(null),
                 dirtyOptions,
-                dirtySink);
+                dirtySink,
+                migrateAll);

Review Comment:
   migrateall -> multipleSink



##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java:
##########
@@ -319,7 +323,8 @@ public DynamicTableSink copy() {
                         sinkMultipleFormat,
                         topicPattern,
                         dirtyOptions,
-                        dirtySink);
+                        dirtySink,
+                        migrateAll);

Review Comment:
   migrateall -> multipleSink



##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java:
##########
@@ -294,7 +296,8 @@ public DynamicTableSink createDynamicTableSink(Context 
context) {
                 null,
                 null,
                 dirtyOptions,
-                dirtySink);
+                dirtySink,
+                migrateAll);

Review Comment:
   migrateall -> multipleSink



##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java:
##########
@@ -547,6 +550,7 @@ protected KafkaDynamicSink createKafkaTableSink(
                 sinkMultipleFormat,
                 topicPattern,
                 dirtyOptions,
-                dirtySink);
+                dirtySink,
+                migrateAll);

Review Comment:
   migrateall -> multipleSink



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to