This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new a5bacfa1d [INLONG-7154][SDK] Fix metric report failure when topic does not exist (#7158) a5bacfa1d is described below commit a5bacfa1dfc9f616704e99446b943a60e08f601d Author: vernedeng <deng...@pku.edu.cn> AuthorDate: Thu Jan 5 14:58:31 2023 +0800 [INLONG-7154][SDK] Fix metric report failure when topic does not exist (#7158) --- .../apache/inlong/sdk/sort/api/ClientContext.java | 2 +- .../fetcher/pulsar/PulsarSingleTopicFetcher.java | 138 +++++++++++---------- 2 files changed, 71 insertions(+), 69 deletions(-) diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java index b22d7dbf4..fb600cfcc 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java @@ -144,7 +144,7 @@ public abstract class ClientContext implements Cleanable { private SortSdkMetricItem getMetricItem(InLongTopic topic, int partitionId) { Map<String, String> dimensions = new HashMap<>(); dimensions.put(SortSdkMetricItem.KEY_SORT_TASK_ID, sortTaskId); - if (topic != null || config.isTopicStaticsEnabled()) { + if (topic != null && config.isTopicStaticsEnabled()) { dimensions.put(SortSdkMetricItem.KEY_CLUSTER_ID, topic.getInLongCluster().getClusterId()); dimensions.put(SortSdkMetricItem.KEY_TOPIC_ID, topic.getTopic()); } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java index 4c9b41a9b..f8d39c361 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java @@ -111,8 +111,8 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher { consumer.acknowledgeAsync(messageId) .thenAccept(consumer -> ackSucc(msgOffset)) .exceptionally(exception -> { - LOGGER.error("ack fail:{} {},error:{}", - topic, msgOffset, exception.getMessage(), exception); + LOGGER.error("ack fail:{} {}", + topic, msgOffset, exception); context.addAckFail(topic, -1); return null; }); @@ -162,9 +162,10 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher { String threadName = String.format("sort_sdk_pulsar_single_topic_fetch_thread_%s_%s_%d", this.topic.getInLongCluster().getClusterId(), topic.getTopic(), this.hashCode()); this.fetchThread = new Thread(new PulsarSingleTopicFetcher.Fetcher(), threadName); + this.fetchThread.setDaemon(true); this.fetchThread.start(); } catch (Exception e) { - LOGGER.error(e.getMessage(), e); + LOGGER.error("fail to create consumer", e); return false; } return true; @@ -203,9 +204,6 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher { if (consumer != null) { consumer.close(); } - if (fetchThread != null) { - fetchThread.interrupt(); - } } catch (PulsarClientException e) { LOGGER.warn(e.getMessage(), e); } @@ -239,7 +237,7 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher { } catch (Exception e) { context.addCallBackFail(topic, -1, messageRecords.size(), System.currentTimeMillis() - start); - LOGGER.error("failed to callback {}", e.getMessage(), e); + LOGGER.error("failed to callback", e); } } @@ -251,78 +249,82 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher { public void run() { boolean hasPermit; while (true) { - hasPermit = false; - long fetchTimeCost = -1; try { - if (context.getConfig().isStopConsume() || stopConsume) { - TimeUnit.MILLISECONDS.sleep(50); - continue; - } + hasPermit = false; + long fetchTimeCost = -1; + try { + if (context.getConfig().isStopConsume() || stopConsume) { + TimeUnit.MILLISECONDS.sleep(50); + continue; + } - if (sleepTime > 0) { - TimeUnit.MILLISECONDS.sleep(sleepTime); - } + if (sleepTime > 0) { + TimeUnit.MILLISECONDS.sleep(sleepTime); + } - context.acquireRequestPermit(); - hasPermit = true; - context.addConsumeTime(topic, -1); + context.acquireRequestPermit(); + hasPermit = true; + context.addConsumeTime(topic, -1); - long startFetchTime = System.currentTimeMillis(); - Messages<byte[]> messages = consumer.batchReceive(); - fetchTimeCost = System.currentTimeMillis() - startFetchTime; - if (null != messages && messages.size() != 0) { - for (Message<byte[]> msg : messages) { - // if need seek - if (msg.getPublishTime() < seeker.getSeekTime()) { - seeker.seek(); - break; - } + long startFetchTime = System.currentTimeMillis(); + Messages<byte[]> messages = consumer.batchReceive(); + fetchTimeCost = System.currentTimeMillis() - startFetchTime; + if (null != messages && messages.size() != 0) { + for (Message<byte[]> msg : messages) { + // if need seek + if (msg.getPublishTime() < seeker.getSeekTime()) { + seeker.seek(); + break; + } - String offsetKey = getOffset(msg.getMessageId()); - offsetCache.put(offsetKey, msg.getMessageId()); + String offsetKey = getOffset(msg.getMessageId()); + offsetCache.put(offsetKey, msg.getMessageId()); - // deserialize - List<InLongMessage> inLongMessages = deserializer - .deserialize(context, topic, msg.getProperties(), msg.getData()); - context.addConsumeSuccess(topic, -1, inLongMessages.size(), msg.getData().length, - fetchTimeCost); - int originSize = inLongMessages.size(); - // intercept - inLongMessages = interceptor.intercept(inLongMessages); - if (inLongMessages.isEmpty()) { - ack(offsetKey); - continue; - } - int filterSize = originSize - inLongMessages.size(); - context.addConsumeFilter(topic, -1, filterSize); + // deserialize + List<InLongMessage> inLongMessages = deserializer + .deserialize(context, topic, msg.getProperties(), msg.getData()); + context.addConsumeSuccess(topic, -1, inLongMessages.size(), msg.getData().length, + fetchTimeCost); + int originSize = inLongMessages.size(); + // intercept + inLongMessages = interceptor.intercept(inLongMessages); + if (inLongMessages.isEmpty()) { + ack(offsetKey); + continue; + } + int filterSize = originSize - inLongMessages.size(); + context.addConsumeFilter(topic, -1, filterSize); - List<MessageRecord> msgs = new ArrayList<>(); - msgs.add(new MessageRecord(topic.getTopicKey(), - inLongMessages, - offsetKey, System.currentTimeMillis())); - handleAndCallbackMsg(msgs); + List<MessageRecord> msgs = new ArrayList<>(); + msgs.add(new MessageRecord(topic.getTopicKey(), + inLongMessages, + offsetKey, System.currentTimeMillis())); + handleAndCallbackMsg(msgs); + } + sleepTime = 0L; + } else { + context.addConsumeEmpty(topic, -1, fetchTimeCost); + emptyFetchTimes++; + if (emptyFetchTimes >= context.getConfig().getEmptyPollTimes()) { + sleepTime = Math.min((sleepTime += context.getConfig().getEmptyPollSleepStepMs()), + context.getConfig().getMaxEmptyPollSleepMs()); + emptyFetchTimes = 0; + } } - sleepTime = 0L; - } else { - context.addConsumeEmpty(topic, -1, fetchTimeCost); - emptyFetchTimes++; - if (emptyFetchTimes >= context.getConfig().getEmptyPollTimes()) { - sleepTime = Math.min((sleepTime += context.getConfig().getEmptyPollSleepStepMs()), - context.getConfig().getMaxEmptyPollSleepMs()); - emptyFetchTimes = 0; + } catch (Exception e) { + context.addConsumeError(topic, -1, fetchTimeCost); + LOGGER.error("failed to fetch msg", e); + } finally { + if (hasPermit) { + context.releaseRequestPermit(); } } - } catch (Exception e) { - context.addConsumeError(topic, -1, fetchTimeCost); - LOGGER.error("failed to fetch msg: {}", e.getMessage(), e); - } finally { - if (hasPermit) { - context.releaseRequestPermit(); - } - } - if (closed) { - break; + if (closed) { + break; + } + } catch (Throwable t) { + LOGGER.error("got exception while process fetching", t); } } }