hubcio commented on code in PR #3135:
URL: https://github.com/apache/iggy/pull/3135#discussion_r3241285453
##########
core/server/src/streaming/partitions/storage.rs:
##########
@@ -181,9 +181,16 @@ pub fn load_consumer_offsets(path: &str) ->
Result<Vec<ConsumerOffset>, IggyErro
}
let name = dir_entry.file_name().into_string().unwrap();
Review Comment:
this fix is incomplete - `into_string().unwrap()` on this line still panics
on a non-utf-8 filename, which is perfectly legal on linux filesystems. the
pr's whole rationale is an operator dropping a stray file crashing startup, but
a stray file with a non-utf-8 name crashes here before ever reaching the parse
you patched. handle the `Err` the same way - `to_string_lossy()` or warn +
`continue`.
same loop has two more startup-panic paths the pr leaves untouched:
`dir_entry.unwrap()` a few lines up panics on a per-entry io error, and
`metadata.is_err()` does `break` instead of `continue` - on a metadata error it
bails the whole loop and silently drops every remaining offset file, including
valid ones.
##########
core/server/src/streaming/partitions/storage.rs:
##########
@@ -181,9 +181,16 @@ pub fn load_consumer_offsets(path: &str) ->
Result<Vec<ConsumerOffset>, IggyErro
}
let name = dir_entry.file_name().into_string().unwrap();
- let consumer_id = name.parse::<u32>().unwrap_or_else(|_| {
- panic!("Invalid consumer ID file with name: '{}'.", name);
- });
+ let consumer_id = match name.parse::<u32>() {
Review Comment:
behavior tradeoff worth calling out in the pr description: a genuinely
corrupt or renamed offset file that should be numeric but isn't is now silently
skipped, so that consumer loses its committed offset and re-consumes on
restart. the `warn!` gives observability so this is probably acceptable, but
it's a crash -> silent-data-skip swap, not a pure robustness win.
##########
core/integration/tests/storage/consumer_offsets.rs:
##########
@@ -0,0 +1,145 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use iggy_common::{ConsumerKind, IggyError};
+use server::streaming::partitions::storage::{load_consumer_group_offsets,
load_consumer_offsets};
+use std::path::Path;
+use std::sync::atomic::Ordering;
+
+fn write_offset_file(dir: &Path, name: &str, offset: u64) {
+ std::fs::write(dir.join(name), offset.to_le_bytes()).unwrap();
+}
+
+#[test]
+fn load_consumer_offsets_valid_files() {
+ let dir = tempfile::tempdir().unwrap();
+ write_offset_file(dir.path(), "1", 100);
+ write_offset_file(dir.path(), "2", 200);
+ write_offset_file(dir.path(), "3", 300);
+
+ let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
+
+ assert_eq!(offsets.len(), 3);
+ assert_eq!(offsets[0].consumer_id, 1);
+ assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 100);
+ assert_eq!(offsets[0].kind, ConsumerKind::Consumer);
+ assert_eq!(offsets[1].consumer_id, 2);
+ assert_eq!(offsets[1].offset.load(Ordering::Relaxed), 200);
+ assert_eq!(offsets[2].consumer_id, 3);
+ assert_eq!(offsets[2].offset.load(Ordering::Relaxed), 300);
+}
+
+#[test]
+fn load_consumer_offsets_skips_non_numeric_files() {
+ let dir = tempfile::tempdir().unwrap();
+ write_offset_file(dir.path(), ".DS_Store", 0);
+ write_offset_file(dir.path(), "backup.bak", 0);
+ write_offset_file(dir.path(), "1", 42);
+
+ let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
+
+ assert_eq!(offsets.len(), 1);
+ assert_eq!(offsets[0].consumer_id, 1);
+ assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 42);
+}
Review Comment:
missing test case: a numeric-named file with fewer than 8 bytes of content.
`load_consumer_offsets` does `read_exact(&mut [0u8; 8])` on each file, so a
truncated or partially-written offset file makes the whole function return
`Err` and aborts the entire load - same "one bad file kills startup" class this
pr targets, and more likely in practice than a non-numeric name (partial write,
disk full). worth adding alongside the non-numeric test here.
##########
core/server/src/streaming/partitions/storage.rs:
##########
@@ -244,12 +251,16 @@ pub fn load_consumer_group_offsets(
let name = dir_entry.file_name().into_string().unwrap();
Review Comment:
same non-utf-8 gap as in `load_consumer_offsets` - `into_string().unwrap()`
here still panics on a non-utf-8 filename before the parse you patched is
reached. and same as the other loop, `dir_entry.unwrap()` and the
`metadata.is_err()` -> `break` (drops all remaining files on a metadata error)
are left as startup-panic / data-loss paths.
##########
core/integration/tests/storage/consumer_offsets.rs:
##########
@@ -0,0 +1,145 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use iggy_common::{ConsumerKind, IggyError};
+use server::streaming::partitions::storage::{load_consumer_group_offsets,
load_consumer_offsets};
+use std::path::Path;
+use std::sync::atomic::Ordering;
+
+fn write_offset_file(dir: &Path, name: &str, offset: u64) {
+ std::fs::write(dir.join(name), offset.to_le_bytes()).unwrap();
+}
+
+#[test]
+fn load_consumer_offsets_valid_files() {
+ let dir = tempfile::tempdir().unwrap();
+ write_offset_file(dir.path(), "1", 100);
+ write_offset_file(dir.path(), "2", 200);
+ write_offset_file(dir.path(), "3", 300);
+
+ let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
+
+ assert_eq!(offsets.len(), 3);
+ assert_eq!(offsets[0].consumer_id, 1);
+ assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 100);
+ assert_eq!(offsets[0].kind, ConsumerKind::Consumer);
+ assert_eq!(offsets[1].consumer_id, 2);
+ assert_eq!(offsets[1].offset.load(Ordering::Relaxed), 200);
+ assert_eq!(offsets[2].consumer_id, 3);
+ assert_eq!(offsets[2].offset.load(Ordering::Relaxed), 300);
+}
+
+#[test]
+fn load_consumer_offsets_skips_non_numeric_files() {
+ let dir = tempfile::tempdir().unwrap();
+ write_offset_file(dir.path(), ".DS_Store", 0);
+ write_offset_file(dir.path(), "backup.bak", 0);
+ write_offset_file(dir.path(), "1", 42);
+
+ let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
+
+ assert_eq!(offsets.len(), 1);
+ assert_eq!(offsets[0].consumer_id, 1);
+ assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 42);
+}
+
+#[test]
+fn load_consumer_offsets_empty_dir() {
+ let dir = tempfile::tempdir().unwrap();
+
+ let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
+
+ assert!(offsets.is_empty());
+}
+
+#[test]
+fn load_consumer_offsets_skips_directories() {
Review Comment:
this test doesn't cover anything this pr changes. the directory skip it
asserts happens via the `metadata.unwrap().is_dir()` check in
`load_consumer_offsets`, which is untouched by this pr - and `123` is a numeric
name, so it'd only reach the patched code if that `is_dir` check were removed.
fine to keep as a regression test, but it's pinning pre-existing behavior, not
this change.
##########
core/integration/tests/storage/consumer_offsets.rs:
##########
@@ -0,0 +1,145 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use iggy_common::{ConsumerKind, IggyError};
+use server::streaming::partitions::storage::{load_consumer_group_offsets,
load_consumer_offsets};
+use std::path::Path;
+use std::sync::atomic::Ordering;
+
+fn write_offset_file(dir: &Path, name: &str, offset: u64) {
+ std::fs::write(dir.join(name), offset.to_le_bytes()).unwrap();
+}
+
+#[test]
+fn load_consumer_offsets_valid_files() {
+ let dir = tempfile::tempdir().unwrap();
+ write_offset_file(dir.path(), "1", 100);
+ write_offset_file(dir.path(), "2", 200);
+ write_offset_file(dir.path(), "3", 300);
+
+ let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
+
+ assert_eq!(offsets.len(), 3);
+ assert_eq!(offsets[0].consumer_id, 1);
+ assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 100);
+ assert_eq!(offsets[0].kind, ConsumerKind::Consumer);
+ assert_eq!(offsets[1].consumer_id, 2);
+ assert_eq!(offsets[1].offset.load(Ordering::Relaxed), 200);
+ assert_eq!(offsets[2].consumer_id, 3);
+ assert_eq!(offsets[2].offset.load(Ordering::Relaxed), 300);
+}
+
+#[test]
+fn load_consumer_offsets_skips_non_numeric_files() {
+ let dir = tempfile::tempdir().unwrap();
+ write_offset_file(dir.path(), ".DS_Store", 0);
+ write_offset_file(dir.path(), "backup.bak", 0);
+ write_offset_file(dir.path(), "1", 42);
+
+ let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
+
+ assert_eq!(offsets.len(), 1);
+ assert_eq!(offsets[0].consumer_id, 1);
+ assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 42);
+}
+
+#[test]
+fn load_consumer_offsets_empty_dir() {
+ let dir = tempfile::tempdir().unwrap();
+
+ let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
+
+ assert!(offsets.is_empty());
+}
+
+#[test]
+fn load_consumer_offsets_skips_directories() {
+ let dir = tempfile::tempdir().unwrap();
+ std::fs::create_dir(dir.path().join("123")).unwrap();
+ write_offset_file(dir.path(), "1", 77);
+
+ let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
+
+ assert_eq!(offsets.len(), 1);
+ assert_eq!(offsets[0].consumer_id, 1);
+ assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 77);
+}
+
+#[test]
+fn load_consumer_offsets_nonexistent_dir() {
+ let result = load_consumer_offsets("/tmp/nonexistent_iggy_test_dir_12345");
Review Comment:
also at line 138 in `load_consumer_group_offsets_nonexistent_dir`. both
tests hardcode `/tmp/nonexistent_iggy_test_dir_12345` - if that path ever
exists (leftover from a crashed run, or a parallel run) the test breaks. create
a `tempfile::tempdir()`, grab its path, drop it, then use that path -
guaranteed nonexistent and unique.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]