This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a5bacfa1d [INLONG-7154][SDK] Fix metric report failure when topic does 
not exist (#7158)
a5bacfa1d is described below

commit a5bacfa1dfc9f616704e99446b943a60e08f601d
Author: vernedeng <deng...@pku.edu.cn>
AuthorDate: Thu Jan 5 14:58:31 2023 +0800

    [INLONG-7154][SDK] Fix metric report failure when topic does not exist 
(#7158)
---
 .../apache/inlong/sdk/sort/api/ClientContext.java  |   2 +-
 .../fetcher/pulsar/PulsarSingleTopicFetcher.java   | 138 +++++++++++----------
 2 files changed, 71 insertions(+), 69 deletions(-)

diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
index b22d7dbf4..fb600cfcc 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
@@ -144,7 +144,7 @@ public abstract class ClientContext implements Cleanable {
     private SortSdkMetricItem getMetricItem(InLongTopic topic, int 
partitionId) {
         Map<String, String> dimensions = new HashMap<>();
         dimensions.put(SortSdkMetricItem.KEY_SORT_TASK_ID, sortTaskId);
-        if (topic != null || config.isTopicStaticsEnabled()) {
+        if (topic != null && config.isTopicStaticsEnabled()) {
             dimensions.put(SortSdkMetricItem.KEY_CLUSTER_ID, 
topic.getInLongCluster().getClusterId());
             dimensions.put(SortSdkMetricItem.KEY_TOPIC_ID, topic.getTopic());
         }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
index 4c9b41a9b..f8d39c361 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
@@ -111,8 +111,8 @@ public class PulsarSingleTopicFetcher extends 
SingleTopicFetcher {
                 consumer.acknowledgeAsync(messageId)
                         .thenAccept(consumer -> ackSucc(msgOffset))
                         .exceptionally(exception -> {
-                            LOGGER.error("ack fail:{} {},error:{}",
-                                    topic, msgOffset, exception.getMessage(), 
exception);
+                            LOGGER.error("ack fail:{} {}",
+                                    topic, msgOffset, exception);
                             context.addAckFail(topic, -1);
                             return null;
                         });
@@ -162,9 +162,10 @@ public class PulsarSingleTopicFetcher extends 
SingleTopicFetcher {
             String threadName = 
String.format("sort_sdk_pulsar_single_topic_fetch_thread_%s_%s_%d",
                     this.topic.getInLongCluster().getClusterId(), 
topic.getTopic(), this.hashCode());
             this.fetchThread = new Thread(new 
PulsarSingleTopicFetcher.Fetcher(), threadName);
+            this.fetchThread.setDaemon(true);
             this.fetchThread.start();
         } catch (Exception e) {
-            LOGGER.error(e.getMessage(), e);
+            LOGGER.error("fail to create consumer", e);
             return false;
         }
         return true;
@@ -203,9 +204,6 @@ public class PulsarSingleTopicFetcher extends 
SingleTopicFetcher {
                 if (consumer != null) {
                     consumer.close();
                 }
-                if (fetchThread != null) {
-                    fetchThread.interrupt();
-                }
             } catch (PulsarClientException e) {
                 LOGGER.warn(e.getMessage(), e);
             }
@@ -239,7 +237,7 @@ public class PulsarSingleTopicFetcher extends 
SingleTopicFetcher {
             } catch (Exception e) {
                 context.addCallBackFail(topic, -1, messageRecords.size(),
                         System.currentTimeMillis() - start);
-                LOGGER.error("failed to callback {}", e.getMessage(), e);
+                LOGGER.error("failed to callback", e);
             }
         }
 
@@ -251,78 +249,82 @@ public class PulsarSingleTopicFetcher extends 
SingleTopicFetcher {
         public void run() {
             boolean hasPermit;
             while (true) {
-                hasPermit = false;
-                long fetchTimeCost = -1;
                 try {
-                    if (context.getConfig().isStopConsume() || stopConsume) {
-                        TimeUnit.MILLISECONDS.sleep(50);
-                        continue;
-                    }
+                    hasPermit = false;
+                    long fetchTimeCost = -1;
+                    try {
+                        if (context.getConfig().isStopConsume() || 
stopConsume) {
+                            TimeUnit.MILLISECONDS.sleep(50);
+                            continue;
+                        }
 
-                    if (sleepTime > 0) {
-                        TimeUnit.MILLISECONDS.sleep(sleepTime);
-                    }
+                        if (sleepTime > 0) {
+                            TimeUnit.MILLISECONDS.sleep(sleepTime);
+                        }
 
-                    context.acquireRequestPermit();
-                    hasPermit = true;
-                    context.addConsumeTime(topic, -1);
+                        context.acquireRequestPermit();
+                        hasPermit = true;
+                        context.addConsumeTime(topic, -1);
 
-                    long startFetchTime = System.currentTimeMillis();
-                    Messages<byte[]> messages = consumer.batchReceive();
-                    fetchTimeCost = System.currentTimeMillis() - 
startFetchTime;
-                    if (null != messages && messages.size() != 0) {
-                        for (Message<byte[]> msg : messages) {
-                            // if need seek
-                            if (msg.getPublishTime() < seeker.getSeekTime()) {
-                                seeker.seek();
-                                break;
-                            }
+                        long startFetchTime = System.currentTimeMillis();
+                        Messages<byte[]> messages = consumer.batchReceive();
+                        fetchTimeCost = System.currentTimeMillis() - 
startFetchTime;
+                        if (null != messages && messages.size() != 0) {
+                            for (Message<byte[]> msg : messages) {
+                                // if need seek
+                                if (msg.getPublishTime() < 
seeker.getSeekTime()) {
+                                    seeker.seek();
+                                    break;
+                                }
 
-                            String offsetKey = getOffset(msg.getMessageId());
-                            offsetCache.put(offsetKey, msg.getMessageId());
+                                String offsetKey = 
getOffset(msg.getMessageId());
+                                offsetCache.put(offsetKey, msg.getMessageId());
 
-                            // deserialize
-                            List<InLongMessage> inLongMessages = deserializer
-                                    .deserialize(context, topic, 
msg.getProperties(), msg.getData());
-                            context.addConsumeSuccess(topic, -1, 
inLongMessages.size(), msg.getData().length,
-                                    fetchTimeCost);
-                            int originSize = inLongMessages.size();
-                            // intercept
-                            inLongMessages = 
interceptor.intercept(inLongMessages);
-                            if (inLongMessages.isEmpty()) {
-                                ack(offsetKey);
-                                continue;
-                            }
-                            int filterSize = originSize - 
inLongMessages.size();
-                            context.addConsumeFilter(topic, -1, filterSize);
+                                // deserialize
+                                List<InLongMessage> inLongMessages = 
deserializer
+                                        .deserialize(context, topic, 
msg.getProperties(), msg.getData());
+                                context.addConsumeSuccess(topic, -1, 
inLongMessages.size(), msg.getData().length,
+                                        fetchTimeCost);
+                                int originSize = inLongMessages.size();
+                                // intercept
+                                inLongMessages = 
interceptor.intercept(inLongMessages);
+                                if (inLongMessages.isEmpty()) {
+                                    ack(offsetKey);
+                                    continue;
+                                }
+                                int filterSize = originSize - 
inLongMessages.size();
+                                context.addConsumeFilter(topic, -1, 
filterSize);
 
-                            List<MessageRecord> msgs = new ArrayList<>();
-                            msgs.add(new MessageRecord(topic.getTopicKey(),
-                                    inLongMessages,
-                                    offsetKey, System.currentTimeMillis()));
-                            handleAndCallbackMsg(msgs);
+                                List<MessageRecord> msgs = new ArrayList<>();
+                                msgs.add(new MessageRecord(topic.getTopicKey(),
+                                        inLongMessages,
+                                        offsetKey, 
System.currentTimeMillis()));
+                                handleAndCallbackMsg(msgs);
+                            }
+                            sleepTime = 0L;
+                        } else {
+                            context.addConsumeEmpty(topic, -1, fetchTimeCost);
+                            emptyFetchTimes++;
+                            if (emptyFetchTimes >= 
context.getConfig().getEmptyPollTimes()) {
+                                sleepTime = Math.min((sleepTime += 
context.getConfig().getEmptyPollSleepStepMs()),
+                                        
context.getConfig().getMaxEmptyPollSleepMs());
+                                emptyFetchTimes = 0;
+                            }
                         }
-                        sleepTime = 0L;
-                    } else {
-                        context.addConsumeEmpty(topic, -1, fetchTimeCost);
-                        emptyFetchTimes++;
-                        if (emptyFetchTimes >= 
context.getConfig().getEmptyPollTimes()) {
-                            sleepTime = Math.min((sleepTime += 
context.getConfig().getEmptyPollSleepStepMs()),
-                                    
context.getConfig().getMaxEmptyPollSleepMs());
-                            emptyFetchTimes = 0;
+                    } catch (Exception e) {
+                        context.addConsumeError(topic, -1, fetchTimeCost);
+                        LOGGER.error("failed to fetch msg", e);
+                    } finally {
+                        if (hasPermit) {
+                            context.releaseRequestPermit();
                         }
                     }
-                } catch (Exception e) {
-                    context.addConsumeError(topic, -1, fetchTimeCost);
-                    LOGGER.error("failed to fetch msg: {}", e.getMessage(), e);
-                } finally {
-                    if (hasPermit) {
-                        context.releaseRequestPermit();
-                    }
-                }
 
-                if (closed) {
-                    break;
+                    if (closed) {
+                        break;
+                    }
+                } catch (Throwable t) {
+                    LOGGER.error("got exception while process fetching", t);
                 }
             }
         }

Reply via email to