drpmma commented on code in PR #8430: URL: https://github.com/apache/rocketmq/pull/8430#discussion_r1687528383
########## client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java: ########## @@ -44,55 +44,55 @@ import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.topic.TopicValidator; -import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.RPCHook; import static org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME; public class AsyncTraceDispatcher implements TraceDispatcher { private final static Logger log = LoggerFactory.getLogger(AsyncTraceDispatcher.class); private final static AtomicInteger COUNTER = new AtomicInteger(); private final static short MAX_MSG_KEY_SIZE = Short.MAX_VALUE - 10000; + private static final AtomicInteger INSTANCE_NUM = new AtomicInteger(0); + private final int traceInstanceId = INSTANCE_NUM.getAndIncrement(); private final int queueSize; private final int batchSize; private final int maxMsgSize; private final long pollingTimeMil; private final long waitTimeThresholdMil; private final DefaultMQProducer traceProducer; - private final ThreadPoolExecutor traceExecutor; - // The last discard number of log private AtomicLong discardCount; private Thread worker; + private final ThreadPoolExecutor traceExecutor; + + private final int threadNum = Math.max(8, Runtime.getRuntime().availableProcessors()); private final ArrayBlockingQueue<TraceContext> traceContextQueue; - private final HashMap<String, TraceDataSegment> taskQueueByTopic; + // private final HashMap<String, TraceDataSegment> taskQueueByTopic; private ArrayBlockingQueue<Runnable> appenderQueue; private volatile Thread shutDownHook; private volatile boolean stopped = false; private DefaultMQProducerImpl hostProducer; private DefaultMQPushConsumerImpl hostConsumer; private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); - private String dispatcherId = UUID.randomUUID().toString(); private volatile String traceTopicName; private AtomicBoolean isStarted = new AtomicBoolean(false); private volatile AccessChannel accessChannel = AccessChannel.LOCAL; private String group; private Type type; private String namespaceV2; - public AsyncTraceDispatcher(String group, Type type, String traceTopicName, RPCHook rpcHook) { + public AsyncTraceDispatcher(String group, Type type, int batchSize, String traceTopicName, RPCHook rpcHook) { // queueSize is greater than or equal to the n power of 2 of value this.queueSize = 2048; - this.batchSize = 100; + this.batchSize = Math.min(batchSize, 20); Review Comment: It's recommended to add some annotations about `min`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@rocketmq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org