This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 09fa669fcf [INLONG-11803][Sort] Solve the NPE problem when parsing InLongMsg (#11804) 09fa669fcf is described below commit 09fa669fcfceca2b7305b6a1b4f1c88730023078 Author: Mingyu Bao <baomingy...@163.com> AuthorDate: Mon Mar 17 16:11:45 2025 +0800 [INLONG-11803][Sort] Solve the NPE problem when parsing InLongMsg (#11804) --- .../AbstractInLongMsgFormatDeserializer.java | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java index 9f63185102..b9611e9a17 100644 --- a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java +++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java @@ -20,24 +20,31 @@ package org.apache.inlong.sort.formats.inlongmsg; import org.apache.inlong.common.msg.InLongMsg; import org.apache.inlong.sort.formats.metrics.FormatMetricGroup; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.Set; /** * The base for all InLongMsg format deserializers. */ public abstract class AbstractInLongMsgFormatDeserializer implements ResultTypeQueryable<RowData>, Serializable { + private static final Logger LOG = LoggerFactory.getLogger(AbstractInLongMsgFormatDeserializer.class); + protected FailureHandler failureHandler; /** @@ -81,6 +88,19 @@ public abstract class AbstractInLongMsgFormatDeserializer implements ResultTypeQ final List<InLongMsgWrap> result = new ArrayList<>(); InLongMsg inLongMsg = InLongMsg.parseFrom(bytes); + if (inLongMsg == null) { + failureHandler.onParsingMsgFailure(bytes, new IOException( + String.format("Could not parse InLongMsg from bytes. bytes={}.", + StringUtils.join(bytes)))); + return result; + } + try { + Set<String> set = inLongMsg.getAttrs(); + } catch (Exception e) { + failureHandler.onParsingMsgFailure(bytes, + new IOException("Parse InLongMsg from bytes has exception.", e)); + return result; + } for (String attr : inLongMsg.getAttrs()) { Iterator<byte[]> iterator = inLongMsg.getIterator(attr); if (iterator == null) {