apoorvmittal10 commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2012532595


##########
server-common/src/main/java/org/apache/kafka/server/storage/log/FetchIsolation.java:
##########
@@ -25,11 +25,11 @@ public enum FetchIsolation {
     TXN_COMMITTED;
 
     public static FetchIsolation of(FetchRequest request) {
-        return of(request.replicaId(), request.isolationLevel());
+        return of(request.replicaId(), request.isolationLevel(), false);
     }
 
-    public static FetchIsolation of(int replicaId, IsolationLevel 
isolationLevel) {
-        if (!FetchRequest.isConsumer(replicaId)) {
+    public static FetchIsolation of(int replicaId, IsolationLevel 
isolationLevel, boolean isShareFetchRequest) {
+        if (!FetchRequest.isConsumer(replicaId) && !isShareFetchRequest) {

Review Comment:
   Hmmm, I am not sure if we should have `isShareFetchRequest` as a param. 
Shall there be any other replicaId for share consumers?
   
   cc: @junrao 
   
   ```
       public static final int ORDINARY_CONSUMER_ID = -1;
       public static final int DEBUGGING_CONSUMER_ID = -2;
       public static final int FUTURE_LOCAL_REPLICA_ID = -3;



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -83,6 +92,8 @@ public final class GroupConfig extends AbstractConfig {
 
     public final int streamsNumStandbyReplicas;
 
+    public final int shareIsolationLevel;

Review Comment:
   KIP says `Valid values "read_committed"  and "read_uncommitted" (default)` 
so why do we have the int value?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -104,7 +104,8 @@ class KafkaApis(val requestChannel: RequestChannel,
                 time: Time,
                 val tokenManager: DelegationTokenManager,
                 val apiVersionManager: ApiVersionManager,
-                val clientMetricsManager: ClientMetricsManager
+                val clientMetricsManager: ClientMetricsManager,
+                val groupConfigManager: GroupConfigManager

Review Comment:
   Though it's fine to have another manager in KafkaApis but would it be better 
to have `groupConfigManager` in `SharePartitionManager`? KafkaApis can pass 
default IsolationLevel using `FetchIsolation.of(-1, 
GroupConfig.defaultShareIsolationLevel, true)` and SharePartitionManager can 
check for updated IsolationLevel using below code. The final isolation level 
can go in `ShareFetch` class? This approach require less plumbing in KafkaApis 
but adds a isolation level param in `ShareFetch` which might be different from 
`FetchParams`. What do you think?
   
   ```
   val isolationLevel: IsolationLevel = 
IsolationLevel.forId(groupConfigManager.groupConfig(groupId).get().shareIsolationLevel().toByte)
             FetchIsolation.of(-1, isolationLevel, true)



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -65,6 +67,13 @@ public final class GroupConfig extends AbstractConfig {
 
     public static final String STREAMS_NUM_STANDBY_REPLICAS_CONFIG = 
"group.streams.num.standby.replicas";
 
+    public static final String SHARE_ISOLATION_LEVEL_CONFIG = 
"share.isolation.level";
+    public static final int SHARE_ISOLATION_LEVEL_DEFAULT = 
IsolationLevel.READ_UNCOMMITTED.id();
+    public static final String SHARE_ISOLATION_LEVEL_DOC = "Controls how to 
read records written transactionally. " +
+        "If set to \"read_committed\", the share group will only deliver 
transactional records which have been committed. " +
+        "If set to \"read_uncommitted\", the share group will return all 
messages, even transactional messages which have been aborted. " +
+        "Non-transactional records will be returned unconditionally in either 
mode.";

Review Comment:
   nit: would be good to align with share group configs above.



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -847,6 +859,7 @@ public ShareAcquiredRecords acquire(
             }

Review Comment:
   Don't we need the filtering here as well? Do we cover this scenario in unit 
test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to