AndrewJSchofield commented on code in PR #15836: URL: https://github.com/apache/kafka/pull/15836#discussion_r1589348685
########## core/src/main/scala/kafka/server/FetchSession.scala: ########## @@ -395,19 +396,27 @@ object FullFetchContext { * The fetch context for a full fetch request. * * @param time The clock to use. - * @param cache The fetch session cache. + * @param caches The fetch session cache shards. * @param reqMetadata The request metadata. * @param fetchData The partition data from the fetch request. * @param usesTopicIds True if this session should use topic IDs. * @param isFromFollower True if this fetch request came from a follower. */ class FullFetchContext(private val time: Time, - private val cache: FetchSessionCache, + private val caches: Seq[FetchSessionCache], Review Comment: `cacheShards`? ########## core/src/main/scala/kafka/server/FetchSession.scala: ########## @@ -583,9 +595,13 @@ case class EvictableKey(privileged: Boolean, size: Int, id: Int) extends Compara * * @param maxEntries The maximum number of entries that can be in the cache. * @param evictionMs The minimum time that an entry must be unused in order to be evictable. - */ + * @param sessionIdRange The number of sessionIds each cache shard handles. The range for a given shard is [Math.max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange). Review Comment: I know the `[ , )` notation means what you intend for inclusive/exclusive bounds, but the presence of the parentheses makes it a bit hard to read I think. Maybe using >= and < would be clearer. ########## core/src/main/scala/kafka/server/FetchSession.scala: ########## @@ -583,9 +595,13 @@ case class EvictableKey(privileged: Boolean, size: Int, id: Int) extends Compara * * @param maxEntries The maximum number of entries that can be in the cache. * @param evictionMs The minimum time that an entry must be unused in order to be evictable. - */ + * @param sessionIdRange The number of sessionIds each cache shard handles. The range for a given shard is [Math.max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange). + * @param shardNum Identifier for this shard. + */ class FetchSessionCache(private val maxEntries: Int, Review Comment: Maybe the class name ought to be `FetchSessionCache` shard too. The cache really is the whole set of shards. ########## core/src/main/scala/kafka/server/BrokerServer.scala: ########## @@ -389,9 +389,17 @@ class BrokerServer( authorizer = config.createNewAuthorizer() authorizer.foreach(_.configure(config.originals)) - val fetchManager = new FetchManager(Time.SYSTEM, - new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, - KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) + // The FetchSessionCache is divided into config.numIoThreads shards, each responsible + // for sessionIds falling in [Max(1, shardNum * sessionIdRange), (shardNum + 1) * sessionIdRange) + val sessionIdRange = Int.MaxValue / config.numIoThreads Review Comment: I would tend not to add a configuration for this. The value you're talking about sounds like it's doing the job on a busy workload, and it's small enough that there's negligible benefit of configuring it smaller for a tiny cluster. Having a configuration kind of crystallizes this aspect of the internal design of Kafka, and you might have an even better idea in the future that would make this configuration pointless. ########## core/src/main/scala/kafka/server/FetchSession.scala: ########## @@ -789,7 +807,15 @@ class FetchSessionCache(private val maxEntries: Int, } class FetchManager(private val time: Time, - private val cache: FetchSessionCache) extends Logging { + private val caches: Seq[FetchSessionCache]) extends Logging { + + def this(time: Time, cache: FetchSessionCache) = this(time, Seq(cache)) + + def getShardedCache(sessionId: Int): FetchSessionCache = { Review Comment: `getCacheShard`? ########## core/src/main/scala/kafka/server/FetchSession.scala: ########## @@ -603,14 +619,16 @@ class FetchSessionCache(private val maxEntries: Int, // A map containing sessions which can be evicted by privileged sessions. private val evictableByPrivileged = new util.TreeMap[EvictableKey, FetchSession] + private val metricTag = Map("shard" -> s"$shardNum").asJava + // Set up metrics. - metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS) - metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => FetchSessionCache.this.size) - metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED) - metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, () => FetchSessionCache.this.totalPartitions) - metricsGroup.removeMetric(FetchSession.INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC) + metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, metricTag) + metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => FetchSessionCache.this.size, metricTag) Review Comment: Yes, the metric would be KIP-able. But you have decided you don't need the tag :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org