This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 30ce46c39b [INLONG-10748][Bug] Fix some null pointer dereference risks 
in the code. (#10753)
30ce46c39b is described below

commit 30ce46c39bb581a30c0d2610d467983b385db7ea
Author: LeeWY <61183968+yfsn...@users.noreply.github.com>
AuthorDate: Tue Aug 6 19:02:55 2024 +0800

    [INLONG-10748][Bug] Fix some null pointer dereference risks in the code. 
(#10753)
    
    * [INLONG-10748] Fix some null pointer dereference risks in the code.
    
    * [INLONG-10748] Optimize the processing logic of `StringUtils.java`.
    
    ---------
    
    Co-authored-by: jameswyli <jamesw...@tencent.com>
---
 .../org/apache/inlong/agent/plugin/sources/PulsarSource.java |  2 +-
 .../inlong/agent/plugin/sources/file/AbstractSource.java     | 12 +++++++-----
 .../apache/inlong/audit/service/AuditMsgConsumerServer.java  |  4 +++-
 .../manager/service/source/StreamSourceServiceImpl.java      |  2 +-
 .../inlong/sdk/dataproxy/pb/network/TcpChannelGroup.java     |  2 +-
 .../org/apache/inlong/sdk/commons/protocol/ProxyEvent.java   |  8 +++++---
 .../apache/inlong/sdk/sort/fetcher/kafka/KafkaSeeker.java    |  1 +
 .../inlong/sort/base/format/JsonDynamicSchemaFormat.java     |  2 +-
 .../inlong/sort/pulsar/table/PulsarRowDataConverter.java     |  6 ++++--
 .../sort/pulsar/table/source/PulsarRowDataConverter.java     |  6 ++++--
 .../org/apache/inlong/sort/formats/util/StringUtils.java     |  4 ++++
 11 files changed, 32 insertions(+), 17 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
index f7c63acba4..9ca7a5aaf4 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
@@ -132,7 +132,7 @@ public class PulsarSource extends AbstractSource {
             }
             return consumer;
         } catch (PulsarClientException | IllegalArgumentException e) {
-            if (consumer == null) {
+            if (consumer != null) {
                 try {
                     consumer.close();
                 } catch (PulsarClientException ex) {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index a477d84093..1fb96f7ff7 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -178,12 +178,14 @@ public abstract class AbstractSource implements Source {
                 continue;
             }
             emptyCount = 0;
-            for (int i = 0; i < lines.size(); i++) {
-                boolean suc4Queue = 
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).getData().length);
-                if (!suc4Queue) {
-                    break;
+            if (lines != null) {
+                for (int i = 0; i < lines.size(); i++) {
+                    boolean suc4Queue = 
waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).getData().length);
+                    if (!suc4Queue) {
+                        break;
+                    }
+                    putIntoQueue(lines.get(i));
                 }
-                putIntoQueue(lines.get(i));
             }
             
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
BATCH_READ_LINE_TOTAL_LEN);
             if (AgentUtils.getCurrentTime() - lastPrintTime > 
CORE_THREAD_PRINT_INTERVAL_MS) {
diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
index acd43a7ed6..51ed0caf60 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java
@@ -97,7 +97,9 @@ public class AuditMsgConsumerServer implements 
InitializingBean {
         if (storeConfig.isJdbc()) {
             jdbcService.start();
         }
-        mqConsume.start();
+        if (mqConsume != null) {
+            mqConsume.start();
+        }
     }
 
     /**
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index ca6865bdbb..62ccd8f2ca 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -294,7 +294,7 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
         InlongGroupEntity groupEntity = 
groupCheckService.checkGroupStatus(groupId, operator);
         if (groupEntity == null) {
             throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND,
-                    String.format("InlongGroup does not exist with 
InlongGroupId=%s", groupEntity.getInlongGroupId()));
+                    String.format("InlongGroup does not exist with 
InlongGroupId=%s", groupId));
         }
         StreamSourceOperator sourceOperator = 
operatorFactory.getInstance(request.getSourceType());
         // Remove id in sourceField when save
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/network/TcpChannelGroup.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/network/TcpChannelGroup.java
index 1e3938dd54..f97e1197a5 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/network/TcpChannelGroup.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/network/TcpChannelGroup.java
@@ -255,8 +255,8 @@ public class TcpChannelGroup {
                 ChannelFuture future = 
client.connect(tcpChannel.getIpPort().addr).await();
                 Channel newChannel = future.getChannel();
                 tcpChannel.setChannel(newChannel);
-                newChannel.setAttachment(oldChannel.getAttachment());
                 if (oldChannel != null) {
+                    newChannel.setAttachment(oldChannel.getAttachment());
                     oldChannel.disconnect();
                     oldChannel.close();
                 }
diff --git 
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
 
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
index dfe27d2a2b..94f374a0a7 100644
--- 
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
+++ 
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
@@ -76,10 +76,12 @@ public class ProxyEvent extends SdkEvent {
     public ProxyEvent(String inlongGroupId, String inlongStreamId, MessageObj 
obj) {
         this.inlongGroupId = inlongGroupId;
         this.inlongStreamId = inlongStreamId;
-        super.setBody(obj.getBody().toByteArray());
         this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
-        this.msgTime = obj.getMsgTime();
-        this.sourceIp = obj.getSourceIp();
+        if (obj != null) {
+            super.setBody(obj.getBody().toByteArray());
+            this.msgTime = obj.getMsgTime();
+            this.sourceIp = obj.getSourceIp();
+        }
         Map<String, String> headers = super.getHeaders();
         headers.put(EventConstants.INLONG_GROUP_ID, inlongGroupId);
         headers.put(EventConstants.INLONG_STREAM_ID, inlongStreamId);
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSeeker.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSeeker.java
index 1f6caf70d1..02722355f6 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSeeker.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSeeker.java
@@ -95,6 +95,7 @@ public class KafkaSeeker implements Seeker {
         if (offsetAndTimestamp == null) {
             LOGGER.info("tp {} has null offsetAndTimestamp, reset to end", tp);
             endOffsetsTopicPartitions.add(tp);
+            return;
         }
         long expected = offsetAndTimestamp.offset();
         long last = consumer.position(tp);
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
index 5833719e48..ce1797a426 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
@@ -103,7 +103,7 @@ public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaForma
             return new ArrayList<>();
         }
         JsonNode physicalNode = getPhysicalData(root);
-        if (physicalNode.isArray()) {
+        if (physicalNode != null && physicalNode.isArray()) {
             // Extract from the first value when the physicalNode is array
             physicalNode = physicalNode.get(FIRST);
         }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java
index c66e25c225..82b73dfae0 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java
@@ -113,8 +113,10 @@ public class PulsarRowDataConverter implements 
Serializable {
                 new GenericRowData(
                         rowKind, physicalArity + 
readableMetadata.getConnectorMetadataArity());
 
-        for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) {
-            producedRow.setField(valueProjection[valuePos], 
physicalValueRow.getField(valuePos));
+        if (physicalValueRow != null) {
+            for (int valuePos = 0; valuePos < valueProjection.length; 
valuePos++) {
+                producedRow.setField(valueProjection[valuePos], 
physicalValueRow.getField(valuePos));
+            }
         }
 
         for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java
 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java
index 44abbaf7c4..661fad696c 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java
@@ -113,8 +113,10 @@ public class PulsarRowDataConverter implements 
Serializable {
                 new GenericRowData(
                         rowKind, physicalArity + 
readableMetadata.getConnectorMetadataArity());
 
-        for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) {
-            producedRow.setField(valueProjection[valuePos], 
physicalValueRow.getField(valuePos));
+        if (physicalValueRow != null) {
+            for (int valuePos = 0; valuePos < valueProjection.length; 
valuePos++) {
+                producedRow.setField(valueProjection[valuePos], 
physicalValueRow.getField(valuePos));
+            }
         }
 
         for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) {
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/util/StringUtils.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/util/StringUtils.java
index 3d06625212..f33ad8e825 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/util/StringUtils.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/util/StringUtils.java
@@ -81,6 +81,10 @@ public class StringUtils {
         Map<String, String> fields = new HashMap<>();
         List<Map<String, String>> lines = new ArrayList<>();
 
+        if (text == null) {
+            return lines;
+        }
+
         StringBuilder stringBuilder = new StringBuilder();
 
         String key = "";

Reply via email to