This is an automated email from the ASF dual-hosted git repository.

piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 2afeaa3ff feat(test): add shared_server to iggy_harness for server 
reuse across tests (#3085)
2afeaa3ff is described below

commit 2afeaa3ff19bee422a4831713caaf97ed5be0243
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Apr 7 13:32:26 2026 +0200

    feat(test): add shared_server to iggy_harness for server reuse across tests 
(#3085)
    
    Connector integration tests each spawn their own iggy-server,
    causing resource contention and timing-sensitive CI failures
    (e.g. WireMock not receiving requests within retry limit).
    
    Add a `shared_server = "key"` attribute to #[iggy_harness]
    that lets tests with the same key share a single iggy-server
    process. Tests within a group run serially via
    #[serial_test::serial(key)], groups run in parallel with
    each other. Each test still gets its own connector runtime,
    fixtures, and clients - only the server is shared.
    
    Idempotent seed functions (connector_stream_idempotent)
    skip creation if stream/topic already exists, allowing
    serial tests to safely reuse the same server state.
    
    Infrastructure:
    - SharedServerRegistry (DashMap + tokio::sync::OnceCell)
      for thread-safe lazy server initialization
    - Atomic refcounting for lifecycle: last test stops server
      and cleans up (preserved on failure for debugging)
    - TestHarness::from_shared() + start_shared_with_seed()
    - Compile-time validation rejects incompatible combinations
      (&mut TestHarness, config overrides, cluster, TLS)
    
    Migrates 113 connector tests across 18 files. Three restart
    tests using &mut TestHarness remain on dedicated servers.
---
 .config/nextest.toml                               |  14 +-
 Cargo.lock                                         |   1 +
 core/harness_derive/src/attrs.rs                   |  23 +++
 core/harness_derive/src/codegen.rs                 | 173 +++++++++++++++++-
 core/integration/Cargo.toml                        |   1 +
 core/integration/src/harness/handle/mod.rs         |   2 +-
 core/integration/src/harness/mod.rs                |   2 +
 .../src/harness/orchestrator/builder.rs            |   2 +
 .../src/harness/orchestrator/harness.rs            | 198 +++++++++++++++++++--
 core/integration/src/harness/seeds.rs              |  71 +++++++-
 core/integration/src/harness/shared.rs             | 159 +++++++++++++++++
 core/integration/src/lib.rs                        |   3 +-
 core/integration/tests/connectors/api/endpoints.rs |  24 ++-
 .../connectors/elasticsearch/elasticsearch_sink.rs |   9 +-
 .../elasticsearch/elasticsearch_source.rs          |   9 +-
 .../integration/tests/connectors/http/http_sink.rs |  26 +--
 .../http_config_provider/direct_responses.rs       |  30 ++--
 .../http_config_provider/wrapped_responses.rs      |  30 ++--
 .../tests/connectors/iceberg/iceberg_sink.rs       |   9 +-
 .../tests/connectors/influxdb/influxdb_sink.rs     |  18 +-
 .../connectors/influxdb/influxdb_sink_formats.rs   |  36 ++--
 .../tests/connectors/influxdb/influxdb_source.rs   |  12 +-
 .../connectors/influxdb/influxdb_source_formats.rs |  27 ++-
 .../tests/connectors/mongodb/mongodb_sink.rs       |  31 ++--
 .../tests/connectors/postgres/postgres_sink.rs     |   9 +-
 .../tests/connectors/postgres/postgres_source.rs   |  15 +-
 .../tests/connectors/postgres/restart.rs           |   6 +-
 .../tests/connectors/quickwit/quickwit_sink.rs     |  12 +-
 .../tests/connectors/random/random_source.rs       |   3 +-
 .../tests/connectors/stdout/stdout_sink.rs         |   9 +-
 30 files changed, 827 insertions(+), 137 deletions(-)

diff --git a/.config/nextest.toml b/.config/nextest.toml
index bb29d9651..2dd295fb8 100644
--- a/.config/nextest.toml
+++ b/.config/nextest.toml
@@ -15,11 +15,21 @@
 # specific language governing permissions and limitations
 # under the License.
 
+[profile.default]
+slow-timeout = { period = "30s", terminate-after = 4 }
+
 [[profile.default.overrides]]
 # This is a solution (or actually a workaround) for the problem that nextest 
does not support
 # #[serial] macro which shall enforce sequential execution of the test case.
 filter = 'package(integration) and 
test(cli::system::test_cli_session_scenario::should_be_successful)'
 threads-required = "num-cpus"
 
-[profile.default]
-slow-timeout = { period = "30s", terminate-after = 4 }
+# shared_server tests: the iggy_harness macro prefixes test names with
+# shared_server_ enabling a single generic nextest filter. Bounded to limit
+# resource contention from parallel fixture containers without fully 
serializing.
+[test-groups.shared-server]
+max-threads = 8
+
+[[profile.default.overrides]]
+filter = 'package(integration) and test(/shared_server_/)'
+test-group = 'shared-server'
diff --git a/Cargo.lock b/Cargo.lock
index cb11aa7ff..26c32044b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6151,6 +6151,7 @@ dependencies = [
  "configs",
  "configs_derive",
  "ctor",
+ "dashmap",
  "figment",
  "futures",
  "harness_derive",
diff --git a/core/harness_derive/src/attrs.rs b/core/harness_derive/src/attrs.rs
index 2e4389faa..fa67e9239 100644
--- a/core/harness_derive/src/attrs.rs
+++ b/core/harness_derive/src/attrs.rs
@@ -48,6 +48,12 @@ impl ClusterNodesValue {
     }
 }
 
+/// Shared server configuration.
+#[derive(Debug, Clone)]
+pub struct SharedServerAttrs {
+    pub key: String,
+}
+
 /// Parsed `#[iggy_harness(...)]` attributes.
 #[derive(Debug, Default)]
 pub struct IggyTestAttrs {
@@ -57,6 +63,7 @@ pub struct IggyTestAttrs {
     pub server: ServerAttrs,
     pub seed_fn: Option<syn::Path>,
     pub cluster_nodes: ClusterNodesValue,
+    pub shared_server: Option<SharedServerAttrs>,
 }
 
 /// MCP configuration attributes.
@@ -82,6 +89,7 @@ impl IggyTestAttrs {
             server: ServerAttrs::default(),
             seed_fn: None,
             cluster_nodes: ClusterNodesValue::None,
+            shared_server: None,
         }
     }
 }
@@ -248,6 +256,9 @@ impl Parse for IggyTestAttrs {
                 AttrItem::ClusterNodes(cluster) => {
                     attrs.cluster_nodes = cluster;
                 }
+                AttrItem::SharedServer(shared) => {
+                    attrs.shared_server = Some(shared);
+                }
             }
         }
 
