This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 93443b179 fix(test): Fix Elasticsearch container readiness check to 
use HTTP health check (#3079)
93443b179 is described below

commit 93443b17988731066c4ad6b715f05cd58056cdab
Author: Krishna Vishal <[email protected]>
AuthorDate: Mon Apr 6 16:42:26 2026 +0530

    fix(test): Fix Elasticsearch container readiness check to use HTTP health 
check (#3079)
    
    Closes #3078
    
    ## Summary
    - Replace unreliable `WaitFor::message_on_stdout("started")` with
    `HttpWaitStrategy` `on /_cluster/health`, matching the pattern from
    #3077 (Iceberg fix)
    - Remove redundant fixture health check loops in both sink and source
    fixtures, since the container startup now guarantees ES is healthy
    - Reduce excessive sleep durations and poll attempts in source tests
---
 .../elasticsearch/elasticsearch_source.rs          |  6 ++---
 .../connectors/fixtures/elasticsearch/container.rs | 12 +++++----
 .../connectors/fixtures/elasticsearch/sink.rs      | 26 +++----------------
 .../connectors/fixtures/elasticsearch/source.rs    | 29 +++-------------------
 4 files changed, 18 insertions(+), 55 deletions(-)

diff --git 
a/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs 
b/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
index 5001bf0cc..2beaad75c 100644
--- a/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
+++ b/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
@@ -123,7 +123,7 @@ async fn elasticsearch_source_handles_empty_index(
     let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
     let consumer_id: Identifier = "test_consumer".try_into().unwrap();
 
-    sleep(Duration::from_millis(500)).await;
+    sleep(Duration::from_millis(100)).await;
 
     let polled = client
         .poll_messages(
@@ -164,7 +164,7 @@ async fn elasticsearch_source_produces_bulk_messages(
     let consumer_id: Identifier = "test_consumer".try_into().unwrap();
 
     let mut received: Vec<serde_json::Value> = Vec::new();
-    for _ in 0..POLL_ATTEMPTS * 2 {
+    for _ in 0..POLL_ATTEMPTS {
         if let Ok(polled) = client
             .poll_messages(
                 &stream_id,
@@ -270,7 +270,7 @@ async fn state_persists_across_connector_restart(
         .start_dependents()
         .await
         .expect("Failed to restart connectors");
-    sleep(Duration::from_millis(500)).await;
+    sleep(Duration::from_millis(100)).await;
 
     let mut received_after: Vec<serde_json::Value> = Vec::new();
     for _ in 0..POLL_ATTEMPTS {
diff --git 
a/core/integration/tests/connectors/fixtures/elasticsearch/container.rs 
b/core/integration/tests/connectors/fixtures/elasticsearch/container.rs
index 1117ac77b..3eacc3a0a 100644
--- a/core/integration/tests/connectors/fixtures/elasticsearch/container.rs
+++ b/core/integration/tests/connectors/fixtures/elasticsearch/container.rs
@@ -22,6 +22,7 @@ use reqwest_middleware::ClientWithMiddleware as HttpClient;
 use reqwest_retry::RetryTransientMiddleware;
 use reqwest_retry::policies::ExponentialBackoff;
 use serde::Deserialize;
+use testcontainers_modules::testcontainers::core::wait::HttpWaitStrategy;
 use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor};
 use testcontainers_modules::testcontainers::runners::AsyncRunner;
 use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage, 
ImageExt};
@@ -31,10 +32,7 @@ use uuid::Uuid;
 const ELASTICSEARCH_IMAGE: &str = "elasticsearch";
 const ELASTICSEARCH_TAG: &str = "9.3.0";
 const ELASTICSEARCH_PORT: u16 = 9200;
-const ELASTICSEARCH_READY_MSG: &str = "started";
-
-pub const HEALTH_CHECK_ATTEMPTS: usize = 30;
-pub const HEALTH_CHECK_INTERVAL_MS: u64 = 500;
+const ELASTICSEARCH_HEALTH_ENDPOINT: &str = "/_cluster/health";
 
 pub const DEFAULT_TEST_STREAM: &str = "test_stream";
 pub const DEFAULT_TEST_TOPIC: &str = "test_topic";
@@ -101,7 +99,11 @@ impl ElasticsearchContainer {
 
         let container = GenericImage::new(ELASTICSEARCH_IMAGE, 
ELASTICSEARCH_TAG)
             .with_exposed_port(ELASTICSEARCH_PORT.tcp())
-            .with_wait_for(WaitFor::message_on_stdout(ELASTICSEARCH_READY_MSG))
+            .with_wait_for(WaitFor::http(
+                HttpWaitStrategy::new(ELASTICSEARCH_HEALTH_ENDPOINT)
+                    .with_port(ELASTICSEARCH_PORT.tcp())
+                    .with_expected_status_code(200u16),
+            ))
             .with_network(unique_network)
             .with_env_var("discovery.type", "single-node")
             .with_env_var("xpack.security.enabled", "false")
diff --git a/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs 
b/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs
index 155250544..334dc72f1 100644
--- a/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs
+++ b/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs
@@ -21,8 +21,7 @@ use super::container::{
     DEFAULT_TEST_STREAM, DEFAULT_TEST_TOPIC, ENV_SINK_INDEX, ENV_SINK_PATH,
     ENV_SINK_STREAMS_0_CONSUMER_GROUP, ENV_SINK_STREAMS_0_SCHEMA, 
ENV_SINK_STREAMS_0_STREAM,
     ENV_SINK_STREAMS_0_TOPICS, ENV_SINK_URL, ElasticsearchContainer, 
ElasticsearchOps,
-    ElasticsearchSearchResponse, HEALTH_CHECK_ATTEMPTS, 
HEALTH_CHECK_INTERVAL_MS,
-    create_http_client,
+    ElasticsearchSearchResponse, create_http_client,
 };
 use async_trait::async_trait;
 use integration::harness::{TestBinaryError, TestFixture};
@@ -95,28 +94,11 @@ impl TestFixture for ElasticsearchSinkFixture {
         let container = ElasticsearchContainer::start().await?;
         let http_client = create_http_client();
 
-        let fixture = Self {
+        // Container startup already waits for /_cluster/health to return 200
+        // via HttpWaitStrategy, so no additional health check is needed.
+        Ok(Self {
             container,
             http_client,
-        };
-
-        for _ in 0..HEALTH_CHECK_ATTEMPTS {
-            let url = format!("{}/_cluster/health", 
fixture.container.base_url);
-            if let Ok(response) = fixture.http_client.get(&url).send().await
-                && response.status().is_success()
-            {
-                info!("Elasticsearch cluster is healthy");
-                return Ok(fixture);
-            }
-            sleep(Duration::from_millis(HEALTH_CHECK_INTERVAL_MS)).await;
-        }
-
-        Err(TestBinaryError::FixtureSetup {
-            fixture_type: "ElasticsearchSink".to_string(),
-            message: format!(
-                "Failed to confirm Elasticsearch cluster health after {} 
attempts",
-                HEALTH_CHECK_ATTEMPTS
-            ),
         })
     }
 
diff --git a/core/integration/tests/connectors/fixtures/elasticsearch/source.rs 
b/core/integration/tests/connectors/fixtures/elasticsearch/source.rs
index a70b9bd10..16bdaaeae 100644
--- a/core/integration/tests/connectors/fixtures/elasticsearch/source.rs
+++ b/core/integration/tests/connectors/fixtures/elasticsearch/source.rs
@@ -21,17 +21,13 @@ use super::container::{
     DEFAULT_TEST_STREAM, DEFAULT_TEST_TOPIC, ENV_SOURCE_BATCH_SIZE, 
ENV_SOURCE_INDEX,
     ENV_SOURCE_PATH, ENV_SOURCE_POLLING_INTERVAL, ENV_SOURCE_STREAMS_0_SCHEMA,
     ENV_SOURCE_STREAMS_0_STREAM, ENV_SOURCE_STREAMS_0_TOPIC, 
ENV_SOURCE_TIMESTAMP_FIELD,
-    ENV_SOURCE_URL, ElasticsearchContainer, ElasticsearchOps, 
HEALTH_CHECK_ATTEMPTS,
-    HEALTH_CHECK_INTERVAL_MS, create_http_client,
+    ENV_SOURCE_URL, ElasticsearchContainer, ElasticsearchOps, 
create_http_client,
 };
 use async_trait::async_trait;
 use iggy_common::IggyTimestamp;
 use integration::harness::{TestBinaryError, TestFixture};
 use reqwest_middleware::ClientWithMiddleware as HttpClient;
 use std::collections::HashMap;
-use std::time::Duration;
-use tokio::time::sleep;
-use tracing::info;
 
 const TEST_INDEX: &str = "test_documents";
 
@@ -102,28 +98,11 @@ impl TestFixture for ElasticsearchSourceFixture {
         let container = ElasticsearchContainer::start().await?;
         let http_client = create_http_client();
 
-        let fixture = Self {
+        // Container startup already waits for /_cluster/health to return 200
+        // via HttpWaitStrategy, so no additional health check is needed.
+        Ok(Self {
             container,
             http_client,
-        };
-
-        for _ in 0..HEALTH_CHECK_ATTEMPTS {
-            let url = format!("{}/_cluster/health", 
fixture.container.base_url);
-            if let Ok(response) = fixture.http_client.get(&url).send().await
-                && response.status().is_success()
-            {
-                info!("Elasticsearch cluster is healthy");
-                return Ok(fixture);
-            }
-            sleep(Duration::from_millis(HEALTH_CHECK_INTERVAL_MS)).await;
-        }
-
-        Err(TestBinaryError::FixtureSetup {
-            fixture_type: "ElasticsearchSource".to_string(),
-            message: format!(
-                "Failed to confirm Elasticsearch cluster health after {} 
attempts",
-                HEALTH_CHECK_ATTEMPTS
-            ),
         })
     }
 

Reply via email to