This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 2afeaa3ff feat(test): add shared_server to iggy_harness for server
reuse across tests (#3085)
2afeaa3ff is described below
commit 2afeaa3ff19bee422a4831713caaf97ed5be0243
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Apr 7 13:32:26 2026 +0200
feat(test): add shared_server to iggy_harness for server reuse across tests
(#3085)
Connector integration tests each spawn their own iggy-server,
causing resource contention and timing-sensitive CI failures
(e.g. WireMock not receiving requests within retry limit).
Add a `shared_server = "key"` attribute to #[iggy_harness]
that lets tests with the same key share a single iggy-server
process. Tests within a group run serially via
#[serial_test::serial(key)], groups run in parallel with
each other. Each test still gets its own connector runtime,
fixtures, and clients - only the server is shared.
Idempotent seed functions (connector_stream_idempotent)
skip creation if stream/topic already exists, allowing
serial tests to safely reuse the same server state.
Infrastructure:
- SharedServerRegistry (DashMap + tokio::sync::OnceCell)
for thread-safe lazy server initialization
- Atomic refcounting for lifecycle: last test stops server
and cleans up (preserved on failure for debugging)
- TestHarness::from_shared() + start_shared_with_seed()
- Compile-time validation rejects incompatible combinations
(&mut TestHarness, config overrides, cluster, TLS)
Migrates 113 connector tests across 18 files. Three restart
tests using &mut TestHarness remain on dedicated servers.
---
.config/nextest.toml | 14 +-
Cargo.lock | 1 +
core/harness_derive/src/attrs.rs | 23 +++
core/harness_derive/src/codegen.rs | 173 +++++++++++++++++-
core/integration/Cargo.toml | 1 +
core/integration/src/harness/handle/mod.rs | 2 +-
core/integration/src/harness/mod.rs | 2 +
.../src/harness/orchestrator/builder.rs | 2 +
.../src/harness/orchestrator/harness.rs | 198 +++++++++++++++++++--
core/integration/src/harness/seeds.rs | 71 +++++++-
core/integration/src/harness/shared.rs | 159 +++++++++++++++++
core/integration/src/lib.rs | 3 +-
core/integration/tests/connectors/api/endpoints.rs | 24 ++-
.../connectors/elasticsearch/elasticsearch_sink.rs | 9 +-
.../elasticsearch/elasticsearch_source.rs | 9 +-
.../integration/tests/connectors/http/http_sink.rs | 26 +--
.../http_config_provider/direct_responses.rs | 30 ++--
.../http_config_provider/wrapped_responses.rs | 30 ++--
.../tests/connectors/iceberg/iceberg_sink.rs | 9 +-
.../tests/connectors/influxdb/influxdb_sink.rs | 18 +-
.../connectors/influxdb/influxdb_sink_formats.rs | 36 ++--
.../tests/connectors/influxdb/influxdb_source.rs | 12 +-
.../connectors/influxdb/influxdb_source_formats.rs | 27 ++-
.../tests/connectors/mongodb/mongodb_sink.rs | 31 ++--
.../tests/connectors/postgres/postgres_sink.rs | 9 +-
.../tests/connectors/postgres/postgres_source.rs | 15 +-
.../tests/connectors/postgres/restart.rs | 6 +-
.../tests/connectors/quickwit/quickwit_sink.rs | 12 +-
.../tests/connectors/random/random_source.rs | 3 +-
.../tests/connectors/stdout/stdout_sink.rs | 9 +-
30 files changed, 827 insertions(+), 137 deletions(-)
diff --git a/.config/nextest.toml b/.config/nextest.toml
index bb29d9651..2dd295fb8 100644
--- a/.config/nextest.toml
+++ b/.config/nextest.toml
@@ -15,11 +15,21 @@
# specific language governing permissions and limitations
# under the License.
+[profile.default]
+slow-timeout = { period = "30s", terminate-after = 4 }
+
[[profile.default.overrides]]
# This is a solution (or actually a workaround) for the problem that nextest
does not support
# #[serial] macro which shall enforce sequential execution of the test case.
filter = 'package(integration) and
test(cli::system::test_cli_session_scenario::should_be_successful)'
threads-required = "num-cpus"
-[profile.default]
-slow-timeout = { period = "30s", terminate-after = 4 }
+# shared_server tests: the iggy_harness macro prefixes test names with
+# shared_server_ enabling a single generic nextest filter. Bounded to limit
+# resource contention from parallel fixture containers without fully
serializing.
+[test-groups.shared-server]
+max-threads = 8
+
+[[profile.default.overrides]]
+filter = 'package(integration) and test(/shared_server_/)'
+test-group = 'shared-server'
diff --git a/Cargo.lock b/Cargo.lock
index cb11aa7ff..26c32044b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6151,6 +6151,7 @@ dependencies = [
"configs",
"configs_derive",
"ctor",
+ "dashmap",
"figment",
"futures",
"harness_derive",
diff --git a/core/harness_derive/src/attrs.rs b/core/harness_derive/src/attrs.rs
index 2e4389faa..fa67e9239 100644
--- a/core/harness_derive/src/attrs.rs
+++ b/core/harness_derive/src/attrs.rs
@@ -48,6 +48,12 @@ impl ClusterNodesValue {
}
}
+/// Shared server configuration.
+#[derive(Debug, Clone)]
+pub struct SharedServerAttrs {
+ pub key: String,
+}
+
/// Parsed `#[iggy_harness(...)]` attributes.
#[derive(Debug, Default)]
pub struct IggyTestAttrs {
@@ -57,6 +63,7 @@ pub struct IggyTestAttrs {
pub server: ServerAttrs,
pub seed_fn: Option<syn::Path>,
pub cluster_nodes: ClusterNodesValue,
+ pub shared_server: Option<SharedServerAttrs>,
}
/// MCP configuration attributes.
@@ -82,6 +89,7 @@ impl IggyTestAttrs {
server: ServerAttrs::default(),
seed_fn: None,
cluster_nodes: ClusterNodesValue::None,
+ shared_server: None,
}
}
}
@@ -248,6 +256,9 @@ impl Parse for IggyTestAttrs {
AttrItem::ClusterNodes(cluster) => {
attrs.cluster_nodes = cluster;
}
+ AttrItem::SharedServer(shared) => {
+ attrs.shared_server = Some(shared);
+ }
}
}
@@ -264,6 +275,7 @@ enum AttrItem {
Server(Box<ServerAttrs>),
Seed(syn::Path),
ClusterNodes(ClusterNodesValue),
+ SharedServer(SharedServerAttrs),
}
impl Parse for AttrItem {
@@ -293,6 +305,10 @@ impl Parse for AttrItem {
let path: syn::Path = input.parse()?;
Ok(AttrItem::Seed(path))
}
+ "shared_server" => {
+ let shared = parse_shared_server_attrs(input)?;
+ Ok(AttrItem::SharedServer(shared))
+ }
_ => Err(syn::Error::new(
ident.span(),
format!("unknown attribute: {ident_str}"),
@@ -473,6 +489,13 @@ fn parse_mcp_attrs(input: ParseStream) ->
syn::Result<McpAttrs> {
Ok(mcp)
}
+/// Parse `shared_server = "key"`.
+fn parse_shared_server_attrs(input: ParseStream) ->
syn::Result<SharedServerAttrs> {
+ input.parse::<Token![=]>()?;
+ let lit: LitStr = input.parse()?;
+ Ok(SharedServerAttrs { key: lit.value() })
+}
+
fn parse_connectors_runtime_attrs(input: ParseStream) ->
syn::Result<ConnectorsRuntimeAttrs> {
let mut attrs = ConnectorsRuntimeAttrs::default();
diff --git a/core/harness_derive/src/codegen.rs
b/core/harness_derive/src/codegen.rs
index 2abd54368..ba3b2b679 100644
--- a/core/harness_derive/src/codegen.rs
+++ b/core/harness_derive/src/codegen.rs
@@ -138,6 +138,12 @@ pub fn generate_tests(attrs: &IggyTestAttrs, input:
&ItemFn) -> syn::Result<Toke
.collect();
let params = analyze_signature(&input.sig)?;
+
+ // Validate shared_server constraints
+ if attrs.shared_server.is_some() {
+ validate_shared_server(attrs, ¶ms, fn_name)?;
+ }
+
let matrix_params_list = matrix_params(¶ms);
let has_matrix_params = !matrix_params_list.is_empty();
@@ -172,6 +178,52 @@ pub fn generate_tests(attrs: &IggyTestAttrs, input:
&ItemFn) -> syn::Result<Toke
)
}
+fn validate_shared_server(
+ attrs: &IggyTestAttrs,
+ params: &[DetectedParam],
+ fn_name: &Ident,
+) -> syn::Result<()> {
+ let has_mut_harness = params
+ .iter()
+ .any(|p| matches!(p, DetectedParam::HarnessMut { .. }));
+ if has_mut_harness {
+ return Err(syn::Error::new(
+ fn_name.span(),
+ "shared_server is incompatible with &mut TestHarness",
+ ));
+ }
+
+ if !attrs.server.config_overrides.is_empty() {
+ return Err(syn::Error::new(
+ fn_name.span(),
+ "shared_server is incompatible with server config overrides",
+ ));
+ }
+
+ if !matches!(attrs.cluster_nodes, crate::attrs::ClusterNodesValue::None) {
+ return Err(syn::Error::new(
+ fn_name.span(),
+ "shared_server is incompatible with cluster_nodes",
+ ));
+ }
+
+ if attrs.server.tls.is_some() || attrs.server.websocket_tls.is_some() {
+ return Err(syn::Error::new(
+ fn_name.span(),
+ "shared_server is incompatible with TLS configuration",
+ ));
+ }
+
+ if attrs.transports.iter().any(|t| t.tls_mode().is_some()) {
+ return Err(syn::Error::new(
+ fn_name.span(),
+ "shared_server is incompatible with TLS transports",
+ ));
+ }
+
+ Ok(())
+}
+
fn generate_single_test(
fn_name: &Ident,
fn_vis: &syn::Visibility,
@@ -185,11 +237,34 @@ fn generate_single_test(
let fixture_setup = generate_fixture_setup(params);
let fixture_envs = generate_fixture_envs_collection(params);
let fixture_param_bindings = generate_fixture_param_bindings(params);
- let harness_setup = generate_harness_setup(variant, has_fixtures, attrs);
let fixture_seed = generate_fixture_seed(params);
- let start_and_seed = generate_start_and_seed(attrs, fixture_seed);
let harness_param_bindings = generate_harness_param_bindings(params);
+ if let Some(ref shared) = attrs.shared_server {
+ let shared_harness_setup = generate_shared_harness_setup(variant,
has_fixtures, attrs);
+ let shared_start_and_seed = generate_shared_start_and_seed(attrs,
fixture_seed);
+ let key_ident = Ident::new(&shared.key, Span::call_site());
+ let shared_fn_name = format_ident!("shared_server_{}", fn_name);
+
+ return Ok(quote! {
+ #(#other_attrs)*
+ #[::tokio::test]
+ #[::serial_test::serial(#key_ident)]
+ #fn_vis async fn #shared_fn_name() {
+ #fixture_setup
+ #fixture_envs
+ #fixture_param_bindings
+ #shared_harness_setup
+ #shared_start_and_seed
+ #harness_param_bindings
+ #fn_body
+ }
+ });
+ }
+
+ let harness_setup = generate_harness_setup(variant, has_fixtures, attrs);
+ let start_and_seed = generate_start_and_seed(attrs, fixture_seed);
+
Ok(quote! {
#(#other_attrs)*
#[::tokio::test]
@@ -545,6 +620,99 @@ fn generate_start_and_seed(attrs: &IggyTestAttrs,
fixture_seed: TokenStream) ->
}
}
+// ============================================================================
+// Shared server code generation helpers
+// ============================================================================
+
+fn generate_shared_harness_setup(
+ variant: &TestVariant,
+ has_fixtures: bool,
+ attrs: &IggyTestAttrs,
+) -> TokenStream {
+ let shared = attrs
+ .shared_server
+ .as_ref()
+ .expect("shared_server required");
+ let key = &shared.key;
+ let transport = variant.transport.variant_ident();
+ let client_config_method =
+ Ident::new(variant.transport.client_config_method(),
Span::call_site());
+
+ let cr_config = if let Some(ref cr) = attrs.server.connectors_runtime {
+ let config_path = cr.config_path.as_deref().unwrap_or("");
+ if has_fixtures {
+ quote! {
+
Some(::integration::__macro_support::ConnectorsRuntimeConfig::builder()
+ .config_path(::std::path::PathBuf::from(#config_path))
+ .extra_envs(__fixture_envs.clone())
+ .build())
+ }
+ } else {
+ quote! {
+
Some(::integration::__macro_support::ConnectorsRuntimeConfig::builder()
+ .config_path(::std::path::PathBuf::from(#config_path))
+ .build())
+ }
+ }
+ } else {
+ quote! { None }
+ };
+
+ quote! {
+ let __shared =
::integration::__macro_support::SharedServerRegistry::get_or_start(
+ #key,
+ ::integration::__macro_support::TestServerConfig::default(),
+ ).await.unwrap_or_else(|e| panic!("failed to get shared server: {e}"));
+
+ let mut __harness =
::integration::__macro_support::TestHarness::from_shared(
+ __shared,
+ #cr_config,
+
Some(::integration::__macro_support::ClientConfig::#client_config_method()),
+ ).await.unwrap_or_else(|e| panic!("failed to build shared harness:
{e}"));
+ let _ = ::integration::__macro_support::TransportProtocol::#transport;
+ }
+}
+
+fn generate_shared_start_and_seed(attrs: &IggyTestAttrs, fixture_seed:
TokenStream) -> TokenStream {
+ let has_fixture_seed = !fixture_seed.is_empty();
+
+ match (&attrs.seed_fn, has_fixture_seed) {
+ (Some(seed_fn), true) => {
+ quote! {
+ __harness.start_shared_with_seed(|__seed_client| async move {
+ #seed_fn(&__seed_client).await?;
+ #fixture_seed
+ Ok(())
+ }).await.unwrap_or_else(|e| panic!("failed to start shared
harness: {e}"));
+ }
+ }
+ (Some(seed_fn), false) => {
+ quote! {
+ __harness.start_shared_with_seed(|__seed_client| async move {
+ #seed_fn(&__seed_client).await
+ }).await.unwrap_or_else(|e| panic!("failed to start shared
harness: {e}"));
+ }
+ }
+ (None, true) => {
+ quote! {
+ __harness.start_shared_with_seed(|__seed_client| async move {
+ #fixture_seed
+ Ok(())
+ }).await.unwrap_or_else(|e| panic!("failed to start shared
harness: {e}"));
+ }
+ }
+ (None, false) => {
+ quote! {
+ __harness.start_shared().await.unwrap_or_else(|e|
panic!("failed to start shared harness: {e}"));
+ }
+ }
+ }
+}
+
+// ============================================================================
+// Standard (non-shared) code generation helpers
+// ============================================================================
+
fn fixture_var_ident(name: &syn::Ident) -> syn::Ident {
let name_str = name.to_string();
let clean_name = name_str.trim_start_matches('_');
@@ -795,6 +963,7 @@ mod tests {
},
seed_fn: None,
cluster_nodes: crate::attrs::ClusterNodesValue::None,
+ shared_server: None,
};
let variants = generate_variants(&attrs);
// 2 transports * 2 segment sizes * 2 cache modes = 8 variants
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index c8830b19f..513dc92d9 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -36,6 +36,7 @@ compio = { workspace = true }
configs = { workspace = true }
configs_derive = { workspace = true }
ctor = { workspace = true }
+dashmap = { workspace = true }
figment = { workspace = true }
futures = { workspace = true }
harness_derive = { workspace = true }
diff --git a/core/integration/src/harness/handle/mod.rs
b/core/integration/src/harness/handle/mod.rs
index da746d175..a31ad7535 100644
--- a/core/integration/src/harness/handle/mod.rs
+++ b/core/integration/src/harness/handle/mod.rs
@@ -25,7 +25,7 @@ mod mcp;
mod server;
pub use client::ClientHandle;
-pub use client_builder::ClientBuilder;
+pub use client_builder::{ClientBuilder, ServerConnection};
pub use connectors_runtime::ConnectorsRuntimeHandle;
pub use mcp::{McpClient, McpHandle};
pub use server::{ServerHandle, ServerLogs};
diff --git a/core/integration/src/harness/mod.rs
b/core/integration/src/harness/mod.rs
index 13793ab05..c8d3757ae 100644
--- a/core/integration/src/harness/mod.rs
+++ b/core/integration/src/harness/mod.rs
@@ -51,6 +51,7 @@ mod helpers;
mod orchestrator;
mod port_reserver;
pub mod seeds;
+mod shared;
mod traits;
pub use config::{
@@ -71,3 +72,4 @@ pub use helpers::{
};
pub use fixture::TestFixture;
+pub use shared::{SharedServerInfo, SharedServerRegistry};
diff --git a/core/integration/src/harness/orchestrator/builder.rs
b/core/integration/src/harness/orchestrator/builder.rs
index 46afddd3b..668d9a193 100644
--- a/core/integration/src/harness/orchestrator/builder.rs
+++ b/core/integration/src/harness/orchestrator/builder.rs
@@ -226,6 +226,8 @@ impl TestHarnessBuilder {
primary_transport,
primary_client_config: self.primary_client_config,
started: false,
+ shared_server: None,
+ shared_connectors_runtime: None,
})
}
}
diff --git a/core/integration/src/harness/orchestrator/harness.rs
b/core/integration/src/harness/orchestrator/harness.rs
index 6897b31b4..ca99731c2 100644
--- a/core/integration/src/harness/orchestrator/harness.rs
+++ b/core/integration/src/harness/orchestrator/harness.rs
@@ -18,14 +18,15 @@
*/
use super::builder::TestHarnessBuilder;
-use crate::harness::config::ClientConfig;
+use crate::harness::config::{ClientConfig, ConnectorsRuntimeConfig};
use crate::harness::context::TestContext;
use crate::harness::error::TestBinaryError;
use crate::harness::handle::{
- ClientBuilder, ClientHandle, ConnectorsRuntimeHandle, McpClient,
McpHandle, ServerHandle,
- ServerLogs,
+ ClientBuilder, ClientHandle, ConnectorsRuntimeHandle, McpClient,
McpHandle, ServerConnection,
+ ServerHandle, ServerLogs,
};
-use crate::harness::traits::{Restartable, TestBinary};
+use crate::harness::shared::SharedServerInfo;
+use crate::harness::traits::{IggyServerDependent, Restartable, TestBinary};
use futures::executor::block_on;
use iggy::prelude::{ClientWrapper, IggyClient};
use iggy_common::TransportProtocol;
@@ -47,6 +48,8 @@ pub struct TestHarness {
pub(super) primary_transport: Option<TransportProtocol>,
pub(super) primary_client_config: Option<ClientConfig>,
pub(super) started: bool,
+ pub(super) shared_server: Option<Arc<SharedServerInfo>>,
+ pub(super) shared_connectors_runtime: Option<ConnectorsRuntimeHandle>,
}
impl std::fmt::Debug for TestHarness {
@@ -72,6 +75,39 @@ impl TestHarness {
TestHarnessBuilder::default()
}
+ /// Create a harness that uses a shared server (no ownership of server
process).
+ ///
+ /// Each test still gets its own `TestContext` (for logs), connector
runtime,
+ /// and clients. Only the iggy-server process is shared.
+ pub async fn from_shared(
+ shared: Arc<SharedServerInfo>,
+ connectors_runtime_config: Option<ConnectorsRuntimeConfig>,
+ primary_client_config: Option<ClientConfig>,
+ ) -> Result<Self, TestBinaryError> {
+ let mut context = TestContext::new(None, true)?;
+ context.ensure_created()?;
+ let context = Arc::new(context);
+
+ let shared_cr = connectors_runtime_config
+ .map(|cfg| ConnectorsRuntimeHandle::with_server_id(cfg,
context.clone(), 0));
+
+ let primary_transport = primary_client_config.as_ref().map(|c|
c.transport);
+
+ shared.acquire();
+
+ Ok(TestHarness {
+ context,
+ servers: Vec::new(),
+ clients: Vec::new(),
+ client_configs: Vec::new(),
+ primary_transport,
+ primary_client_config,
+ started: false,
+ shared_server: Some(shared),
+ shared_connectors_runtime: shared_cr,
+ })
+ }
+
/// Start all configured binaries and create clients.
pub async fn start(&mut self) -> Result<(), TestBinaryError> {
self.start_internal(
@@ -97,6 +133,76 @@ impl TestHarness {
self.start_internal(Some(seed)).await
}
+ /// Start a shared-server harness without a seed function.
+ pub async fn start_shared(&mut self) -> Result<(), TestBinaryError> {
+ self.start_shared_internal(
+ None::<
+ fn(
+ IggyClient,
+ )
+ -> std::future::Ready<Result<(), Box<dyn std::error::Error
+ Send + Sync>>>,
+ >,
+ )
+ .await
+ }
+
+ /// Start a shared-server harness with a seed function.
+ ///
+ /// The seed runs BEFORE the connector runtime starts, allowing it to
+ /// create streams/topics that the connector expects to find.
+ pub async fn start_shared_with_seed<F, Fut>(&mut self, seed: F) ->
Result<(), TestBinaryError>
+ where
+ F: FnOnce(IggyClient) -> Fut,
+ Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error
+ Send + Sync>>>,
+ {
+ self.start_shared_internal(Some(seed)).await
+ }
+
+ async fn start_shared_internal<F, Fut>(
+ &mut self,
+ seed: Option<F>,
+ ) -> Result<(), TestBinaryError>
+ where
+ F: FnOnce(IggyClient) -> Fut,
+ Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error
+ Send + Sync>>>,
+ {
+ if self.started {
+ return Err(TestBinaryError::AlreadyStarted);
+ }
+
+ let shared = self
+ .shared_server
+ .as_ref()
+ .ok_or_else(|| TestBinaryError::InvalidState {
+ message: "start_shared called without shared
server".to_string(),
+ })?;
+
+ let tcp_addr = shared
+ .tcp_addr()
+ .ok_or_else(|| TestBinaryError::InvalidState {
+ message: "Shared server has no TCP address".to_string(),
+ })?;
+
+ // Seed runs before connector runtime - creates streams/topics
+ if let Some(seed_fn) = seed {
+ let client = self.tcp_root_client().await?;
+ seed_fn(client)
+ .await
+ .map_err(|e| TestBinaryError::SeedFailed(e.to_string()))?;
+ }
+
+ // Connector runtime starts after seed - streams/topics exist now
+ if let Some(ref mut cr) = self.shared_connectors_runtime {
+ cr.set_iggy_address(tcp_addr);
+ cr.start()?;
+ cr.wait_ready().await?;
+ }
+
+ self.create_clients().await?;
+ self.started = true;
+ Ok(())
+ }
+
async fn start_internal<F, Fut>(&mut self, seed: Option<F>) -> Result<(),
TestBinaryError>
where
F: FnOnce(IggyClient) -> Fut,
@@ -138,6 +244,20 @@ impl TestHarness {
}
self.clients.clear();
+ // Stop per-test connector runtime (shared server mode)
+ if let Some(ref mut cr) = self.shared_connectors_runtime {
+ cr.stop()?;
+ }
+
+ // Release shared server ref - last test stops the server and cleans up
+ if let Some(shared) = self.shared_server.take() {
+ if std::thread::panicking() {
+ shared.mark_failed();
+ }
+ shared.release();
+ }
+
+ // Stop owned servers (non-shared mode only)
for server in self.servers.iter_mut().rev() {
server.stop_dependents()?;
server.stop()?;
@@ -168,12 +288,24 @@ impl TestHarness {
}
/// Get reference to the first (primary) server handle.
+ ///
+ /// Not available in `shared_server` mode - the server is managed by
`SharedServerRegistry`.
pub fn server(&self) -> &ServerHandle {
+ assert!(
+ self.shared_server.is_none(),
+ "server() is not available in shared_server mode"
+ );
self.servers.first().expect("No servers configured")
}
/// Get mutable reference to the first (primary) server handle.
+ ///
+ /// Not available in `shared_server` mode - the server is managed by
`SharedServerRegistry`.
pub fn server_mut(&mut self) -> &mut ServerHandle {
+ assert!(
+ self.shared_server.is_none(),
+ "server_mut() is not available in shared_server mode"
+ );
self.servers.first_mut().expect("No servers configured")
}
@@ -250,9 +382,15 @@ impl TestHarness {
/// Get the connectors runtime handle from the primary server if
configured.
///
+ /// Returns the per-test connector runtime for shared-server harnesses,
+ /// or the server-owned one for standard harnesses.
+ ///
/// # Panics
/// Panics if called on a cluster (multiple servers). Use
`node(i).connectors_runtime()` instead.
pub fn connectors_runtime(&self) -> Option<&ConnectorsRuntimeHandle> {
+ if self.shared_connectors_runtime.is_some() {
+ return self.shared_connectors_runtime.as_ref();
+ }
assert!(
self.servers.len() <= 1,
"connectors_runtime() is only available for single-server setups.
Use node(i).connectors_runtime() for clusters."
@@ -287,6 +425,18 @@ impl TestHarness {
&self,
transport: TransportProtocol,
) -> Result<ClientBuilder, TestBinaryError> {
+ if let Some(ref shared) = self.shared_server {
+ let connection = ServerConnection {
+ tcp_addr: shared.tcp_addr(),
+ http_addr: shared.http_addr(),
+ quic_addr: shared.quic_addr(),
+ websocket_addr: shared.websocket_addr(),
+ tls: None,
+ websocket_tls: None,
+ tls_ca_cert_path: shared.tls_ca_cert_path().cloned(),
+ };
+ return Ok(ClientBuilder::new(transport, connection));
+ }
let server =
self.servers.first().ok_or(TestBinaryError::MissingServer)?;
match transport {
TransportProtocol::Tcp => server.tcp_client(),
@@ -314,14 +464,7 @@ impl TestHarness {
&self,
transport: TransportProtocol,
) -> Result<IggyClient, TestBinaryError> {
- let server =
self.servers.first().ok_or(TestBinaryError::MissingServer)?;
- let builder = match transport {
- TransportProtocol::Tcp => server.tcp_client()?,
- TransportProtocol::Http => server.http_client()?,
- TransportProtocol::Quic => server.quic_client()?,
- TransportProtocol::WebSocket => server.websocket_client()?,
- };
- builder.connect().await
+ self.client_builder_for(transport)?.connect().await
}
pub async fn tcp_root_client(&self) -> Result<IggyClient, TestBinaryError>
{
@@ -421,16 +564,33 @@ impl TestHarness {
}
pub(super) async fn create_clients(&mut self) -> Result<(),
TestBinaryError> {
- let Some(server) = self.servers.first() else {
+ // Resolve addresses from either owned server or shared server
+ let (tcp, http, quic, ws, ca_cert) = if let Some(ref shared) =
self.shared_server {
+ (
+ shared.tcp_addr(),
+ shared.http_addr(),
+ shared.quic_addr(),
+ shared.websocket_addr(),
+ shared.tls_ca_cert_path().cloned(),
+ )
+ } else if let Some(server) = self.servers.first() {
+ (
+ server.tcp_addr(),
+ server.http_addr(),
+ server.quic_addr(),
+ server.websocket_addr(),
+ server.tls_ca_cert_path(),
+ )
+ } else {
return Ok(());
};
for config in &self.client_configs {
let address = match config.transport {
- TransportProtocol::Tcp => server.tcp_addr(),
- TransportProtocol::Http => server.http_addr(),
- TransportProtocol::Quic => server.quic_addr(),
- TransportProtocol::WebSocket => server.websocket_addr(),
+ TransportProtocol::Tcp => tcp,
+ TransportProtocol::Http => http,
+ TransportProtocol::Quic => quic,
+ TransportProtocol::WebSocket => ws,
};
let Some(address) = address else {
@@ -441,9 +601,9 @@ impl TestHarness {
let mut config = config.clone();
if config.tls_enabled
- && let Some(ca_cert_path) = server.tls_ca_cert_path()
+ && let Some(ref path) = ca_cert
{
- config.tls_ca_file = Some(ca_cert_path);
+ config.tls_ca_file = Some(path.clone());
}
let mut client = ClientHandle::new(config, address);
diff --git a/core/integration/src/harness/seeds.rs
b/core/integration/src/harness/seeds.rs
index aa5554c70..c29ec25cf 100644
--- a/core/integration/src/harness/seeds.rs
+++ b/core/integration/src/harness/seeds.rs
@@ -19,7 +19,7 @@
use iggy::prelude::{IggyClient, StreamClient, TopicClient};
use iggy_common::{
- CompressionAlgorithm, Consumer, Identifier, IggyExpiry, IggyMessage,
MaxTopicSize,
+ CompressionAlgorithm, Consumer, Identifier, IggyError, IggyExpiry,
IggyMessage, MaxTopicSize,
Partitioning, PersonalAccessTokenExpiry, UserStatus,
};
use iggy_common::{
@@ -123,6 +123,75 @@ pub async fn connector_multi_topic_stream(client:
&IggyClient) -> Result<(), See
Ok(())
}
+/// Idempotent seed for serial shared_server tests: creates stream and topic,
+/// silently skipping if they already exist from a previous test in the group.
+pub async fn connector_stream_idempotent(client: &IggyClient) -> Result<(),
SeedError> {
+ let stream_id: Identifier = names::STREAM.try_into()?;
+
+ match client.create_stream(names::STREAM).await {
+ Ok(_) => {}
+ Err(e) if e.as_code() ==
IggyError::StreamNameAlreadyExists(String::new()).as_code() => {}
+ Err(e) => return Err(e.into()),
+ }
+
+ match client
+ .create_topic(
+ &stream_id,
+ names::TOPIC,
+ 1,
+ CompressionAlgorithm::None,
+ None,
+ IggyExpiry::ServerDefault,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ {
+ Ok(_) => {}
+ Err(e)
+ if e.as_code()
+ == IggyError::TopicNameAlreadyExists(String::new(),
Identifier::default())
+ .as_code() => {}
+ Err(e) => return Err(e.into()),
+ }
+
+ Ok(())
+}
+
+/// Idempotent multi-topic seed for serial shared_server tests.
+pub async fn connector_multi_topic_stream_idempotent(client: &IggyClient) ->
Result<(), SeedError> {
+ let stream_id: Identifier = names::STREAM.try_into()?;
+
+ match client.create_stream(names::STREAM).await {
+ Ok(_) => {}
+ Err(e) if e.as_code() ==
IggyError::StreamNameAlreadyExists(String::new()).as_code() => {}
+ Err(e) => return Err(e.into()),
+ }
+
+ for topic_name in [names::TOPIC, names::TOPIC_2] {
+ match client
+ .create_topic(
+ &stream_id,
+ topic_name,
+ 1,
+ CompressionAlgorithm::None,
+ None,
+ IggyExpiry::ServerDefault,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ {
+ Ok(_) => {}
+ Err(e)
+ if e.as_code()
+ == IggyError::TopicNameAlreadyExists(String::new(),
Identifier::default())
+ .as_code() => {}
+ Err(e) => return Err(e.into()),
+ }
+ }
+
+ Ok(())
+}
+
/// Standard MCP test data: stream, topic, message, consumer group, consumer
offset, user, PAT.
pub async fn mcp_standard(client: &IggyClient) -> Result<(), SeedError> {
let stream_id: Identifier = names::STREAM.try_into()?;
diff --git a/core/integration/src/harness/shared.rs
b/core/integration/src/harness/shared.rs
new file mode 100644
index 000000000..82f717ea0
--- /dev/null
+++ b/core/integration/src/harness/shared.rs
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::harness::config::TestServerConfig;
+use crate::harness::context::TestContext;
+use crate::harness::error::TestBinaryError;
+use crate::harness::handle::ServerHandle;
+use crate::harness::traits::TestBinary;
+use dashmap::DashMap;
+use std::net::SocketAddr;
+use std::path::PathBuf;
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use std::sync::{Arc, LazyLock};
+use tokio::sync::OnceCell;
+
+/// Shared server state: addresses, handles, and active user count.
+///
+/// Rust statics are not dropped at process exit, so we use an atomic counter
+/// to track active tests. The last test to finish stops the server and
+/// cleans up the test directory.
+pub struct SharedServerInfo {
+ tcp_addr: Option<SocketAddr>,
+ http_addr: Option<SocketAddr>,
+ quic_addr: Option<SocketAddr>,
+ websocket_addr: Option<SocketAddr>,
+ tls_ca_cert_path: Option<PathBuf>,
+ handle: std::sync::Mutex<Option<ServerHandle>>,
+ context: Arc<TestContext>,
+ active_count: AtomicUsize,
+ any_test_failed: AtomicBool,
+}
+
+impl SharedServerInfo {
+ pub fn tcp_addr(&self) -> Option<SocketAddr> {
+ self.tcp_addr
+ }
+
+ pub fn http_addr(&self) -> Option<SocketAddr> {
+ self.http_addr
+ }
+
+ pub fn quic_addr(&self) -> Option<SocketAddr> {
+ self.quic_addr
+ }
+
+ pub fn websocket_addr(&self) -> Option<SocketAddr> {
+ self.websocket_addr
+ }
+
+ pub fn tls_ca_cert_path(&self) -> Option<&PathBuf> {
+ self.tls_ca_cert_path.as_ref()
+ }
+
+ /// Increment the active test count. Called when a test acquires this
server.
+ pub fn acquire(&self) {
+ self.active_count.fetch_add(1, Ordering::AcqRel);
+ }
+
+ /// Mark that a test using this server has failed.
+ /// Preserves the test directory for debugging.
+ pub fn mark_failed(&self) {
+ self.any_test_failed.store(true, Ordering::Release);
+ }
+
+ /// Decrement the active test count. When it reaches zero, stops the server
+ /// and cleans up the test directory (unless any test failed).
+ pub fn release(&self) {
+ let prev = self.active_count.fetch_sub(1, Ordering::AcqRel);
+ if prev == 1 {
+ match self.handle.lock() {
+ Ok(mut guard) => {
+ if let Some(ref mut server) = *guard
+ && let Err(e) = server.stop()
+ {
+ eprintln!("[SharedServer] failed to stop server: {e}");
+ }
+ *guard = None;
+ }
+ Err(e) => {
+ eprintln!("[SharedServer] mutex poisoned, server process
may leak: {e}");
+ }
+ }
+ if !self.any_test_failed.load(Ordering::Acquire) {
+ self.context.cleanup();
+ }
+ }
+ }
+}
+
+static REGISTRY: LazyLock<DashMap<String,
Arc<OnceCell<Arc<SharedServerInfo>>>>> =
+ LazyLock::new(DashMap::new);
+
+/// Global registry for shared server instances.
+///
+/// Tests with the same `shared_server` key share a single iggy-server process.
+/// The first test to request a key starts the server; all subsequent tests
+/// get a reference to the already-running server. The last test to finish
+/// stops the server and cleans up the test directory.
+pub struct SharedServerRegistry;
+
+impl SharedServerRegistry {
+ /// Get or start a shared server for the given key.
+ ///
+ /// Thread-safe: concurrent callers for the same key will block until the
+ /// first caller finishes initialization, then all receive the same `Arc`.
+ ///
+ /// Callers must call `SharedServerInfo::acquire()` after obtaining the
+ /// reference and `SharedServerInfo::release()` when done (typically in
+ /// `TestHarness::stop()`).
+ pub async fn get_or_start(
+ key: &str,
+ config: TestServerConfig,
+ ) -> Result<Arc<SharedServerInfo>, TestBinaryError> {
+ let cell = REGISTRY
+ .entry(key.to_string())
+ .or_insert_with(|| Arc::new(OnceCell::new()))
+ .value()
+ .clone();
+
+ cell.get_or_try_init(|| async move {
+ let mut context =
TestContext::new(Some(format!("shared_server_{key}")), true)?;
+ context.ensure_created()?;
+ let context = Arc::new(context);
+
+ let mut server = ServerHandle::with_config(config,
context.clone());
+ server.start()?;
+
+ Ok(Arc::new(SharedServerInfo {
+ tcp_addr: server.tcp_addr(),
+ http_addr: server.http_addr(),
+ quic_addr: server.quic_addr(),
+ websocket_addr: server.websocket_addr(),
+ tls_ca_cert_path: server.tls_ca_cert_path(),
+ handle: std::sync::Mutex::new(Some(server)),
+ context,
+ active_count: AtomicUsize::new(0),
+ any_test_failed: AtomicBool::new(false),
+ }))
+ })
+ .await
+ .cloned()
+ }
+}
diff --git a/core/integration/src/lib.rs b/core/integration/src/lib.rs
index fc03d02d7..41ec34555 100644
--- a/core/integration/src/lib.rs
+++ b/core/integration/src/lib.rs
@@ -24,7 +24,8 @@ pub use harness_derive::iggy_harness;
#[doc(hidden)]
pub mod __macro_support {
pub use crate::harness::{
- ClientConfig, McpClient, McpConfig, TestHarness, TestServerConfig,
TlsConfig,
+ ClientConfig, ConnectorsRuntimeConfig, McpClient, McpConfig,
SharedServerRegistry,
+ TestHarness, TestServerConfig, TlsConfig,
};
pub use iggy::prelude::ClientWrapper;
pub use iggy_common::TransportProtocol;
diff --git a/core/integration/tests/connectors/api/endpoints.rs
b/core/integration/tests/connectors/api/endpoints.rs
index 69f6cdbfb..5fe987a29 100644
--- a/core/integration/tests/connectors/api/endpoints.rs
+++ b/core/integration/tests/connectors/api/endpoints.rs
@@ -27,8 +27,9 @@ use reqwest::Client;
const API_KEY: &str = "test-api-key";
#[iggy_harness(
+ shared_server = "connector_api",
server(connectors_runtime(config_path =
"tests/connectors/api/config.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn root_endpoint_returns_welcome_message(harness: &TestHarness) {
let api_address = harness
@@ -49,8 +50,9 @@ async fn root_endpoint_returns_welcome_message(harness:
&TestHarness) {
}
#[iggy_harness(
+ shared_server = "connector_api",
server(connectors_runtime(config_path =
"tests/connectors/api/config.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn health_endpoint_returns_healthy(harness: &TestHarness) {
let api_address = harness
@@ -71,8 +73,9 @@ async fn health_endpoint_returns_healthy(harness:
&TestHarness) {
}
#[iggy_harness(
+ shared_server = "connector_api",
server(connectors_runtime(config_path =
"tests/connectors/api/config.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn stats_endpoint_returns_runtime_stats(harness: &TestHarness) {
let api_address = harness
@@ -100,8 +103,9 @@ async fn stats_endpoint_returns_runtime_stats(harness:
&TestHarness) {
}
#[iggy_harness(
+ shared_server = "connector_api",
server(connectors_runtime(config_path =
"tests/connectors/api/config.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn metrics_endpoint_returns_prometheus_format(harness: &TestHarness) {
let api_address = harness
@@ -127,8 +131,9 @@ async fn
metrics_endpoint_returns_prometheus_format(harness: &TestHarness) {
}
#[iggy_harness(
+ shared_server = "connector_api",
server(connectors_runtime(config_path =
"tests/connectors/api/config.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn sources_endpoint_returns_list(harness: &TestHarness) {
let api_address = harness
@@ -150,8 +155,9 @@ async fn sources_endpoint_returns_list(harness:
&TestHarness) {
}
#[iggy_harness(
+ shared_server = "connector_api",
server(connectors_runtime(config_path =
"tests/connectors/api/config.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn sinks_endpoint_returns_list(harness: &TestHarness) {
let api_address = harness
@@ -173,8 +179,9 @@ async fn sinks_endpoint_returns_list(harness: &TestHarness)
{
}
#[iggy_harness(
+ shared_server = "connector_api",
server(connectors_runtime(config_path =
"tests/connectors/api/config.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn api_key_authentication_required(harness: &TestHarness) {
let api_address = harness
@@ -217,8 +224,9 @@ async fn api_key_authentication_required(harness:
&TestHarness) {
}
#[iggy_harness(
+ shared_server = "connector_api",
server(connectors_runtime(config_path =
"tests/connectors/api/config.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn api_key_authentication_rejected_with_invalid_key(harness:
&TestHarness) {
let api_address = harness
diff --git
a/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs
b/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs
index ad6ea1c86..13190f9b5 100644
--- a/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs
+++ b/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs
@@ -28,8 +28,9 @@ use integration::harness::seeds;
use integration::iggy_harness;
#[iggy_harness(
+ shared_server = "elasticsearch_sink",
server(connectors_runtime(config_path =
"tests/connectors/elasticsearch/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn elasticsearch_sink_stores_json_messages(
harness: &TestHarness,
@@ -86,8 +87,9 @@ async fn elasticsearch_sink_stores_json_messages(
}
#[iggy_harness(
+ shared_server = "elasticsearch_sink",
server(connectors_runtime(config_path =
"tests/connectors/elasticsearch/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn elasticsearch_sink_handles_bulk_messages(
harness: &TestHarness,
@@ -140,8 +142,9 @@ async fn elasticsearch_sink_handles_bulk_messages(
}
#[iggy_harness(
+ shared_server = "elasticsearch_sink",
server(connectors_runtime(config_path =
"tests/connectors/elasticsearch/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn elasticsearch_sink_preserves_json_structure(
harness: &TestHarness,
diff --git
a/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
b/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
index 2beaad75c..f945f4889 100644
--- a/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
+++ b/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
@@ -27,8 +27,9 @@ use std::time::Duration;
use tokio::time::sleep;
#[iggy_harness(
+ shared_server = "elasticsearch_source",
server(connectors_runtime(config_path =
"tests/connectors/elasticsearch/source.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn elasticsearch_source_produces_messages_to_iggy(
harness: &TestHarness,
@@ -104,8 +105,9 @@ async fn elasticsearch_source_produces_messages_to_iggy(
}
#[iggy_harness(
+ shared_server = "elasticsearch_source",
server(connectors_runtime(config_path =
"tests/connectors/elasticsearch/source.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn elasticsearch_source_handles_empty_index(
harness: &TestHarness,
@@ -144,8 +146,9 @@ async fn elasticsearch_source_handles_empty_index(
}
#[iggy_harness(
+ shared_server = "elasticsearch_source",
server(connectors_runtime(config_path =
"tests/connectors/elasticsearch/source.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn elasticsearch_source_produces_bulk_messages(
harness: &TestHarness,
diff --git a/core/integration/tests/connectors/http/http_sink.rs
b/core/integration/tests/connectors/http/http_sink.rs
index e41c59ab6..3056d69e3 100644
--- a/core/integration/tests/connectors/http/http_sink.rs
+++ b/core/integration/tests/connectors/http/http_sink.rs
@@ -196,8 +196,9 @@ use integration::iggy_harness;
/// Validates `batch_mode=individual`: one HTTP POST per message, each with
metadata envelope.
/// Checks request count = message count, envelope structure, and
`application/json` content type.
#[iggy_harness(
+ shared_server = "http_sink",
server(connectors_runtime(config_path =
"tests/connectors/http/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn individual_json_messages_delivered_as_separate_posts(
harness: &TestHarness,
@@ -303,8 +304,9 @@ async fn
individual_json_messages_delivered_as_separate_posts(
/// Validates `batch_mode=ndjson`: all messages in one request as
newline-delimited JSON.
/// Checks single request, line count = message count, per-line envelope,
`application/x-ndjson`.
#[iggy_harness(
+ shared_server = "http_sink",
server(connectors_runtime(config_path =
"tests/connectors/http/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn ndjson_messages_delivered_as_single_request(
harness: &TestHarness,
@@ -395,8 +397,9 @@ async fn ndjson_messages_delivered_as_single_request(
/// Validates `batch_mode=json_array`: all messages as a single JSON array in
one request.
/// Checks single request, array length = message count, per-item envelope,
`application/json`.
#[iggy_harness(
+ shared_server = "http_sink",
server(connectors_runtime(config_path =
"tests/connectors/http/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn json_array_messages_delivered_as_single_request(
harness: &TestHarness,
@@ -488,8 +491,9 @@ async fn json_array_messages_delivered_as_single_request(
/// Validates `batch_mode=raw`: each message as raw bytes without metadata
envelope.
/// Checks request count = message count, no envelope wrapper,
`application/octet-stream`.
#[iggy_harness(
+ shared_server = "http_sink",
server(connectors_runtime(config_path =
"tests/connectors/http/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn raw_binary_messages_delivered_without_envelope(
harness: &TestHarness,
@@ -575,8 +579,9 @@ async fn raw_binary_messages_delivered_without_envelope(
/// Validates `include_metadata=false`: bare payload without `{metadata,
payload}` envelope.
/// Checks no metadata field in body, original payload fields at top level.
#[iggy_harness(
+ shared_server = "http_sink",
server(connectors_runtime(config_path =
"tests/connectors/http/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn metadata_disabled_sends_bare_payload(
harness: &TestHarness,
@@ -651,8 +656,9 @@ async fn metadata_disabled_sends_bare_payload(
/// Validates sequential offset integrity: `iggy_offset` values are contiguous
across
/// 5 delivered messages. Sorts by offset and checks each = previous + 1.
#[iggy_harness(
+ shared_server = "http_sink",
server(connectors_runtime(config_path =
"tests/connectors/http/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn individual_messages_have_sequential_offsets(
harness: &TestHarness,
@@ -738,8 +744,9 @@ async fn individual_messages_have_sequential_offsets(
/// Validates multi-topic delivery: one connector consuming two topics on the
same stream.
/// Sends 2 messages to topic 1, 1 to topic 2, verifies `iggy_topic` metadata
matches source.
#[iggy_harness(
+ shared_server = "http_sink",
server(connectors_runtime(config_path =
"tests/connectors/http/sink.toml")),
- seed = seeds::connector_multi_topic_stream
+ seed = seeds::connector_multi_topic_stream_idempotent
)]
async fn multi_topic_messages_delivered_with_correct_topic_metadata(
harness: &TestHarness,
@@ -749,8 +756,8 @@ async fn
multi_topic_messages_delivered_with_correct_topic_metadata(
let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
let topic_1_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
- // Step 1: Both topics created by connector_multi_topic_stream seed (runs
before
- // connector runtime starts — runtime health check requires all configured
topics).
+ // Both topics created by connector_multi_topic_stream_idempotent seed
(runs before
+ // connector runtime starts - runtime health check requires all configured
topics).
let topic_2_id: Identifier = seeds::names::TOPIC_2.try_into().unwrap();
// Step 2: Send 2 messages to topic 1 with source identifier in payload
@@ -822,7 +829,6 @@ async fn
multi_topic_messages_delivered_with_correct_topic_metadata(
)
});
- // Match against constants — not magic strings (code review M9)
match iggy_topic {
t if t == seeds::names::TOPIC => {
topic_1_count += 1;
diff --git
a/core/integration/tests/connectors/http_config_provider/direct_responses.rs
b/core/integration/tests/connectors/http_config_provider/direct_responses.rs
index eeebddf4b..17ce1279d 100644
--- a/core/integration/tests/connectors/http_config_provider/direct_responses.rs
+++ b/core/integration/tests/connectors/http_config_provider/direct_responses.rs
@@ -23,8 +23,9 @@ use integration::iggy_harness;
use reqwest::StatusCode;
#[iggy_harness(
+ shared_server = "http_config_direct",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_direct.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn source_configs_list_returns_all_versions(
harness: &TestHarness,
@@ -65,8 +66,8 @@ async fn source_configs_list_returns_all_versions(
assert_eq!(streams.len(), 1, "Should have 1 stream config");
let stream = &streams[0];
- assert_eq!(stream["stream"].as_str().unwrap(), "test_stream");
- assert_eq!(stream["topic"].as_str().unwrap(), "test_topic");
+ assert_eq!(stream["stream"].as_str().unwrap(), seeds::names::STREAM);
+ assert_eq!(stream["topic"].as_str().unwrap(), seeds::names::TOPIC);
assert_eq!(stream["schema"].as_str().unwrap(), "json");
assert!(stream["batch_length"].as_u64().is_some());
assert!(stream["linger_time"].as_str().is_some());
@@ -80,8 +81,9 @@ async fn source_configs_list_returns_all_versions(
}
#[iggy_harness(
+ shared_server = "http_config_direct",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_direct.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn source_config_by_version_returns_specific_version(
harness: &TestHarness,
@@ -111,12 +113,13 @@ async fn
source_config_by_version_returns_specific_version(
let streams = body["streams"].as_array().unwrap();
assert_eq!(streams.len(), 1);
- assert_eq!(streams[0]["topic"].as_str().unwrap(), "test_topic");
+ assert_eq!(streams[0]["topic"].as_str().unwrap(), seeds::names::TOPIC);
}
#[iggy_harness(
+ shared_server = "http_config_direct",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_direct.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn source_active_config_returns_current_version(
harness: &TestHarness,
@@ -151,8 +154,9 @@ async fn source_active_config_returns_current_version(
}
#[iggy_harness(
+ shared_server = "http_config_direct",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_direct.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn sink_configs_list_returns_all_versions(
harness: &TestHarness,
@@ -193,10 +197,10 @@ async fn sink_configs_list_returns_all_versions(
assert_eq!(streams.len(), 1, "Should have 1 stream config");
let stream = &streams[0];
- assert_eq!(stream["stream"].as_str().unwrap(), "test_stream");
+ assert_eq!(stream["stream"].as_str().unwrap(), seeds::names::STREAM);
let topics = stream["topics"].as_array().unwrap();
assert_eq!(topics.len(), 1);
- assert_eq!(topics[0].as_str().unwrap(), "test_topic");
+ assert_eq!(topics[0].as_str().unwrap(), seeds::names::TOPIC);
assert_eq!(stream["schema"].as_str().unwrap(), "json");
assert!(stream["batch_length"].as_u64().is_some());
assert!(stream["poll_interval"].as_str().is_some());
@@ -207,8 +211,9 @@ async fn sink_configs_list_returns_all_versions(
}
#[iggy_harness(
+ shared_server = "http_config_direct",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_direct.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn sink_config_by_version_returns_specific_version(
harness: &TestHarness,
@@ -239,12 +244,13 @@ async fn sink_config_by_version_returns_specific_version(
let streams = body["streams"].as_array().unwrap();
assert_eq!(streams.len(), 1);
let topics = streams[0]["topics"].as_array().unwrap();
- assert_eq!(topics[0].as_str().unwrap(), "test_topic");
+ assert_eq!(topics[0].as_str().unwrap(), seeds::names::TOPIC);
}
#[iggy_harness(
+ shared_server = "http_config_direct",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_direct.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn sink_active_config_returns_current_version(
harness: &TestHarness,
diff --git
a/core/integration/tests/connectors/http_config_provider/wrapped_responses.rs
b/core/integration/tests/connectors/http_config_provider/wrapped_responses.rs
index 1ef5f99ae..aab8df975 100644
---
a/core/integration/tests/connectors/http_config_provider/wrapped_responses.rs
+++
b/core/integration/tests/connectors/http_config_provider/wrapped_responses.rs
@@ -23,8 +23,9 @@ use integration::iggy_harness;
use reqwest::StatusCode;
#[iggy_harness(
+ shared_server = "http_config_wrapped",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_wrapped.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn source_configs_list_returns_all_versions(
harness: &TestHarness,
@@ -65,8 +66,8 @@ async fn source_configs_list_returns_all_versions(
assert_eq!(streams.len(), 1, "Should have 1 stream config");
let stream = &streams[0];
- assert_eq!(stream["stream"].as_str().unwrap(), "test_stream");
- assert_eq!(stream["topic"].as_str().unwrap(), "test_topic");
+ assert_eq!(stream["stream"].as_str().unwrap(), seeds::names::STREAM);
+ assert_eq!(stream["topic"].as_str().unwrap(), seeds::names::TOPIC);
assert_eq!(stream["schema"].as_str().unwrap(), "json");
assert!(stream["batch_length"].as_u64().is_some());
assert!(stream["linger_time"].as_str().is_some());
@@ -80,8 +81,9 @@ async fn source_configs_list_returns_all_versions(
}
#[iggy_harness(
+ shared_server = "http_config_wrapped",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_wrapped.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn source_config_by_version_returns_specific_version(
harness: &TestHarness,
@@ -111,12 +113,13 @@ async fn
source_config_by_version_returns_specific_version(
let streams = body["streams"].as_array().unwrap();
assert_eq!(streams.len(), 1);
- assert_eq!(streams[0]["topic"].as_str().unwrap(), "test_topic");
+ assert_eq!(streams[0]["topic"].as_str().unwrap(), seeds::names::TOPIC);
}
#[iggy_harness(
+ shared_server = "http_config_wrapped",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_wrapped.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn source_active_config_returns_current_version(
harness: &TestHarness,
@@ -151,8 +154,9 @@ async fn source_active_config_returns_current_version(
}
#[iggy_harness(
+ shared_server = "http_config_wrapped",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_wrapped.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn sink_configs_list_returns_all_versions(
harness: &TestHarness,
@@ -193,10 +197,10 @@ async fn sink_configs_list_returns_all_versions(
assert_eq!(streams.len(), 1, "Should have 1 stream config");
let stream = &streams[0];
- assert_eq!(stream["stream"].as_str().unwrap(), "test_stream");
+ assert_eq!(stream["stream"].as_str().unwrap(), seeds::names::STREAM);
let topics = stream["topics"].as_array().unwrap();
assert_eq!(topics.len(), 1);
- assert_eq!(topics[0].as_str().unwrap(), "test_topic");
+ assert_eq!(topics[0].as_str().unwrap(), seeds::names::TOPIC);
assert_eq!(stream["schema"].as_str().unwrap(), "json");
assert!(stream["batch_length"].as_u64().is_some());
assert!(stream["poll_interval"].as_str().is_some());
@@ -207,8 +211,9 @@ async fn sink_configs_list_returns_all_versions(
}
#[iggy_harness(
+ shared_server = "http_config_wrapped",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_wrapped.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn sink_config_by_version_returns_specific_version(
harness: &TestHarness,
@@ -239,12 +244,13 @@ async fn sink_config_by_version_returns_specific_version(
let streams = body["streams"].as_array().unwrap();
assert_eq!(streams.len(), 1);
let topics = streams[0]["topics"].as_array().unwrap();
- assert_eq!(topics[0].as_str().unwrap(), "test_topic");
+ assert_eq!(topics[0].as_str().unwrap(), seeds::names::TOPIC);
}
#[iggy_harness(
+ shared_server = "http_config_wrapped",
server(connectors_runtime(config_path =
"tests/connectors/http_config_provider/config_wrapped.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn sink_active_config_returns_current_version(
harness: &TestHarness,
diff --git a/core/integration/tests/connectors/iceberg/iceberg_sink.rs
b/core/integration/tests/connectors/iceberg/iceberg_sink.rs
index 777214116..ad031dd80 100644
--- a/core/integration/tests/connectors/iceberg/iceberg_sink.rs
+++ b/core/integration/tests/connectors/iceberg/iceberg_sink.rs
@@ -37,8 +37,9 @@ const SNAPSHOT_POLL_INTERVAL_MS: u64 = 500;
const BULK_SNAPSHOT_POLL_ATTEMPTS: usize = 60;
#[iggy_harness(
+ shared_server = "iceberg_sink",
server(connectors_runtime(config_path =
"tests/connectors/iceberg/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn iceberg_sink_initializes_and_runs(
harness: &TestHarness,
@@ -68,8 +69,9 @@ async fn iceberg_sink_initializes_and_runs(
}
#[iggy_harness(
+ shared_server = "iceberg_sink",
server(connectors_runtime(config_path =
"tests/connectors/iceberg/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn iceberg_sink_consumes_json_messages(
harness: &TestHarness,
@@ -139,8 +141,9 @@ async fn iceberg_sink_consumes_json_messages(
}
#[iggy_harness(
+ shared_server = "iceberg_sink",
server(connectors_runtime(config_path =
"tests/connectors/iceberg/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn iceberg_sink_handles_bulk_messages(
harness: &TestHarness,
diff --git a/core/integration/tests/connectors/influxdb/influxdb_sink.rs
b/core/integration/tests/connectors/influxdb/influxdb_sink.rs
index cda484cfc..6b82d6428 100644
--- a/core/integration/tests/connectors/influxdb/influxdb_sink.rs
+++ b/core/integration/tests/connectors/influxdb/influxdb_sink.rs
@@ -33,7 +33,8 @@ use serde_json::json;
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_sink",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_sink_writes_messages_to_bucket(
harness: &TestHarness,
@@ -74,7 +75,8 @@ async fn influxdb_sink_writes_messages_to_bucket(
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_sink",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_sink_handles_bulk_messages(harness: &TestHarness, fixture:
InfluxDbSinkFixture) {
let client = harness.root_client().await.unwrap();
@@ -112,7 +114,8 @@ async fn influxdb_sink_handles_bulk_messages(harness:
&TestHarness, fixture: Inf
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_sink",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_sink_payload_fields_stored_correctly(
harness: &TestHarness,
@@ -150,7 +153,8 @@ async fn influxdb_sink_payload_fields_stored_correctly(
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_sink",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_sink_large_batch(harness: &TestHarness, fixture:
InfluxDbSinkFixture) {
let client = harness.root_client().await.unwrap();
@@ -184,7 +188,8 @@ async fn influxdb_sink_large_batch(harness: &TestHarness,
fixture: InfluxDbSinkF
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_sink",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_sink_recovers_backlogged_messages(
harness: &TestHarness,
@@ -224,7 +229,8 @@ async fn influxdb_sink_recovers_backlogged_messages(
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_sink",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_sink_multiple_partitions(harness: &TestHarness, fixture:
InfluxDbSinkFixture) {
let client = harness.root_client().await.unwrap();
diff --git
a/core/integration/tests/connectors/influxdb/influxdb_sink_formats.rs
b/core/integration/tests/connectors/influxdb/influxdb_sink_formats.rs
index d1225c179..e68e78b55 100644
--- a/core/integration/tests/connectors/influxdb/influxdb_sink_formats.rs
+++ b/core/integration/tests/connectors/influxdb/influxdb_sink_formats.rs
@@ -52,7 +52,8 @@ use integration::iggy_harness;
/// with printable ASCII.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink_text.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_sink",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_sink_text_writes_points(harness: &TestHarness, fixture:
InfluxDbSinkTextFixture) {
let client = harness.root_client().await.unwrap();
@@ -91,7 +92,8 @@ async fn influxdb_sink_text_writes_points(harness:
&TestHarness, fixture: Influx
/// Covers: `write_field_string` quote-escape branch.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink_text.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_sink",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_sink_text_with_quote_characters(
harness: &TestHarness,
@@ -131,7 +133,8 @@ async fn influxdb_sink_text_with_quote_characters(
/// Covers: `write_field_string` backslash-escape branch.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink_text.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_sink",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_sink_text_with_backslash(
harness: &TestHarness,
@@ -170,7 +173,8 @@ async fn influxdb_sink_text_with_backslash(
/// path under the Text format.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink_text.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_sink",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_sink_text_bulk(harness: &TestHarness, fixture:
InfluxDbSinkTextFixture) {
let client = harness.root_client().await.unwrap();
@@ -212,7 +216,8 @@ async fn influxdb_sink_text_bulk(harness: &TestHarness,
fixture: InfluxDbSinkTex
/// `general_purpose::STANDARD.encode`.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink_base64.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_sink",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_sink_base64_writes_points(
harness: &TestHarness,
@@ -257,7 +262,8 @@ async fn influxdb_sink_base64_writes_points(
/// needed for base64 output, but exercises the full field-write path).
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink_base64.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_sink",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_sink_base64_roundtrip(harness: &TestHarness, fixture:
InfluxDbSinkBase64Fixture) {
let client = harness.root_client().await.unwrap();
@@ -302,7 +308,8 @@ async fn influxdb_sink_base64_roundtrip(harness:
&TestHarness, fixture: InfluxDb
/// test covers the next-closest case (1-byte payload → 4-char base64 string).
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink_base64.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_sink",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_sink_base64_single_byte_payload(
harness: &TestHarness,
@@ -346,7 +353,8 @@ async fn influxdb_sink_base64_single_byte_payload(
/// branches inside `append_line`.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink_no_metadata.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_sink",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_sink_no_metadata_writes_points(
harness: &TestHarness,
@@ -387,7 +395,8 @@ async fn influxdb_sink_no_metadata_writes_points(
/// with the no-metadata tag configuration.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink_no_metadata.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_sink",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_sink_no_metadata_bulk(
harness: &TestHarness,
@@ -432,7 +441,8 @@ async fn influxdb_sink_no_metadata_bulk(
/// Covers: `"ns"` arm of `to_precision_timestamp`.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_sink",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_sink_ns_precision_writes_points(
harness: &TestHarness,
@@ -475,7 +485,8 @@ async fn influxdb_sink_ns_precision_writes_points(
/// in `PayloadFormat::Json` and `write_field_string` with `{`, `}`, `:`, `"`.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_sink",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_sink_json_nested_payload(harness: &TestHarness, fixture:
InfluxDbSinkFixture) {
let client = harness.root_client().await.unwrap();
@@ -523,7 +534,8 @@ async fn influxdb_sink_json_nested_payload(harness:
&TestHarness, fixture: Influ
/// Covers: `'\n' => buf.push_str("\\n")` branch in `write_field_string`.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/sink_text.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_sink",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_sink_text_with_newline(harness: &TestHarness, fixture:
InfluxDbSinkTextFixture) {
let client = harness.root_client().await.unwrap();
diff --git a/core/integration/tests/connectors/influxdb/influxdb_source.rs
b/core/integration/tests/connectors/influxdb/influxdb_source.rs
index f6eab572e..e2716aeb9 100644
--- a/core/integration/tests/connectors/influxdb/influxdb_source.rs
+++ b/core/integration/tests/connectors/influxdb/influxdb_source.rs
@@ -26,7 +26,8 @@ use tracing::info;
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_source",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_source_polls_and_produces_messages(
harness: &TestHarness,
@@ -84,7 +85,8 @@ async fn influxdb_source_polls_and_produces_messages(
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_source",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_source_message_payload_structure(
harness: &TestHarness,
@@ -140,7 +142,8 @@ async fn influxdb_source_message_payload_structure(
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_source",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_source_empty_bucket_produces_no_messages(
harness: &TestHarness,
@@ -179,7 +182,8 @@ async fn influxdb_source_empty_bucket_produces_no_messages(
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_source",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_source_multiple_measurements(
harness: &TestHarness,
diff --git
a/core/integration/tests/connectors/influxdb/influxdb_source_formats.rs
b/core/integration/tests/connectors/influxdb/influxdb_source_formats.rs
index 98cede6db..089637ce5 100644
--- a/core/integration/tests/connectors/influxdb/influxdb_source_formats.rs
+++ b/core/integration/tests/connectors/influxdb/influxdb_source_formats.rs
@@ -51,7 +51,8 @@ use tracing::info;
/// path), `schema()` → `Schema::Text`.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source_text.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_source",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_source_text_payload_column(
harness: &TestHarness,
@@ -110,7 +111,8 @@ async fn influxdb_source_text_payload_column(
/// Covers: `PayloadFormat::Text` → `Ok(raw_value.into_bytes())`.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source_text.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_source",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_source_text_whitespace_value(
harness: &TestHarness,
@@ -172,7 +174,8 @@ async fn influxdb_source_text_whitespace_value(
/// STANDARD.decode`.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source_raw.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_source",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_source_raw_payload_column(
harness: &TestHarness,
@@ -229,7 +232,8 @@ async fn influxdb_source_raw_payload_column(
/// Covers: the loop in `poll_messages` with `PayloadFormat::Raw`.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source_raw.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_source",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_source_raw_multiple_rows(
harness: &TestHarness,
@@ -290,7 +294,8 @@ async fn influxdb_source_raw_multiple_rows(
/// Covers: `parse_scalar` for `i64`, `f64`, `bool`, and string branches.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_source",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_source_scalar_type_integer(
harness: &TestHarness,
@@ -350,7 +355,8 @@ async fn influxdb_source_scalar_type_integer(
/// Covers `parse_scalar` for `f64` values.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_source",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_source_scalar_type_float(harness: &TestHarness, fixture:
InfluxDbSourceFixture) {
let base_ts: u64 = Utc::now().timestamp_nanos_opt().unwrap_or(0) as u64;
@@ -401,7 +407,8 @@ async fn influxdb_source_scalar_type_float(harness:
&TestHarness, fixture: Influ
/// Covers `parse_scalar` for boolean values (`true`/`false`).
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_source",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_source_scalar_type_bool(harness: &TestHarness, fixture:
InfluxDbSourceFixture) {
let base_ts: u64 = Utc::now().timestamp_nanos_opt().unwrap_or(0) as u64;
@@ -474,7 +481,8 @@ async fn influxdb_source_scalar_type_bool(harness:
&TestHarness, fixture: Influx
/// cursor state persisted via `state.last_timestamp`.
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_source",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_source_cursor_advances_between_batches(
harness: &TestHarness,
@@ -565,7 +573,8 @@ async fn influxdb_source_cursor_advances_between_batches(
/// `build_payload` (whole-row path).
#[iggy_harness(
server(connectors_runtime(config_path =
"tests/connectors/influxdb/source.toml")),
- seed = seeds::connector_stream
+ shared_server = "influxdb_source",
+ seed = seeds::connector_stream_idempotent
)]
async fn influxdb_source_no_metadata_core_fields_present(
harness: &TestHarness,
diff --git a/core/integration/tests/connectors/mongodb/mongodb_sink.rs
b/core/integration/tests/connectors/mongodb/mongodb_sink.rs
index 474b809ed..09c400eb3 100644
--- a/core/integration/tests/connectors/mongodb/mongodb_sink.rs
+++ b/core/integration/tests/connectors/mongodb/mongodb_sink.rs
@@ -48,8 +48,9 @@ fn build_expected_document_id_for_topic(topic_name: &str,
message_id: u128) -> S
}
#[iggy_harness(
+ shared_server = "mongodb_sink",
server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn json_messages_sink_to_mongodb(harness: &TestHarness, fixture:
MongoDbSinkJsonFixture) {
let client = harness.root_client().await.unwrap();
@@ -136,8 +137,9 @@ async fn json_messages_sink_to_mongodb(harness:
&TestHarness, fixture: MongoDbSi
}
#[iggy_harness(
+ shared_server = "mongodb_sink",
server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn binary_messages_sink_as_bson_binary(harness: &TestHarness, fixture:
MongoDbSinkFixture) {
let client = harness.root_client().await.unwrap();
@@ -204,8 +206,9 @@ async fn binary_messages_sink_as_bson_binary(harness:
&TestHarness, fixture: Mon
}
#[iggy_harness(
+ shared_server = "mongodb_sink",
server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn large_batch_processed_correctly(harness: &TestHarness, fixture:
MongoDbSinkBatchFixture) {
let client = harness.root_client().await.unwrap();
@@ -258,8 +261,9 @@ async fn large_batch_processed_correctly(harness:
&TestHarness, fixture: MongoDb
}
#[iggy_harness(
+ shared_server = "mongodb_sink",
server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn duplicate_key_is_idempotent_replay_not_sink_error(
harness: &TestHarness,
@@ -441,8 +445,9 @@ async fn duplicate_key_is_idempotent_replay_not_sink_error(
}
#[iggy_harness(
+ shared_server = "mongodb_sink",
server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn ordered_duplicate_partial_insert_has_exact_accounting(
harness: &TestHarness,
@@ -617,8 +622,9 @@ async fn
ordered_duplicate_partial_insert_has_exact_accounting(
}
#[iggy_harness(
+ shared_server = "mongodb_sink",
server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn schema_validation_mid_batch_surfaces_hard_error_and_partial_prefix(
harness: &TestHarness,
@@ -729,8 +735,9 @@ async fn
schema_validation_mid_batch_surfaces_hard_error_and_partial_prefix(
}
#[iggy_harness(
+ shared_server = "mongodb_sink",
server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn write_concern_timeout_does_not_report_full_success(
harness: &TestHarness,
@@ -820,8 +827,9 @@ async fn write_concern_timeout_does_not_report_full_success(
}
#[iggy_harness(
+ shared_server = "mongodb_sink",
server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn retryable_write_failover_keeps_single_doc_per_id(
harness: &TestHarness,
@@ -967,8 +975,9 @@ async fn retryable_write_failover_keeps_single_doc_per_id(
}
#[iggy_harness(
+ shared_server = "mongodb_sink",
server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn no_writes_performed_label_path_preserves_state_accuracy(
harness: &TestHarness,
@@ -1115,8 +1124,9 @@ async fn
no_writes_performed_label_path_preserves_state_accuracy(
}
#[iggy_harness(
+ shared_server = "mongodb_sink",
server(connectors_runtime(config_path =
"tests/connectors/mongodb/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn auto_create_collection_on_open(
harness: &TestHarness,
@@ -1157,6 +1167,5 @@ async fn auto_create_collection_on_open(
"Collection should be empty after open() with no messages"
);
- // Suppress unused harness warning.
let _ = harness;
}
diff --git a/core/integration/tests/connectors/postgres/postgres_sink.rs
b/core/integration/tests/connectors/postgres/postgres_sink.rs
index 2032c025e..6b5b87022 100644
--- a/core/integration/tests/connectors/postgres/postgres_sink.rs
+++ b/core/integration/tests/connectors/postgres/postgres_sink.rs
@@ -35,8 +35,9 @@ type SinkRow = (i64, String, String, Vec<u8>);
type SinkJsonRow = (i64, serde_json::Value);
#[iggy_harness(
+ shared_server = "postgres_sink",
server(connectors_runtime(config_path =
"tests/connectors/postgres/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn json_messages_sink_stores_as_bytea(harness: &TestHarness, fixture:
PostgresSinkFixture) {
let client = harness.root_client().await.unwrap();
@@ -97,8 +98,9 @@ async fn json_messages_sink_stores_as_bytea(harness:
&TestHarness, fixture: Post
}
#[iggy_harness(
+ shared_server = "postgres_sink",
server(connectors_runtime(config_path =
"tests/connectors/postgres/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn binary_messages_sink_stores_as_bytea(
harness: &TestHarness,
@@ -161,8 +163,9 @@ async fn binary_messages_sink_stores_as_bytea(
}
#[iggy_harness(
+ shared_server = "postgres_sink",
server(connectors_runtime(config_path =
"tests/connectors/postgres/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn json_messages_sink_stores_as_jsonb(
harness: &TestHarness,
diff --git a/core/integration/tests/connectors/postgres/postgres_source.rs
b/core/integration/tests/connectors/postgres/postgres_source.rs
index 5f4a79af8..ce5cb99e2 100644
--- a/core/integration/tests/connectors/postgres/postgres_source.rs
+++ b/core/integration/tests/connectors/postgres/postgres_source.rs
@@ -32,8 +32,9 @@ use std::time::Duration;
use tokio::time::sleep;
#[iggy_harness(
+ shared_server = "postgres_source",
server(connectors_runtime(config_path =
"tests/connectors/postgres/source.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn json_rows_source_produces_messages_to_iggy(
harness: &TestHarness,
@@ -113,8 +114,9 @@ async fn json_rows_source_produces_messages_to_iggy(
}
#[iggy_harness(
+ shared_server = "postgres_source",
server(connectors_runtime(config_path =
"tests/connectors/postgres/source.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn bytea_rows_source_produces_raw_messages_to_iggy(
harness: &TestHarness,
@@ -176,8 +178,9 @@ async fn bytea_rows_source_produces_raw_messages_to_iggy(
}
#[iggy_harness(
+ shared_server = "postgres_source",
server(connectors_runtime(config_path =
"tests/connectors/postgres/source.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn jsonb_rows_source_produces_json_messages_to_iggy(
harness: &TestHarness,
@@ -243,8 +246,9 @@ async fn jsonb_rows_source_produces_json_messages_to_iggy(
}
#[iggy_harness(
+ shared_server = "postgres_source",
server(connectors_runtime(config_path =
"tests/connectors/postgres/source.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn delete_after_read_source_removes_rows_after_producing(
harness: &TestHarness,
@@ -313,8 +317,9 @@ async fn
delete_after_read_source_removes_rows_after_producing(
}
#[iggy_harness(
+ shared_server = "postgres_source",
server(connectors_runtime(config_path =
"tests/connectors/postgres/source.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn processed_column_source_marks_rows_after_producing(
harness: &TestHarness,
diff --git a/core/integration/tests/connectors/postgres/restart.rs
b/core/integration/tests/connectors/postgres/restart.rs
index e6e5f203b..cdba541ee 100644
--- a/core/integration/tests/connectors/postgres/restart.rs
+++ b/core/integration/tests/connectors/postgres/restart.rs
@@ -38,8 +38,9 @@ const SINK_KEY: &str = "postgres";
type SinkRow = (i64, String, String, Vec<u8>);
#[iggy_harness(
+ shared_server = "postgres_sink",
server(connectors_runtime(config_path =
"tests/connectors/postgres/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn restart_sink_connector_continues_processing(
harness: &TestHarness,
@@ -127,8 +128,9 @@ async fn restart_sink_connector_continues_processing(
}
#[iggy_harness(
+ shared_server = "postgres_sink",
server(connectors_runtime(config_path =
"tests/connectors/postgres/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn parallel_restart_requests_should_not_break_connector(
harness: &TestHarness,
diff --git a/core/integration/tests/connectors/quickwit/quickwit_sink.rs
b/core/integration/tests/connectors/quickwit/quickwit_sink.rs
index ce3abd364..b7fe69b10 100644
--- a/core/integration/tests/connectors/quickwit/quickwit_sink.rs
+++ b/core/integration/tests/connectors/quickwit/quickwit_sink.rs
@@ -28,8 +28,9 @@ use integration::iggy_harness;
use serde::{Deserialize, Serialize};
#[iggy_harness(
+ shared_server = "quickwit_sink",
server(connectors_runtime(config_path =
"tests/connectors/quickwit/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn given_existent_quickwit_index_should_store(
harness: &TestHarness,
@@ -83,8 +84,9 @@ async fn given_existent_quickwit_index_should_store(
}
#[iggy_harness(
+ shared_server = "quickwit_sink",
server(connectors_runtime(config_path =
"tests/connectors/quickwit/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn given_nonexistent_quickwit_index_should_create_and_store(
harness: &TestHarness,
@@ -138,8 +140,9 @@ async fn
given_nonexistent_quickwit_index_should_create_and_store(
}
#[iggy_harness(
+ shared_server = "quickwit_sink",
server(connectors_runtime(config_path =
"tests/connectors/quickwit/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn given_bulk_message_send_should_store(
harness: &TestHarness,
@@ -193,8 +196,9 @@ async fn given_bulk_message_send_should_store(
}
#[iggy_harness(
+ shared_server = "quickwit_sink",
server(connectors_runtime(config_path =
"tests/connectors/quickwit/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn given_invalid_messages_should_not_store(harness: &TestHarness,
fixture: QuickwitFixture) {
let client = harness.root_client().await.unwrap();
diff --git a/core/integration/tests/connectors/random/random_source.rs
b/core/integration/tests/connectors/random/random_source.rs
index 5efb6fd08..4d44d1bfe 100644
--- a/core/integration/tests/connectors/random/random_source.rs
+++ b/core/integration/tests/connectors/random/random_source.rs
@@ -25,8 +25,9 @@ use std::time::Duration;
use tokio::time::sleep;
#[iggy_harness(
+ shared_server = "random_source",
server(connectors_runtime(config_path =
"tests/connectors/random/source.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn random_source_produces_messages(harness: &TestHarness) {
sleep(Duration::from_secs(1)).await;
diff --git a/core/integration/tests/connectors/stdout/stdout_sink.rs
b/core/integration/tests/connectors/stdout/stdout_sink.rs
index be247588c..2dfe8758e 100644
--- a/core/integration/tests/connectors/stdout/stdout_sink.rs
+++ b/core/integration/tests/connectors/stdout/stdout_sink.rs
@@ -32,8 +32,9 @@ use tokio::time::sleep;
const API_KEY: &str = "test-api-key";
#[iggy_harness(
+ shared_server = "stdout_sink",
server(connectors_runtime(config_path =
"tests/connectors/stdout/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn stdout_sink_consumes_messages(harness: &TestHarness) {
let client = harness.root_client().await.unwrap();
@@ -90,8 +91,9 @@ async fn stdout_sink_consumes_messages(harness: &TestHarness)
{
}
#[iggy_harness(
+ shared_server = "stdout_sink",
server(connectors_runtime(config_path =
"tests/connectors/stdout/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn stdout_sink_reports_metrics(harness: &TestHarness) {
let client = harness.root_client().await.unwrap();
@@ -150,8 +152,9 @@ async fn stdout_sink_reports_metrics(harness: &TestHarness)
{
}
#[iggy_harness(
+ shared_server = "stdout_sink",
server(connectors_runtime(config_path =
"tests/connectors/stdout/sink.toml")),
- seed = seeds::connector_stream
+ seed = seeds::connector_stream_idempotent
)]
async fn stdout_sink_handles_bulk_messages(harness: &TestHarness) {
let client = harness.root_client().await.unwrap();