This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 09fa669fcf [INLONG-11803][Sort] Solve the NPE problem when parsing 
InLongMsg (#11804)
09fa669fcf is described below

commit 09fa669fcfceca2b7305b6a1b4f1c88730023078
Author: Mingyu Bao <baomingy...@163.com>
AuthorDate: Mon Mar 17 16:11:45 2025 +0800

    [INLONG-11803][Sort] Solve the NPE problem when parsing InLongMsg (#11804)
---
 .../AbstractInLongMsgFormatDeserializer.java         | 20 ++++++++++++++++++++
 1 file changed, 20 insertions(+)

diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
index 9f63185102..b9611e9a17 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
@@ -20,24 +20,31 @@ package org.apache.inlong.sort.formats.inlongmsg;
 import org.apache.inlong.common.msg.InLongMsg;
 import org.apache.inlong.sort.formats.metrics.FormatMetricGroup;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 
 /**
  * The base for all InLongMsg format deserializers.
  */
 public abstract class AbstractInLongMsgFormatDeserializer implements 
ResultTypeQueryable<RowData>, Serializable {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractInLongMsgFormatDeserializer.class);
+
     protected FailureHandler failureHandler;
 
     /**
@@ -81,6 +88,19 @@ public abstract class AbstractInLongMsgFormatDeserializer 
implements ResultTypeQ
         final List<InLongMsgWrap> result = new ArrayList<>();
 
         InLongMsg inLongMsg = InLongMsg.parseFrom(bytes);
+        if (inLongMsg == null) {
+            failureHandler.onParsingMsgFailure(bytes, new IOException(
+                    String.format("Could not parse InLongMsg from bytes. 
bytes={}.",
+                            StringUtils.join(bytes))));
+            return result;
+        }
+        try {
+            Set<String> set = inLongMsg.getAttrs();
+        } catch (Exception e) {
+            failureHandler.onParsingMsgFailure(bytes,
+                    new IOException("Parse InLongMsg from bytes has 
exception.", e));
+            return result;
+        }
         for (String attr : inLongMsg.getAttrs()) {
             Iterator<byte[]> iterator = inLongMsg.getIterator(attr);
             if (iterator == null) {

Reply via email to