atharvalade commented on code in PR #3151:
URL: https://github.com/apache/iggy/pull/3151#discussion_r3139490421


##########
core/simulator/src/client.rs:
##########
@@ -181,6 +181,44 @@ impl SimClient {
         self.build_request_with_namespace(Operation::DeleteConsumerOffset, 
&payload, namespace)
     }
 
+    /// v2 of `store_consumer_offset` carrying an explicit `AckLevel` byte.
+    ///
+    /// Only the simulator emits this opcode today; the partitions plane
+    /// accepts it alongside v1. The ack byte is reserved for future
+    /// cluster-side commit-timing semantics.
+    pub fn store_consumer_offset_v2(
+        &self,
+        namespace: IggyNamespace,
+        consumer_kind: u8,
+        consumer_id: u32,
+        offset: u64,
+        ack: AckLevel,
+    ) -> Message<RequestHeader> {
+        let mut payload = Vec::with_capacity(14);
+        payload.push(consumer_kind);
+        payload.extend_from_slice(&consumer_id.to_le_bytes());
+        payload.extend_from_slice(&offset.to_le_bytes());
+        payload.push(ack.as_u8());
+
+        self.build_request_with_namespace(Operation::StoreConsumerOffset2, 
&payload, namespace)
+    }
+
+    /// v2 of `delete_consumer_offset` carrying an explicit `AckLevel` byte.
+    pub fn delete_consumer_offset_v2(
+        &self,
+        namespace: IggyNamespace,
+        consumer_kind: u8,
+        consumer_id: u32,
+        ack: AckLevel,
+    ) -> Message<RequestHeader> {
+        let mut payload = Vec::with_capacity(6);
+        payload.push(consumer_kind);
+        payload.extend_from_slice(&consumer_id.to_le_bytes());
+        payload.push(ack.as_u8());
+
+        self.build_request_with_namespace(Operation::DeleteConsumerOffset2, 
&payload, namespace)
+    }

Review Comment:
   `store_consumer_offset_v2` and `delete_consumer_offset_v2` are defined here 
but never called anywhere in the codebase. Similarly, 
`StoreConsumerOffset2Request` / `DeleteConsumerOffset2Request` in 
binary_protocol are only exercised in their own unit tests.
   
   This means the v2 path has zero end-to-end coverage. The wire types have 
roundtrip tests which is great, but there's nothing proving a v2 message built 
by the simulator actually flows through the consensus pipeline and gets parsed 
correctly by `parse_consumer_offset_payload`. I think at minimum there should 
be a simulator-level test (or even just a unit test in `iggy_partition.rs`) 
that builds a v2 store/delete message and runs it through the partition parser 
to confirm the bytes land in the right places. Otherwise this is a lot of 
scaffolding with no integration confidence.



##########
core/partitions/src/iggy_partition.rs:
##########
@@ -1421,7 +1430,9 @@ where
                         })?;
                 Ok((kind, consumer_id, Some(offset)))
             }
-            Operation::DeleteConsumerOffset => Ok((kind, consumer_id, None)),
+            Operation::DeleteConsumerOffset | Operation::DeleteConsumerOffset2 
=> {
+                Ok((kind, consumer_id, None))
+            }
             _ => Err(IggyError::InvalidCommand),
         }
     }

Review Comment:
   This is the only place that actually processes v2 commands (both 
`parse_consumer_offset_request` and `parse_staged_consumer_offset_commit` 
delegate here), and it handles `StoreConsumerOffset2` in the same branch as v1 
without ever reading the trailing ack byte. For store, parsing stops at 
`body[5..13]` (byte 12). For delete, it stops at `body[1..5]` (byte 4). The ack 
byte just falls off the end.
   
   Two concerns here. First, `AckLevel` has literally no effect on behavior 
right now, but the PR description says it "selects the policy." I think the 
description should be explicit that the ack byte is wire-reserved only and 
behavioral branching comes later. Otherwise future contributors might assume 
this already works.
   
   Second, and more importantly from a protocol correctness angle: an invalid 
ack value (e.g. `0xFF`) would be silently accepted with no validation error. 
Wouldn't it be better to at least validate the byte here, even if you don't use 
it yet? Something like reading the ack byte and calling `AckLevel::from_code()` 
to reject garbage early. That way you get fail-fast behavior for malformed 
messages instead of silently swallowing bad data through the pipeline.



##########
core/integration/tests/server/scenarios/authentication_scenario.rs:
##########
@@ -139,6 +139,16 @@ async fn test_all_commands_require_auth(client: 
&IggyClient) {
         ) {
             continue;
         }
+        // v2 consumer-offset ops are registered in the dispatch table for the
+        // consensus/simulator pathway but are not wired into the legacy binary
+        // server's dispatch. They'll move into server-ng alongside the rest of
+        // the v2 surface; re-enable these codes here once that lands.
+        if matches!(
+            code,
+            STORE_CONSUMER_OFFSET_2_CODE | DELETE_CONSUMER_OFFSET_2_CODE
+        ) {
+            continue;
+        }

Review Comment:
   The comment says "re-enable these codes here once that lands" but there's no 
issue/ticket reference to track it. In my experience these `continue` skips 
tend to get forgotten once the PR is merged. Would it be worth adding a `// 
TODO(#XXXX)` with an issue number so this shows up in tracking? Or even a 
`compile_error!` behind a feature flag that fires when server-ng v2 dispatch 
lands, forcing someone to come back here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to