This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 34a7e8f50 [INLONG-7411][Sort] Fix the invalid of kafka source meitric due to inlongMetric being null (#7409) 34a7e8f50 is described below commit 34a7e8f501e9edde01a48b4413fcac3770644536 Author: LinChen <1572139...@qq.com> AuthorDate: Thu Feb 23 16:21:09 2023 +0800 [INLONG-7411][Sort] Fix the invalid of kafka source meitric due to inlongMetric being null (#7409) Co-authored-by: leolinchen <leolinc...@tencent.com> --- .../inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java index f41c9a78b..73507aa6c 100644 --- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java @@ -217,6 +217,8 @@ public class UpsertKafkaDynamicTableFactory // Build the dirty data side-output final DirtyOptions dirtyOptions = DirtyOptions.fromConfig(tableOptions); final DirtySink<String> dirtySink = DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions); + final String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null); + final String auditHostAndPorts = tableOptions.getOptional(INLONG_AUDIT).orElse(null); return new KafkaDynamicSource( schema.toPhysicalRowDataType(), keyDecodingFormat, @@ -231,8 +233,8 @@ public class UpsertKafkaDynamicTableFactory Collections.emptyMap(), 0, true, - null, - null, + inlongMetric, + auditHostAndPorts, dirtyOptions, dirtySink); }