drpmma commented on code in PR #8430: URL: https://github.com/apache/rocketmq/pull/8430#discussion_r1687534860
########## client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java: ########## @@ -258,146 +247,96 @@ public void removeShutdownHook() { } } - class AsyncRunnable implements Runnable { - private boolean stopped; + private long lastFlushTime = System.currentTimeMillis(); + class AsyncRunnable implements Runnable { @Override public void run() { - while (!stopped) { - synchronized (traceContextQueue) { - long endTime = System.currentTimeMillis() + pollingTimeMil; - while (System.currentTimeMillis() < endTime) { - try { - TraceContext traceContext = traceContextQueue.poll( - endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS - ); - - if (traceContext != null && !traceContext.getTraceBeans().isEmpty()) { - // get the topic which the trace message will send to - String traceTopicName = this.getTraceTopicName(traceContext.getRegionId()); - - // get the traceDataSegment which will save this trace message, create if null - TraceDataSegment traceDataSegment = taskQueueByTopic.get(traceTopicName); - if (traceDataSegment == null) { - traceDataSegment = new TraceDataSegment(traceTopicName, traceContext.getRegionId()); - taskQueueByTopic.put(traceTopicName, traceDataSegment); - } - - // encode traceContext and save it into traceDataSegment - // NOTE if data size in traceDataSegment more than maxMsgSize, - // a AsyncDataSendTask will be created and submitted - TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(traceContext); - traceDataSegment.addTraceTransferBean(traceTransferBean); - } - } catch (InterruptedException ignore) { - log.debug("traceContextQueue#poll exception"); - } - } - - // NOTE send the data in traceDataSegment which the first TraceTransferBean - // is longer than waitTimeThreshold - sendDataByTimeThreshold(); - - if (AsyncTraceDispatcher.this.stopped) { - this.stopped = true; - } + while (true) { + try { + flushTraceContext(); + } catch (Throwable e) { + log.error("flushTraceContext error", e); } } - } + } - private void sendDataByTimeThreshold() { - long now = System.currentTimeMillis(); - for (TraceDataSegment taskInfo : taskQueueByTopic.values()) { - if (now - taskInfo.firstBeanAddTime >= waitTimeThresholdMil) { - taskInfo.sendAllData(); + private void flushTraceContext() { + List<TraceContext> contextList = new ArrayList<>(batchSize); + int size = traceContextQueue.size(); + if (size != 0 && (size >= batchSize || System.currentTimeMillis() - lastFlushTime > 5000)) { Review Comment: It's better to use a var rather than magic number 5000. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org