vernedeng commented on code in PR #6954: URL: https://github.com/apache/inlong/pull/6954#discussion_r1056939832
########## inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java: ########## @@ -38,29 +46,118 @@ public SortClientConfig getConfig() { @Override public boolean clean() { - statManager.clean(); return true; } - public StatManager getStatManager() { - return statManager; + public void addConsumeTime(InLongTopic topic, int partitionId) { + SortSdkMetricItem metricItem = this.getMetricItem(topic, partitionId); + metricItem.consumeTimes.incrementAndGet(); } - public void acquireRequestPermit() throws InterruptedException { - config.getGlobalInProgressRequest().acquireUninterruptibly(); + public void addConsumeSuccess(InLongTopic topic, int partitionId, int size, int count, long time) { + SortSdkMetricItem metricItem = this.getMetricItem(topic, partitionId); + metricItem.consumeSize.addAndGet(size); + metricItem.consumeMsgCount.addAndGet(count); + metricItem.consumeTimeCost.addAndGet(time); } - public void releaseRequestPermit() { - config.getGlobalInProgressRequest().release(); + public void addConsumeFilter(InLongTopic topic, int partitionId, int count) { + SortSdkMetricItem metricItem = this.getMetricItem(topic, partitionId); + metricItem.filterCount.addAndGet(count); + } + + public void addConsumeEmpty(InLongTopic topic, int partitionId, long time) { + SortSdkMetricItem metricItem = this.getMetricItem(topic, partitionId); + metricItem.consumeEmptyCount.incrementAndGet(); + metricItem.consumeTimeCost.addAndGet(time); + } + + public void addConsumeError(InLongTopic topic, int partitionId, long time) { + SortSdkMetricItem metricItem = this.getMetricItem(topic, partitionId); + metricItem.consumeErrorCount.incrementAndGet(); + metricItem.consumeTimeCost.addAndGet(time); + } + + public void addCallBack(InLongTopic topic, int partitionId) { + SortSdkMetricItem metricItem = this.getMetricItem(topic, partitionId); + metricItem.callbackCount.incrementAndGet(); + } + + public void addCallBackSuccess(InLongTopic topic, int partitionId, int count, long time) { + SortSdkMetricItem metricItem = this.getMetricItem(topic, partitionId); + metricItem.callbackDoneCount.addAndGet(count); + metricItem.callbackTimeCost.addAndGet(time); + } + + public void addCallBackFail(InLongTopic topic, int partitionId, int count, long time) { + SortSdkMetricItem metricItem = this.getMetricItem(topic, partitionId); + metricItem.callbackFailCount.addAndGet(count); + metricItem.callbackTimeCost.addAndGet(time); + } + + public void addAckSuccess(InLongTopic topic, int partitionId) { + SortSdkMetricItem metricItem = this.getMetricItem(topic, partitionId); + metricItem.ackSuccCount.incrementAndGet(); + } + + public void addAckFail(InLongTopic topic, int partitionId) { + SortSdkMetricItem metricItem = this.getMetricItem(topic, partitionId); + metricItem.ackFailCount.incrementAndGet(); } - public SortClientStateCounter getStateCounterByTopic(InLongTopic topic) { - return statManager.getStatistics(config.getSortTaskId(), - topic.getInLongCluster().getClusterId(), topic.getTopic()); + public void addTopicOnlineCount(int count) { + SortSdkMetricItem metricItem = this.getMetricItem(null, -1); + metricItem.topicOnlineCount.addAndGet(count); } - public SortClientStateCounter getDefaultStateCounter() { - return statManager.getStatistics(config.getSortTaskId()); + public void addTopicOfflineCount(int count) { + SortSdkMetricItem metricItem = this.getMetricItem(null, -1); + metricItem.topicOfflineCount.addAndGet(count); + } + + public void addRequestManager() { + SortSdkMetricItem metricItem = this.getMetricItem(null, -1); + metricItem.requestManagerCount.incrementAndGet(); + } + + public void addRequestManagerFail(long time) { + SortSdkMetricItem metricItem = this.getMetricItem(null, -1); + metricItem.requestManagerFailCount.incrementAndGet(); + metricItem.requestManagerTimeCost.addAndGet(time); + } + + public void addRequestManagerConfChange() { + SortSdkMetricItem metricItem = this.getMetricItem(null, -1); + metricItem.requestManagerConfChangedCount.incrementAndGet(); + } + + public void addRequestManagerCommonError() { + SortSdkMetricItem metricItem = this.getMetricItem(null, -1); + metricItem.requestManagerCommonErrorCount.incrementAndGet(); + } + + public void addRequestManagerParamError() { + SortSdkMetricItem metricItem = this.getMetricItem(null, -1); + metricItem.requestManagerParamErrorCount.incrementAndGet(); + } + + private SortSdkMetricItem getMetricItem(InLongTopic topic, int partitionId) { + Map<String, String> dimensions = new HashMap<>(); + dimensions.put(SortSdkMetricItem.KEY_SORT_TASK_ID, sortTaskId); + if (topic != null) { + dimensions.put(SortSdkMetricItem.KEY_CLUSTER_ID, topic.getInLongCluster().getClusterId()); + dimensions.put(SortSdkMetricItem.KEY_TOPIC_ID, topic.getTopic()); Review Comment: fixed, thx ########## inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java: ########## @@ -38,29 +46,118 @@ public SortClientConfig getConfig() { @Override public boolean clean() { - statManager.clean(); return true; } - public StatManager getStatManager() { - return statManager; + public void addConsumeTime(InLongTopic topic, int partitionId) { + SortSdkMetricItem metricItem = this.getMetricItem(topic, partitionId); + metricItem.consumeTimes.incrementAndGet(); } - public void acquireRequestPermit() throws InterruptedException { - config.getGlobalInProgressRequest().acquireUninterruptibly(); + public void addConsumeSuccess(InLongTopic topic, int partitionId, int size, int count, long time) { + SortSdkMetricItem metricItem = this.getMetricItem(topic, partitionId); + metricItem.consumeSize.addAndGet(size); + metricItem.consumeMsgCount.addAndGet(count); + metricItem.consumeTimeCost.addAndGet(time); } - public void releaseRequestPermit() { - config.getGlobalInProgressRequest().release(); + public void addConsumeFilter(InLongTopic topic, int partitionId, int count) { + SortSdkMetricItem metricItem = this.getMetricItem(topic, partitionId); + metricItem.filterCount.addAndGet(count); + } + + public void addConsumeEmpty(InLongTopic topic, int partitionId, long time) { + SortSdkMetricItem metricItem = this.getMetricItem(topic, partitionId); + metricItem.consumeEmptyCount.incrementAndGet(); + metricItem.consumeTimeCost.addAndGet(time); + } + + public void addConsumeError(InLongTopic topic, int partitionId, long time) { + SortSdkMetricItem metricItem = this.getMetricItem(topic, partitionId); + metricItem.consumeErrorCount.incrementAndGet(); + metricItem.consumeTimeCost.addAndGet(time); + } + + public void addCallBack(InLongTopic topic, int partitionId) { + SortSdkMetricItem metricItem = this.getMetricItem(topic, partitionId); + metricItem.callbackCount.incrementAndGet(); + } + + public void addCallBackSuccess(InLongTopic topic, int partitionId, int count, long time) { + SortSdkMetricItem metricItem = this.getMetricItem(topic, partitionId); + metricItem.callbackDoneCount.addAndGet(count); + metricItem.callbackTimeCost.addAndGet(time); + } + + public void addCallBackFail(InLongTopic topic, int partitionId, int count, long time) { + SortSdkMetricItem metricItem = this.getMetricItem(topic, partitionId); + metricItem.callbackFailCount.addAndGet(count); + metricItem.callbackTimeCost.addAndGet(time); + } + + public void addAckSuccess(InLongTopic topic, int partitionId) { + SortSdkMetricItem metricItem = this.getMetricItem(topic, partitionId); + metricItem.ackSuccCount.incrementAndGet(); + } + + public void addAckFail(InLongTopic topic, int partitionId) { + SortSdkMetricItem metricItem = this.getMetricItem(topic, partitionId); + metricItem.ackFailCount.incrementAndGet(); } - public SortClientStateCounter getStateCounterByTopic(InLongTopic topic) { - return statManager.getStatistics(config.getSortTaskId(), - topic.getInLongCluster().getClusterId(), topic.getTopic()); + public void addTopicOnlineCount(int count) { + SortSdkMetricItem metricItem = this.getMetricItem(null, -1); + metricItem.topicOnlineCount.addAndGet(count); } - public SortClientStateCounter getDefaultStateCounter() { - return statManager.getStatistics(config.getSortTaskId()); + public void addTopicOfflineCount(int count) { + SortSdkMetricItem metricItem = this.getMetricItem(null, -1); + metricItem.topicOfflineCount.addAndGet(count); + } + + public void addRequestManager() { + SortSdkMetricItem metricItem = this.getMetricItem(null, -1); + metricItem.requestManagerCount.incrementAndGet(); + } + + public void addRequestManagerFail(long time) { + SortSdkMetricItem metricItem = this.getMetricItem(null, -1); + metricItem.requestManagerFailCount.incrementAndGet(); + metricItem.requestManagerTimeCost.addAndGet(time); + } + + public void addRequestManagerConfChange() { + SortSdkMetricItem metricItem = this.getMetricItem(null, -1); + metricItem.requestManagerConfChangedCount.incrementAndGet(); + } + + public void addRequestManagerCommonError() { + SortSdkMetricItem metricItem = this.getMetricItem(null, -1); + metricItem.requestManagerCommonErrorCount.incrementAndGet(); + } + + public void addRequestManagerParamError() { + SortSdkMetricItem metricItem = this.getMetricItem(null, -1); + metricItem.requestManagerParamErrorCount.incrementAndGet(); + } + + private SortSdkMetricItem getMetricItem(InLongTopic topic, int partitionId) { + Map<String, String> dimensions = new HashMap<>(); + dimensions.put(SortSdkMetricItem.KEY_SORT_TASK_ID, sortTaskId); + if (topic != null) { + dimensions.put(SortSdkMetricItem.KEY_CLUSTER_ID, topic.getInLongCluster().getClusterId()); + dimensions.put(SortSdkMetricItem.KEY_TOPIC_ID, topic.getTopic()); + } + dimensions.put(SortSdkMetricItem.KEY_PARTITION_ID, String.valueOf(partitionId)); Review Comment: fixed, thx -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org