This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new e0c55e6e4 fix(test): remove race condition in postgres source mark 
test (#3052)
e0c55e6e4 is described below

commit e0c55e6e491082cf72a1193d86c519291147ebfc
Author: Krishna Vishal <[email protected]>
AuthorDate: Mon Mar 30 14:36:19 2026 +0530

    fix(test): remove race condition in postgres source mark test (#3052)
    
    Closes #3050
    
    ## Rationale
    
    The processed_column_source_marks_rows_after_producing test fails
    intermittently on CI because the postgres source connector (polling
    every 10ms) can process rows between the insert and the initial
    count_unprocessed check.
    The strict assertions `unprocessed == 3` and `processed == 0` are
    replaced with a single `unprocessed + processed == 3` check that
    verifies all rows exist without being timing-sensitive. The final
    polling loop already asserts the actual behavior under test: `processed
    == 3`, `unprocessed == 0`, and `total == 3`.
    
    Co-authored-by: Piotr Gankiewicz <[email protected]>
---
 core/integration/tests/connectors/postgres/postgres_source.rs | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git a/core/integration/tests/connectors/postgres/postgres_source.rs 
b/core/integration/tests/connectors/postgres/postgres_source.rs
index bbfd0307b..606f2d1f5 100644
--- a/core/integration/tests/connectors/postgres/postgres_source.rs
+++ b/core/integration/tests/connectors/postgres/postgres_source.rs
@@ -339,12 +339,11 @@ async fn 
processed_column_source_marks_rows_after_producing(
     let initial_unprocessed = fixture.count_unprocessed(&pool).await;
     let initial_processed = fixture.count_processed(&pool).await;
     assert_eq!(
-        initial_unprocessed, TEST_MESSAGE_COUNT as i64,
-        "Expected {TEST_MESSAGE_COUNT} unprocessed rows before processing"
-    );
-    assert_eq!(
-        initial_processed, 0,
-        "Expected 0 processed rows before processing"
+        initial_unprocessed + initial_processed,
+        TEST_MESSAGE_COUNT as i64,
+        "Expected {TEST_MESSAGE_COUNT} total rows before processing, got {} 
unprocessed + {} processed",
+        initial_unprocessed,
+        initial_processed
     );
 
     let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();

Reply via email to