AndrewJSchofield commented on code in PR #15836:
URL: https://github.com/apache/kafka/pull/15836#discussion_r1589348685


##########
core/src/main/scala/kafka/server/FetchSession.scala:
##########
@@ -395,19 +396,27 @@ object FullFetchContext {
   * The fetch context for a full fetch request.
   *
   * @param time               The clock to use.
-  * @param cache              The fetch session cache.
+  * @param caches             The fetch session cache shards.
   * @param reqMetadata        The request metadata.
   * @param fetchData          The partition data from the fetch request.
   * @param usesTopicIds       True if this session should use topic IDs.
   * @param isFromFollower     True if this fetch request came from a follower.
   */
 class FullFetchContext(private val time: Time,
-                       private val cache: FetchSessionCache,
+                       private val caches: Seq[FetchSessionCache],

Review Comment:
   `cacheShards`?



##########
core/src/main/scala/kafka/server/FetchSession.scala:
##########
@@ -583,9 +595,13 @@ case class EvictableKey(privileged: Boolean, size: Int, 
id: Int) extends Compara
   *
   * @param maxEntries The maximum number of entries that can be in the cache.
   * @param evictionMs The minimum time that an entry must be unused in order 
to be evictable.
-  */
+  * @param sessionIdRange The number of sessionIds each cache shard handles. 
The range for a given shard is [Math.max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange).

Review Comment:
   I know the `[ , )` notation means what you intend for inclusive/exclusive 
bounds, but the presence of the parentheses makes it a bit hard to read I 
think. Maybe using >= and < would be clearer.



##########
core/src/main/scala/kafka/server/FetchSession.scala:
##########
@@ -583,9 +595,13 @@ case class EvictableKey(privileged: Boolean, size: Int, 
id: Int) extends Compara
   *
   * @param maxEntries The maximum number of entries that can be in the cache.
   * @param evictionMs The minimum time that an entry must be unused in order 
to be evictable.
-  */
+  * @param sessionIdRange The number of sessionIds each cache shard handles. 
The range for a given shard is [Math.max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange).
+  * @param shardNum Identifier for this shard.
+ */
 class FetchSessionCache(private val maxEntries: Int,

Review Comment:
   Maybe the class name ought to be `FetchSessionCache` shard too. The cache 
really is the whole set of shards.



##########
core/src/main/scala/kafka/server/BrokerServer.scala:
##########
@@ -389,9 +389,17 @@ class BrokerServer(
       authorizer = config.createNewAuthorizer()
       authorizer.foreach(_.configure(config.originals))
 
-      val fetchManager = new FetchManager(Time.SYSTEM,
-        new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
-          KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
+      // The FetchSessionCache is divided into config.numIoThreads shards, 
each responsible
+      // for sessionIds falling in [Max(1, shardNum * sessionIdRange), 
(shardNum + 1) * sessionIdRange)
+      val sessionIdRange = Int.MaxValue / config.numIoThreads

Review Comment:
   I would tend not to add a configuration for this. The value you're talking 
about sounds like it's doing the job on a busy workload, and it's small enough 
that there's negligible benefit of configuring it smaller for a tiny cluster. 
Having a configuration kind of crystallizes this aspect of the internal design 
of Kafka, and you might have an even better idea in the future that would make 
this configuration pointless.



##########
core/src/main/scala/kafka/server/FetchSession.scala:
##########
@@ -789,7 +807,15 @@ class FetchSessionCache(private val maxEntries: Int,
 }
 
 class FetchManager(private val time: Time,
-                   private val cache: FetchSessionCache) extends Logging {
+                   private val caches: Seq[FetchSessionCache]) extends Logging 
{
+
+  def this(time: Time, cache: FetchSessionCache) = this(time, Seq(cache))
+
+  def getShardedCache(sessionId: Int): FetchSessionCache = {

Review Comment:
   `getCacheShard`?



##########
core/src/main/scala/kafka/server/FetchSession.scala:
##########
@@ -603,14 +619,16 @@ class FetchSessionCache(private val maxEntries: Int,
   // A map containing sessions which can be evicted by privileged sessions.
   private val evictableByPrivileged = new util.TreeMap[EvictableKey, 
FetchSession]
 
+  private val metricTag = Map("shard" -> s"$shardNum").asJava
+
   // Set up metrics.
-  metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS)
-  metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => 
FetchSessionCache.this.size)
-  
metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED)
-  metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_PARTITIONS_CACHED, 
() => FetchSessionCache.this.totalPartitions)
-  
metricsGroup.removeMetric(FetchSession.INCREMENTAL_FETCH_SESSIONS_EVICTIONS_PER_SEC)
+  metricsGroup.removeMetric(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, 
metricTag)
+  metricsGroup.newGauge(FetchSession.NUM_INCREMENTAL_FETCH_SESSIONS, () => 
FetchSessionCache.this.size, metricTag)

Review Comment:
   Yes, the metric would be KIP-able. But you have decided you don't need the 
tag :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to