yunqingmoswu commented on code in PR #6750: URL: https://github.com/apache/inlong/pull/6750#discussion_r1053020028
########## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java: ########## @@ -164,6 +170,9 @@ public ProducerRecord<byte[], byte[]> serialize(RowData consumedRow, @Nullable L final byte[] valueSerialized = serializeWithDirtyHandle(consumedRow, DirtyType.VALUE_SERIALIZE_ERROR, valueSerialization); if (valueSerialized != null) { + if (metricData != null) { Review Comment: Why report dirty data metric here? ########## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java: ########## @@ -875,7 +902,12 @@ public void onCompletion(RecordMetadata metadata, Exception e) { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null && asyncException == null) { asyncException = exception; - sendDirtyMetrics(rowSize, dataSize); + } else if (metadata != null) { Review Comment: What should do if the metadata is null? ########## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java: ########## @@ -214,6 +217,7 @@ public KafkaDynamicSink( this.topicPattern = topicPattern; this.dirtyOptions = dirtyOptions; this.dirtySink = dirtySink; + this.migrateAll = migrateAll; Review Comment: migrateall -> multipleSink ########## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java: ########## @@ -199,6 +211,9 @@ public ProducerRecord<byte[], byte[]> serialize(RowData consumedRow, @Nullable L } else { valueSerialized = serializeWithDirtyHandle(valueRow, DirtyType.VALUE_SERIALIZE_ERROR, valueSerialization); mayDirtyData = mayDirtyData || valueSerialized == null; + if (metricData != null) { Review Comment: Why report dirty data metric here? ########## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java: ########## @@ -525,7 +527,8 @@ protected KafkaDynamicSink createKafkaTableSink( @Nullable String sinkMultipleFormat, @Nullable String topicPattern, DirtyOptions dirtyOptions, - @Nullable DirtySink<Object> dirtySink) { + @Nullable DirtySink<Object> dirtySink, + boolean migrateAll) { Review Comment: migrateall -> multipleSink ########## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java: ########## @@ -859,10 +883,13 @@ public void open(Configuration configuration) throws Exception { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { - sendDirtyMetrics(rowSize, dataSize); - LOG.error( - "Error while sending record to Kafka: " + e.getMessage(), - e); + LOG.error("Error while sending record to Kafka: " + e.getMessage(), e); + } else if (metadata != null) { Review Comment: What should do if the metadata is null? ########## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java: ########## @@ -179,6 +188,9 @@ public ProducerRecord<byte[], byte[]> serialize(RowData consumedRow, @Nullable L } else { final RowData keyRow = createProjectedRow(consumedRow, RowKind.INSERT, keyFieldGetters); keySerialized = serializeWithDirtyHandle(keyRow, DirtyType.KEY_SERIALIZE_ERROR, keySerialization); + if (metricData != null) { Review Comment: Why report dirty data metric here? ########## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java: ########## @@ -181,7 +183,8 @@ public KafkaDynamicSink( @Nullable String sinkMultipleFormat, @Nullable String topicPattern, DirtyOptions dirtyOptions, - @Nullable DirtySink<Object> dirtySink) { + @Nullable DirtySink<Object> dirtySink, + boolean migrateAll) { Review Comment: migrateall -> multipleSink ########## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java: ########## @@ -440,7 +441,8 @@ public DynamicTableSink createDynamicTableSink(Context context) { sinkMultipleFormat, tableOptions.getOptional(TOPIC_PATTERN).orElse(null), dirtyOptions, - dirtySink); + dirtySink, + migrateAll); Review Comment: migrateall -> multipleSink ########## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java: ########## @@ -319,7 +323,8 @@ public DynamicTableSink copy() { sinkMultipleFormat, topicPattern, dirtyOptions, - dirtySink); + dirtySink, + migrateAll); Review Comment: migrateall -> multipleSink ########## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java: ########## @@ -294,7 +296,8 @@ public DynamicTableSink createDynamicTableSink(Context context) { null, null, dirtyOptions, - dirtySink); + dirtySink, + migrateAll); Review Comment: migrateall -> multipleSink ########## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java: ########## @@ -547,6 +550,7 @@ protected KafkaDynamicSink createKafkaTableSink( sinkMultipleFormat, topicPattern, dirtyOptions, - dirtySink); + dirtySink, + migrateAll); Review Comment: migrateall -> multipleSink -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org