@@ -264,6 +275,7 @@ enum AttrItem {
     Server(Box<ServerAttrs>),
     Seed(syn::Path),
     ClusterNodes(ClusterNodesValue),
+    SharedServer(SharedServerAttrs),
 }
 
 impl Parse for AttrItem {
@@ -293,6 +305,10 @@ impl Parse for AttrItem {
                 let path: syn::Path = input.parse()?;
                 Ok(AttrItem::Seed(path))
             }
+            "shared_server" => {
+                let shared = parse_shared_server_attrs(input)?;
+                Ok(AttrItem::SharedServer(shared))
+            }
             _ => Err(syn::Error::new(
                 ident.span(),
                 format!("unknown attribute: {ident_str}"),
@@ -473,6 +489,13 @@ fn parse_mcp_attrs(input: ParseStream) -> 
syn::Result<McpAttrs> {
     Ok(mcp)
 }
 
+/// Parse `shared_server = "key"`.
+fn parse_shared_server_attrs(input: ParseStream) -> 
syn::Result<SharedServerAttrs> {
+    input.parse::<Token![=]>()?;
+    let lit: LitStr = input.parse()?;
+    Ok(SharedServerAttrs { key: lit.value() })
+}
+
 fn parse_connectors_runtime_attrs(input: ParseStream) -> 
syn::Result<ConnectorsRuntimeAttrs> {
     let mut attrs = ConnectorsRuntimeAttrs::default();
 
diff --git a/core/harness_derive/src/codegen.rs 
b/core/harness_derive/src/codegen.rs
index 2abd54368..ba3b2b679 100644
--- a/core/harness_derive/src/codegen.rs
+++ b/core/harness_derive/src/codegen.rs
@@ -138,6 +138,12 @@ pub fn generate_tests(attrs: &IggyTestAttrs, input: 
&ItemFn) -> syn::Result<Toke
         .collect();
 
     let params = analyze_signature(&input.sig)?;
+
+    // Validate shared_server constraints
+    if attrs.shared_server.is_some() {
+        validate_shared_server(attrs, &params, fn_name)?;
+    }
+
     let matrix_params_list = matrix_params(&params);
     let has_matrix_params = !matrix_params_list.is_empty();
 
@@ -172,6 +178,52 @@ pub fn generate_tests(attrs: &IggyTestAttrs, input: 
&ItemFn) -> syn::Result<Toke
     )
 }
 
+fn validate_shared_server(
+    attrs: &IggyTestAttrs,
+    params: &[DetectedParam],
+    fn_name: &Ident,
+) -> syn::Result<()> {
+    let has_mut_harness = params
+        .iter()
+        .any(|p| matches!(p, DetectedParam::HarnessMut { .. }));
+    if has_mut_harness {
+        return Err(syn::Error::new(
+            fn_name.span(),
+            "shared_server is incompatible with &mut TestHarness",
+        ));
+    }
+
+    if !attrs.server.config_overrides.is_empty() {
+        return Err(syn::Error::new(
+            fn_name.span(),
+            "shared_server is incompatible with server config overrides",
+        ));
+    }
+
+    if !matches!(attrs.cluster_nodes, crate::attrs::ClusterNodesValue::None) {
+        return Err(syn::Error::new(
+            fn_name.span(),
+            "shared_server is incompatible with cluster_nodes",
+        ));
+    }
+
+    if attrs.server.tls.is_some() || attrs.server.websocket_tls.is_some() {
+        return Err(syn::Error::new(
+            fn_name.span(),
+            "shared_server is incompatible with TLS configuration",
+        ));
+    }
+
+    if attrs.transports.iter().any(|t| t.tls_mode().is_some()) {
+        return Err(syn::Error::new(
+            fn_name.span(),
+            "shared_server is incompatible with TLS transports",
+        ));
+    }
+
+    Ok(())
+}
+
 fn generate_single_test(
     fn_name: &Ident,
     fn_vis: &syn::Visibility,
@@ -185,11 +237,34 @@ fn generate_single_test(
     let fixture_setup = generate_fixture_setup(params);
     let fixture_envs = generate_fixture_envs_collection(params);
     let fixture_param_bindings = generate_fixture_param_bindings(params);
-    let harness_setup = generate_harness_setup(variant, has_fixtures, attrs);
     let fixture_seed = generate_fixture_seed(params);
-    let start_and_seed = generate_start_and_seed(attrs, fixture_seed);
     let harness_param_bindings = generate_harness_param_bindings(params);
 
+    if let Some(ref shared) = attrs.shared_server {
+        let shared_harness_setup = generate_shared_harness_setup(variant, 
has_fixtures, attrs);
+        let shared_start_and_seed = generate_shared_start_and_seed(attrs, 
fixture_seed);
+        let key_ident = Ident::new(&shared.key, Span::call_site());
+        let shared_fn_name = format_ident!("shared_server_{}", fn_name);
+
+        return Ok(quote! {
+            #(#other_attrs)*
+            #[::tokio::test]
+            #[::serial_test::serial(#key_ident)]
+            #fn_vis async fn #shared_fn_name() {
+                #fixture_setup
+                #fixture_envs
+                #fixture_param_bindings
+                #shared_harness_setup
+                #shared_start_and_seed
+                #harness_param_bindings
+                #fn_body
+            }
+        });
+    }
+
+    let harness_setup = generate_harness_setup(variant, has_fixtures, attrs);
+    let start_and_seed = generate_start_and_seed(attrs, fixture_seed);
+
     Ok(quote! {
         #(#other_attrs)*
         #[::tokio::test]
@@ -545,6 +620,99 @@ fn generate_start_and_seed(attrs: &IggyTestAttrs, 
fixture_seed: TokenStream) ->
     }
 }
 
+// ============================================================================
+// Shared server code generation helpers
+// ============================================================================
+
+fn generate_shared_harness_setup(
+    variant: &TestVariant,
+    has_fixtures: bool,
+    attrs: &IggyTestAttrs,
+) -> TokenStream {
+    let shared = attrs
+        .shared_server
+        .as_ref()
+        .expect("shared_server required");
+    let key = &shared.key;
+    let transport = variant.transport.variant_ident();
+    let client_config_method =
+        Ident::new(variant.transport.client_config_method(), 
Span::call_site());
+
+    let cr_config = if let Some(ref cr) = attrs.server.connectors_runtime {
+        let config_path = cr.config_path.as_deref().unwrap_or("");
+        if has_fixtures {
+            quote! {
+                
Some(::integration::__macro_support::ConnectorsRuntimeConfig::builder()
+                    .config_path(::std::path::PathBuf::from(#config_path))
+                    .extra_envs(__fixture_envs.clone())
+                    .build())
+            }
+        } else {
+            quote! {
+                
Some(::integration::__macro_support::ConnectorsRuntimeConfig::builder()
+                    .config_path(::std::path::PathBuf::from(#config_path))
+                    .build())
+            }
+        }
+    } else {
+        quote! { None }
+    };
+
+    quote! {
+        let __shared = 
::integration::__macro_support::SharedServerRegistry::get_or_start(
+            #key,
+            ::integration::__macro_support::TestServerConfig::default(),
+        ).await.unwrap_or_else(|e| panic!("failed to get shared server: {e}"));
+
+        let mut __harness = 
::integration::__macro_support::TestHarness::from_shared(
+            __shared,
+            #cr_config,
+            
Some(::integration::__macro_support::ClientConfig::#client_config_method()),
+        ).await.unwrap_or_else(|e| panic!("failed to build shared harness: 
{e}"));
+        let _ = ::integration::__macro_support::TransportProtocol::#transport;
+    }
+}
+
+fn generate_shared_start_and_seed(attrs: &IggyTestAttrs, fixture_seed: 
TokenStream) -> TokenStream {
+    let has_fixture_seed = !fixture_seed.is_empty();
+
+    match (&attrs.seed_fn, has_fixture_seed) {
+        (Some(seed_fn), true) => {
+            quote! {
+                __harness.start_shared_with_seed(|__seed_client| async move {
+                    #seed_fn(&__seed_client).await?;
+                    #fixture_seed
+                    Ok(())
+                }).await.unwrap_or_else(|e| panic!("failed to start shared 
harness: {e}"));
+            }
+        }
+        (Some(seed_fn), false) => {
+            quote! {
+                __harness.start_shared_with_seed(|__seed_client| async move {
+                    #seed_fn(&__seed_client).await
+                }).await.unwrap_or_else(|e| panic!("failed to start shared 
harness: {e}"));
+            }
+        }
+        (None, true) => {
+            quote! {
+                __harness.start_shared_with_seed(|__seed_client| async move {
+                    #fixture_seed
+                    Ok(())
+                }).await.unwrap_or_else(|e| panic!("failed to start shared 
harness: {e}"));
+            }
+        }
+        (None, false) => {
+            quote! {
+                __harness.start_shared().await.unwrap_or_else(|e| 
panic!("failed to start shared harness: {e}"));
+            }
+        }
+    }
+}
+
+// ============================================================================
+// Standard (non-shared) code generation helpers
+// ============================================================================
+
 fn fixture_var_ident(name: &syn::Ident) -> syn::Ident {
     let name_str = name.to_string();
     let clean_name = name_str.trim_start_matches('_');
@@ -795,6 +963,7 @@ mod tests {
             },
             seed_fn: None,
             cluster_nodes: crate::attrs::ClusterNodesValue::None,
+            shared_server: None,
         };
         let variants = generate_variants(&attrs);
         // 2 transports * 2 segment sizes * 2 cache modes = 8 variants
diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml
index c8830b19f..513dc92d9 100644
--- a/core/integration/Cargo.toml
+++ b/core/integration/Cargo.toml
@@ -36,6 +36,7 @@ compio = { workspace = true }
 configs = { workspace = true }
 configs_derive = { workspace = true }
 ctor = { workspace = true }
+dashmap = { workspace = true }
 figment = { workspace = true }
 futures = { workspace = true }
 harness_derive = { workspace = true }
diff --git a/core/integration/src/harness/handle/mod.rs 
b/core/integration/src/harness/handle/mod.rs
index da746d175..a31ad7535 100644
--- a/core/integration/src/harness/handle/mod.rs
+++ b/core/integration/src/harness/handle/mod.rs
@@ -25,7 +25,7 @@ mod mcp;
 mod server;
 
 pub use client::ClientHandle;
-pub use client_builder::ClientBuilder;
+pub use client_builder::{ClientBuilder, ServerConnection};
 pub use connectors_runtime::ConnectorsRuntimeHandle;
 pub use mcp::{McpClient, McpHandle};
 pub use server::{ServerHandle, ServerLogs};
diff --git a/core/integration/src/harness/mod.rs 
b/core/integration/src/harness/mod.rs
index 13793ab05..c8d3757ae 100644
--- a/core/integration/src/harness/mod.rs
+++ b/core/integration/src/harness/mod.rs
@@ -51,6 +51,7 @@ mod helpers;
 mod orchestrator;
 mod port_reserver;
 pub mod seeds;
+mod shared;
 mod traits;
 
 pub use config::{
@@ -71,3 +72,4 @@ pub use helpers::{
 };
 
 pub use fixture::TestFixture;
+pub use shared::{SharedServerInfo, SharedServerRegistry};
diff --git a/core/integration/src/harness/orchestrator/builder.rs 
b/core/integration/src/harness/orchestrator/builder.rs
index 46afddd3b..668d9a193 100644
--- a/core/integration/src/harness/orchestrator/builder.rs
+++ b/core/integration/src/harness/orchestrator/builder.rs
@@ -226,6 +226,8 @@ impl TestHarnessBuilder {
             primary_transport,
             primary_client_config: self.primary_client_config,
             started: false,
+            shared_server: None,
+            shared_connectors_runtime: None,
         })
     }
 }
diff --git a/core/integration/src/harness/orchestrator/harness.rs 
b/core/integration/src/harness/orchestrator/harness.rs
index 6897b31b4..ca99731c2 100644
--- a/core/integration/src/harness/orchestrator/harness.rs
+++ b/core/integration/src/harness/orchestrator/harness.rs
@@ -18,14 +18,15 @@
  */
 
 use super::builder::TestHarnessBuilder;
-use crate::harness::config::ClientConfig;
+use crate::harness::config::{ClientConfig, ConnectorsRuntimeConfig};
 use crate::harness::context::TestContext;
 use crate::harness::error::TestBinaryError;
 use crate::harness::handle::{
-    ClientBuilder, ClientHandle, ConnectorsRuntimeHandle, McpClient, 
McpHandle, ServerHandle,
-    ServerLogs,
+    ClientBuilder, ClientHandle, ConnectorsRuntimeHandle, McpClient, 
McpHandle, ServerConnection,
+    ServerHandle, ServerLogs,
 };
-use crate::harness::traits::{Restartable, TestBinary};
+use crate::harness::shared::SharedServerInfo;
+use crate::harness::traits::{IggyServerDependent, Restartable, TestBinary};
 use futures::executor::block_on;
 use iggy::prelude::{ClientWrapper, IggyClient};
 use iggy_common::TransportProtocol;
@@ -47,6 +48,8 @@ pub struct TestHarness {
     pub(super) primary_transport: Option<TransportProtocol>,
     pub(super) primary_client_config: Option<ClientConfig>,
     pub(super) started: bool,
+    pub(super) shared_server: Option<Arc<SharedServerInfo>>,
+    pub(super) shared_connectors_runtime: Option<ConnectorsRuntimeHandle>,
 }
 
 impl std::fmt::Debug for TestHarness {
@@ -72,6 +75,39 @@ impl TestHarness {
         TestHarnessBuilder::default()
     }
 
+    /// Create a harness that uses a shared server (no ownership of server 
process).
+    ///
+    /// Each test still gets its own `TestContext` (for logs), connector 
runtime,
+    /// and clients. Only the iggy-server process is shared.
+    pub async fn from_shared(
+        shared: Arc<SharedServerInfo>,
+        connectors_runtime_config: Option<ConnectorsRuntimeConfig>,
+        primary_client_config: Option<ClientConfig>,
+    ) -> Result<Self, TestBinaryError> {
+        let mut context = TestContext::new(None, true)?;
+        context.ensure_created()?;
+        let context = Arc::new(context);
+
+        let shared_cr = connectors_runtime_config
+            .map(|cfg| ConnectorsRuntimeHandle::with_server_id(cfg, 
context.clone(), 0));
+
+        let primary_transport = primary_client_config.as_ref().map(|c| 
c.transport);
+
+        shared.acquire();
+
+        Ok(TestHarness {
+            context,
+            servers: Vec::new(),
+            clients: Vec::new(),
+            client_configs: Vec::new(),
+            primary_transport,
+            primary_client_config,
+            started: false,
+            shared_server: Some(shared),
+            shared_connectors_runtime: shared_cr,
+        })
+    }
+
     /// Start all configured binaries and create clients.
     pub async fn start(&mut self) -> Result<(), TestBinaryError> {
         self.start_internal(
@@ -97,6 +133,76 @@ impl TestHarness {
         self.start_internal(Some(seed)).await
     }
 
+    /// Start a shared-server harness without a seed function.
+    pub async fn start_shared(&mut self) -> Result<(), TestBinaryError> {
+        self.start_shared_internal(
+            None::<
+                fn(
+                    IggyClient,
+                )
+                    -> std::future::Ready<Result<(), Box<dyn std::error::Error 
+ Send + Sync>>>,
+            >,
+        )
+        .await
+    }
+
+    /// Start a shared-server harness with a seed function.
+    ///
+    /// The seed runs BEFORE the connector runtime starts, allowing it to
+    /// create streams/topics that the connector expects to find.
+    pub async fn start_shared_with_seed<F, Fut>(&mut self, seed: F) -> 
Result<(), TestBinaryError>
+    where
+        F: FnOnce(IggyClient) -> Fut,
+        Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error 
+ Send + Sync>>>,
+    {
+        self.start_shared_internal(Some(seed)).await
+    }
+
+    async fn start_shared_internal<F, Fut>(
+        &mut self,
+        seed: Option<F>,
+    ) -> Result<(), TestBinaryError>
+    where
+        F: FnOnce(IggyClient) -> Fut,
+        Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error 
+ Send + Sync>>>,
+    {
+        if self.started {
+            return Err(TestBinaryError::AlreadyStarted);
+        }
+
+        let shared = self
+            .shared_server
+            .as_ref()
+            .ok_or_else(|| TestBinaryError::InvalidState {
+                message: "start_shared called without shared 
server".to_string(),
+            })?;
+
+        let tcp_addr = shared
+            .tcp_addr()
+            .ok_or_else(|| TestBinaryError::InvalidState {
+                message: "Shared server has no TCP address".to_string(),
+            })?;
+
+        // Seed runs before connector runtime - creates streams/topics
+        if let Some(seed_fn) = seed {
+            let client = self.tcp_root_client().await?;
+            seed_fn(client)
+                .await
+                .map_err(|e| TestBinaryError::SeedFailed(e.to_string()))?;
+        }
+
+        // Connector runtime starts after seed - streams/topics exist now
+        if let Some(ref mut cr) = self.shared_connectors_runtime {
+            cr.set_iggy_address(tcp_addr);
+            cr.start()?;
+            cr.wait_ready().await?;
+        }
+
+        self.create_clients().await?;
+        self.started = true;
+        Ok(())
+    }
+
     async fn start_internal<F, Fut>(&mut self, seed: Option<F>) -> Result<(), 
TestBinaryError>
     where
         F: FnOnce(IggyClient) -> Fut,
@@ -138,6 +244,20 @@ impl TestHarness {
         }
         self.clients.clear();
 
+        // Stop per-test connector runtime (shared server mode)
+        if let Some(ref mut cr) = self.shared_connectors_runtime {
+            cr.stop()?;
+        }
+
+        // Release shared server ref - last test stops the server and cleans up
+        if let Some(shared) = self.shared_server.take() {
+            if std::thread::panicking() {
+                shared.mark_failed();
+            }
+            shared.release();
+        }
+
+        // Stop owned servers (non-shared mode only)
         for server in self.servers.iter_mut().rev() {
             server.stop_dependents()?;
             server.stop()?;
@@ -168,12 +288,24 @@ impl TestHarness {
     }
 
     /// Get reference to the first (primary) server handle.
+    ///
+    /// Not available in `shared_server` mode - the server is managed by 
`SharedServerRegistry`.
     pub fn server(&self) -> &ServerHandle {
+        assert!(
+            self.shared_server.is_none(),
+            "server() is not available in shared_server mode"
+        );
         self.servers.first().expect("No servers configured")
     }
 
     /// Get mutable reference to the first (primary) server handle.
+    ///
+    /// Not available in `shared_server` mode - the server is managed by 
`SharedServerRegistry`.
     pub fn server_mut(&mut self) -> &mut ServerHandle {
+        assert!(
+            self.shared_server.is_none(),
+            "server_mut() is not available in shared_server mode"
+        );
         self.servers.first_mut().expect("No servers configured")
     }
 
@@ -250,9 +382,15 @@ impl TestHarness {
 
     /// Get the connectors runtime handle from the primary server if 
configured.
     ///
+    /// Returns the per-test connector runtime for shared-server harnesses,
+    /// or the server-owned one for standard harnesses.
+    ///
     /// # Panics
     /// Panics if called on a cluster (multiple servers). Use 
`node(i).connectors_runtime()` instead.
     pub fn connectors_runtime(&self) -> Option<&ConnectorsRuntimeHandle> {
+        if self.shared_connectors_runtime.is_some() {
+            return self.shared_connectors_runtime.as_ref();
+        }
         assert!(
             self.servers.len() <= 1,
             "connectors_runtime() is only available for single-server setups. 
Use node(i).connectors_runtime() for clusters."
@@ -287,6 +425,18 @@ impl TestHarness {
         &self,
         transport: TransportProtocol,
     ) -> Result<ClientBuilder, TestBinaryError> {
+        if let Some(ref shared) = self.shared_server {
+            let connection = ServerConnection {
+                tcp_addr: shared.tcp_addr(),
+                http_addr: shared.http_addr(),
+                quic_addr: shared.quic_addr(),
+                websocket_addr: shared.websocket_addr(),
+                tls: None,
+                websocket_tls: None,
+                tls_ca_cert_path: shared.tls_ca_cert_path().cloned(),
+            };
+            return Ok(ClientBuilder::new(transport, connection));
+        }
         let server = 
self.servers.first().ok_or(TestBinaryError::MissingServer)?;
         match transport {
             TransportProtocol::Tcp => server.tcp_client(),
@@ -314,14 +464,7 @@ impl TestHarness {
         &self,
         transport: TransportProtocol,
     ) -> Result<IggyClient, TestBinaryError> {
-        let server = 
self.servers.first().ok_or(TestBinaryError::MissingServer)?;
-        let builder = match transport {
-            TransportProtocol::Tcp => server.tcp_client()?,
-            TransportProtocol::Http => server.http_client()?,
-            TransportProtocol::Quic => server.quic_client()?,
-            TransportProtocol::WebSocket => server.websocket_client()?,
-        };
-        builder.connect().await
+        self.client_builder_for(transport)?.connect().await
     }
 
     pub async fn tcp_root_client(&self) -> Result<IggyClient, TestBinaryError> 
{
@@ -421,16 +564,33 @@ impl TestHarness {
     }
 
     pub(super) async fn create_clients(&mut self) -> Result<(), 
TestBinaryError> {
-        let Some(server) = self.servers.first() else {
+        // Resolve addresses from either owned server or shared server
+        let (tcp, http, quic, ws, ca_cert) = if let Some(ref shared) = 
self.shared_server {
+            (
+                shared.tcp_addr(),
+                shared.http_addr(),
+                shared.quic_addr(),
+                shared.websocket_addr(),
+                shared.tls_ca_cert_path().cloned(),
+            )
+        } else if let Some(server) = self.servers.first() {
+            (
+                server.tcp_addr(),
+                server.http_addr(),
+                server.quic_addr(),
+                server.websocket_addr(),
+                server.tls_ca_cert_path(),
+            )
+        } else {
             return Ok(());
         };
 
         for config in &self.client_configs {
             let address = match config.transport {
-                TransportProtocol::Tcp => server.tcp_addr(),
-                TransportProtocol::Http => server.http_addr(),
-                TransportProtocol::Quic => server.quic_addr(),
-                TransportProtocol::WebSocket => server.websocket_addr(),
+                TransportProtocol::Tcp => tcp,
+                TransportProtocol::Http => http,
+                TransportProtocol::Quic => quic,
+                TransportProtocol::WebSocket => ws,
             };
 
             let Some(address) = address else {
@@ -441,9 +601,9 @@ impl TestHarness {
 
             let mut config = config.clone();
             if config.tls_enabled
-                && let Some(ca_cert_path) = server.tls_ca_cert_path()
+                && let Some(ref path) = ca_cert
             {
-                config.tls_ca_file = Some(ca_cert_path);
+                config.tls_ca_file = Some(path.clone());
             }
 
             let mut client = ClientHandle::new(config, address);
diff --git a/core/integration/src/harness/seeds.rs 
b/core/integration/src/harness/seeds.rs
index aa5554c70..c29ec25cf 100644
--- a/core/integration/src/harness/seeds.rs
+++ b/core/integration/src/harness/seeds.rs
@@ -19,7 +19,7 @@
 
 use iggy::prelude::{IggyClient, StreamClient, TopicClient};
 use iggy_common::{
-    CompressionAlgorithm, Consumer, Identifier, IggyExpiry, IggyMessage, 
MaxTopicSize,
+    CompressionAlgorithm, Consumer, Identifier, IggyError, IggyExpiry, 
IggyMessage, MaxTopicSize,
     Partitioning, PersonalAccessTokenExpiry, UserStatus,
 };
 use iggy_common::{
@@ -123,6 +123,75 @@ pub async fn connector_multi_topic_stream(client: 
&IggyClient) -> Result<(), See
     Ok(())
 }
 
+/// Idempotent seed for serial shared_server tests: creates stream and topic,
+/// silently skipping if they already exist from a previous test in the group.
+pub async fn connector_stream_idempotent(client: &IggyClient) -> Result<(), 
SeedError> {
+    let stream_id: Identifier = names::STREAM.try_into()?;
+
+    match client.create_stream(names::STREAM).await {
+        Ok(_) => {}
+        Err(e) if e.as_code() == 
IggyError::StreamNameAlreadyExists(String::new()).as_code() => {}
+        Err(e) => return Err(e.into()),
+    }
+
+    match client
+        .create_topic(
+            &stream_id,
+            names::TOPIC,
+            1,
+            CompressionAlgorithm::None,
+            None,
+            IggyExpiry::ServerDefault,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+    {
+        Ok(_) => {}
+        Err(e)
+            if e.as_code()
+                == IggyError::TopicNameAlreadyExists(String::new(), 
Identifier::default())
+                    .as_code() => {}
+        Err(e) => return Err(e.into()),
+    }
+
+    Ok(())
+}
+
+/// Idempotent multi-topic seed for serial shared_server tests.
+pub async fn connector_multi_topic_stream_idempotent(client: &IggyClient) -> 
Result<(), SeedError> {
+    let stream_id: Identifier = names::STREAM.try_into()?;
+
+    match client.create_stream(names::STREAM).await {
+        Ok(_) => {}
+        Err(e) if e.as_code() == 
IggyError::StreamNameAlreadyExists(String::new()).as_code() => {}
+        Err(e) => return Err(e.into()),
+    }
+
+    for topic_name in [names::TOPIC, names::TOPIC_2] {
+        match client
+            .create_topic(
+                &stream_id,
+                topic_name,
+                1,
+                CompressionAlgorithm::None,
+                None,
+                IggyExpiry::ServerDefault,
+                MaxTopicSize::ServerDefault,
+            )
+            .await
+        {
+            Ok(_) => {}
+            Err(e)
+                if e.as_code()
+                    == IggyError::TopicNameAlreadyExists(String::new(), 
Identifier::default())
+                        .as_code() => {}
+            Err(e) => return Err(e.into()),
+        }
+    }
+
+    Ok(())
+}
+
 /// Standard MCP test data: stream, topic, message, consumer group, consumer 
offset, user, PAT.
 pub async fn mcp_standard(client: &IggyClient) -> Result<(), SeedError> {
     let stream_id: Identifier = names::STREAM.try_into()?;
diff --git a/core/integration/src/harness/shared.rs 
b/core/integration/src/harness/shared.rs
new file mode 100644
index 000000000..82f717ea0
--- /dev/null
+++ b/core/integration/src/harness/shared.rs
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::harness::config::TestServerConfig;
+use crate::harness::context::TestContext;
+use crate::harness::error::TestBinaryError;
+use crate::harness::handle::ServerHandle;
+use crate::harness::traits::TestBinary;
+use dashmap::DashMap;
+use std::net::SocketAddr;
+use std::path::PathBuf;
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use std::sync::{Arc, LazyLock};
+use tokio::sync::OnceCell;
+
+/// Shared server state: addresses, handles, and active user count.
+///
+/// Rust statics are not dropped at process exit, so we use an atomic counter
+/// to track active tests. The last test to finish stops the server and
+/// cleans up the test directory.
+pub struct SharedServerInfo {
+    tcp_addr: Option<SocketAddr>,
+    http_addr: Option<SocketAddr>,
+    quic_addr: Option<SocketAddr>,
+    websocket_addr: Option<SocketAddr>,
+    tls_ca_cert_path: Option<PathBuf>,
+    handle: std::sync::Mutex<Option<ServerHandle>>,
+    context: Arc<TestContext>,
+    active_count: AtomicUsize,
+    any_test_failed: AtomicBool,
+}
+
+impl SharedServerInfo {
+    pub fn tcp_addr(&self) -> Option<SocketAddr> {
+        self.tcp_addr
+    }
+
+    pub fn http_addr(&self) -> Option<SocketAddr> {
+        self.http_addr
+    }
+
+    pub fn quic_addr(&self) -> Option<SocketAddr> {
+        self.quic_addr
+    }
+
+    pub fn websocket_addr(&self) -> Option<SocketAddr> {
+        self.websocket_addr
+    }
+
+    pub fn tls_ca_cert_path(&self) -> Option<&PathBuf> {
+        self.tls_ca_cert_path.as_ref()
+    }
+
+    /// Increment the active test count. Called when a test acquires this 
server.
+    pub fn acquire(&self) {
+        self.active_count.fetch_add(1, Ordering::AcqRel);
+    }
+
+    /// Mark that a test using this server has failed.
+    /// Preserves the test directory for debugging.
+    pub fn mark_failed(&self) {
+        self.any_test_failed.store(true, Ordering::Release);
+    }
+
+    /// Decrement the active test count. When it reaches zero, stops the server
+    /// and cleans up the test directory (unless any test failed).
+    pub fn release(&self) {
+        let prev = self.active_count.fetch_sub(1, Ordering::AcqRel);
+        if prev == 1 {
+            match self.handle.lock() {
+                Ok(mut guard) => {
+                    if let Some(ref mut server) = *guard
+                        && let Err(e) = server.stop()
+                    {
+                        eprintln!("[SharedServer] failed to stop server: {e}");
+                    }
+                    *guard = None;
+                }
+                Err(e) => {
+                    eprintln!("[SharedServer] mutex poisoned, server process 
may leak: {e}");
+                }
+            }
+            if !self.any_test_failed.load(Ordering::Acquire) {
+                self.context.cleanup();
+            }
+        }
+    }
+}
+
+static REGISTRY: LazyLock<DashMap<String, 
Arc<OnceCell<Arc<SharedServerInfo>>>>> =
+    LazyLock::new(DashMap::new);
+
+/// Global registry for shared server instances.
+///
+/// Tests with the same `shared_server` key share a single iggy-server process.
+/// The first test to request a key starts the server; all subsequent tests
+/// get a reference to the already-running server. The last test to finish
+/// stops the server and cleans up the test directory.
+pub struct SharedServerRegistry;
+
+impl SharedServerRegistry {
+    /// Get or start a shared server for the given key.
+    ///
+    /// Thread-safe: concurrent callers for the same key will block until the
+    /// first caller finishes initialization, then all receive the same `Arc`.
+    ///
+    /// Callers must call `SharedServerInfo::acquire()` after obtaining the
+    /// reference and `SharedServerInfo::release()` when done (typically in
+    /// `TestHarness::stop()`).
+    pub async fn get_or_start(
+        key: &str,
+        config: TestServerConfig,
+    ) -> Result<Arc<SharedServerInfo>, TestBinaryError> {
+        let cell = REGISTRY
+            .entry(key.to_string())
+            .or_insert_with(|| Arc::new(OnceCell::new()))
+            .value()
+            .clone();
+
+        cell.get_or_try_init(|| async move {
+            let mut context = 
TestContext::new(Some(format!("shared_server_{key}")), true)?;
+            context.ensure_created()?;
+            let context = Arc::new(context);
+
+            let mut server = ServerHandle::with_config(config, 
context.clone());
+            server.start()?;
+
+            Ok(Arc::new(SharedServerInfo {
+                tcp_addr: server.tcp_addr(),
+                http_addr: server.http_addr(),
+                quic_addr: server.quic_addr(),
+                websocket_addr: server.websocket_addr(),
+                tls_ca_cert_path: server.tls_ca_cert_path(),
+                handle: std::sync::Mutex::new(Some(server)),
+                context,
+                active_count: AtomicUsize::new(0),
+                any_test_failed: AtomicBool::new(false),
+            }))
+        })
+        .await
+        .cloned()
+    }
+}
diff --git a/core/integration/src/lib.rs b/core/integration/src/lib.rs
index fc03d02d7..41ec34555 100644
--- a/core/integration/src/lib.rs
+++ b/core/integration/src/lib.rs
@@ -24,7 +24,8 @@ pub use harness_derive::iggy_harness;
 #[doc(hidden)]
 pub mod __macro_support {
     pub use crate::harness::{
-        ClientConfig, McpClient, McpConfig, TestHarness, TestServerConfig, 
TlsConfig,
+        ClientConfig, ConnectorsRuntimeConfig, McpClient, McpConfig, 
SharedServerRegistry,
+        TestHarness, TestServerConfig, TlsConfig,
     };
     pub use iggy::prelude::ClientWrapper;
     pub use iggy_common::TransportProtocol;
diff --git a/core/integration/tests/connectors/api/endpoints.rs 
b/core/integration/tests/connectors/api/endpoints.rs
index 69f6cdbfb..5fe987a29 100644
--- a/core/integration/tests/connectors/api/endpoints.rs
+++ b/core/integration/tests/connectors/api/endpoints.rs
@@ -27,8 +27,9 @@ use reqwest::Client;
 const API_KEY: &str = "test-api-key";
 
 #[iggy_harness(
+    shared_server = "connector_api",
     server(connectors_runtime(config_path = 
"tests/connectors/api/config.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn root_endpoint_returns_welcome_message(harness: &TestHarness) {
     let api_address = harness
@@ -49,8 +50,9 @@ async fn root_endpoint_returns_welcome_message(harness: 
&TestHarness) {
 }
 
 #[iggy_harness(
+    shared_server = "connector_api",
     server(connectors_runtime(config_path = 
"tests/connectors/api/config.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn health_endpoint_returns_healthy(harness: &TestHarness) {
     let api_address = harness
@@ -71,8 +73,9 @@ async fn health_endpoint_returns_healthy(harness: 
&TestHarness) {
 }
 
 #[iggy_harness(
+    shared_server = "connector_api",
     server(connectors_runtime(config_path = 
"tests/connectors/api/config.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn stats_endpoint_returns_runtime_stats(harness: &TestHarness) {
     let api_address = harness
@@ -100,8 +103,9 @@ async fn stats_endpoint_returns_runtime_stats(harness: 
&TestHarness) {
 }
 
 #[iggy_harness(
+    shared_server = "connector_api",
     server(connectors_runtime(config_path = 
"tests/connectors/api/config.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn metrics_endpoint_returns_prometheus_format(harness: &TestHarness) {
     let api_address = harness
@@ -127,8 +131,9 @@ async fn 
metrics_endpoint_returns_prometheus_format(harness: &TestHarness) {
 }
 
 #[iggy_harness(
+    shared_server = "connector_api",
     server(connectors_runtime(config_path = 
"tests/connectors/api/config.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn sources_endpoint_returns_list(harness: &TestHarness) {
     let api_address = harness
@@ -150,8 +155,9 @@ async fn sources_endpoint_returns_list(harness: 
&TestHarness) {
 }
 
 #[iggy_harness(
+    shared_server = "connector_api",
     server(connectors_runtime(config_path = 
"tests/connectors/api/config.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn sinks_endpoint_returns_list(harness: &TestHarness) {
     let api_address = harness
@@ -173,8 +179,9 @@ async fn sinks_endpoint_returns_list(harness: &TestHarness) 
{
 }
 
 #[iggy_harness(
+    shared_server = "connector_api",
     server(connectors_runtime(config_path = 
"tests/connectors/api/config.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn api_key_authentication_required(harness: &TestHarness) {
     let api_address = harness
@@ -217,8 +224,9 @@ async fn api_key_authentication_required(harness: 
&TestHarness) {
 }
 
 #[iggy_harness(
+    shared_server = "connector_api",
     server(connectors_runtime(config_path = 
"tests/connectors/api/config.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn api_key_authentication_rejected_with_invalid_key(harness: 
&TestHarness) {
     let api_address = harness
diff --git 
a/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs 
b/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs
index ad6ea1c86..13190f9b5 100644
--- a/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs
+++ b/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs
@@ -28,8 +28,9 @@ use integration::harness::seeds;
 use integration::iggy_harness;
 
 #[iggy_harness(
+    shared_server = "elasticsearch_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/elasticsearch/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn elasticsearch_sink_stores_json_messages(
     harness: &TestHarness,
@@ -86,8 +87,9 @@ async fn elasticsearch_sink_stores_json_messages(
 }
 
 #[iggy_harness(
+    shared_server = "elasticsearch_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/elasticsearch/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn elasticsearch_sink_handles_bulk_messages(
     harness: &TestHarness,
@@ -140,8 +142,9 @@ async fn elasticsearch_sink_handles_bulk_messages(
 }
 
 #[iggy_harness(
+    shared_server = "elasticsearch_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/elasticsearch/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn elasticsearch_sink_preserves_json_structure(
     harness: &TestHarness,
diff --git 
a/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs 
b/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
index 2beaad75c..f945f4889 100644
--- a/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
+++ b/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs
@@ -27,8 +27,9 @@ use std::time::Duration;
 use tokio::time::sleep;
 
 #[iggy_harness(
+    shared_server = "elasticsearch_source",
     server(connectors_runtime(config_path = 
"tests/connectors/elasticsearch/source.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn elasticsearch_source_produces_messages_to_iggy(
     harness: &TestHarness,
@@ -104,8 +105,9 @@ async fn elasticsearch_source_produces_messages_to_iggy(
 }
 
 #[iggy_harness(
+    shared_server = "elasticsearch_source",
     server(connectors_runtime(config_path = 
"tests/connectors/elasticsearch/source.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn elasticsearch_source_handles_empty_index(
     harness: &TestHarness,
@@ -144,8 +146,9 @@ async fn elasticsearch_source_handles_empty_index(
 }
 
 #[iggy_harness(
+    shared_server = "elasticsearch_source",
     server(connectors_runtime(config_path = 
"tests/connectors/elasticsearch/source.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn elasticsearch_source_produces_bulk_messages(
     harness: &TestHarness,
diff --git a/core/integration/tests/connectors/http/http_sink.rs 
b/core/integration/tests/connectors/http/http_sink.rs
index e41c59ab6..3056d69e3 100644
--- a/core/integration/tests/connectors/http/http_sink.rs
+++ b/core/integration/tests/connectors/http/http_sink.rs
@@ -196,8 +196,9 @@ use integration::iggy_harness;
 /// Validates `batch_mode=individual`: one HTTP POST per message, each with 
metadata envelope.
 /// Checks request count = message count, envelope structure, and 
`application/json` content type.
 #[iggy_harness(
+    shared_server = "http_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/http/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn individual_json_messages_delivered_as_separate_posts(
     harness: &TestHarness,
@@ -303,8 +304,9 @@ async fn 
individual_json_messages_delivered_as_separate_posts(
 /// Validates `batch_mode=ndjson`: all messages in one request as 
newline-delimited JSON.
 /// Checks single request, line count = message count, per-line envelope, 
`application/x-ndjson`.
 #[iggy_harness(
+    shared_server = "http_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/http/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn ndjson_messages_delivered_as_single_request(
     harness: &TestHarness,
@@ -395,8 +397,9 @@ async fn ndjson_messages_delivered_as_single_request(
 /// Validates `batch_mode=json_array`: all messages as a single JSON array in 
one request.
 /// Checks single request, array length = message count, per-item envelope, 
`application/json`.
 #[iggy_harness(
+    shared_server = "http_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/http/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn json_array_messages_delivered_as_single_request(
     harness: &TestHarness,
@@ -488,8 +491,9 @@ async fn json_array_messages_delivered_as_single_request(
 /// Validates `batch_mode=raw`: each message as raw bytes without metadata 
envelope.
 /// Checks request count = message count, no envelope wrapper, 
`application/octet-stream`.
 #[iggy_harness(
+    shared_server = "http_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/http/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn raw_binary_messages_delivered_without_envelope(
     harness: &TestHarness,
@@ -575,8 +579,9 @@ async fn raw_binary_messages_delivered_without_envelope(
 /// Validates `include_metadata=false`: bare payload without `{metadata, 
payload}` envelope.
 /// Checks no metadata field in body, original payload fields at top level.
 #[iggy_harness(
+    shared_server = "http_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/http/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn metadata_disabled_sends_bare_payload(
     harness: &TestHarness,
@@ -651,8 +656,9 @@ async fn metadata_disabled_sends_bare_payload(
 /// Validates sequential offset integrity: `iggy_offset` values are contiguous 
across
 /// 5 delivered messages. Sorts by offset and checks each = previous + 1.
 #[iggy_harness(
+    shared_server = "http_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/http/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn individual_messages_have_sequential_offsets(
     harness: &TestHarness,
@@ -738,8 +744,9 @@ async fn individual_messages_have_sequential_offsets(
 /// Validates multi-topic delivery: one connector consuming two topics on the 
same stream.
 /// Sends 2 messages to topic 1, 1 to topic 2, verifies `iggy_topic` metadata 
matches source.
 #[iggy_harness(
+    shared_server = "http_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/http/sink.toml")),
-    seed = seeds::connector_multi_topic_stream
+    seed = seeds::connector_multi_topic_stream_idempotent
 )]
 async fn multi_topic_messages_delivered_with_correct_topic_metadata(
     harness: &TestHarness,
@@ -749,8 +756,8 @@ async fn 
multi_topic_messages_delivered_with_correct_topic_metadata(
     let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap();
     let topic_1_id: Identifier = seeds::names::TOPIC.try_into().unwrap();
 
-    // Step 1: Both topics created by connector_multi_topic_stream seed (runs 
before
-    // connector runtime starts — runtime health check requires all configured 
topics).
+    // Both topics created by connector_multi_topic_stream_idempotent seed 
(runs before
+    // connector runtime starts - runtime health check requires all configured 
topics).
     let topic_2_id: Identifier = seeds::names::TOPIC_2.try_into().unwrap();
 
     // Step 2: Send 2 messages to topic 1 with source identifier in payload
@@ -822,7 +829,6 @@ async fn 
multi_topic_messages_delivered_with_correct_topic_metadata(
             )
         });
 
-        // Match against constants — not magic strings (code review M9)
         match iggy_topic {
             t if t == seeds::names::TOPIC => {
                 topic_1_count += 1;
diff --git 
a/core/integration/tests/connectors/http_config_provider/direct_responses.rs 
b/core/integration/tests/connectors/http_config_provider/direct_responses.rs
index eeebddf4b..17ce1279d 100644
--- a/core/integration/tests/connectors/http_config_provider/direct_responses.rs
+++ b/core/integration/tests/connectors/http_config_provider/direct_responses.rs
@@ -23,8 +23,9 @@ use integration::iggy_harness;
 use reqwest::StatusCode;
 
 #[iggy_harness(
+    shared_server = "http_config_direct",
     server(connectors_runtime(config_path = 
"tests/connectors/http_config_provider/config_direct.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn source_configs_list_returns_all_versions(
     harness: &TestHarness,
@@ -65,8 +66,8 @@ async fn source_configs_list_returns_all_versions(
     assert_eq!(streams.len(), 1, "Should have 1 stream config");
 
     let stream = &streams[0];
-    assert_eq!(stream["stream"].as_str().unwrap(), "test_stream");
-    assert_eq!(stream["topic"].as_str().unwrap(), "test_topic");
+    assert_eq!(stream["stream"].as_str().unwrap(), seeds::names::STREAM);
+    assert_eq!(stream["topic"].as_str().unwrap(), seeds::names::TOPIC);
     assert_eq!(stream["schema"].as_str().unwrap(), "json");
     assert!(stream["batch_length"].as_u64().is_some());
     assert!(stream["linger_time"].as_str().is_some());
@@ -80,8 +81,9 @@ async fn source_configs_list_returns_all_versions(
 }
 
 #[iggy_harness(
+    shared_server = "http_config_direct",
     server(connectors_runtime(config_path = 
"tests/connectors/http_config_provider/config_direct.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn source_config_by_version_returns_specific_version(
     harness: &TestHarness,
@@ -111,12 +113,13 @@ async fn 
source_config_by_version_returns_specific_version(
 
     let streams = body["streams"].as_array().unwrap();
     assert_eq!(streams.len(), 1);
-    assert_eq!(streams[0]["topic"].as_str().unwrap(), "test_topic");
+    assert_eq!(streams[0]["topic"].as_str().unwrap(), seeds::names::TOPIC);
 }
 
 #[iggy_harness(
+    shared_server = "http_config_direct",
     server(connectors_runtime(config_path = 
"tests/connectors/http_config_provider/config_direct.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn source_active_config_returns_current_version(
     harness: &TestHarness,
@@ -151,8 +154,9 @@ async fn source_active_config_returns_current_version(
 }
 
 #[iggy_harness(
+    shared_server = "http_config_direct",
     server(connectors_runtime(config_path = 
"tests/connectors/http_config_provider/config_direct.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn sink_configs_list_returns_all_versions(
     harness: &TestHarness,
@@ -193,10 +197,10 @@ async fn sink_configs_list_returns_all_versions(
     assert_eq!(streams.len(), 1, "Should have 1 stream config");
 
     let stream = &streams[0];
-    assert_eq!(stream["stream"].as_str().unwrap(), "test_stream");
+    assert_eq!(stream["stream"].as_str().unwrap(), seeds::names::STREAM);
     let topics = stream["topics"].as_array().unwrap();
     assert_eq!(topics.len(), 1);
-    assert_eq!(topics[0].as_str().unwrap(), "test_topic");
+    assert_eq!(topics[0].as_str().unwrap(), seeds::names::TOPIC);
     assert_eq!(stream["schema"].as_str().unwrap(), "json");
     assert!(stream["batch_length"].as_u64().is_some());
     assert!(stream["poll_interval"].as_str().is_some());
@@ -207,8 +211,9 @@ async fn sink_configs_list_returns_all_versions(
 }
 
 #[iggy_harness(
+    shared_server = "http_config_direct",
     server(connectors_runtime(config_path = 
"tests/connectors/http_config_provider/config_direct.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn sink_config_by_version_returns_specific_version(
     harness: &TestHarness,
@@ -239,12 +244,13 @@ async fn sink_config_by_version_returns_specific_version(
     let streams = body["streams"].as_array().unwrap();
     assert_eq!(streams.len(), 1);
     let topics = streams[0]["topics"].as_array().unwrap();
-    assert_eq!(topics[0].as_str().unwrap(), "test_topic");
+    assert_eq!(topics[0].as_str().unwrap(), seeds::names::TOPIC);
 }
 
 #[iggy_harness(
+    shared_server = "http_config_direct",
     server(connectors_runtime(config_path = 
"tests/connectors/http_config_provider/config_direct.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn sink_active_config_returns_current_version(
     harness: &TestHarness,
diff --git 
a/core/integration/tests/connectors/http_config_provider/wrapped_responses.rs 
b/core/integration/tests/connectors/http_config_provider/wrapped_responses.rs
index 1ef5f99ae..aab8df975 100644
--- 
a/core/integration/tests/connectors/http_config_provider/wrapped_responses.rs
+++ 
b/core/integration/tests/connectors/http_config_provider/wrapped_responses.rs
@@ -23,8 +23,9 @@ use integration::iggy_harness;
 use reqwest::StatusCode;
 
 #[iggy_harness(
+    shared_server = "http_config_wrapped",
     server(connectors_runtime(config_path = 
"tests/connectors/http_config_provider/config_wrapped.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn source_configs_list_returns_all_versions(
     harness: &TestHarness,
@@ -65,8 +66,8 @@ async fn source_configs_list_returns_all_versions(
     assert_eq!(streams.len(), 1, "Should have 1 stream config");
 
     let stream = &streams[0];
-    assert_eq!(stream["stream"].as_str().unwrap(), "test_stream");
-    assert_eq!(stream["topic"].as_str().unwrap(), "test_topic");
+    assert_eq!(stream["stream"].as_str().unwrap(), seeds::names::STREAM);
+    assert_eq!(stream["topic"].as_str().unwrap(), seeds::names::TOPIC);
     assert_eq!(stream["schema"].as_str().unwrap(), "json");
     assert!(stream["batch_length"].as_u64().is_some());
     assert!(stream["linger_time"].as_str().is_some());
@@ -80,8 +81,9 @@ async fn source_configs_list_returns_all_versions(
 }
 
 #[iggy_harness(
+    shared_server = "http_config_wrapped",
     server(connectors_runtime(config_path = 
"tests/connectors/http_config_provider/config_wrapped.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn source_config_by_version_returns_specific_version(
     harness: &TestHarness,
@@ -111,12 +113,13 @@ async fn 
source_config_by_version_returns_specific_version(
 
     let streams = body["streams"].as_array().unwrap();
     assert_eq!(streams.len(), 1);
-    assert_eq!(streams[0]["topic"].as_str().unwrap(), "test_topic");
+    assert_eq!(streams[0]["topic"].as_str().unwrap(), seeds::names::TOPIC);
 }
 
 #[iggy_harness(
+    shared_server = "http_config_wrapped",
     server(connectors_runtime(config_path = 
"tests/connectors/http_config_provider/config_wrapped.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn source_active_config_returns_current_version(
     harness: &TestHarness,
@@ -151,8 +154,9 @@ async fn source_active_config_returns_current_version(
 }
 
 #[iggy_harness(
+    shared_server = "http_config_wrapped",
     server(connectors_runtime(config_path = 
"tests/connectors/http_config_provider/config_wrapped.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn sink_configs_list_returns_all_versions(
     harness: &TestHarness,
@@ -193,10 +197,10 @@ async fn sink_configs_list_returns_all_versions(
     assert_eq!(streams.len(), 1, "Should have 1 stream config");
 
     let stream = &streams[0];
-    assert_eq!(stream["stream"].as_str().unwrap(), "test_stream");
+    assert_eq!(stream["stream"].as_str().unwrap(), seeds::names::STREAM);
     let topics = stream["topics"].as_array().unwrap();
     assert_eq!(topics.len(), 1);
-    assert_eq!(topics[0].as_str().unwrap(), "test_topic");
+    assert_eq!(topics[0].as_str().unwrap(), seeds::names::TOPIC);
     assert_eq!(stream["schema"].as_str().unwrap(), "json");
     assert!(stream["batch_length"].as_u64().is_some());
     assert!(stream["poll_interval"].as_str().is_some());
@@ -207,8 +211,9 @@ async fn sink_configs_list_returns_all_versions(
 }
 
 #[iggy_harness(
+    shared_server = "http_config_wrapped",
     server(connectors_runtime(config_path = 
"tests/connectors/http_config_provider/config_wrapped.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn sink_config_by_version_returns_specific_version(
     harness: &TestHarness,
@@ -239,12 +244,13 @@ async fn sink_config_by_version_returns_specific_version(
     let streams = body["streams"].as_array().unwrap();
     assert_eq!(streams.len(), 1);
     let topics = streams[0]["topics"].as_array().unwrap();
-    assert_eq!(topics[0].as_str().unwrap(), "test_topic");
+    assert_eq!(topics[0].as_str().unwrap(), seeds::names::TOPIC);
 }
 
 #[iggy_harness(
+    shared_server = "http_config_wrapped",
     server(connectors_runtime(config_path = 
"tests/connectors/http_config_provider/config_wrapped.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn sink_active_config_returns_current_version(
     harness: &TestHarness,
diff --git a/core/integration/tests/connectors/iceberg/iceberg_sink.rs 
b/core/integration/tests/connectors/iceberg/iceberg_sink.rs
index 777214116..ad031dd80 100644
--- a/core/integration/tests/connectors/iceberg/iceberg_sink.rs
+++ b/core/integration/tests/connectors/iceberg/iceberg_sink.rs
@@ -37,8 +37,9 @@ const SNAPSHOT_POLL_INTERVAL_MS: u64 = 500;
 const BULK_SNAPSHOT_POLL_ATTEMPTS: usize = 60;
 
 #[iggy_harness(
+    shared_server = "iceberg_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/iceberg/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn iceberg_sink_initializes_and_runs(
     harness: &TestHarness,
@@ -68,8 +69,9 @@ async fn iceberg_sink_initializes_and_runs(
 }
 
 #[iggy_harness(
+    shared_server = "iceberg_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/iceberg/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn iceberg_sink_consumes_json_messages(
     harness: &TestHarness,
@@ -139,8 +141,9 @@ async fn iceberg_sink_consumes_json_messages(
 }
 
 #[iggy_harness(
+    shared_server = "iceberg_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/iceberg/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn iceberg_sink_handles_bulk_messages(
     harness: &TestHarness,
diff --git a/core/integration/tests/connectors/influxdb/influxdb_sink.rs 
b/core/integration/tests/connectors/influxdb/influxdb_sink.rs
index cda484cfc..6b82d6428 100644
--- a/core/integration/tests/connectors/influxdb/influxdb_sink.rs
+++ b/core/integration/tests/connectors/influxdb/influxdb_sink.rs
@@ -33,7 +33,8 @@ use serde_json::json;
 
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/sink.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_sink",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_sink_writes_messages_to_bucket(
     harness: &TestHarness,
@@ -74,7 +75,8 @@ async fn influxdb_sink_writes_messages_to_bucket(
 
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/sink.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_sink",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_sink_handles_bulk_messages(harness: &TestHarness, fixture: 
InfluxDbSinkFixture) {
     let client = harness.root_client().await.unwrap();
@@ -112,7 +114,8 @@ async fn influxdb_sink_handles_bulk_messages(harness: 
&TestHarness, fixture: Inf
 
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/sink.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_sink",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_sink_payload_fields_stored_correctly(
     harness: &TestHarness,
@@ -150,7 +153,8 @@ async fn influxdb_sink_payload_fields_stored_correctly(
 
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/sink.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_sink",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_sink_large_batch(harness: &TestHarness, fixture: 
InfluxDbSinkFixture) {
     let client = harness.root_client().await.unwrap();
@@ -184,7 +188,8 @@ async fn influxdb_sink_large_batch(harness: &TestHarness, 
fixture: InfluxDbSinkF
 
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/sink.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_sink",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_sink_recovers_backlogged_messages(
     harness: &TestHarness,
@@ -224,7 +229,8 @@ async fn influxdb_sink_recovers_backlogged_messages(
 
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/sink.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_sink",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_sink_multiple_partitions(harness: &TestHarness, fixture: 
InfluxDbSinkFixture) {
     let client = harness.root_client().await.unwrap();
diff --git 
a/core/integration/tests/connectors/influxdb/influxdb_sink_formats.rs 
b/core/integration/tests/connectors/influxdb/influxdb_sink_formats.rs
index d1225c179..e68e78b55 100644
--- a/core/integration/tests/connectors/influxdb/influxdb_sink_formats.rs
+++ b/core/integration/tests/connectors/influxdb/influxdb_sink_formats.rs
@@ -52,7 +52,8 @@ use integration::iggy_harness;
 /// with printable ASCII.
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/sink_text.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_sink",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_sink_text_writes_points(harness: &TestHarness, fixture: 
InfluxDbSinkTextFixture) {
     let client = harness.root_client().await.unwrap();
@@ -91,7 +92,8 @@ async fn influxdb_sink_text_writes_points(harness: 
&TestHarness, fixture: Influx
 /// Covers: `write_field_string` quote-escape branch.
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/sink_text.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_sink",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_sink_text_with_quote_characters(
     harness: &TestHarness,
@@ -131,7 +133,8 @@ async fn influxdb_sink_text_with_quote_characters(
 /// Covers: `write_field_string` backslash-escape branch.
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/sink_text.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_sink",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_sink_text_with_backslash(
     harness: &TestHarness,
@@ -170,7 +173,8 @@ async fn influxdb_sink_text_with_backslash(
 /// path under the Text format.
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/sink_text.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_sink",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_sink_text_bulk(harness: &TestHarness, fixture: 
InfluxDbSinkTextFixture) {
     let client = harness.root_client().await.unwrap();
@@ -212,7 +216,8 @@ async fn influxdb_sink_text_bulk(harness: &TestHarness, 
fixture: InfluxDbSinkTex
 /// `general_purpose::STANDARD.encode`.
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/sink_base64.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_sink",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_sink_base64_writes_points(
     harness: &TestHarness,
@@ -257,7 +262,8 @@ async fn influxdb_sink_base64_writes_points(
 /// needed for base64 output, but exercises the full field-write path).
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/sink_base64.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_sink",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_sink_base64_roundtrip(harness: &TestHarness, fixture: 
InfluxDbSinkBase64Fixture) {
     let client = harness.root_client().await.unwrap();
@@ -302,7 +308,8 @@ async fn influxdb_sink_base64_roundtrip(harness: 
&TestHarness, fixture: InfluxDb
 /// test covers the next-closest case (1-byte payload → 4-char base64 string).
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/sink_base64.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_sink",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_sink_base64_single_byte_payload(
     harness: &TestHarness,
@@ -346,7 +353,8 @@ async fn influxdb_sink_base64_single_byte_payload(
 /// branches inside `append_line`.
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/sink_no_metadata.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_sink",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_sink_no_metadata_writes_points(
     harness: &TestHarness,
@@ -387,7 +395,8 @@ async fn influxdb_sink_no_metadata_writes_points(
 /// with the no-metadata tag configuration.
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/sink_no_metadata.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_sink",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_sink_no_metadata_bulk(
     harness: &TestHarness,
@@ -432,7 +441,8 @@ async fn influxdb_sink_no_metadata_bulk(
 /// Covers: `"ns"` arm of `to_precision_timestamp`.
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/sink.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_sink",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_sink_ns_precision_writes_points(
     harness: &TestHarness,
@@ -475,7 +485,8 @@ async fn influxdb_sink_ns_precision_writes_points(
 /// in `PayloadFormat::Json` and `write_field_string` with `{`, `}`, `:`, `"`.
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/sink.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_sink",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_sink_json_nested_payload(harness: &TestHarness, fixture: 
InfluxDbSinkFixture) {
     let client = harness.root_client().await.unwrap();
@@ -523,7 +534,8 @@ async fn influxdb_sink_json_nested_payload(harness: 
&TestHarness, fixture: Influ
 /// Covers: `'\n' => buf.push_str("\\n")` branch in `write_field_string`.
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/sink_text.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_sink",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_sink_text_with_newline(harness: &TestHarness, fixture: 
InfluxDbSinkTextFixture) {
     let client = harness.root_client().await.unwrap();
diff --git a/core/integration/tests/connectors/influxdb/influxdb_source.rs 
b/core/integration/tests/connectors/influxdb/influxdb_source.rs
index f6eab572e..e2716aeb9 100644
--- a/core/integration/tests/connectors/influxdb/influxdb_source.rs
+++ b/core/integration/tests/connectors/influxdb/influxdb_source.rs
@@ -26,7 +26,8 @@ use tracing::info;
 
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/source.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_source",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_source_polls_and_produces_messages(
     harness: &TestHarness,
@@ -84,7 +85,8 @@ async fn influxdb_source_polls_and_produces_messages(
 
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/source.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_source",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_source_message_payload_structure(
     harness: &TestHarness,
@@ -140,7 +142,8 @@ async fn influxdb_source_message_payload_structure(
 
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/source.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_source",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_source_empty_bucket_produces_no_messages(
     harness: &TestHarness,
@@ -179,7 +182,8 @@ async fn influxdb_source_empty_bucket_produces_no_messages(
 
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/source.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_source",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_source_multiple_measurements(
     harness: &TestHarness,
diff --git 
a/core/integration/tests/connectors/influxdb/influxdb_source_formats.rs 
b/core/integration/tests/connectors/influxdb/influxdb_source_formats.rs
index 98cede6db..089637ce5 100644
--- a/core/integration/tests/connectors/influxdb/influxdb_source_formats.rs
+++ b/core/integration/tests/connectors/influxdb/influxdb_source_formats.rs
@@ -51,7 +51,8 @@ use tracing::info;
 /// path), `schema()` → `Schema::Text`.
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/source_text.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_source",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_source_text_payload_column(
     harness: &TestHarness,
@@ -110,7 +111,8 @@ async fn influxdb_source_text_payload_column(
 /// Covers: `PayloadFormat::Text` → `Ok(raw_value.into_bytes())`.
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/source_text.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_source",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_source_text_whitespace_value(
     harness: &TestHarness,
@@ -172,7 +174,8 @@ async fn influxdb_source_text_whitespace_value(
 /// STANDARD.decode`.
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/source_raw.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_source",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_source_raw_payload_column(
     harness: &TestHarness,
@@ -229,7 +232,8 @@ async fn influxdb_source_raw_payload_column(
 /// Covers: the loop in `poll_messages` with `PayloadFormat::Raw`.
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/source_raw.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_source",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_source_raw_multiple_rows(
     harness: &TestHarness,
@@ -290,7 +294,8 @@ async fn influxdb_source_raw_multiple_rows(
 /// Covers: `parse_scalar` for `i64`, `f64`, `bool`, and string branches.
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/source.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_source",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_source_scalar_type_integer(
     harness: &TestHarness,
@@ -350,7 +355,8 @@ async fn influxdb_source_scalar_type_integer(
 /// Covers `parse_scalar` for `f64` values.
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/source.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_source",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_source_scalar_type_float(harness: &TestHarness, fixture: 
InfluxDbSourceFixture) {
     let base_ts: u64 = Utc::now().timestamp_nanos_opt().unwrap_or(0) as u64;
@@ -401,7 +407,8 @@ async fn influxdb_source_scalar_type_float(harness: 
&TestHarness, fixture: Influ
 /// Covers `parse_scalar` for boolean values (`true`/`false`).
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/source.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_source",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_source_scalar_type_bool(harness: &TestHarness, fixture: 
InfluxDbSourceFixture) {
     let base_ts: u64 = Utc::now().timestamp_nanos_opt().unwrap_or(0) as u64;
@@ -474,7 +481,8 @@ async fn influxdb_source_scalar_type_bool(harness: 
&TestHarness, fixture: Influx
 /// cursor state persisted via `state.last_timestamp`.
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/source.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_source",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_source_cursor_advances_between_batches(
     harness: &TestHarness,
@@ -565,7 +573,8 @@ async fn influxdb_source_cursor_advances_between_batches(
 /// `build_payload` (whole-row path).
 #[iggy_harness(
     server(connectors_runtime(config_path = 
"tests/connectors/influxdb/source.toml")),
-    seed = seeds::connector_stream
+    shared_server = "influxdb_source",
+    seed = seeds::connector_stream_idempotent
 )]
 async fn influxdb_source_no_metadata_core_fields_present(
     harness: &TestHarness,
diff --git a/core/integration/tests/connectors/mongodb/mongodb_sink.rs 
b/core/integration/tests/connectors/mongodb/mongodb_sink.rs
index 474b809ed..09c400eb3 100644
--- a/core/integration/tests/connectors/mongodb/mongodb_sink.rs
+++ b/core/integration/tests/connectors/mongodb/mongodb_sink.rs
@@ -48,8 +48,9 @@ fn build_expected_document_id_for_topic(topic_name: &str, 
message_id: u128) -> S
 }
 
 #[iggy_harness(
+    shared_server = "mongodb_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/mongodb/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn json_messages_sink_to_mongodb(harness: &TestHarness, fixture: 
MongoDbSinkJsonFixture) {
     let client = harness.root_client().await.unwrap();
@@ -136,8 +137,9 @@ async fn json_messages_sink_to_mongodb(harness: 
&TestHarness, fixture: MongoDbSi
 }
 
 #[iggy_harness(
+    shared_server = "mongodb_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/mongodb/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn binary_messages_sink_as_bson_binary(harness: &TestHarness, fixture: 
MongoDbSinkFixture) {
     let client = harness.root_client().await.unwrap();
@@ -204,8 +206,9 @@ async fn binary_messages_sink_as_bson_binary(harness: 
&TestHarness, fixture: Mon
 }
 
 #[iggy_harness(
+    shared_server = "mongodb_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/mongodb/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn large_batch_processed_correctly(harness: &TestHarness, fixture: 
MongoDbSinkBatchFixture) {
     let client = harness.root_client().await.unwrap();
@@ -258,8 +261,9 @@ async fn large_batch_processed_correctly(harness: 
&TestHarness, fixture: MongoDb
 }
 
 #[iggy_harness(
+    shared_server = "mongodb_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/mongodb/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn duplicate_key_is_idempotent_replay_not_sink_error(
     harness: &TestHarness,
@@ -441,8 +445,9 @@ async fn duplicate_key_is_idempotent_replay_not_sink_error(
 }
 
 #[iggy_harness(
+    shared_server = "mongodb_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/mongodb/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn ordered_duplicate_partial_insert_has_exact_accounting(
     harness: &TestHarness,
@@ -617,8 +622,9 @@ async fn 
ordered_duplicate_partial_insert_has_exact_accounting(
 }
 
 #[iggy_harness(
+    shared_server = "mongodb_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/mongodb/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn schema_validation_mid_batch_surfaces_hard_error_and_partial_prefix(
     harness: &TestHarness,
@@ -729,8 +735,9 @@ async fn 
schema_validation_mid_batch_surfaces_hard_error_and_partial_prefix(
 }
 
 #[iggy_harness(
+    shared_server = "mongodb_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/mongodb/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn write_concern_timeout_does_not_report_full_success(
     harness: &TestHarness,
@@ -820,8 +827,9 @@ async fn write_concern_timeout_does_not_report_full_success(
 }
 
 #[iggy_harness(
+    shared_server = "mongodb_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/mongodb/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn retryable_write_failover_keeps_single_doc_per_id(
     harness: &TestHarness,
@@ -967,8 +975,9 @@ async fn retryable_write_failover_keeps_single_doc_per_id(
 }
 
 #[iggy_harness(
+    shared_server = "mongodb_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/mongodb/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn no_writes_performed_label_path_preserves_state_accuracy(
     harness: &TestHarness,
@@ -1115,8 +1124,9 @@ async fn 
no_writes_performed_label_path_preserves_state_accuracy(
 }
 
 #[iggy_harness(
+    shared_server = "mongodb_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/mongodb/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn auto_create_collection_on_open(
     harness: &TestHarness,
@@ -1157,6 +1167,5 @@ async fn auto_create_collection_on_open(
         "Collection should be empty after open() with no messages"
     );
 
-    // Suppress unused harness warning.
     let _ = harness;
 }
diff --git a/core/integration/tests/connectors/postgres/postgres_sink.rs 
b/core/integration/tests/connectors/postgres/postgres_sink.rs
index 2032c025e..6b5b87022 100644
--- a/core/integration/tests/connectors/postgres/postgres_sink.rs
+++ b/core/integration/tests/connectors/postgres/postgres_sink.rs
@@ -35,8 +35,9 @@ type SinkRow = (i64, String, String, Vec<u8>);
 type SinkJsonRow = (i64, serde_json::Value);
 
 #[iggy_harness(
+    shared_server = "postgres_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/postgres/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn json_messages_sink_stores_as_bytea(harness: &TestHarness, fixture: 
PostgresSinkFixture) {
     let client = harness.root_client().await.unwrap();
@@ -97,8 +98,9 @@ async fn json_messages_sink_stores_as_bytea(harness: 
&TestHarness, fixture: Post
 }
 
 #[iggy_harness(
+    shared_server = "postgres_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/postgres/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn binary_messages_sink_stores_as_bytea(
     harness: &TestHarness,
@@ -161,8 +163,9 @@ async fn binary_messages_sink_stores_as_bytea(
 }
 
 #[iggy_harness(
+    shared_server = "postgres_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/postgres/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn json_messages_sink_stores_as_jsonb(
     harness: &TestHarness,
diff --git a/core/integration/tests/connectors/postgres/postgres_source.rs 
b/core/integration/tests/connectors/postgres/postgres_source.rs
index 5f4a79af8..ce5cb99e2 100644
--- a/core/integration/tests/connectors/postgres/postgres_source.rs
+++ b/core/integration/tests/connectors/postgres/postgres_source.rs
@@ -32,8 +32,9 @@ use std::time::Duration;
 use tokio::time::sleep;
 
 #[iggy_harness(
+    shared_server = "postgres_source",
     server(connectors_runtime(config_path = 
"tests/connectors/postgres/source.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn json_rows_source_produces_messages_to_iggy(
     harness: &TestHarness,
@@ -113,8 +114,9 @@ async fn json_rows_source_produces_messages_to_iggy(
 }
 
 #[iggy_harness(
+    shared_server = "postgres_source",
     server(connectors_runtime(config_path = 
"tests/connectors/postgres/source.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn bytea_rows_source_produces_raw_messages_to_iggy(
     harness: &TestHarness,
@@ -176,8 +178,9 @@ async fn bytea_rows_source_produces_raw_messages_to_iggy(
 }
 
 #[iggy_harness(
+    shared_server = "postgres_source",
     server(connectors_runtime(config_path = 
"tests/connectors/postgres/source.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn jsonb_rows_source_produces_json_messages_to_iggy(
     harness: &TestHarness,
@@ -243,8 +246,9 @@ async fn jsonb_rows_source_produces_json_messages_to_iggy(
 }
 
 #[iggy_harness(
+    shared_server = "postgres_source",
     server(connectors_runtime(config_path = 
"tests/connectors/postgres/source.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn delete_after_read_source_removes_rows_after_producing(
     harness: &TestHarness,
@@ -313,8 +317,9 @@ async fn 
delete_after_read_source_removes_rows_after_producing(
 }
 
 #[iggy_harness(
+    shared_server = "postgres_source",
     server(connectors_runtime(config_path = 
"tests/connectors/postgres/source.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn processed_column_source_marks_rows_after_producing(
     harness: &TestHarness,
diff --git a/core/integration/tests/connectors/postgres/restart.rs 
b/core/integration/tests/connectors/postgres/restart.rs
index e6e5f203b..cdba541ee 100644
--- a/core/integration/tests/connectors/postgres/restart.rs
+++ b/core/integration/tests/connectors/postgres/restart.rs
@@ -38,8 +38,9 @@ const SINK_KEY: &str = "postgres";
 type SinkRow = (i64, String, String, Vec<u8>);
 
 #[iggy_harness(
+    shared_server = "postgres_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/postgres/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn restart_sink_connector_continues_processing(
     harness: &TestHarness,
@@ -127,8 +128,9 @@ async fn restart_sink_connector_continues_processing(
 }
 
 #[iggy_harness(
+    shared_server = "postgres_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/postgres/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn parallel_restart_requests_should_not_break_connector(
     harness: &TestHarness,
diff --git a/core/integration/tests/connectors/quickwit/quickwit_sink.rs 
b/core/integration/tests/connectors/quickwit/quickwit_sink.rs
index ce3abd364..b7fe69b10 100644
--- a/core/integration/tests/connectors/quickwit/quickwit_sink.rs
+++ b/core/integration/tests/connectors/quickwit/quickwit_sink.rs
@@ -28,8 +28,9 @@ use integration::iggy_harness;
 use serde::{Deserialize, Serialize};
 
 #[iggy_harness(
+    shared_server = "quickwit_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/quickwit/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn given_existent_quickwit_index_should_store(
     harness: &TestHarness,
@@ -83,8 +84,9 @@ async fn given_existent_quickwit_index_should_store(
 }
 
 #[iggy_harness(
+    shared_server = "quickwit_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/quickwit/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn given_nonexistent_quickwit_index_should_create_and_store(
     harness: &TestHarness,
@@ -138,8 +140,9 @@ async fn 
given_nonexistent_quickwit_index_should_create_and_store(
 }
 
 #[iggy_harness(
+    shared_server = "quickwit_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/quickwit/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn given_bulk_message_send_should_store(
     harness: &TestHarness,
@@ -193,8 +196,9 @@ async fn given_bulk_message_send_should_store(
 }
 
 #[iggy_harness(
+    shared_server = "quickwit_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/quickwit/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn given_invalid_messages_should_not_store(harness: &TestHarness, 
fixture: QuickwitFixture) {
     let client = harness.root_client().await.unwrap();
diff --git a/core/integration/tests/connectors/random/random_source.rs 
b/core/integration/tests/connectors/random/random_source.rs
index 5efb6fd08..4d44d1bfe 100644
--- a/core/integration/tests/connectors/random/random_source.rs
+++ b/core/integration/tests/connectors/random/random_source.rs
@@ -25,8 +25,9 @@ use std::time::Duration;
 use tokio::time::sleep;
 
 #[iggy_harness(
+    shared_server = "random_source",
     server(connectors_runtime(config_path = 
"tests/connectors/random/source.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn random_source_produces_messages(harness: &TestHarness) {
     sleep(Duration::from_secs(1)).await;
diff --git a/core/integration/tests/connectors/stdout/stdout_sink.rs 
b/core/integration/tests/connectors/stdout/stdout_sink.rs
index be247588c..2dfe8758e 100644
--- a/core/integration/tests/connectors/stdout/stdout_sink.rs
+++ b/core/integration/tests/connectors/stdout/stdout_sink.rs
@@ -32,8 +32,9 @@ use tokio::time::sleep;
 const API_KEY: &str = "test-api-key";
 
 #[iggy_harness(
+    shared_server = "stdout_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/stdout/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn stdout_sink_consumes_messages(harness: &TestHarness) {
     let client = harness.root_client().await.unwrap();
@@ -90,8 +91,9 @@ async fn stdout_sink_consumes_messages(harness: &TestHarness) 
{
 }
 
 #[iggy_harness(
+    shared_server = "stdout_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/stdout/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn stdout_sink_reports_metrics(harness: &TestHarness) {
     let client = harness.root_client().await.unwrap();
@@ -150,8 +152,9 @@ async fn stdout_sink_reports_metrics(harness: &TestHarness) 
{
 }
 
 #[iggy_harness(
+    shared_server = "stdout_sink",
     server(connectors_runtime(config_path = 
"tests/connectors/stdout/sink.toml")),
-    seed = seeds::connector_stream
+    seed = seeds::connector_stream_idempotent
 )]
 async fn stdout_sink_handles_bulk_messages(harness: &TestHarness) {
     let client = harness.root_client().await.unwrap();


Reply via email to