This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 93443b179 fix(test): Fix Elasticsearch container readiness check to
use HTTP health check (#3079)
93443b179 is described below
commit 93443b17988731066c4ad6b715f05cd58056cdab
Author: Krishna Vishal <[email protected]>
AuthorDate: Mon Apr 6 16:42:26 2026 +0530
fix(test): Fix Elasticsearch container readiness check to use HTTP health
check (#3079)
Closes #3078
## Summary
- Replace unreliable `WaitFor::message_on_stdout("started")` with
`HttpWaitStrategy` `on /_cluster/health`, matching the pattern from
#3077 (Iceberg fix)
- Remove redundant fixture health check loops in both sink and source
fixtures, since the container startup now guarantees ES is healthy
- Reduce excessive sleep durations and poll attempts in source tests
---
.../elasticsearch/elasticsearch_source.rs | 6 ++---
.../connectors/fixtures/elasticsearch/container.rs | 12 +++++----
.../connectors/fixtures/elasticsearch/sink.rs | 26 +++----------------
.../connectors/fixtures/elasticsearch/source.rs | 29 +++-------------------
4 files changed, 18 insertions(+), 55 deletions(-)
diff --git
a/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
b/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
index 5001bf0cc..2beaad75c 100644
--- a/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
+++ b/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
@@ -123,7 +123,7 @@ async fn elasticsearch_source_handles_empty_index(
let topic_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
let consumer_id: Identifier = "test_consumer".try_into().unwrap();
- sleep(Duration::from_millis(500)).await;
+ sleep(Duration::from_millis(100)).await;
let polled = client
.poll_messages(
@@ -164,7 +164,7 @@ async fn elasticsearch_source_produces_bulk_messages(
let consumer_id: Identifier = "test_consumer".try_into().unwrap();
let mut received: Vec<serde_json::Value> = Vec::new();
- for _ in 0..POLL_ATTEMPTS * 2 {
+ for _ in 0..POLL_ATTEMPTS {
if let Ok(polled) = client
.poll_messages(
&stream_id,
@@ -270,7 +270,7 @@ async fn state_persists_across_connector_restart(
.start_dependents()
.await
.expect("Failed to restart connectors");
- sleep(Duration::from_millis(500)).await;
+ sleep(Duration::from_millis(100)).await;
let mut received_after: Vec<serde_json::Value> = Vec::new();
for _ in 0..POLL_ATTEMPTS {
diff --git
a/core/integration/tests/connectors/fixtures/elasticsearch/container.rs
b/core/integration/tests/connectors/fixtures/elasticsearch/container.rs
index 1117ac77b..3eacc3a0a 100644
--- a/core/integration/tests/connectors/fixtures/elasticsearch/container.rs
+++ b/core/integration/tests/connectors/fixtures/elasticsearch/container.rs
@@ -22,6 +22,7 @@ use reqwest_middleware::ClientWithMiddleware as HttpClient;
use reqwest_retry::RetryTransientMiddleware;
use reqwest_retry::policies::ExponentialBackoff;
use serde::Deserialize;
+use testcontainers_modules::testcontainers::core::wait::HttpWaitStrategy;
use testcontainers_modules::testcontainers::core::{IntoContainerPort, WaitFor};
use testcontainers_modules::testcontainers::runners::AsyncRunner;
use testcontainers_modules::testcontainers::{ContainerAsync, GenericImage,
ImageExt};
@@ -31,10 +32,7 @@ use uuid::Uuid;
const ELASTICSEARCH_IMAGE: &str = "elasticsearch";
const ELASTICSEARCH_TAG: &str = "9.3.0";
const ELASTICSEARCH_PORT: u16 = 9200;
-const ELASTICSEARCH_READY_MSG: &str = "started";
-
-pub const HEALTH_CHECK_ATTEMPTS: usize = 30;
-pub const HEALTH_CHECK_INTERVAL_MS: u64 = 500;
+const ELASTICSEARCH_HEALTH_ENDPOINT: &str = "/_cluster/health";
pub const DEFAULT_TEST_STREAM: &str = "test_stream";
pub const DEFAULT_TEST_TOPIC: &str = "test_topic";
@@ -101,7 +99,11 @@ impl ElasticsearchContainer {
let container = GenericImage::new(ELASTICSEARCH_IMAGE,
ELASTICSEARCH_TAG)
.with_exposed_port(ELASTICSEARCH_PORT.tcp())
- .with_wait_for(WaitFor::message_on_stdout(ELASTICSEARCH_READY_MSG))
+ .with_wait_for(WaitFor::http(
+ HttpWaitStrategy::new(ELASTICSEARCH_HEALTH_ENDPOINT)
+ .with_port(ELASTICSEARCH_PORT.tcp())
+ .with_expected_status_code(200u16),
+ ))
.with_network(unique_network)
.with_env_var("discovery.type", "single-node")
.with_env_var("xpack.security.enabled", "false")
diff --git a/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs
b/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs
index 155250544..334dc72f1 100644
--- a/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs
+++ b/core/integration/tests/connectors/fixtures/elasticsearch/sink.rs
@@ -21,8 +21,7 @@ use super::container::{
DEFAULT_TEST_STREAM, DEFAULT_TEST_TOPIC, ENV_SINK_INDEX, ENV_SINK_PATH,
ENV_SINK_STREAMS_0_CONSUMER_GROUP, ENV_SINK_STREAMS_0_SCHEMA,
ENV_SINK_STREAMS_0_STREAM,
ENV_SINK_STREAMS_0_TOPICS, ENV_SINK_URL, ElasticsearchContainer,
ElasticsearchOps,
- ElasticsearchSearchResponse, HEALTH_CHECK_ATTEMPTS,
HEALTH_CHECK_INTERVAL_MS,
- create_http_client,
+ ElasticsearchSearchResponse, create_http_client,
};
use async_trait::async_trait;
use integration::harness::{TestBinaryError, TestFixture};
@@ -95,28 +94,11 @@ impl TestFixture for ElasticsearchSinkFixture {
let container = ElasticsearchContainer::start().await?;
let http_client = create_http_client();
- let fixture = Self {
+ // Container startup already waits for /_cluster/health to return 200
+ // via HttpWaitStrategy, so no additional health check is needed.
+ Ok(Self {
container,
http_client,
- };
-
- for _ in 0..HEALTH_CHECK_ATTEMPTS {
- let url = format!("{}/_cluster/health",
fixture.container.base_url);
- if let Ok(response) = fixture.http_client.get(&url).send().await
- && response.status().is_success()
- {
- info!("Elasticsearch cluster is healthy");
- return Ok(fixture);
- }
- sleep(Duration::from_millis(HEALTH_CHECK_INTERVAL_MS)).await;
- }
-
- Err(TestBinaryError::FixtureSetup {
- fixture_type: "ElasticsearchSink".to_string(),
- message: format!(
- "Failed to confirm Elasticsearch cluster health after {}
attempts",
- HEALTH_CHECK_ATTEMPTS
- ),
})
}
diff --git a/core/integration/tests/connectors/fixtures/elasticsearch/source.rs
b/core/integration/tests/connectors/fixtures/elasticsearch/source.rs
index a70b9bd10..16bdaaeae 100644
--- a/core/integration/tests/connectors/fixtures/elasticsearch/source.rs
+++ b/core/integration/tests/connectors/fixtures/elasticsearch/source.rs
@@ -21,17 +21,13 @@ use super::container::{
DEFAULT_TEST_STREAM, DEFAULT_TEST_TOPIC, ENV_SOURCE_BATCH_SIZE,
ENV_SOURCE_INDEX,
ENV_SOURCE_PATH, ENV_SOURCE_POLLING_INTERVAL, ENV_SOURCE_STREAMS_0_SCHEMA,
ENV_SOURCE_STREAMS_0_STREAM, ENV_SOURCE_STREAMS_0_TOPIC,
ENV_SOURCE_TIMESTAMP_FIELD,
- ENV_SOURCE_URL, ElasticsearchContainer, ElasticsearchOps,
HEALTH_CHECK_ATTEMPTS,
- HEALTH_CHECK_INTERVAL_MS, create_http_client,
+ ENV_SOURCE_URL, ElasticsearchContainer, ElasticsearchOps,
create_http_client,
};
use async_trait::async_trait;
use iggy_common::IggyTimestamp;
use integration::harness::{TestBinaryError, TestFixture};
use reqwest_middleware::ClientWithMiddleware as HttpClient;
use std::collections::HashMap;
-use std::time::Duration;
-use tokio::time::sleep;
-use tracing::info;
const TEST_INDEX: &str = "test_documents";
@@ -102,28 +98,11 @@ impl TestFixture for ElasticsearchSourceFixture {
let container = ElasticsearchContainer::start().await?;
let http_client = create_http_client();
- let fixture = Self {
+ // Container startup already waits for /_cluster/health to return 200
+ // via HttpWaitStrategy, so no additional health check is needed.
+ Ok(Self {
container,
http_client,
- };
-
- for _ in 0..HEALTH_CHECK_ATTEMPTS {
- let url = format!("{}/_cluster/health",
fixture.container.base_url);
- if let Ok(response) = fixture.http_client.get(&url).send().await
- && response.status().is_success()
- {
- info!("Elasticsearch cluster is healthy");
- return Ok(fixture);
- }
- sleep(Duration::from_millis(HEALTH_CHECK_INTERVAL_MS)).await;
- }
-
- Err(TestBinaryError::FixtureSetup {
- fixture_type: "ElasticsearchSource".to_string(),
- message: format!(
- "Failed to confirm Elasticsearch cluster health after {}
attempts",
- HEALTH_CHECK_ATTEMPTS
- ),
})
}