This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new e0c55e6e4 fix(test): remove race condition in postgres source mark
test (#3052)
e0c55e6e4 is described below
commit e0c55e6e491082cf72a1193d86c519291147ebfc
Author: Krishna Vishal <[email protected]>
AuthorDate: Mon Mar 30 14:36:19 2026 +0530
fix(test): remove race condition in postgres source mark test (#3052)
Closes #3050
## Rationale
The processed_column_source_marks_rows_after_producing test fails
intermittently on CI because the postgres source connector (polling
every 10ms) can process rows between the insert and the initial
count_unprocessed check.
The strict assertions `unprocessed == 3` and `processed == 0` are
replaced with a single `unprocessed + processed == 3` check that
verifies all rows exist without being timing-sensitive. The final
polling loop already asserts the actual behavior under test: `processed
== 3`, `unprocessed == 0`, and `total == 3`.
Co-authored-by: Piotr Gankiewicz <[email protected]>
---
core/integration/tests/connectors/postgres/postgres_source.rs | 11 +++++------
1 file changed, 5 insertions(+), 6 deletions(-)
diff --git a/core/integration/tests/connectors/postgres/postgres_source.rs
b/core/integration/tests/connectors/postgres/postgres_source.rs
index bbfd0307b..606f2d1f5 100644
--- a/core/integration/tests/connectors/postgres/postgres_source.rs
+++ b/core/integration/tests/connectors/postgres/postgres_source.rs
@@ -339,12 +339,11 @@ async fn
processed_column_source_marks_rows_after_producing(
let initial_unprocessed = fixture.count_unprocessed(&pool).await;
let initial_processed = fixture.count_processed(&pool).await;
assert_eq!(
- initial_unprocessed, TEST_MESSAGE_COUNT as i64,
- "Expected {TEST_MESSAGE_COUNT} unprocessed rows before processing"
- );
- assert_eq!(
- initial_processed, 0,
- "Expected 0 processed rows before processing"
+ initial_unprocessed + initial_processed,
+ TEST_MESSAGE_COUNT as i64,
+ "Expected {TEST_MESSAGE_COUNT} total rows before processing, got {}
unprocessed + {} processed",
+ initial_unprocessed,
+ initial_processed
);
let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();