This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 051475a76 revert(test): remove shared_server, add nextest CI retries
(#3088)
051475a76 is described below
commit 051475a765e5631e12e9323a0f08046dbd5205fe
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Apr 7 16:47:56 2026 +0200
revert(test): remove shared_server, add nextest CI retries (#3088)
---
.config/nextest.toml | 15 +-
.github/actions/rust/pre-merge/action.yml | 2 +-
.github/workflows/coverage-baseline.yml | 2 +-
Cargo.lock | 1 -
core/harness_derive/src/attrs.rs | 23 ---
core/harness_derive/src/codegen.rs | 173 +-----------------
core/integration/Cargo.toml | 1 -
core/integration/src/harness/handle/mod.rs | 2 +-
core/integration/src/harness/mod.rs | 2 -
.../src/harness/orchestrator/builder.rs | 2 -
.../src/harness/orchestrator/harness.rs | 198 ++-------------------
core/integration/src/harness/seeds.rs | 71 +-------
core/integration/src/harness/shared.rs | 159 -----------------
core/integration/src/lib.rs | 3 +-
core/integration/tests/connectors/api/endpoints.rs | 24 +--
.../connectors/elasticsearch/elasticsearch_sink.rs | 9 +-
.../elasticsearch/elasticsearch_source.rs | 9 +-
.../integration/tests/connectors/http/http_sink.rs | 26 ++-
.../http_config_provider/direct_responses.rs | 30 ++--
.../http_config_provider/wrapped_responses.rs | 30 ++--
.../tests/connectors/iceberg/iceberg_sink.rs | 9 +-
.../tests/connectors/influxdb/influxdb_sink.rs | 18 +-
.../connectors/influxdb/influxdb_sink_formats.rs | 36 ++--
.../tests/connectors/influxdb/influxdb_source.rs | 12 +-
.../connectors/influxdb/influxdb_source_formats.rs | 27 +--
.../tests/connectors/mongodb/mongodb_sink.rs | 31 ++--
.../tests/connectors/postgres/postgres_sink.rs | 9 +-
.../tests/connectors/postgres/postgres_source.rs | 15 +-
.../tests/connectors/postgres/restart.rs | 6 +-
.../tests/connectors/quickwit/quickwit_sink.rs | 12 +-
.../tests/connectors/random/random_source.rs | 3 +-
.../tests/connectors/stdout/stdout_sink.rs | 9 +-
32 files changed, 141 insertions(+), 828 deletions(-)
diff --git a/.config/nextest.toml b/.config/nextest.toml
index 2dd295fb8..fb488b694 100644
--- a/.config/nextest.toml
+++ b/.config/nextest.toml
@@ -15,21 +15,14 @@
# specific language governing permissions and limitations
# under the License.
-[profile.default]
-slow-timeout = { period = "30s", terminate-after = 4 }
-
[[profile.default.overrides]]
# This is a solution (or actually a workaround) for the problem that nextest
does not support
# #[serial] macro which shall enforce sequential execution of the test case.
filter = 'package(integration) and
test(cli::system::test_cli_session_scenario::should_be_successful)'
threads-required = "num-cpus"
-# shared_server tests: the iggy_harness macro prefixes test names with
-# shared_server_ enabling a single generic nextest filter. Bounded to limit
-# resource contention from parallel fixture containers without fully
serializing.
-[test-groups.shared-server]
-max-threads = 8
+[profile.default]
+slow-timeout = { period = "30s", terminate-after = 4 }
-[[profile.default.overrides]]
-filter = 'package(integration) and test(/shared_server_/)'
-test-group = 'shared-server'
+[profile.ci]
+retries = 3
diff --git a/.github/actions/rust/pre-merge/action.yml
b/.github/actions/rust/pre-merge/action.yml
index d1d07138d..0daa91b5d 100644
--- a/.github/actions/rust/pre-merge/action.yml
+++ b/.github/actions/rust/pre-merge/action.yml
@@ -144,7 +144,7 @@ runs:
test_start=$(date +%s)
if command -v cargo-nextest &> /dev/null; then
- cargo nextest run --locked --no-fail-fast $PARTITION_FLAG
+ cargo nextest run --locked --no-fail-fast --profile ci
$PARTITION_FLAG
else
if [[ -n "$PARTITION_FLAG" ]]; then
echo "::error::cargo-nextest not found, falling back to cargo test
without partitioning (all tests will run on every partition)"
diff --git a/.github/workflows/coverage-baseline.yml
b/.github/workflows/coverage-baseline.yml
index 8fe98cde7..e6bc0bd14 100644
--- a/.github/workflows/coverage-baseline.yml
+++ b/.github/workflows/coverage-baseline.yml
@@ -78,7 +78,7 @@ jobs:
eval $(echo -n "test" | gnome-keyring-daemon --unlock
--components=secrets)
echo -n "warmup" | secret-tool store --label="ci-warmup" ci-test
warmup
- cargo test --locked --no-fail-fast
+ cargo nextest run --locked --no-fail-fast --profile ci
shell: bash
- name: Generate coverage report
diff --git a/Cargo.lock b/Cargo.lock
index 26c32044b..cb11aa7ff 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6151,7 +6151,6 @@ dependencies = [
"configs",
"configs_derive",
"ctor",
- "dashmap",
"figment",
"futures",
"harness_derive",
diff --git a/core/harness_derive/src/attrs.rs b/core/harness_derive/src/attrs.rs
index fa67e9239..2e4389faa 100644
--- a/core/harness_derive/src/attrs.rs
+++ b/core/harness_derive/src/attrs.rs
@@ -48,12 +48,6 @@ impl ClusterNodesValue {
}
}
-/// Shared server configuration.
-#[derive(Debug, Clone)]
-pub struct SharedServerAttrs {
- pub key: String,
-}
-
/// Parsed `#[iggy_harness(...)]` attributes.
#[derive(Debug, Default)]
pub struct IggyTestAttrs {
@@ -63,7 +57,6 @@ pub struct IggyTestAttrs {
pub server: ServerAttrs,
pub seed_fn: Option<syn::Path>,
pub cluster_nodes: ClusterNodesValue,
- pub shared_server: Option<SharedServerAttrs>,
}
/// MCP configuration attributes.
@@ -89,7 +82,6 @@ impl IggyTestAttrs {
server: ServerAttrs::default(),
seed_fn: None,
cluster_nodes: ClusterNodesValue::None,
- shared_server: None,
}
}
}
@@ -256,9 +248,6 @@ impl Parse for IggyTestAttrs {
AttrItem::ClusterNodes(cluster) => {
attrs.cluster_nodes = cluster;
}
- AttrItem::SharedServer(shared) => {
- attrs.shared_server = Some(shared);
- }
}
}
@@ -275,7 +264,6 @@ enum AttrItem {
Server(Box<ServerAttrs>),
Seed(syn::Path),
ClusterNodes(ClusterNodesValue),
- SharedServer(SharedServerAttrs),
}
impl Parse for AttrItem {
@@ -305,10 +293,6 @@ impl Parse for AttrItem {
let path: syn::Path = input.parse()?;
Ok(AttrItem::Seed(path))
}
- "shared_server" => {
- let shared = parse_shared_server_attrs(input)?;
- Ok(AttrItem::SharedServer(shared))
- }
_ => Err(syn::Error::new(
ident.span(),
format!("unknown attribute: {ident_str}"),
@@ -489,13 +473,6 @@ fn parse_mcp_attrs(input: ParseStream) ->
syn::Result<McpAttrs> {
Ok(mcp)
}
-/// Parse `shared_server = "key"`.
-fn parse_shared_server_attrs(input: ParseStream) ->
syn::Result<SharedServerAttrs> {
- input.parse::<Token![=]>()?;
- let lit: LitStr = input.parse()?;
- Ok(SharedServerAttrs { key: lit.value() })
-}
-
fn parse_connectors_runtime_attrs(input: ParseStream) ->
syn::Result<ConnectorsRuntimeAttrs> {
let mut attrs = ConnectorsRuntimeAttrs::default();
diff --git a/core/harness_derive/src/codegen.rs
b/core/harness_derive/src/codegen.rs
index ba3b2b679..2abd54368 100644
--- a/core/harness_derive/src/codegen.rs
+++ b/core/harness_derive/src/codegen.rs
@@ -138,12 +138,6 @@ pub fn generate_tests(attrs: &IggyTestAttrs, input:
&ItemFn) -> syn::Result<Toke
.collect();
let params = analyze_signature(&input.sig)?;
-
- // Validate shared_server constraints
- if attrs.shared_server.is_some() {
- validate_shared_server(attrs, ¶ms, fn_name)?;
- }
-
let matrix_params_list = matrix_params(¶ms);
let has_matrix_params = !matrix_params_list.is_empty();
@@ -178,52 +172,6 @@ pub fn generate_tests(attrs: &IggyTestAttrs, input:
&ItemFn) -> syn::Result<Toke
)
}
-fn validate_shared_server(
- attrs: &IggyTestAttrs,
- params: &[DetectedParam],
- fn_name: &Ident,
-) -> syn::Result<()> {
- let has_mut_harness = params
- .iter()
- .any(|p| matches!(p, DetectedParam::HarnessMut { .. }));
- if has_mut_harness {
- return Err(syn::Error::new(
- fn_name.span(),
- "shared_server is incompatible with &mut TestHarness",
- ));
- }
-
- if !attrs.server.config_overrides.is_empty() {
- return Err(syn::Error::new(
- fn_name.span(),
- "shared_server is incompatible with server config overrides",
- ));
- }
-
- if !matches!(attrs.cluster_nodes, crate::attrs::ClusterNodesValue::None) {
- return Err(syn::Error::new(
- fn_name.span(),
- "shared_server is incompatible with cluster_nodes",
- ));
- }
-
- if attrs.server.tls.is_some() || attrs.server.websocket_tls.is_some() {
- return Err(syn::Error::new(
- fn_name.span(),
- "shared_server is incompatible with TLS configuration",
- ));
- }
-
- if attrs.transports.iter().any(|t| t.tls_mode().is_some()) {
- return Err(syn::Error::new(
- fn_name.span(),
- "shared_server is incompatible with TLS transports",
- ));
- }
-
- Ok(())
-}
-
fn generate_single_test(
fn_name: &Ident,
fn_vis: &syn::Visibility,
@@ -237,33 +185,10 @@ fn generate_single_test(
let fixture_setup = generate_fixture_setup(params);
let fixture_envs = generate_fixture_envs_collection(params);
let fixture_param_bindings = generate_fixture_param_bindings(params);
- let fixture_seed = generate_fixture_seed(params);
- let harness_param_bindings = generate_harness_param_bindings(params);
-
- if let Some(ref shared) = attrs.shared_server {
- let shared_harness_setup = generate_shared_harness_setup(variant,
has_fixtures, attrs);
- let shared_start_and_seed = generate_shared_start_and_seed(attrs,
fixture_seed);
- let key_ident = Ident::new(&shared.key, Span::call_site());
- let shared_fn_name = format_ident!("shared_server_{}", fn_name);
-
- return Ok(quote! {
- #(#other_attrs)*
- #[::tokio::test]
- #[::serial_test::serial(#key_ident)]
- #fn_vis async fn #shared_fn_name() {
- #fixture_setup
- #fixture_envs
- #fixture_param_bindings
- #shared_harness_setup
- #shared_start_and_seed
- #harness_param_bindings
- #fn_body
- }
- });
- }
-
let harness_setup = generate_harness_setup(variant, has_fixtures, attrs);
+ let fixture_seed = generate_fixture_seed(params);
let start_and_seed = generate_start_and_seed(attrs, fixture_seed);
+ let harness_param_bindings = generate_harness_param_bindings(params);
Ok(quote! {
#(#other_attrs)*
@@ -620,99 +545,6 @@ fn generate_start_and_seed(attrs: &IggyTestAttrs,
fixture_seed: TokenStream) ->
}
}
-// ============================================================================
-// Shared server code generation helpers
-// ============================================================================
-
-fn generate_shared_harness_setup(
- variant: &TestVariant,
- has_fixtures: bool,
- attrs: &IggyTestAttrs,
-) -> TokenStream {
- let shared = attrs
- .shared_server
- .as_ref()
- .expect("shared_server required");
- let key = &shared.key;
- let transport = variant.transport.variant_ident();
- let client_config_method =
- Ident::new(variant.transport.client_config_method(),
Span::call_site());
-
- let cr_config = if let Some(ref cr) = attrs.server.connectors_runtime {
- let config_path = cr.config_path.as_deref().unwrap_or("");
- if has_fixtures {
- quote! {
-
Some(::integration::__macro_support::ConnectorsRuntimeConfig::builder()
- .config_path(::std::path::PathBuf::from(#config_path))
- .extra_envs(__fixture_envs.clone())
- .build())
- }
- } else {
- quote! {
-
Some(::integration::__macro_support::ConnectorsRuntimeConfig::builder()
- .config_path(::std::path::PathBuf::from(#config_path))
- .build())
- }
- }
- } else {
- quote! { None }
- };
-
- quote! {
- let __shared =
::integration::__macro_support::SharedServerRegistry::get_or_start(
- #key,
- ::integration::__macro_support::TestServerConfig::default(),
- ).await.unwrap_or_else(|e| panic!("failed to get shared server: {e}"));
-
- let mut __harness =
::integration::__macro_support::TestHarness::from_shared(
- __shared,
- #cr_config,
-
Some(::integration::__macro_support::ClientConfig::#client_config_method()),
- ).await.unwrap_or_else(|e| panic!("failed to build shared harness:
{e}"));
- let _ = ::integration::__macro_support::TransportProtocol::#transport;
- }
-}
-
-fn generate_shared_start_and_seed(attrs: &IggyTestAttrs, fixture_seed:
TokenStream) -> TokenStream {
- let has_fixture_seed = !fixture_seed.is_empty();
-
- match (&attrs.seed_fn, has_fixture_seed) {
- (Some(seed_fn), true) => {
- quote! {
- __harness.start_shared_with_seed(|__seed_client| async move {
- #seed_fn(&__seed_client).await?;
- #fixture_seed
- Ok(())
- }).await.unwrap_or_else(|e| panic!("failed to start shared
harness: {e}"));
- }
- }
- (Some(seed_fn), false) => {
- quote! {
- __harness.start_shared_with_seed(|__seed_client| async move {
- #seed_fn(&__seed_client).await
- }).await.unwrap_or_else(|e| panic!("failed to start shared
harness: {e}"));
- }
- }
- (None, true) => {
- quote! {
- __harness.start_shared_with_seed(|__seed_client| async move {
- #fixture_seed
- Ok(())
- }).await.unwrap_or_else(|e| panic!("failed to start shared
harness: {e}"));
- }
- }
- (None, false) => {
- quote! {
- __harness.start_shared().await.unwrap_or_else(|e|
panic!("failed to start shared harness: {e}"));
- }
- }
- }
-}
-
-// ============================================================================
-// Standard (non-shared) code generation helpers
-// ============================================================================
-
fn fixture_var_ident(name: &syn::Ident) -> syn::Ident {
let name_str = name.to_string();
let clean_name = name_str.trim_start_matches('_');
@@ -963,7 +795,6 @@ mod tests {
},
seed_fn: None,
cluster_nodes: crate::attrs::ClusterNodesValue::None,
- shared_server: None,
};
let variants = generate_variants(&attrs);
// 2 transports * 2 segment sizes * 2 cache modes = 8 variants
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index 513dc92d9..c8830b19f 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -36,7 +36,6 @@ compio = { workspace = true }
configs = { workspace = true }
configs_derive = { workspace = true }
ctor = { workspace = true }
-dashmap = { workspace = true }
figment = { workspace = true }
futures = { workspace = true }
harness_derive = { workspace = true }
diff --git a/core/integration/src/harness/handle/mod.rs
b/core/integration/src/harness/handle/mod.rs
index a31ad7535..da746d175 100644
--- a/core/integration/src/harness/handle/mod.rs
+++ b/core/integration/src/harness/handle/mod.rs
@@ -25,7 +25,7 @@ mod mcp;
mod server;
pub use client::ClientHandle;
-pub use client_builder::{ClientBuilder, ServerConnection};
+pub use client_builder::ClientBuilder;
pub use connectors_runtime::ConnectorsRuntimeHandle;
pub use mcp::{McpClient, McpHandle};
pub use server::{ServerHandle, ServerLogs};
diff --git a/core/integration/src/harness/mod.rs
b/core/integration/src/harness/mod.rs
index c8d3757ae..13793ab05 100644
--- a/core/integration/src/harness/mod.rs
+++ b/core/integration/src/harness/mod.rs
@@ -51,7 +51,6 @@ mod helpers;
mod orchestrator;
mod port_reserver;
pub mod seeds;
-mod shared;
mod traits;
pub use config::{
@@ -72,4 +71,3 @@ pub use helpers::{
};
pub use fixture::TestFixture;
-pub use shared::{SharedServerInfo, SharedServerRegistry};
diff --git a/core/integration/src/harness/orchestrator/builder.rs
b/core/integration/src/harness/orchestrator/builder.rs
index 668d9a193..46afddd3b 100644
--- a/core/integration/src/harness/orchestrator/builder.rs
+++ b/core/integration/src/harness/orchestrator/builder.rs
@@ -226,8 +226,6 @@ impl TestHarnessBuilder {
primary_transport,
primary_client_config: self.primary_client_config,
started: false,
- shared_server: None,
- shared_connectors_runtime: None,
})
}
}
diff --git a/core/integration/src/harness/orchestrator/harness.rs
b/core/integration/src/harness/orchestrator/harness.rs
index ca99731c2..6897b31b4 100644
--- a/core/integration/src/harness/orchestrator/harness.rs
+++ b/core/integration/src/harness/orchestrator/harness.rs
@@ -18,15 +18,14 @@
*/
use super::builder::TestHarnessBuilder;
-use crate::harness::config::{ClientConfig, ConnectorsRuntimeConfig};
+use crate::harness::config::ClientConfig;
use crate::harness::context::TestContext;
use crate::harness::error::TestBinaryError;
use crate::harness::handle::{
- ClientBuilder, ClientHandle, ConnectorsRuntimeHandle, McpClient,
McpHandle, ServerConnection,
- ServerHandle, ServerLogs,
+ ClientBuilder, ClientHandle, ConnectorsRuntimeHandle, McpClient,
McpHandle, ServerHandle,
+ ServerLogs,
};
-use crate::harness::shared::SharedServerInfo;
-use crate::harness::traits::{IggyServerDependent, Restartable, TestBinary};
+use crate::harness::traits::{Restartable, TestBinary};
use futures::executor::block_on;
use iggy::prelude::{ClientWrapper, IggyClient};
use iggy_common::TransportProtocol;
@@ -48,8 +47,6 @@ pub struct TestHarness {
pub(super) primary_transport: Option<TransportProtocol>,
pub(super) primary_client_config: Option<ClientConfig>,
pub(super) started: bool,
- pub(super) shared_server: Option<Arc<SharedServerInfo>>,
- pub(super) shared_connectors_runtime: Option<ConnectorsRuntimeHandle>,
}
impl std::fmt::Debug for TestHarness {
@@ -75,39 +72,6 @@ impl TestHarness {
TestHarnessBuilder::default()
}
- /// Create a harness that uses a shared server (no ownership of server
process).
- ///
- /// Each test still gets its own `TestContext` (for logs), connector
runtime,
- /// and clients. Only the iggy-server process is shared.
- pub async fn from_shared(
- shared: Arc<SharedServerInfo>,
- connectors_runtime_config: Option<ConnectorsRuntimeConfig>,
- primary_client_config: Option<ClientConfig>,
- ) -> Result<Self, TestBinaryError> {
- let mut context = TestContext::new(None, true)?;
- context.ensure_created()?;
- let context = Arc::new(context);
-
- let shared_cr = connectors_runtime_config
- .map(|cfg| ConnectorsRuntimeHandle::with_server_id(cfg,
context.clone(), 0));
-
- let primary_transport = primary_client_config.as_ref().map(|c|
c.transport);
-
- shared.acquire();
-
- Ok(TestHarness {
- context,
- servers: Vec::new(),
- clients: Vec::new(),
- client_configs: Vec::new(),
- primary_transport,
- primary_client_config,
- started: false,
- shared_server: Some(shared),
- shared_connectors_runtime: shared_cr,
- })
- }
-
/// Start all configured binaries and create clients.
pub async fn start(&mut self) -> Result<(), TestBinaryError> {
self.start_internal(
@@ -133,76 +97,6 @@ impl TestHarness {
self.start_internal(Some(seed)).await
}
- /// Start a shared-server harness without a seed function.
- pub async fn start_shared(&mut self) -> Result<(), TestBinaryError> {
- self.start_shared_internal(
- None::<
- fn(
- IggyClient,
- )
- -> std::future::Ready<Result<(), Box<dyn std::error::Error
+ Send + Sync>>>,
- >,
- )
- .await
- }
-
- /// Start a shared-server harness with a seed function.
- ///
- /// The seed runs BEFORE the connector runtime starts, allowing it to
- /// create streams/topics that the connector expects to find.
- pub async fn start_shared_with_seed<F, Fut>(&mut self, seed: F) ->
Result<(), TestBinaryError>
- where
- F: FnOnce(IggyClient) -> Fut,
- Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error
+ Send + Sync>>>,
- {
- self.start_shared_internal(Some(seed)).await
- }
-
- async fn start_shared_internal<F, Fut>(
- &mut self,
- seed: Option<F>,
- ) -> Result<(), TestBinaryError>
- where
- F: FnOnce(IggyClient) -> Fut,
- Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error
+ Send + Sync>>>,
- {
- if self.started {
- return Err(TestBinaryError::AlreadyStarted);
- }
-
- let shared = self
- .shared_server
- .as_ref()
- .ok_or_else(|| TestBinaryError::InvalidState {
- message: "start_shared called without shared
server".to_string(),
- })?;
-
- let tcp_addr = shared
- .tcp_addr()
- .ok_or_else(|| TestBinaryError::InvalidState {
- message: "Shared server has no TCP address".to_string(),
- })?;
-
- // Seed runs before connector runtime - creates streams/topics
- if let Some(seed_fn) = seed {
- let client = self.tcp_root_client().await?;
- seed_fn(client)
- .await
- .map_err(|e| TestBinaryError::SeedFailed(e.to_string()))?;
- }
-
- // Connector runtime starts after seed - streams/topics exist now
- if let Some(ref mut cr) = self.shared_connectors_runtime {
- cr.set_iggy_address(tcp_addr);
- cr.start()?;
- cr.wait_ready().await?;
- }
-
- self.create_clients().await?;
- self.started = true;
- Ok(())
- }
-
async fn start_internal<F, Fut>(&mut self, seed: Option<F>) -> Result<(),
TestBinaryError>
where
F: FnOnce(IggyClient) -> Fut,
@@ -244,20 +138,6 @@ impl TestHarness {
}
self.clients.clear();
- // Stop per-test connector runtime (shared server mode)
- if let Some(ref mut cr) = self.shared_connectors_runtime {
- cr.stop()?;
- }
-
- // Release shared server ref - last test stops the server and cleans up
- if let Some(shared) = self.shared_server.take() {
- if std::thread::panicking() {
- shared.mark_failed();
- }
- shared.release();
- }
-
- // Stop owned servers (non-shared mode only)
for server in self.servers.iter_mut().rev() {
server.stop_dependents()?;
server.stop()?;
@@ -288,24 +168,12 @@ impl TestHarness {
}
/// Get reference to the first (primary) server handle.
- ///
- /// Not available in `shared_server` mode - the server is managed by
`SharedServerRegistry`.
pub fn server(&self) -> &ServerHandle {
- assert!(
- self.shared_server.is_none(),
- "server() is not available in shared_server mode"
- );
self.servers.first().expect("No servers configured")
}
/// Get mutable reference to the first (primary) server handle.
- ///
- /// Not available in `shared_server` mode - the server is managed by
`SharedServerRegistry`.
pub fn server_mut(&mut self) -> &mut ServerHandle {
- assert!(
- self.shared_server.is_none(),
- "server_mut() is not available in shared_server mode"
- );
self.servers.first_mut().expect("No servers configured")
}
@@ -382,15 +250,9 @@ impl TestHarness {
/// Get the connectors runtime handle from the primary server if
configured.
///
- /// Returns the per-test connector runtime for shared-server harnesses,
- /// or the server-owned one for standard harnesses.
- ///
/// # Panics
/// Panics if called on a cluster (multiple servers). Use
`node(i).connectors_runtime()` instead.
pub fn connectors_runtime(&self) -> Option<&ConnectorsRuntimeHandle> {
- if self.shared_connectors_runtime.is_some() {
- return self.shared_connectors_runtime.as_ref();
- }
assert!(
self.servers.len() <= 1,
"connectors_runtime() is only available for single-server setups.
Use node(i).connectors_runtime() for clusters."
@@ -425,18 +287,6 @@ impl TestHarness {
&self,
transport: TransportProtocol,
) -> Result<ClientBuilder, TestBinaryError> {
- if let Some(ref shared) = self.shared_server {
- let connection = ServerConnection {
- tcp_addr: shared.tcp_addr(),
- http_addr: shared.http_addr(),
- quic_addr: shared.quic_addr(),
- websocket_addr: shared.websocket_addr(),
- tls: None,
- websocket_tls: None,
- tls_ca_cert_path: shared.tls_ca_cert_path().cloned(),
- };
- return Ok(ClientBuilder::new(transport, connection));
- }
let server =
self.servers.first().ok_or(TestBinaryError::MissingServer)?;
match transport {
TransportProtocol::Tcp => server.tcp_client(),
@@ -464,7 +314,14 @@ impl TestHarness {
&self,
transport: TransportProtocol,
) -> Result<IggyClient, TestBinaryError> {
- self.client_builder_for(transport)?.connect().await
+ let server =
self.servers.first().ok_or(TestBinaryError::MissingServer)?;
+ let builder = match transport {
+ TransportProtocol::Tcp => server.tcp_client()?,
+ TransportProtocol::Http => server.http_client()?,
+ TransportProtocol::Quic => server.quic_client()?,
+ TransportProtocol::WebSocket => server.websocket_client()?,
+ };
+ builder.connect().await
}
pub async fn tcp_root_client(&self) -> Result<IggyClient, TestBinaryError>
{
@@ -564,33 +421,16 @@ impl TestHarness {
}
pub(super) async fn create_clients(&mut self) -> Result<(),
TestBinaryError> {
- // Resolve addresses from either owned server or shared server
- let (tcp, http, quic, ws, ca_cert) = if let Some(ref shared) =
self.shared_server {
- (
- shared.tcp_addr(),
- shared.http_addr(),
- shared.quic_addr(),
- shared.websocket_addr(),
- shared.tls_ca_cert_path().cloned(),
- )
- } else if let Some(server) = self.servers.first() {
- (
- server.tcp_addr(),
- server.http_addr(),
- server.quic_addr(),
- server.websocket_addr(),
- server.tls_ca_cert_path(),
- )
- } else {
+ let Some(server) = self.servers.first() else {
return Ok(());
};
for config in &self.client_configs {
let address = match config.transport {
- TransportProtocol::Tcp => tcp,
- TransportProtocol::Http => http,
- TransportProtocol::Quic => quic,
- TransportProtocol::WebSocket => ws,
+ TransportProtocol::Tcp => server.tcp_addr(),
+ TransportProtocol::Http => server.http_addr(),
+ TransportProtocol::Quic => server.quic_addr(),
+ TransportProtocol::WebSocket => server.websocket_addr(),
};
let Some(address) = address else {
@@ -601,9 +441,9 @@ impl TestHarness {
let mut config = config.clone();
if config.tls_enabled
- && let Some(ref path) = ca_cert
+ && let Some(ca_cert_path) = server.tls_ca_cert_path()
{
- config.tls_ca_file = Some(path.clone());
+ config.tls_ca_file = Some(ca_cert_path);
}
let mut client = ClientHandle::new(config, address);
diff --git a/core/integration/src/harness/seeds.rs
b/core/integration/src/harness/seeds.rs
index c29ec25cf..aa5554c70 100644
--- a/core/integration/src/harness/seeds.rs
+++ b/core/integration/src/harness/seeds.rs
@@ -19,7 +19,7 @@
use iggy::prelude::{IggyClient, StreamClient, TopicClient};
use iggy_common::{
- CompressionAlgorithm, Consumer, Identifier, IggyError, IggyExpiry,
IggyMessage, MaxTopicSize,
+ CompressionAlgorithm, Consumer, Identifier, IggyExpiry, IggyMessage,
MaxTopicSize,
Partitioning, PersonalAccessTokenExpiry, UserStatus,
};
use iggy_common::{
@@ -123,75 +123,6 @@ pub async fn connector_multi_topic_stream(client:
&IggyClient) -> Result<(), See
Ok(())
}
-/// Idempotent seed for serial shared_server tests: creates stream and topic,
-/// silently skipping if they already exist from a previous test in the group.
-pub async fn connector_stream_idempotent(client: &IggyClient) -> Result<(),
SeedError> {
- let stream_id: Identifier = names::STREAM.try_into()?;
-
- match client.create_stream(names::STREAM).await {
- Ok(_) => {}
- Err(e) if e.as_code() ==
IggyError::StreamNameAlreadyExists(String::new()).as_code() => {}
- Err(e) => return Err(e.into()),
- }
-
- match client
- .create_topic(
- &stream_id,
- names::TOPIC,
- 1,
- CompressionAlgorithm::None,
- None,
- IggyExpiry::ServerDefault,
- MaxTopicSize::ServerDefault,
- )
- .await
- {
- Ok(_) => {}
- Err(e)
- if e.as_code()
- == IggyError::TopicNameAlreadyExists(String::new(),
Identifier::default())
- .as_code() => {}
- Err(e) => return Err(e.into()),
- }
-
- Ok(())
-}
-
-/// Idempotent multi-topic seed for serial shared_server tests.
-pub async fn connector_multi_topic_stream_idempotent(client: &IggyClient) ->
Result<(), SeedError> {
- let stream_id: Identifier = names::STREAM.try_into()?;
-
- match client.create_stream(names::STREAM).await {
- Ok(_) => {}
- Err(e) if e.as_code() ==
IggyError::StreamNameAlreadyExists(String::new()).as_code() => {}
- Err(e) => return Err(e.into()),
- }
-
- for topic_name in [names::TOPIC, names::TOPIC_2] {
- match client
- .create_topic(
- &stream_id,
- topic_name,
- 1,
- CompressionAlgorithm::None,
- None,
- IggyExpiry::ServerDefault,
- MaxTopicSize::ServerDefault,
- )
- .await
- {
- Ok(_) => {}
- Err(e)
- if e.as_code()
- == IggyError::TopicNameAlreadyExists(String::new(),
Identifier::default())
- .as_code() => {}
- Err(e) => return Err(e.into()),
- }
- }
-
- Ok(())
-}
-
/// Standard MCP test data: stream, topic, message, consumer group, consumer
offset, user, PAT.
pub async fn mcp_standard(client: &IggyClient) -> Result<(), SeedError> {
let stream_id: Identifier = names::STREAM.try_into()?;
diff --git a/core/integration/src/harness/shared.rs
b/core/integration/src/harness/shared.rs
deleted file mode 100644
index 82f717ea0..000000000
--- a/core/integration/src/harness/shared.rs
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::harness::config::TestServerConfig;
-use crate::harness::context::TestContext;
-use crate::harness::error::TestBinaryError;
-use crate::harness::handle::ServerHandle;
-use crate::harness::traits::TestBinary;
-use dashmap::DashMap;
-use std::net::SocketAddr;
-use std::path::PathBuf;
-use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
-use std::sync::{Arc, LazyLock};
-use tokio::sync::OnceCell;
-
-/// Shared server state: addresses, handles, and active user count.
-///
-/// Rust statics are not dropped at process exit, so we use an atomic counter
-/// to track active tests. The last test to finish stops the server and
-/// cleans up the test directory.
-pub struct SharedServerInfo {
- tcp_addr: Option<SocketAddr>,
- http_addr: Option<SocketAddr>,
- quic_addr: Option<SocketAddr>,
- websocket_addr: Option<SocketAddr>,
- tls_ca_cert_path: Option<PathBuf>,
- handle: std::sync::Mutex<Option<ServerHandle>>,
- context: Arc<TestContext>,
- active_count: AtomicUsize,
- any_test_failed: AtomicBool,
-}
-
-impl SharedServerInfo {
- pub fn tcp_addr(&self) -> Option<SocketAddr> {
- self.tcp_addr
- }
-
- pub fn http_addr(&self) -> Option<SocketAddr> {
- self.http_addr
- }
-
- pub fn quic_addr(&self) -> Option<SocketAddr> {
- self.quic_addr
- }
-
- pub fn websocket_addr(&self) -> Option<SocketAddr> {
- self.websocket_addr
- }
-
- pub fn tls_ca_cert_path(&self) -> Option<&PathBuf> {
- self.tls_ca_cert_path.as_ref()
- }
-
- /// Increment the active test count. Called when a test acquires this
server.
- pub fn acquire(&self) {
- self.active_count.fetch_add(1, Ordering::AcqRel);
- }
-
- /// Mark that a test using this server has failed.
- /// Preserves the test directory for debugging.
- pub fn mark_failed(&self) {
- self.any_test_failed.store(true, Ordering::Release);
- }
-
- /// Decrement the active test count. When it reaches zero, stops the server
- /// and cleans up the test directory (unless any test failed).
- pub fn release(&self) {
- let prev = self.active_count.fetch_sub(1, Ordering::AcqRel);
- if prev == 1 {
- match self.handle.lock() {
- Ok(mut guard) => {
- if let Some(ref mut server) = *guard
- && let Err(e) = server.stop()
- {
- eprintln!("[SharedServer] failed to stop server: {e}");
- }
- *guard = None;
- }
- Err(e) => {
- eprintln!("[SharedServer] mutex poisoned, server process
may leak: {e}");
- }
- }
- if !self.any_test_failed.load(Ordering::Acquire) {
- self.context.cleanup();
- }
- }
- }
-}
-
-static REGISTRY: LazyLock<DashMap<String,
Arc<OnceCell<Arc<SharedServerInfo>>>>> =
- LazyLock::new(DashMap::new);
-
-/// Global registry for shared server instances.
-///
-/// Tests with the same `shared_server` key share a single iggy-server process.
-/// The first test to request a key starts the server; all subsequent tests
-/// get a reference to the already-running server. The last test to finish
-/// stops the server and cleans up the test directory.
-pub struct SharedServerRegistry;
-
-impl SharedServerRegistry {
- /// Get or start a shared server for the given key.
- ///
- /// Thread-safe: concurrent callers for the same key will block until the
- /// first caller finishes initialization, then all receive the same `Arc`.
- ///
- /// Callers must call `SharedServerInfo::acquire()` after obtaining the
- /// reference and `SharedServerInfo::release()` when done (typically in
- /// `TestHarness::stop()`).
- pub async fn get_or_start(
- key: &str,
- config: TestServerConfig,
- ) -> Result<Arc<SharedServerInfo>, TestBinaryError> {
- let cell = REGISTRY
- .entry(key.to_string())
- .or_insert_with(|| Arc::new(OnceCell::new()))
- .value()
- .clone();
-
- cell.get_or_try_init(|| async move {
- let mut context =
TestContext::new(Some(format!("shared_server_{key}")), true)?;
- context.ensure_created()?;
- let context = Arc::new(context);
-
- let mut server = ServerHandle::with_config(config,
context.clone());
- server.start()?;
-
- Ok(Arc::new(SharedServerInfo {
- tcp_addr: server.tcp_addr(),
- http_addr: server.http_addr(),
- quic_addr: server.quic_addr(),
- websocket_addr: server.websocket_addr(),
- tls_ca_cert_path: server.tls_ca_cert_path(),
- handle: std::sync::Mutex::new(Some(server)),
- context,
- active_count: AtomicUsize::new(0),
- any_test_failed: AtomicBool::new(false),
- }))
- })
- .await
- .cloned()
- }
-}
diff --git a/core/integration/src/lib.rs b/core/integration/src/lib.rs
index 41ec34555..fc03d02d7 100644
--- a/core/integration/src/lib.rs
+++ b/core/integration/src/lib.rs
@@ -24,8 +24,7 @@ pub use harness_derive::iggy_harness;
#[doc(hidden)]
pub mod __macro_support {
pub use crate::harness::{
- ClientConfig, ConnectorsRuntimeConfig, McpClient, McpConfig,
SharedServerRegistry,
- TestHarness, TestServerConfig, TlsConfig,
+ ClientConfig, McpClient, McpConfig, TestHarness, TestServerConfig,
TlsConfig,
};
pub use iggy::prelude::ClientWrapper;
pub use iggy_common::TransportProtocol;
diff --git a/core/integration/tests/connectors/api/endpoints.rs
b/core/integration/tests/connectors/api/endpoints.rs
index 5fe987a29..69f6cdbfb 100644
--- a/core/integration/tests/connectors/api/endpoints.rs
+++ b/core/integration/tests/connectors/api/endpoints.rs
@@ -27,9 +27,8 @@ use reqwest::Client;
const API_KEY: &str = "test-api-key";
#[iggy_harness(
- shared_server = "connector_api",
server(connectors_runtime(config_path =
"tests/connectors/api/config.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn root_endpoint_returns_welcome_message(harness: &TestHarness) {
let api_address = harness
@@ -50,9 +49,8 @@ async fn root_endpoint_returns_welcome_message(harness:
&TestHarness) {
}
#[iggy_harness(
- shared_server = "connector_api",
server(connectors_runtime(config_path =
"tests/connectors/api/config.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn health_endpoint_returns_healthy(harness: &TestHarness) {
let api_address = harness
@@ -73,9 +71,8 @@ async fn health_endpoint_returns_healthy(harness:
&TestHarness) {
}
#[iggy_harness(
- shared_server = "connector_api",
server(connectors_runtime(config_path =
"tests/connectors/api/config.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn stats_endpoint_returns_runtime_stats(harness: &TestHarness) {
let api_address = harness
@@ -103,9 +100,8 @@ async fn stats_endpoint_returns_runtime_stats(harness:
&TestHarness) {
}
#[iggy_harness(
- shared_server = "connector_api",
server(connectors_runtime(config_path =
"tests/connectors/api/config.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn metrics_endpoint_returns_prometheus_format(harness: &TestHarness) {
let api_address = harness
@@ -131,9 +127,8 @@ async fn
metrics_endpoint_returns_prometheus_format(harness: &TestHarness) {
}
#[iggy_harness(
- shared_server = "connector_api",
server(connectors_runtime(config_path =
"tests/connectors/api/config.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn sources_endpoint_returns_list(harness: &TestHarness) {
let api_address = harness
@@ -155,9 +150,8 @@ async fn sources_endpoint_returns_list(harness:
&TestHarness) {
}
#[iggy_harness(
- shared_server = "connector_api",
server(connectors_runtime(config_path =
"tests/connectors/api/config.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn sinks_endpoint_returns_list(harness: &TestHarness) {
let api_address = harness
@@ -179,9 +173,8 @@ async fn sinks_endpoint_returns_list(harness: &TestHarness)
{
}
#[iggy_harness(
- shared_server = "connector_api",
server(connectors_runtime(config_path =
"tests/connectors/api/config.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn api_key_authentication_required(harness: &TestHarness) {
let api_address = harness
@@ -224,9 +217,8 @@ async fn api_key_authentication_required(harness:
&TestHarness) {
}
#[iggy_harness(
- shared_server = "connector_api",
server(connectors_runtime(config_path =
"tests/connectors/api/config.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn api_key_authentication_rejected_with_invalid_key(harness:
&TestHarness) {
let api_address = harness
diff --git
a/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs
b/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs
index 13190f9b5..ad6ea1c86 100644
--- a/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs
+++ b/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs
@@ -28,9 +28,8 @@ use integration::harness::seeds;
use integration::iggy_harness;
#[iggy_harness(
- shared_server = "elasticsearch_sink",
server(connectors_runtime(config_path =
"tests/connectors/elasticsearch/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn elasticsearch_sink_stores_json_messages(
harness: &TestHarness,
@@ -87,9 +86,8 @@ async fn elasticsearch_sink_stores_json_messages(
}
#[iggy_harness(
- shared_server = "elasticsearch_sink",
server(connectors_runtime(config_path =
"tests/connectors/elasticsearch/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn elasticsearch_sink_handles_bulk_messages(
harness: &TestHarness,
@@ -142,9 +140,8 @@ async fn elasticsearch_sink_handles_bulk_messages(
}
#[iggy_harness(
- shared_server = "elasticsearch_sink",
server(connectors_runtime(config_path =
"tests/connectors/elasticsearch/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn elasticsearch_sink_preserves_json_structure(
harness: &TestHarness,
diff --git
a/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
b/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
index f945f4889..2beaad75c 100644
--- a/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
+++ b/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
@@ -27,9 +27,8 @@ use std::time::Duration;
use tokio::time::sleep;
#[iggy_harness(
- shared_server = "elasticsearch_source",
server(connectors_runtime(config_path =
"tests/connectors/elasticsearch/source.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn elasticsearch_source_produces_messages_to_iggy(
harness: &TestHarness,
@@ -105,9 +104,8 @@ async fn elasticsearch_source_produces_messages_to_iggy(
}
#[iggy_harness(
- shared_server = "elasticsearch_source",
server(connectors_runtime(config_path =
"tests/connectors/elasticsearch/source.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn elasticsearch_source_handles_empty_index(
harness: &TestHarness,
@@ -146,9 +144,8 @@ async fn elasticsearch_source_handles_empty_index(
}
#[iggy_harness(
- shared_server = "elasticsearch_source",
server(connectors_runtime(config_path =
"tests/connectors/elasticsearch/source.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn elasticsearch_source_produces_bulk_messages(
harness: &TestHarness,
diff --git a/core/integration/tests/connectors/http/http_sink.rs
b/core/integration/tests/connectors/http/http_sink.rs
index 3056d69e3..e41c59ab6 100644
--- a/core/integration/tests/connectors/http/http_sink.rs
+++ b/core/integration/tests/connectors/http/http_sink.rs
@@ -196,9 +196,8 @@ use integration::iggy_harness;
/// Validates `batch_mode=individual`: one HTTP POST per message, each with
metadata envelope.
/// Checks request count = message count, envelope structure, and
`application/json` content type.
#[iggy_harness(
- shared_server = "http_sink",
server(connectors_runtime(config_path =
"tests/connectors/http/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn individual_json_messages_delivered_as_separate_posts(
harness: &TestHarness,
@@ -304,9 +303,8 @@ async fn
individual_json_messages_delivered_as_separate_posts(
/// Validates `batch_mode=ndjson`: all messages in one request as
newline-delimited JSON.
/// Checks single request, line count = message count, per-line envelope,
`application/x-ndjson`.
#[iggy_harness(
- shared_server = "http_sink",
server(connectors_runtime(config_path =
"tests/connectors/http/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn ndjson_messages_delivered_as_single_request(
harness: &TestHarness,
@@ -397,9 +395,8 @@ async fn ndjson_messages_delivered_as_single_request(
/// Validates `batch_mode=json_array`: all messages as a single JSON array in
one request.
/// Checks single request, array length = message count, per-item envelope,
`application/json`.
#[iggy_harness(
- shared_server = "http_sink",
server(connectors_runtime(config_path =
"tests/connectors/http/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn json_array_messages_delivered_as_single_request(
harness: &TestHarness,
@@ -491,9 +488,8 @@ async fn json_array_messages_delivered_as_single_request(
/// Validates `batch_mode=raw`: each message as raw bytes without metadata
envelope.
/// Checks request count = message count, no envelope wrapper,
`application/octet-stream`.
#[iggy_harness(
- shared_server = "http_sink",
server(connectors_runtime(config_path =
"tests/connectors/http/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn raw_binary_messages_delivered_without_envelope(
harness: &TestHarness,
@@ -579,9 +575,8 @@ async fn raw_binary_messages_delivered_without_envelope(
/// Validates `include_metadata=false`: bare payload without `{metadata,
payload}` envelope.
/// Checks no metadata field in body, original payload fields at top level.
#[iggy_harness(
- shared_server = "http_sink",
server(connectors_runtime(config_path =
"tests/connectors/http/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn metadata_disabled_sends_bare_payload(
harness: &TestHarness,
@@ -656,9 +651,8 @@ async fn metadata_disabled_sends_bare_payload(
/// Validates sequential offset integrity: `iggy_offset` values are contiguous
across
/// 5 delivered messages. Sorts by offset and checks each = previous + 1.
#[iggy_harness(
- shared_server = "http_sink",
server(connectors_runtime(config_path =
"tests/connectors/http/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn individual_messages_have_sequential_offsets(
harness: &TestHarness,
@@ -744,9 +738,8 @@ async fn individual_messages_have_sequential_offsets(
/// Validates multi-topic delivery: one connector consuming two topics on the
same stream.
/// Sends 2 messages to topic 1, 1 to topic 2, verifies `iggy_topic` metadata
matches source.
#[iggy_harness(
- shared_server = "http_sink",
server(connectors_runtime(config_path =
"tests/connectors/http/sink.toml")),
- seed = seeds::connector_multi_topic_stream_idempotent
+ seed = seeds::connector_multi_topic_stream
)]
async fn multi_topic_messages_delivered_with_correct_topic_metadata(
harness: &TestHarness,
@@ -756,8 +749,8 @@ async fn
multi_topic_messages_delivered_with_correct_topic_metadata(
let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
let topic_1_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
- // Both topics created by connector_multi_topic_stream_idempotent seed
(runs before
- // connector runtime starts - runtime health check requires all configured
topics).
+ // Step 1: Both topics created by connector_multi_topic_stream seed (runs
before
+ // connector runtime starts — runtime health check requires all configured
topics).
let topic_2_id: Identifier = seeds::names::TOPIC_2.try_into().unwrap();
// Step 2: Send 2 messages to topic 1 with source identifier in payload
@@ -829,6 +822,7 @@ async fn
multi_topic_messages_delivered_with_correct_topic_metadata(
)
});
+ // Match against constants — not magic strings (code review M9)
match iggy_topic {
t if t == seeds::names::TOPIC => {
topic_1_count += 1;
diff --git
a/core/integration/tests/connectors/http_config_provider/direct_responses.rs
b/core/integration/tests/connectors/http_config_provider/direct_responses.rs
index 17ce1279d..eeebddf4b 100644
--- a/core/integration/tests/connectors/http_config_provider/direct_responses.rs
+++ b/core/integration/tests/connectors/http_config_provider/direct_responses.rs
@@ -23,9 +23,8 @@ use integration::iggy_harness;
use reqwest::StatusCode;
#[iggy_harness(
- shared_server = "http_config_direct",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_direct.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn source_configs_list_returns_all_versions(
harness: &TestHarness,
@@ -66,8 +65,8 @@ async fn source_configs_list_returns_all_versions(
assert_eq!(streams.len(), 1, "Should have 1 stream config");
let stream = &streams[0];
- assert_eq!(stream["stream"].as_str().unwrap(), seeds::names::STREAM);
- assert_eq!(stream["topic"].as_str().unwrap(), seeds::names::TOPIC);
+ assert_eq!(stream["stream"].as_str().unwrap(), "test_stream");
+ assert_eq!(stream["topic"].as_str().unwrap(), "test_topic");
assert_eq!(stream["schema"].as_str().unwrap(), "json");
assert!(stream["batch_length"].as_u64().is_some());
assert!(stream["linger_time"].as_str().is_some());
@@ -81,9 +80,8 @@ async fn source_configs_list_returns_all_versions(
}
#[iggy_harness(
- shared_server = "http_config_direct",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_direct.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn source_config_by_version_returns_specific_version(
harness: &TestHarness,
@@ -113,13 +111,12 @@ async fn
source_config_by_version_returns_specific_version(
let streams = body["streams"].as_array().unwrap();
assert_eq!(streams.len(), 1);
- assert_eq!(streams[0]["topic"].as_str().unwrap(), seeds::names::TOPIC);
+ assert_eq!(streams[0]["topic"].as_str().unwrap(), "test_topic");
}
#[iggy_harness(
- shared_server = "http_config_direct",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_direct.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn source_active_config_returns_current_version(
harness: &TestHarness,
@@ -154,9 +151,8 @@ async fn source_active_config_returns_current_version(
}
#[iggy_harness(
- shared_server = "http_config_direct",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_direct.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn sink_configs_list_returns_all_versions(
harness: &TestHarness,
@@ -197,10 +193,10 @@ async fn sink_configs_list_returns_all_versions(
assert_eq!(streams.len(), 1, "Should have 1 stream config");
let stream = &streams[0];
- assert_eq!(stream["stream"].as_str().unwrap(), seeds::names::STREAM);
+ assert_eq!(stream["stream"].as_str().unwrap(), "test_stream");
let topics = stream["topics"].as_array().unwrap();
assert_eq!(topics.len(), 1);
- assert_eq!(topics[0].as_str().unwrap(), seeds::names::TOPIC);
+ assert_eq!(topics[0].as_str().unwrap(), "test_topic");
assert_eq!(stream["schema"].as_str().unwrap(), "json");
assert!(stream["batch_length"].as_u64().is_some());
assert!(stream["poll_interval"].as_str().is_some());
@@ -211,9 +207,8 @@ async fn sink_configs_list_returns_all_versions(
}
#[iggy_harness(
- shared_server = "http_config_direct",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_direct.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn sink_config_by_version_returns_specific_version(
harness: &TestHarness,
@@ -244,13 +239,12 @@ async fn sink_config_by_version_returns_specific_version(
let streams = body["streams"].as_array().unwrap();
assert_eq!(streams.len(), 1);
let topics = streams[0]["topics"].as_array().unwrap();
- assert_eq!(topics[0].as_str().unwrap(), seeds::names::TOPIC);
+ assert_eq!(topics[0].as_str().unwrap(), "test_topic");
}
#[iggy_harness(
- shared_server = "http_config_direct",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_direct.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn sink_active_config_returns_current_version(
harness: &TestHarness,
diff --git
a/core/integration/tests/connectors/http_config_provider/wrapped_responses.rs
b/core/integration/tests/connectors/http_config_provider/wrapped_responses.rs
index aab8df975..1ef5f99ae 100644
---
a/core/integration/tests/connectors/http_config_provider/wrapped_responses.rs
+++
b/core/integration/tests/connectors/http_config_provider/wrapped_responses.rs
@@ -23,9 +23,8 @@ use integration::iggy_harness;
use reqwest::StatusCode;
#[iggy_harness(
- shared_server = "http_config_wrapped",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_wrapped.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn source_configs_list_returns_all_versions(
harness: &TestHarness,
@@ -66,8 +65,8 @@ async fn source_configs_list_returns_all_versions(
assert_eq!(streams.len(), 1, "Should have 1 stream config");
let stream = &streams[0];
- assert_eq!(stream["stream"].as_str().unwrap(), seeds::names::STREAM);
- assert_eq!(stream["topic"].as_str().unwrap(), seeds::names::TOPIC);
+ assert_eq!(stream["stream"].as_str().unwrap(), "test_stream");
+ assert_eq!(stream["topic"].as_str().unwrap(), "test_topic");
assert_eq!(stream["schema"].as_str().unwrap(), "json");
assert!(stream["batch_length"].as_u64().is_some());
assert!(stream["linger_time"].as_str().is_some());
@@ -81,9 +80,8 @@ async fn source_configs_list_returns_all_versions(
}
#[iggy_harness(
- shared_server = "http_config_wrapped",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_wrapped.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn source_config_by_version_returns_specific_version(
harness: &TestHarness,
@@ -113,13 +111,12 @@ async fn
source_config_by_version_returns_specific_version(
let streams = body["streams"].as_array().unwrap();
assert_eq!(streams.len(), 1);
- assert_eq!(streams[0]["topic"].as_str().unwrap(), seeds::names::TOPIC);
+ assert_eq!(streams[0]["topic"].as_str().unwrap(), "test_topic");
}
#[iggy_harness(
- shared_server = "http_config_wrapped",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_wrapped.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn source_active_config_returns_current_version(
harness: &TestHarness,
@@ -154,9 +151,8 @@ async fn source_active_config_returns_current_version(
}
#[iggy_harness(
- shared_server = "http_config_wrapped",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_wrapped.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn sink_configs_list_returns_all_versions(
harness: &TestHarness,
@@ -197,10 +193,10 @@ async fn sink_configs_list_returns_all_versions(
assert_eq!(streams.len(), 1, "Should have 1 stream config");
let stream = &streams[0];
- assert_eq!(stream["stream"].as_str().unwrap(), seeds::names::STREAM);
+ assert_eq!(stream["stream"].as_str().unwrap(), "test_stream");
let topics = stream["topics"].as_array().unwrap();
assert_eq!(topics.len(), 1);
- assert_eq!(topics[0].as_str().unwrap(), seeds::names::TOPIC);
+ assert_eq!(topics[0].as_str().unwrap(), "test_topic");
assert_eq!(stream["schema"].as_str().unwrap(), "json");
assert!(stream["batch_length"].as_u64().is_some());
assert!(stream["poll_interval"].as_str().is_some());
@@ -211,9 +207,8 @@ async fn sink_configs_list_returns_all_versions(
}
#[iggy_harness(
- shared_server = "http_config_wrapped",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_wrapped.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn sink_config_by_version_returns_specific_version(
harness: &TestHarness,
@@ -244,13 +239,12 @@ async fn sink_config_by_version_returns_specific_version(
let streams = body["streams"].as_array().unwrap();
assert_eq!(streams.len(), 1);
let topics = streams[0]["topics"].as_array().unwrap();
- assert_eq!(topics[0].as_str().unwrap(), seeds::names::TOPIC);
+ assert_eq!(topics[0].as_str().unwrap(), "test_topic");
}
#[iggy_harness(
- shared_server = "http_config_wrapped",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_wrapped.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn sink_active_config_returns_current_version(
harness: &TestHarness,
diff --git a/core/integration/tests/connectors/iceberg/iceberg_sink.rs
b/core/integration/tests/connectors/iceberg/iceberg_sink.rs
index ad031dd80..777214116 100644
--- a/core/integration/tests/connectors/iceberg/iceberg_sink.rs
+++ b/core/integration/tests/connectors/iceberg/iceberg_sink.rs
@@ -37,9 +37,8 @@ const SNAPSHOT_POLL_INTERVAL_MS: u64 = 500;
const BULK_SNAPSHOT_POLL_ATTEMPTS: usize = 60;
#[iggy_harness(
- shared_server = "iceberg_sink",
server(connectors_runtime(config_path =
"tests/connectors/iceberg/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn iceberg_sink_initializes_and_runs(
harness: &TestHarness,
@@ -69,9 +68,8 @@ async fn iceberg_sink_initializes_and_runs(
}
#[iggy_harness(
- shared_server = "iceberg_sink",
server(connectors_runtime(config_path =
"tests/connectors/iceberg/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn iceberg_sink_consumes_json_messages(
harness: &TestHarness,
@@ -141,9 +139,8 @@ async fn iceberg_sink_consumes_json_messages(
}
#[iggy_harness(
- shared_server = "iceberg_sink",
server(connectors_runtime(config_path =
"tests/connectors/iceberg/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn iceberg_sink_handles_bulk_messages(
harness: &TestHarness,
diff --git a/core/integration/tests/connectors/influxdb/influxdb_sink.rs
b/core/integration/tests/connectors/influxdb/influxdb_sink.rs
index 6b82d6428..cda484cfc 100644
--- a/core/integration/tests/connectors/influxdb/influxdb_sink.rs
+++ b/core/integration/tests/connectors/influxdb/influxdb_sink.rs
@@ -33,8 +33,7 @@ use serde_json::json;
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink.toml")),
- shared_server = "influxdb_sink",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_sink_writes_messages_to_bucket(
harness: &TestHarness,
@@ -75,8 +74,7 @@ async fn influxdb_sink_writes_messages_to_bucket(
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink.toml")),
- shared_server = "influxdb_sink",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_sink_handles_bulk_messages(harness: &TestHarness, fixture:
InfluxDbSinkFixture) {
let client = harness.root_client().await.unwrap();
@@ -114,8 +112,7 @@ async fn influxdb_sink_handles_bulk_messages(harness:
&TestHarness, fixture: Inf
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink.toml")),
- shared_server = "influxdb_sink",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_sink_payload_fields_stored_correctly(
harness: &TestHarness,
@@ -153,8 +150,7 @@ async fn influxdb_sink_payload_fields_stored_correctly(
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink.toml")),
- shared_server = "influxdb_sink",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_sink_large_batch(harness: &TestHarness, fixture:
InfluxDbSinkFixture) {
let client = harness.root_client().await.unwrap();
@@ -188,8 +184,7 @@ async fn influxdb_sink_large_batch(harness: &TestHarness,
fixture: InfluxDbSinkF
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink.toml")),
- shared_server = "influxdb_sink",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_sink_recovers_backlogged_messages(
harness: &TestHarness,
@@ -229,8 +224,7 @@ async fn influxdb_sink_recovers_backlogged_messages(
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink.toml")),
- shared_server = "influxdb_sink",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_sink_multiple_partitions(harness: &TestHarness, fixture:
InfluxDbSinkFixture) {
let client = harness.root_client().await.unwrap();
diff --git
a/core/integration/tests/connectors/influxdb/influxdb_sink_formats.rs
b/core/integration/tests/connectors/influxdb/influxdb_sink_formats.rs
index e68e78b55..d1225c179 100644
--- a/core/integration/tests/connectors/influxdb/influxdb_sink_formats.rs
+++ b/core/integration/tests/connectors/influxdb/influxdb_sink_formats.rs
@@ -52,8 +52,7 @@ use integration::iggy_harness;
/// with printable ASCII.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink_text.toml")),
- shared_server = "influxdb_sink",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_sink_text_writes_points(harness: &TestHarness, fixture:
InfluxDbSinkTextFixture) {
let client = harness.root_client().await.unwrap();
@@ -92,8 +91,7 @@ async fn influxdb_sink_text_writes_points(harness:
&TestHarness, fixture: Influx
/// Covers: `write_field_string` quote-escape branch.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink_text.toml")),
- shared_server = "influxdb_sink",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_sink_text_with_quote_characters(
harness: &TestHarness,
@@ -133,8 +131,7 @@ async fn influxdb_sink_text_with_quote_characters(
/// Covers: `write_field_string` backslash-escape branch.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink_text.toml")),
- shared_server = "influxdb_sink",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_sink_text_with_backslash(
harness: &TestHarness,
@@ -173,8 +170,7 @@ async fn influxdb_sink_text_with_backslash(
/// path under the Text format.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink_text.toml")),
- shared_server = "influxdb_sink",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_sink_text_bulk(harness: &TestHarness, fixture:
InfluxDbSinkTextFixture) {
let client = harness.root_client().await.unwrap();
@@ -216,8 +212,7 @@ async fn influxdb_sink_text_bulk(harness: &TestHarness,
fixture: InfluxDbSinkTex
/// `general_purpose::STANDARD.encode`.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink_base64.toml")),
- shared_server = "influxdb_sink",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_sink_base64_writes_points(
harness: &TestHarness,
@@ -262,8 +257,7 @@ async fn influxdb_sink_base64_writes_points(
/// needed for base64 output, but exercises the full field-write path).
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink_base64.toml")),
- shared_server = "influxdb_sink",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_sink_base64_roundtrip(harness: &TestHarness, fixture:
InfluxDbSinkBase64Fixture) {
let client = harness.root_client().await.unwrap();
@@ -308,8 +302,7 @@ async fn influxdb_sink_base64_roundtrip(harness:
&TestHarness, fixture: InfluxDb
/// test covers the next-closest case (1-byte payload → 4-char base64 string).
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink_base64.toml")),
- shared_server = "influxdb_sink",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_sink_base64_single_byte_payload(
harness: &TestHarness,
@@ -353,8 +346,7 @@ async fn influxdb_sink_base64_single_byte_payload(
/// branches inside `append_line`.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink_no_metadata.toml")),
- shared_server = "influxdb_sink",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_sink_no_metadata_writes_points(
harness: &TestHarness,
@@ -395,8 +387,7 @@ async fn influxdb_sink_no_metadata_writes_points(
/// with the no-metadata tag configuration.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink_no_metadata.toml")),
- shared_server = "influxdb_sink",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_sink_no_metadata_bulk(
harness: &TestHarness,
@@ -441,8 +432,7 @@ async fn influxdb_sink_no_metadata_bulk(
/// Covers: `"ns"` arm of `to_precision_timestamp`.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink.toml")),
- shared_server = "influxdb_sink",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_sink_ns_precision_writes_points(
harness: &TestHarness,
@@ -485,8 +475,7 @@ async fn influxdb_sink_ns_precision_writes_points(
/// in `PayloadFormat::Json` and `write_field_string` with `{`, `}`, `:`, `"`.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink.toml")),
- shared_server = "influxdb_sink",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_sink_json_nested_payload(harness: &TestHarness, fixture:
InfluxDbSinkFixture) {
let client = harness.root_client().await.unwrap();
@@ -534,8 +523,7 @@ async fn influxdb_sink_json_nested_payload(harness:
&TestHarness, fixture: Influ
/// Covers: `'\n' => buf.push_str("\\n")` branch in `write_field_string`.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink_text.toml")),
- shared_server = "influxdb_sink",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_sink_text_with_newline(harness: &TestHarness, fixture:
InfluxDbSinkTextFixture) {
let client = harness.root_client().await.unwrap();
diff --git a/core/integration/tests/connectors/influxdb/influxdb_source.rs
b/core/integration/tests/connectors/influxdb/influxdb_source.rs
index e2716aeb9..f6eab572e 100644
--- a/core/integration/tests/connectors/influxdb/influxdb_source.rs
+++ b/core/integration/tests/connectors/influxdb/influxdb_source.rs
@@ -26,8 +26,7 @@ use tracing::info;
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source.toml")),
- shared_server = "influxdb_source",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_source_polls_and_produces_messages(
harness: &TestHarness,
@@ -85,8 +84,7 @@ async fn influxdb_source_polls_and_produces_messages(
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source.toml")),
- shared_server = "influxdb_source",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_source_message_payload_structure(
harness: &TestHarness,
@@ -142,8 +140,7 @@ async fn influxdb_source_message_payload_structure(
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source.toml")),
- shared_server = "influxdb_source",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_source_empty_bucket_produces_no_messages(
harness: &TestHarness,
@@ -182,8 +179,7 @@ async fn influxdb_source_empty_bucket_produces_no_messages(
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source.toml")),
- shared_server = "influxdb_source",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_source_multiple_measurements(
harness: &TestHarness,
diff --git
a/core/integration/tests/connectors/influxdb/influxdb_source_formats.rs
b/core/integration/tests/connectors/influxdb/influxdb_source_formats.rs
index 089637ce5..98cede6db 100644
--- a/core/integration/tests/connectors/influxdb/influxdb_source_formats.rs
+++ b/core/integration/tests/connectors/influxdb/influxdb_source_formats.rs
@@ -51,8 +51,7 @@ use tracing::info;
/// path), `schema()` → `Schema::Text`.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source_text.toml")),
- shared_server = "influxdb_source",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_source_text_payload_column(
harness: &TestHarness,
@@ -111,8 +110,7 @@ async fn influxdb_source_text_payload_column(
/// Covers: `PayloadFormat::Text` → `Ok(raw_value.into_bytes())`.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source_text.toml")),
- shared_server = "influxdb_source",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_source_text_whitespace_value(
harness: &TestHarness,
@@ -174,8 +172,7 @@ async fn influxdb_source_text_whitespace_value(
/// STANDARD.decode`.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source_raw.toml")),
- shared_server = "influxdb_source",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_source_raw_payload_column(
harness: &TestHarness,
@@ -232,8 +229,7 @@ async fn influxdb_source_raw_payload_column(
/// Covers: the loop in `poll_messages` with `PayloadFormat::Raw`.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source_raw.toml")),
- shared_server = "influxdb_source",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_source_raw_multiple_rows(
harness: &TestHarness,
@@ -294,8 +290,7 @@ async fn influxdb_source_raw_multiple_rows(
/// Covers: `parse_scalar` for `i64`, `f64`, `bool`, and string branches.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source.toml")),
- shared_server = "influxdb_source",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_source_scalar_type_integer(
harness: &TestHarness,
@@ -355,8 +350,7 @@ async fn influxdb_source_scalar_type_integer(
/// Covers `parse_scalar` for `f64` values.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source.toml")),
- shared_server = "influxdb_source",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_source_scalar_type_float(harness: &TestHarness, fixture:
InfluxDbSourceFixture) {
let base_ts: u64 = Utc::now().timestamp_nanos_opt().unwrap_or(0) as u64;
@@ -407,8 +401,7 @@ async fn influxdb_source_scalar_type_float(harness:
&TestHarness, fixture: Influ
/// Covers `parse_scalar` for boolean values (`true`/`false`).
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source.toml")),
- shared_server = "influxdb_source",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_source_scalar_type_bool(harness: &TestHarness, fixture:
InfluxDbSourceFixture) {
let base_ts: u64 = Utc::now().timestamp_nanos_opt().unwrap_or(0) as u64;
@@ -481,8 +474,7 @@ async fn influxdb_source_scalar_type_bool(harness:
&TestHarness, fixture: Influx
/// cursor state persisted via `state.last_timestamp`.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source.toml")),
- shared_server = "influxdb_source",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_source_cursor_advances_between_batches(
harness: &TestHarness,
@@ -573,8 +565,7 @@ async fn influxdb_source_cursor_advances_between_batches(
/// `build_payload` (whole-row path).
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source.toml")),
- shared_server = "influxdb_source",
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn influxdb_source_no_metadata_core_fields_present(
harness: &TestHarness,
diff --git a/core/integration/tests/connectors/mongodb/mongodb_sink.rs
b/core/integration/tests/connectors/mongodb/mongodb_sink.rs
index 09c400eb3..474b809ed 100644
--- a/core/integration/tests/connectors/mongodb/mongodb_sink.rs
+++ b/core/integration/tests/connectors/mongodb/mongodb_sink.rs
@@ -48,9 +48,8 @@ fn build_expected_document_id_for_topic(topic_name: &str,
message_id: u128) -> S
}
#[iggy_harness(
- shared_server = "mongodb_sink",
server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn json_messages_sink_to_mongodb(harness: &TestHarness, fixture:
MongoDbSinkJsonFixture) {
let client = harness.root_client().await.unwrap();
@@ -137,9 +136,8 @@ async fn json_messages_sink_to_mongodb(harness:
&TestHarness, fixture: MongoDbSi
}
#[iggy_harness(
- shared_server = "mongodb_sink",
server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn binary_messages_sink_as_bson_binary(harness: &TestHarness, fixture:
MongoDbSinkFixture) {
let client = harness.root_client().await.unwrap();
@@ -206,9 +204,8 @@ async fn binary_messages_sink_as_bson_binary(harness:
&TestHarness, fixture: Mon
}
#[iggy_harness(
- shared_server = "mongodb_sink",
server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn large_batch_processed_correctly(harness: &TestHarness, fixture:
MongoDbSinkBatchFixture) {
let client = harness.root_client().await.unwrap();
@@ -261,9 +258,8 @@ async fn large_batch_processed_correctly(harness:
&TestHarness, fixture: MongoDb
}
#[iggy_harness(
- shared_server = "mongodb_sink",
server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn duplicate_key_is_idempotent_replay_not_sink_error(
harness: &TestHarness,
@@ -445,9 +441,8 @@ async fn duplicate_key_is_idempotent_replay_not_sink_error(
}
#[iggy_harness(
- shared_server = "mongodb_sink",
server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn ordered_duplicate_partial_insert_has_exact_accounting(
harness: &TestHarness,
@@ -622,9 +617,8 @@ async fn
ordered_duplicate_partial_insert_has_exact_accounting(
}
#[iggy_harness(
- shared_server = "mongodb_sink",
server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn schema_validation_mid_batch_surfaces_hard_error_and_partial_prefix(
harness: &TestHarness,
@@ -735,9 +729,8 @@ async fn
schema_validation_mid_batch_surfaces_hard_error_and_partial_prefix(
}
#[iggy_harness(
- shared_server = "mongodb_sink",
server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn write_concern_timeout_does_not_report_full_success(
harness: &TestHarness,
@@ -827,9 +820,8 @@ async fn write_concern_timeout_does_not_report_full_success(
}
#[iggy_harness(
- shared_server = "mongodb_sink",
server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn retryable_write_failover_keeps_single_doc_per_id(
harness: &TestHarness,
@@ -975,9 +967,8 @@ async fn retryable_write_failover_keeps_single_doc_per_id(
}
#[iggy_harness(
- shared_server = "mongodb_sink",
server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn no_writes_performed_label_path_preserves_state_accuracy(
harness: &TestHarness,
@@ -1124,9 +1115,8 @@ async fn
no_writes_performed_label_path_preserves_state_accuracy(
}
#[iggy_harness(
- shared_server = "mongodb_sink",
server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn auto_create_collection_on_open(
harness: &TestHarness,
@@ -1167,5 +1157,6 @@ async fn auto_create_collection_on_open(
"Collection should be empty after open() with no messages"
);
+ // Suppress unused harness warning.
let _ = harness;
}
diff --git a/core/integration/tests/connectors/postgres/postgres_sink.rs
b/core/integration/tests/connectors/postgres/postgres_sink.rs
index 6b5b87022..2032c025e 100644
--- a/core/integration/tests/connectors/postgres/postgres_sink.rs
+++ b/core/integration/tests/connectors/postgres/postgres_sink.rs
@@ -35,9 +35,8 @@ type SinkRow = (i64, String, String, Vec<u8>);
type SinkJsonRow = (i64, serde_json::Value);
#[iggy_harness(
- shared_server = "postgres_sink",
server(connectors_runtime(config_path =
"tests/connectors/postgres/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn json_messages_sink_stores_as_bytea(harness: &TestHarness, fixture:
PostgresSinkFixture) {
let client = harness.root_client().await.unwrap();
@@ -98,9 +97,8 @@ async fn json_messages_sink_stores_as_bytea(harness:
&TestHarness, fixture: Post
}
#[iggy_harness(
- shared_server = "postgres_sink",
server(connectors_runtime(config_path =
"tests/connectors/postgres/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn binary_messages_sink_stores_as_bytea(
harness: &TestHarness,
@@ -163,9 +161,8 @@ async fn binary_messages_sink_stores_as_bytea(
}
#[iggy_harness(
- shared_server = "postgres_sink",
server(connectors_runtime(config_path =
"tests/connectors/postgres/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn json_messages_sink_stores_as_jsonb(
harness: &TestHarness,
diff --git a/core/integration/tests/connectors/postgres/postgres_source.rs
b/core/integration/tests/connectors/postgres/postgres_source.rs
index ce5cb99e2..5f4a79af8 100644
--- a/core/integration/tests/connectors/postgres/postgres_source.rs
+++ b/core/integration/tests/connectors/postgres/postgres_source.rs
@@ -32,9 +32,8 @@ use std::time::Duration;
use tokio::time::sleep;
#[iggy_harness(
- shared_server = "postgres_source",
server(connectors_runtime(config_path =
"tests/connectors/postgres/source.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn json_rows_source_produces_messages_to_iggy(
harness: &TestHarness,
@@ -114,9 +113,8 @@ async fn json_rows_source_produces_messages_to_iggy(
}
#[iggy_harness(
- shared_server = "postgres_source",
server(connectors_runtime(config_path =
"tests/connectors/postgres/source.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn bytea_rows_source_produces_raw_messages_to_iggy(
harness: &TestHarness,
@@ -178,9 +176,8 @@ async fn bytea_rows_source_produces_raw_messages_to_iggy(
}
#[iggy_harness(
- shared_server = "postgres_source",
server(connectors_runtime(config_path =
"tests/connectors/postgres/source.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn jsonb_rows_source_produces_json_messages_to_iggy(
harness: &TestHarness,
@@ -246,9 +243,8 @@ async fn jsonb_rows_source_produces_json_messages_to_iggy(
}
#[iggy_harness(
- shared_server = "postgres_source",
server(connectors_runtime(config_path =
"tests/connectors/postgres/source.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn delete_after_read_source_removes_rows_after_producing(
harness: &TestHarness,
@@ -317,9 +313,8 @@ async fn
delete_after_read_source_removes_rows_after_producing(
}
#[iggy_harness(
- shared_server = "postgres_source",
server(connectors_runtime(config_path =
"tests/connectors/postgres/source.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn processed_column_source_marks_rows_after_producing(
harness: &TestHarness,
diff --git a/core/integration/tests/connectors/postgres/restart.rs
b/core/integration/tests/connectors/postgres/restart.rs
index cdba541ee..e6e5f203b 100644
--- a/core/integration/tests/connectors/postgres/restart.rs
+++ b/core/integration/tests/connectors/postgres/restart.rs
@@ -38,9 +38,8 @@ const SINK_KEY: &str = "postgres";
type SinkRow = (i64, String, String, Vec<u8>);
#[iggy_harness(
- shared_server = "postgres_sink",
server(connectors_runtime(config_path =
"tests/connectors/postgres/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn restart_sink_connector_continues_processing(
harness: &TestHarness,
@@ -128,9 +127,8 @@ async fn restart_sink_connector_continues_processing(
}
#[iggy_harness(
- shared_server = "postgres_sink",
server(connectors_runtime(config_path =
"tests/connectors/postgres/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn parallel_restart_requests_should_not_break_connector(
harness: &TestHarness,
diff --git a/core/integration/tests/connectors/quickwit/quickwit_sink.rs
b/core/integration/tests/connectors/quickwit/quickwit_sink.rs
index b7fe69b10..ce3abd364 100644
--- a/core/integration/tests/connectors/quickwit/quickwit_sink.rs
+++ b/core/integration/tests/connectors/quickwit/quickwit_sink.rs
@@ -28,9 +28,8 @@ use integration::iggy_harness;
use serde::{Deserialize, Serialize};
#[iggy_harness(
- shared_server = "quickwit_sink",
server(connectors_runtime(config_path =
"tests/connectors/quickwit/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn given_existent_quickwit_index_should_store(
harness: &TestHarness,
@@ -84,9 +83,8 @@ async fn given_existent_quickwit_index_should_store(
}
#[iggy_harness(
- shared_server = "quickwit_sink",
server(connectors_runtime(config_path =
"tests/connectors/quickwit/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn given_nonexistent_quickwit_index_should_create_and_store(
harness: &TestHarness,
@@ -140,9 +138,8 @@ async fn
given_nonexistent_quickwit_index_should_create_and_store(
}
#[iggy_harness(
- shared_server = "quickwit_sink",
server(connectors_runtime(config_path =
"tests/connectors/quickwit/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn given_bulk_message_send_should_store(
harness: &TestHarness,
@@ -196,9 +193,8 @@ async fn given_bulk_message_send_should_store(
}
#[iggy_harness(
- shared_server = "quickwit_sink",
server(connectors_runtime(config_path =
"tests/connectors/quickwit/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn given_invalid_messages_should_not_store(harness: &TestHarness,
fixture: QuickwitFixture) {
let client = harness.root_client().await.unwrap();
diff --git a/core/integration/tests/connectors/random/random_source.rs
b/core/integration/tests/connectors/random/random_source.rs
index 4d44d1bfe..5efb6fd08 100644
--- a/core/integration/tests/connectors/random/random_source.rs
+++ b/core/integration/tests/connectors/random/random_source.rs
@@ -25,9 +25,8 @@ use std::time::Duration;
use tokio::time::sleep;
#[iggy_harness(
- shared_server = "random_source",
server(connectors_runtime(config_path =
"tests/connectors/random/source.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn random_source_produces_messages(harness: &TestHarness) {
sleep(Duration::from_secs(1)).await;
diff --git a/core/integration/tests/connectors/stdout/stdout_sink.rs
b/core/integration/tests/connectors/stdout/stdout_sink.rs
index 2dfe8758e..be247588c 100644
--- a/core/integration/tests/connectors/stdout/stdout_sink.rs
+++ b/core/integration/tests/connectors/stdout/stdout_sink.rs
@@ -32,9 +32,8 @@ use tokio::time::sleep;
const API_KEY: &str = "test-api-key";
#[iggy_harness(
- shared_server = "stdout_sink",
server(connectors_runtime(config_path =
"tests/connectors/stdout/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn stdout_sink_consumes_messages(harness: &TestHarness) {
let client = harness.root_client().await.unwrap();
@@ -91,9 +90,8 @@ async fn stdout_sink_consumes_messages(harness: &TestHarness)
{
}
#[iggy_harness(
- shared_server = "stdout_sink",
server(connectors_runtime(config_path =
"tests/connectors/stdout/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn stdout_sink_reports_metrics(harness: &TestHarness) {
let client = harness.root_client().await.unwrap();
@@ -152,9 +150,8 @@ async fn stdout_sink_reports_metrics(harness: &TestHarness)
{
}
#[iggy_harness(
- shared_server = "stdout_sink",
server(connectors_runtime(config_path =
"tests/connectors/stdout/sink.toml")),
- seed = seeds::connector_stream_idempotent
+ seed = seeds::connector_stream
)]
async fn stdout_sink_handles_bulk_messages(harness: &TestHarness) {
let client = harness.root_client().await.unwrap();