This is an automated email from the ASF dual-hosted git repository.

wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f52c5c821 [INLONG-11298][Agent] Fix bug for pulsar source with empty 
data process and specified time consumption (#11299)
3f52c5c821 is described below

commit 3f52c5c821e7fc391cd4d13f12b60325c6f8d127
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Wed Oct 9 23:38:19 2024 +0800

    [INLONG-11298][Agent] Fix bug for pulsar source with empty data process and 
specified time consumption (#11299)
    
    * [INLONG-11298][Agent] Fix bug for pulsar source
    
    * [INLONG-11298][Agent] Fix the issue of inconsistent logic between the 
page and backend code
---
 .../inlong/agent/constant/CommonConstants.java     |  2 +-
 .../agent/message/file/ProxyMessageCache.java      | 19 ---------
 .../agent/core/instance/InstanceManager.java       |  2 +-
 .../inlong/agent/plugin/sources/PulsarSource.java  | 47 +++++++++++++---------
 .../agent/plugin/sources/file/AbstractSource.java  |  8 ++--
 5 files changed, 34 insertions(+), 44 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
index 45320406ef..53a5bd976c 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java
@@ -59,7 +59,7 @@ public class CommonConstants {
     public static final int DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS = 4 * 1000;
 
     public static final String PROXY_BATCH_FLUSH_INTERVAL = 
"proxy.batch.flush.interval";
-    public static final int DEFAULT_PROXY_BATCH_FLUSH_INTERVAL = 100;
+    public static final int DEFAULT_PROXY_BATCH_FLUSH_INTERVAL = 1;
 
     public static final String PROXY_SENDER_MAX_TIMEOUT = 
"proxy.sender.maxTimeout";
     // max timeout in seconds.
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
index c7b151a26c..7e2aa28034 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java
@@ -32,14 +32,11 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
-import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
-import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
 import static 
org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
 import static org.apache.inlong.common.msg.AttributeConstants.AUDIT_VERSION;
@@ -56,11 +53,8 @@ public class ProxyMessageCache {
     private final int maxPackSize;
     private final int maxQueueNumber;
     private final String groupId;
-    // ms
-    private final int cacheTimeout;
     // streamId -> list of proxyMessage
     private final ConcurrentHashMap<String, LinkedBlockingQueue<ProxyMessage>> 
messageQueueMap;
-    private final AtomicLong cacheSize = new AtomicLong(0);
     private long lastPrintTime = 0;
     private long dataTime;
     private boolean isRealTime = false;
@@ -76,7 +70,6 @@ public class ProxyMessageCache {
         this.maxPackSize = instanceProfile.getInt(PROXY_PACKAGE_MAX_SIZE, 
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
         this.maxQueueNumber = 
instanceProfile.getInt(PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER,
                 DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER);
-        this.cacheTimeout = 
instanceProfile.getInt(PROXY_PACKAGE_MAX_TIMEOUT_MS, 
DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS);
         messageQueueMap = new ConcurrentHashMap<>();
         dataTime = instanceProfile.getSinkDataTime();
         extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false");
@@ -109,7 +102,6 @@ public class ProxyMessageCache {
                 return false;
             }
             messageQueue.put(message);
-            cacheSize.addAndGet(message.getBody().length);
             return true;
         } catch (Exception ex) {
             LOGGER.error("exception caught", ex);
@@ -159,13 +151,11 @@ public class ProxyMessageCache {
             if (peekMessageLength > maxPackSize) {
                 LOGGER.warn("message size is {}, greater than max pack size 
{}, drop it!",
                         peekMessage.getBody().length, maxPackSize);
-                cacheSize.addAndGet(-bodySize);
                 messageQueue.remove();
                 break;
             }
             resultBatchSize += bodySize;
             // decrease queue size.
-            cacheSize.addAndGet(-bodySize);
             bodyList.add(message.getBody());
             offsetList.add(message.getAckInfo());
         }
@@ -183,13 +173,4 @@ public class ProxyMessageCache {
         }
         return null;
     }
-
-    public Map<String, String> getExtraMap() {
-        return extraMap;
-    }
-
-    public long getCacheSize() {
-        return cacheSize.get();
-    }
-
 }
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
index 48aedfd09d..06dd20a99e 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java
@@ -317,7 +317,7 @@ public class InstanceManager extends AbstractDaemon {
     }
 
     private void addInstance(InstanceProfile profile) {
-        if (instanceMap.size() >= instanceLimit) {
+        if (instanceMap.size() > instanceLimit) {
             LOGGER.error("instanceMap size {} over limit {}", 
instanceMap.size(), instanceLimit);
             actionQueue.offer(new InstanceAction(ActionType.ADD, profile));
             AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
index 56dec44d55..949cf5a4c7 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
@@ -50,10 +50,10 @@ public class PulsarSource extends AbstractSource {
     private String serviceUrl;
     private String subscription;
     private String subscriptionType;
-    private String subscriptionPosition;
     private PulsarClient pulsarClient;
     private Long timestamp;
     private final static String PULSAR_SUBSCRIPTION_PREFIX = "inlong-agent-";
+    private final static String SUBSCRIPTION_CUSTOM = "Custom";
     private boolean isRestoreFromDB = false;
     private Consumer<byte[]> consumer;
     private long offset = 0L;
@@ -68,8 +68,6 @@ public class PulsarSource extends AbstractSource {
             topic = profile.getInstanceId();
             serviceUrl = profile.get(TASK_PULSAR_SERVICE_URL);
             subscription = profile.get(TASK_PULSAR_SUBSCRIPTION, 
PULSAR_SUBSCRIPTION_PREFIX + inlongStreamId);
-            subscriptionPosition = 
profile.get(TASK_PULSAR_SUBSCRIPTION_POSITION,
-                    SubscriptionInitialPosition.Latest.name());
             subscriptionType = profile.get(TASK_PULSAR_SUBSCRIPTION_TYPE, 
SubscriptionType.Shared.name());
             timestamp = profile.getLong(TASK_PULSAR_RESET_TIME, 0);
             pulsarClient = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
@@ -97,35 +95,48 @@ public class PulsarSource extends AbstractSource {
         org.apache.pulsar.client.api.Message<byte[]> message = null;
         try {
             message = consumer.receive(0, TimeUnit.MILLISECONDS);
-            offset = message.getSequenceId();
         } catch (PulsarClientException e) {
             LOGGER.error("read from pulsar error", e);
         }
         if (!ObjectUtils.isEmpty(message)) {
+            offset = message.getSequenceId();
             dataList.add(new SourceData(message.getValue(), new 
String(message.getMessageId().toByteArray(),
                     StandardCharsets.UTF_8)));
+            try {
+                consumer.acknowledge(message);
+            } catch (PulsarClientException e) {
+                LOGGER.error("ack pulsar error", e);
+            }
         }
-        try {
-            consumer.acknowledge(message);
-        } catch (PulsarClientException e) {
-            LOGGER.error("ack pulsar error", e);
-        }
+
         return dataList;
     }
 
     private Consumer<byte[]> getConsumer() {
         Consumer<byte[]> consumer = null;
         try {
-            consumer = pulsarClient.newConsumer(Schema.BYTES)
-                    .topic(topic)
-                    .subscriptionName(subscription)
-                    
.subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(subscriptionPosition))
-                    
.subscriptionType(SubscriptionType.valueOf(subscriptionType))
-                    .subscribe();
-            if (!isRestoreFromDB && timestamp != 0L) {
-                consumer.seek(timestamp);
-                LOGGER.info("Reset consume from {}", timestamp);
+            String position = profile.get(TASK_PULSAR_SUBSCRIPTION_POSITION, 
SubscriptionInitialPosition.Latest.name());
+            if (position.equals(SUBSCRIPTION_CUSTOM)) {
+                consumer = pulsarClient.newConsumer(Schema.BYTES)
+                        .topic(topic)
+                        .subscriptionName(subscription)
+                        
.subscriptionType(SubscriptionType.valueOf(subscriptionType))
+                        .subscribe();
+                if (!isRestoreFromDB) {
+                    if (timestamp == 0L) {
+                        LOGGER.error("Reset consume but timestamp is 0L");
+                    } else {
+                        consumer.seek(timestamp);
+                        LOGGER.info("Reset consume from {}", timestamp);
+                    }
+                }
             } else {
+                consumer = pulsarClient.newConsumer(Schema.BYTES)
+                        .topic(topic)
+                        .subscriptionName(subscription)
+                        
.subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(position))
+                        
.subscriptionType(SubscriptionType.valueOf(subscriptionType))
+                        .subscribe();
                 LOGGER.info("Skip to reset consume");
             }
             return consumer;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
index f1fb8b5570..299c3829a0 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java
@@ -38,7 +38,6 @@ import org.apache.inlong.common.metric.MetricRegister;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
-import org.apache.commons.codec.digest.DigestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,10 +54,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
-import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
 import static 
org.apache.inlong.agent.constant.TaskConstants.DEFAULT_FILE_SOURCE_EXTEND_CLASS;
@@ -356,9 +353,7 @@ public abstract class AbstractSource implements Source {
     }
 
     private Message createMessage(SourceData sourceData) {
-        String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, 
DigestUtils.md5Hex(inlongGroupId));
         Map<String, String> header = new HashMap<>();
-        header.put(PROXY_KEY_DATA, proxyPartitionKey);
         header.put(OFFSET, sourceData.getOffset());
         header.put(PROXY_KEY_STREAM_ID, inlongStreamId);
         if (extendedHandler != null) {
@@ -424,6 +419,9 @@ public abstract class AbstractSource implements Source {
 
     @Override
     public boolean sourceFinish() {
+        if (isRealTime) {
+            return false;
+        }
         return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST;
     }
 }

Reply via email to