atharvalade commented on code in PR #3151:
URL: https://github.com/apache/iggy/pull/3151#discussion_r3139490421
##########
core/simulator/src/client.rs:
##########
@@ -181,6 +181,44 @@ impl SimClient {
self.build_request_with_namespace(Operation::DeleteConsumerOffset,
&payload, namespace)
}
+ /// v2 of `store_consumer_offset` carrying an explicit `AckLevel` byte.
+ ///
+ /// Only the simulator emits this opcode today; the partitions plane
+ /// accepts it alongside v1. The ack byte is reserved for future
+ /// cluster-side commit-timing semantics.
+ pub fn store_consumer_offset_v2(
+ &self,
+ namespace: IggyNamespace,
+ consumer_kind: u8,
+ consumer_id: u32,
+ offset: u64,
+ ack: AckLevel,
+ ) -> Message<RequestHeader> {
+ let mut payload = Vec::with_capacity(14);
+ payload.push(consumer_kind);
+ payload.extend_from_slice(&consumer_id.to_le_bytes());
+ payload.extend_from_slice(&offset.to_le_bytes());
+ payload.push(ack.as_u8());
+
+ self.build_request_with_namespace(Operation::StoreConsumerOffset2,
&payload, namespace)
+ }
+
+ /// v2 of `delete_consumer_offset` carrying an explicit `AckLevel` byte.
+ pub fn delete_consumer_offset_v2(
+ &self,
+ namespace: IggyNamespace,
+ consumer_kind: u8,
+ consumer_id: u32,
+ ack: AckLevel,
+ ) -> Message<RequestHeader> {
+ let mut payload = Vec::with_capacity(6);
+ payload.push(consumer_kind);
+ payload.extend_from_slice(&consumer_id.to_le_bytes());
+ payload.push(ack.as_u8());
+
+ self.build_request_with_namespace(Operation::DeleteConsumerOffset2,
&payload, namespace)
+ }
Review Comment:
`store_consumer_offset_v2` and `delete_consumer_offset_v2` are defined here
but never called anywhere in the codebase. Similarly,
`StoreConsumerOffset2Request` / `DeleteConsumerOffset2Request` in
binary_protocol are only exercised in their own unit tests.
This means the v2 path has zero end-to-end coverage. The wire types have
roundtrip tests which is great, but there's nothing proving a v2 message built
by the simulator actually flows through the consensus pipeline and gets parsed
correctly by `parse_consumer_offset_payload`. I think at minimum there should
be a simulator-level test (or even just a unit test in `iggy_partition.rs`)
that builds a v2 store/delete message and runs it through the partition parser
to confirm the bytes land in the right places. Otherwise this is a lot of
scaffolding with no integration confidence.
##########
core/partitions/src/iggy_partition.rs:
##########
@@ -1421,7 +1430,9 @@ where
})?;
Ok((kind, consumer_id, Some(offset)))
}
- Operation::DeleteConsumerOffset => Ok((kind, consumer_id, None)),
+ Operation::DeleteConsumerOffset | Operation::DeleteConsumerOffset2
=> {
+ Ok((kind, consumer_id, None))
+ }
_ => Err(IggyError::InvalidCommand),
}
}
Review Comment:
This is the only place that actually processes v2 commands (both
`parse_consumer_offset_request` and `parse_staged_consumer_offset_commit`
delegate here), and it handles `StoreConsumerOffset2` in the same branch as v1
without ever reading the trailing ack byte. For store, parsing stops at
`body[5..13]` (byte 12). For delete, it stops at `body[1..5]` (byte 4). The ack
byte just falls off the end.
Two concerns here. First, `AckLevel` has literally no effect on behavior
right now, but the PR description says it "selects the policy." I think the
description should be explicit that the ack byte is wire-reserved only and
behavioral branching comes later. Otherwise future contributors might assume
this already works.
Second, and more importantly from a protocol correctness angle: an invalid
ack value (e.g. `0xFF`) would be silently accepted with no validation error.
Wouldn't it be better to at least validate the byte here, even if you don't use
it yet? Something like reading the ack byte and calling `AckLevel::from_code()`
to reject garbage early. That way you get fail-fast behavior for malformed
messages instead of silently swallowing bad data through the pipeline.
##########
core/integration/tests/server/scenarios/authentication_scenario.rs:
##########
@@ -139,6 +139,16 @@ async fn test_all_commands_require_auth(client:
&IggyClient) {
) {
continue;
}
+ // v2 consumer-offset ops are registered in the dispatch table for the
+ // consensus/simulator pathway but are not wired into the legacy binary
+ // server's dispatch. They'll move into server-ng alongside the rest of
+ // the v2 surface; re-enable these codes here once that lands.
+ if matches!(
+ code,
+ STORE_CONSUMER_OFFSET_2_CODE | DELETE_CONSUMER_OFFSET_2_CODE
+ ) {
+ continue;
+ }
Review Comment:
The comment says "re-enable these codes here once that lands" but there's no
issue/ticket reference to track it. In my experience these `continue` skips
tend to get forgotten once the PR is merged. Would it be worth adding a `//
TODO(#XXXX)` with an issue number so this shows up in tracking? Or even a
`compile_error!` behind a feature flag that fires when server-ng v2 dispatch
lands, forcing someone to come back here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]