This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 30ce46c39b [INLONG-10748][Bug] Fix some null pointer dereference risks in the code. (#10753) 30ce46c39b is described below commit 30ce46c39bb581a30c0d2610d467983b385db7ea Author: LeeWY <61183968+yfsn...@users.noreply.github.com> AuthorDate: Tue Aug 6 19:02:55 2024 +0800 [INLONG-10748][Bug] Fix some null pointer dereference risks in the code. (#10753) * [INLONG-10748] Fix some null pointer dereference risks in the code. * [INLONG-10748] Optimize the processing logic of `StringUtils.java`. --------- Co-authored-by: jameswyli <jamesw...@tencent.com> --- .../org/apache/inlong/agent/plugin/sources/PulsarSource.java | 2 +- .../inlong/agent/plugin/sources/file/AbstractSource.java | 12 +++++++----- .../apache/inlong/audit/service/AuditMsgConsumerServer.java | 4 +++- .../manager/service/source/StreamSourceServiceImpl.java | 2 +- .../inlong/sdk/dataproxy/pb/network/TcpChannelGroup.java | 2 +- .../org/apache/inlong/sdk/commons/protocol/ProxyEvent.java | 8 +++++--- .../apache/inlong/sdk/sort/fetcher/kafka/KafkaSeeker.java | 1 + .../inlong/sort/base/format/JsonDynamicSchemaFormat.java | 2 +- .../inlong/sort/pulsar/table/PulsarRowDataConverter.java | 6 ++++-- .../sort/pulsar/table/source/PulsarRowDataConverter.java | 6 ++++-- .../org/apache/inlong/sort/formats/util/StringUtils.java | 4 ++++ 11 files changed, 32 insertions(+), 17 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java index f7c63acba4..9ca7a5aaf4 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java @@ -132,7 +132,7 @@ public class PulsarSource extends AbstractSource { } return consumer; } catch (PulsarClientException | IllegalArgumentException e) { - if (consumer == null) { + if (consumer != null) { try { consumer.close(); } catch (PulsarClientException ex) { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java index a477d84093..1fb96f7ff7 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java @@ -178,12 +178,14 @@ public abstract class AbstractSource implements Source { continue; } emptyCount = 0; - for (int i = 0; i < lines.size(); i++) { - boolean suc4Queue = waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).getData().length); - if (!suc4Queue) { - break; + if (lines != null) { + for (int i = 0; i < lines.size(); i++) { + boolean suc4Queue = waitForPermit(AGENT_GLOBAL_READER_QUEUE_PERMIT, lines.get(i).getData().length); + if (!suc4Queue) { + break; + } + putIntoQueue(lines.get(i)); } - putIntoQueue(lines.get(i)); } MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN); if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_INTERVAL_MS) { diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java index acd43a7ed6..51ed0caf60 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/AuditMsgConsumerServer.java @@ -97,7 +97,9 @@ public class AuditMsgConsumerServer implements InitializingBean { if (storeConfig.isJdbc()) { jdbcService.start(); } - mqConsume.start(); + if (mqConsume != null) { + mqConsume.start(); + } } /** diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java index ca6865bdbb..62ccd8f2ca 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java @@ -294,7 +294,7 @@ public class StreamSourceServiceImpl implements StreamSourceService { InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(groupId, operator); if (groupEntity == null) { throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND, - String.format("InlongGroup does not exist with InlongGroupId=%s", groupEntity.getInlongGroupId())); + String.format("InlongGroup does not exist with InlongGroupId=%s", groupId)); } StreamSourceOperator sourceOperator = operatorFactory.getInstance(request.getSourceType()); // Remove id in sourceField when save diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/network/TcpChannelGroup.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/network/TcpChannelGroup.java index 1e3938dd54..f97e1197a5 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/network/TcpChannelGroup.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/network/TcpChannelGroup.java @@ -255,8 +255,8 @@ public class TcpChannelGroup { ChannelFuture future = client.connect(tcpChannel.getIpPort().addr).await(); Channel newChannel = future.getChannel(); tcpChannel.setChannel(newChannel); - newChannel.setAttachment(oldChannel.getAttachment()); if (oldChannel != null) { + newChannel.setAttachment(oldChannel.getAttachment()); oldChannel.disconnect(); oldChannel.close(); } diff --git a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java index dfe27d2a2b..94f374a0a7 100644 --- a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java +++ b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java @@ -76,10 +76,12 @@ public class ProxyEvent extends SdkEvent { public ProxyEvent(String inlongGroupId, String inlongStreamId, MessageObj obj) { this.inlongGroupId = inlongGroupId; this.inlongStreamId = inlongStreamId; - super.setBody(obj.getBody().toByteArray()); this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId); - this.msgTime = obj.getMsgTime(); - this.sourceIp = obj.getSourceIp(); + if (obj != null) { + super.setBody(obj.getBody().toByteArray()); + this.msgTime = obj.getMsgTime(); + this.sourceIp = obj.getSourceIp(); + } Map<String, String> headers = super.getHeaders(); headers.put(EventConstants.INLONG_GROUP_ID, inlongGroupId); headers.put(EventConstants.INLONG_STREAM_ID, inlongStreamId); diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSeeker.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSeeker.java index 1f6caf70d1..02722355f6 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSeeker.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSeeker.java @@ -95,6 +95,7 @@ public class KafkaSeeker implements Seeker { if (offsetAndTimestamp == null) { LOGGER.info("tp {} has null offsetAndTimestamp, reset to end", tp); endOffsetsTopicPartitions.add(tp); + return; } long expected = offsetAndTimestamp.offset(); long last = consumer.position(tp); diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java index 5833719e48..ce1797a426 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java @@ -103,7 +103,7 @@ public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaForma return new ArrayList<>(); } JsonNode physicalNode = getPhysicalData(root); - if (physicalNode.isArray()) { + if (physicalNode != null && physicalNode.isArray()) { // Extract from the first value when the physicalNode is array physicalNode = physicalNode.get(FIRST); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java index c66e25c225..82b73dfae0 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarRowDataConverter.java @@ -113,8 +113,10 @@ public class PulsarRowDataConverter implements Serializable { new GenericRowData( rowKind, physicalArity + readableMetadata.getConnectorMetadataArity()); - for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) { - producedRow.setField(valueProjection[valuePos], physicalValueRow.getField(valuePos)); + if (physicalValueRow != null) { + for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) { + producedRow.setField(valueProjection[valuePos], physicalValueRow.getField(valuePos)); + } } for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) { diff --git a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java index 44abbaf7c4..661fad696c 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java +++ b/inlong-sort/sort-flink/sort-flink-v1.18/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/source/PulsarRowDataConverter.java @@ -113,8 +113,10 @@ public class PulsarRowDataConverter implements Serializable { new GenericRowData( rowKind, physicalArity + readableMetadata.getConnectorMetadataArity()); - for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) { - producedRow.setField(valueProjection[valuePos], physicalValueRow.getField(valuePos)); + if (physicalValueRow != null) { + for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) { + producedRow.setField(valueProjection[valuePos], physicalValueRow.getField(valuePos)); + } } for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) { diff --git a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/util/StringUtils.java b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/util/StringUtils.java index 3d06625212..f33ad8e825 100644 --- a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/util/StringUtils.java +++ b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/util/StringUtils.java @@ -81,6 +81,10 @@ public class StringUtils { Map<String, String> fields = new HashMap<>(); List<Map<String, String>> lines = new ArrayList<>(); + if (text == null) { + return lines; + } + StringBuilder stringBuilder = new StringBuilder(); String key = "";