This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch revert-shared-server in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 234abe3e233ac53421bd936cab95a6708b61ca36 Author: Hubert Gruszecki <[email protected]> AuthorDate: Tue Apr 7 15:00:10 2026 +0200 revert(test): remove shared_server from iggy_harness (#3085) nextest spawns each test in a separate executable, making shared in-process server state impossible across tests. The SharedServerRegistry, serial_test integration, and all 113 connector test migrations are reverted. This reverts commit 2afeaa3ff19bee422a4831713caaf97ed5be0243. --- .config/nextest.toml | 14 +- Cargo.lock | 1 - core/harness_derive/src/attrs.rs | 23 --- core/harness_derive/src/codegen.rs | 173 +----------------- core/integration/Cargo.toml | 1 - core/integration/src/harness/handle/mod.rs | 2 +- core/integration/src/harness/mod.rs | 2 - .../src/harness/orchestrator/builder.rs | 2 - .../src/harness/orchestrator/harness.rs | 198 ++------------------- core/integration/src/harness/seeds.rs | 71 +------- core/integration/src/harness/shared.rs | 159 ----------------- core/integration/src/lib.rs | 3 +- core/integration/tests/connectors/api/endpoints.rs | 24 +-- .../connectors/elasticsearch/elasticsearch_sink.rs | 9 +- .../elasticsearch/elasticsearch_source.rs | 9 +- .../integration/tests/connectors/http/http_sink.rs | 26 ++- .../http_config_provider/direct_responses.rs | 30 ++-- .../http_config_provider/wrapped_responses.rs | 30 ++-- .../tests/connectors/iceberg/iceberg_sink.rs | 9 +- .../tests/connectors/influxdb/influxdb_sink.rs | 18 +- .../connectors/influxdb/influxdb_sink_formats.rs | 36 ++-- .../tests/connectors/influxdb/influxdb_source.rs | 12 +- .../connectors/influxdb/influxdb_source_formats.rs | 27 +-- .../tests/connectors/mongodb/mongodb_sink.rs | 31 ++-- .../tests/connectors/postgres/postgres_sink.rs | 9 +- .../tests/connectors/postgres/postgres_source.rs | 15 +- .../tests/connectors/postgres/restart.rs | 6 +- .../tests/connectors/quickwit/quickwit_sink.rs | 12 +- .../tests/connectors/random/random_source.rs | 3 +- .../tests/connectors/stdout/stdout_sink.rs | 9 +- 30 files changed, 137 insertions(+), 827 deletions(-) diff --git a/.config/nextest.toml b/.config/nextest.toml index 2dd295fb8..bb29d9651 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -15,21 +15,11 @@ # specific language governing permissions and limitations # under the License. -[profile.default] -slow-timeout = { period = "30s", terminate-after = 4 } - [[profile.default.overrides]] # This is a solution (or actually a workaround) for the problem that nextest does not support # #[serial] macro which shall enforce sequential execution of the test case. filter = 'package(integration) and test(cli::system::test_cli_session_scenario::should_be_successful)' threads-required = "num-cpus" -# shared_server tests: the iggy_harness macro prefixes test names with -# shared_server_ enabling a single generic nextest filter. Bounded to limit -# resource contention from parallel fixture containers without fully serializing. -[test-groups.shared-server] -max-threads = 8 - -[[profile.default.overrides]] -filter = 'package(integration) and test(/shared_server_/)' -test-group = 'shared-server' +[profile.default] +slow-timeout = { period = "30s", terminate-after = 4 } diff --git a/Cargo.lock b/Cargo.lock index 26c32044b..cb11aa7ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6151,7 +6151,6 @@ dependencies = [ "configs", "configs_derive", "ctor", - "dashmap", "figment", "futures", "harness_derive", diff --git a/core/harness_derive/src/attrs.rs b/core/harness_derive/src/attrs.rs index fa67e9239..2e4389faa 100644 --- a/core/harness_derive/src/attrs.rs +++ b/core/harness_derive/src/attrs.rs @@ -48,12 +48,6 @@ impl ClusterNodesValue { } } -/// Shared server configuration. -#[derive(Debug, Clone)] -pub struct SharedServerAttrs { - pub key: String, -} - /// Parsed `#[iggy_harness(...)]` attributes. #[derive(Debug, Default)] pub struct IggyTestAttrs { @@ -63,7 +57,6 @@ pub struct IggyTestAttrs { pub server: ServerAttrs, pub seed_fn: Option<syn::Path>, pub cluster_nodes: ClusterNodesValue, - pub shared_server: Option<SharedServerAttrs>, } /// MCP configuration attributes. @@ -89,7 +82,6 @@ impl IggyTestAttrs { server: ServerAttrs::default(), seed_fn: None, cluster_nodes: ClusterNodesValue::None, - shared_server: None, } } } @@ -256,9 +248,6 @@ impl Parse for IggyTestAttrs { AttrItem::ClusterNodes(cluster) => { attrs.cluster_nodes = cluster; } - AttrItem::SharedServer(shared) => { - attrs.shared_server = Some(shared); - } } } @@ -275,7 +264,6 @@ enum AttrItem { Server(Box<ServerAttrs>), Seed(syn::Path), ClusterNodes(ClusterNodesValue), - SharedServer(SharedServerAttrs), } impl Parse for AttrItem { @@ -305,10 +293,6 @@ impl Parse for AttrItem { let path: syn::Path = input.parse()?; Ok(AttrItem::Seed(path)) } - "shared_server" => { - let shared = parse_shared_server_attrs(input)?; - Ok(AttrItem::SharedServer(shared)) - } _ => Err(syn::Error::new( ident.span(), format!("unknown attribute: {ident_str}"), @@ -489,13 +473,6 @@ fn parse_mcp_attrs(input: ParseStream) -> syn::Result<McpAttrs> { Ok(mcp) } -/// Parse `shared_server = "key"`. -fn parse_shared_server_attrs(input: ParseStream) -> syn::Result<SharedServerAttrs> { - input.parse::<Token![=]>()?; - let lit: LitStr = input.parse()?; - Ok(SharedServerAttrs { key: lit.value() }) -} - fn parse_connectors_runtime_attrs(input: ParseStream) -> syn::Result<ConnectorsRuntimeAttrs> { let mut attrs = ConnectorsRuntimeAttrs::default(); diff --git a/core/harness_derive/src/codegen.rs b/core/harness_derive/src/codegen.rs index ba3b2b679..2abd54368 100644 --- a/core/harness_derive/src/codegen.rs +++ b/core/harness_derive/src/codegen.rs @@ -138,12 +138,6 @@ pub fn generate_tests(attrs: &IggyTestAttrs, input: &ItemFn) -> syn::Result<Toke .collect(); let params = analyze_signature(&input.sig)?; - - // Validate shared_server constraints - if attrs.shared_server.is_some() { - validate_shared_server(attrs, ¶ms, fn_name)?; - } - let matrix_params_list = matrix_params(¶ms); let has_matrix_params = !matrix_params_list.is_empty(); @@ -178,52 +172,6 @@ pub fn generate_tests(attrs: &IggyTestAttrs, input: &ItemFn) -> syn::Result<Toke ) } -fn validate_shared_server( - attrs: &IggyTestAttrs, - params: &[DetectedParam], - fn_name: &Ident, -) -> syn::Result<()> { - let has_mut_harness = params - .iter() - .any(|p| matches!(p, DetectedParam::HarnessMut { .. })); - if has_mut_harness { - return Err(syn::Error::new( - fn_name.span(), - "shared_server is incompatible with &mut TestHarness", - )); - } - - if !attrs.server.config_overrides.is_empty() { - return Err(syn::Error::new( - fn_name.span(), - "shared_server is incompatible with server config overrides", - )); - } - - if !matches!(attrs.cluster_nodes, crate::attrs::ClusterNodesValue::None) { - return Err(syn::Error::new( - fn_name.span(), - "shared_server is incompatible with cluster_nodes", - )); - } - - if attrs.server.tls.is_some() || attrs.server.websocket_tls.is_some() { - return Err(syn::Error::new( - fn_name.span(), - "shared_server is incompatible with TLS configuration", - )); - } - - if attrs.transports.iter().any(|t| t.tls_mode().is_some()) { - return Err(syn::Error::new( - fn_name.span(), - "shared_server is incompatible with TLS transports", - )); - } - - Ok(()) -} - fn generate_single_test( fn_name: &Ident, fn_vis: &syn::Visibility, @@ -237,33 +185,10 @@ fn generate_single_test( let fixture_setup = generate_fixture_setup(params); let fixture_envs = generate_fixture_envs_collection(params); let fixture_param_bindings = generate_fixture_param_bindings(params); - let fixture_seed = generate_fixture_seed(params); - let harness_param_bindings = generate_harness_param_bindings(params); - - if let Some(ref shared) = attrs.shared_server { - let shared_harness_setup = generate_shared_harness_setup(variant, has_fixtures, attrs); - let shared_start_and_seed = generate_shared_start_and_seed(attrs, fixture_seed); - let key_ident = Ident::new(&shared.key, Span::call_site()); - let shared_fn_name = format_ident!("shared_server_{}", fn_name); - - return Ok(quote! { - #(#other_attrs)* - #[::tokio::test] - #[::serial_test::serial(#key_ident)] - #fn_vis async fn #shared_fn_name() { - #fixture_setup - #fixture_envs - #fixture_param_bindings - #shared_harness_setup - #shared_start_and_seed - #harness_param_bindings - #fn_body - } - }); - } - let harness_setup = generate_harness_setup(variant, has_fixtures, attrs); + let fixture_seed = generate_fixture_seed(params); let start_and_seed = generate_start_and_seed(attrs, fixture_seed); + let harness_param_bindings = generate_harness_param_bindings(params); Ok(quote! { #(#other_attrs)* @@ -620,99 +545,6 @@ fn generate_start_and_seed(attrs: &IggyTestAttrs, fixture_seed: TokenStream) -> } } -// ============================================================================ -// Shared server code generation helpers -// ============================================================================ - -fn generate_shared_harness_setup( - variant: &TestVariant, - has_fixtures: bool, - attrs: &IggyTestAttrs, -) -> TokenStream { - let shared = attrs - .shared_server - .as_ref() - .expect("shared_server required"); - let key = &shared.key; - let transport = variant.transport.variant_ident(); - let client_config_method = - Ident::new(variant.transport.client_config_method(), Span::call_site()); - - let cr_config = if let Some(ref cr) = attrs.server.connectors_runtime { - let config_path = cr.config_path.as_deref().unwrap_or(""); - if has_fixtures { - quote! { - Some(::integration::__macro_support::ConnectorsRuntimeConfig::builder() - .config_path(::std::path::PathBuf::from(#config_path)) - .extra_envs(__fixture_envs.clone()) - .build()) - } - } else { - quote! { - Some(::integration::__macro_support::ConnectorsRuntimeConfig::builder() - .config_path(::std::path::PathBuf::from(#config_path)) - .build()) - } - } - } else { - quote! { None } - }; - - quote! { - let __shared = ::integration::__macro_support::SharedServerRegistry::get_or_start( - #key, - ::integration::__macro_support::TestServerConfig::default(), - ).await.unwrap_or_else(|e| panic!("failed to get shared server: {e}")); - - let mut __harness = ::integration::__macro_support::TestHarness::from_shared( - __shared, - #cr_config, - Some(::integration::__macro_support::ClientConfig::#client_config_method()), - ).await.unwrap_or_else(|e| panic!("failed to build shared harness: {e}")); - let _ = ::integration::__macro_support::TransportProtocol::#transport; - } -} - -fn generate_shared_start_and_seed(attrs: &IggyTestAttrs, fixture_seed: TokenStream) -> TokenStream { - let has_fixture_seed = !fixture_seed.is_empty(); - - match (&attrs.seed_fn, has_fixture_seed) { - (Some(seed_fn), true) => { - quote! { - __harness.start_shared_with_seed(|__seed_client| async move { - #seed_fn(&__seed_client).await?; - #fixture_seed - Ok(()) - }).await.unwrap_or_else(|e| panic!("failed to start shared harness: {e}")); - } - } - (Some(seed_fn), false) => { - quote! { - __harness.start_shared_with_seed(|__seed_client| async move { - #seed_fn(&__seed_client).await - }).await.unwrap_or_else(|e| panic!("failed to start shared harness: {e}")); - } - } - (None, true) => { - quote! { - __harness.start_shared_with_seed(|__seed_client| async move { - #fixture_seed - Ok(()) - }).await.unwrap_or_else(|e| panic!("failed to start shared harness: {e}")); - } - } - (None, false) => { - quote! { - __harness.start_shared().await.unwrap_or_else(|e| panic!("failed to start shared harness: {e}")); - } - } - } -} - -// ============================================================================ -// Standard (non-shared) code generation helpers -// ============================================================================ - fn fixture_var_ident(name: &syn::Ident) -> syn::Ident { let name_str = name.to_string(); let clean_name = name_str.trim_start_matches('_'); @@ -963,7 +795,6 @@ mod tests { }, seed_fn: None, cluster_nodes: crate::attrs::ClusterNodesValue::None, - shared_server: None, }; let variants = generate_variants(&attrs); // 2 transports * 2 segment sizes * 2 cache modes = 8 variants diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml index 513dc92d9..c8830b19f 100644 --- a/core/integration/Cargo.toml +++ b/core/integration/Cargo.toml @@ -36,7 +36,6 @@ compio = { workspace = true } configs = { workspace = true } configs_derive = { workspace = true } ctor = { workspace = true } -dashmap = { workspace = true } figment = { workspace = true } futures = { workspace = true } harness_derive = { workspace = true } diff --git a/core/integration/src/harness/handle/mod.rs b/core/integration/src/harness/handle/mod.rs index a31ad7535..da746d175 100644 --- a/core/integration/src/harness/handle/mod.rs +++ b/core/integration/src/harness/handle/mod.rs @@ -25,7 +25,7 @@ mod mcp; mod server; pub use client::ClientHandle; -pub use client_builder::{ClientBuilder, ServerConnection}; +pub use client_builder::ClientBuilder; pub use connectors_runtime::ConnectorsRuntimeHandle; pub use mcp::{McpClient, McpHandle}; pub use server::{ServerHandle, ServerLogs}; diff --git a/core/integration/src/harness/mod.rs b/core/integration/src/harness/mod.rs index c8d3757ae..13793ab05 100644 --- a/core/integration/src/harness/mod.rs +++ b/core/integration/src/harness/mod.rs @@ -51,7 +51,6 @@ mod helpers; mod orchestrator; mod port_reserver; pub mod seeds; -mod shared; mod traits; pub use config::{ @@ -72,4 +71,3 @@ pub use helpers::{ }; pub use fixture::TestFixture; -pub use shared::{SharedServerInfo, SharedServerRegistry}; diff --git a/core/integration/src/harness/orchestrator/builder.rs b/core/integration/src/harness/orchestrator/builder.rs index 668d9a193..46afddd3b 100644 --- a/core/integration/src/harness/orchestrator/builder.rs +++ b/core/integration/src/harness/orchestrator/builder.rs @@ -226,8 +226,6 @@ impl TestHarnessBuilder { primary_transport, primary_client_config: self.primary_client_config, started: false, - shared_server: None, - shared_connectors_runtime: None, }) } } diff --git a/core/integration/src/harness/orchestrator/harness.rs b/core/integration/src/harness/orchestrator/harness.rs index ca99731c2..6897b31b4 100644 --- a/core/integration/src/harness/orchestrator/harness.rs +++ b/core/integration/src/harness/orchestrator/harness.rs @@ -18,15 +18,14 @@ */ use super::builder::TestHarnessBuilder; -use crate::harness::config::{ClientConfig, ConnectorsRuntimeConfig}; +use crate::harness::config::ClientConfig; use crate::harness::context::TestContext; use crate::harness::error::TestBinaryError; use crate::harness::handle::{ - ClientBuilder, ClientHandle, ConnectorsRuntimeHandle, McpClient, McpHandle, ServerConnection, - ServerHandle, ServerLogs, + ClientBuilder, ClientHandle, ConnectorsRuntimeHandle, McpClient, McpHandle, ServerHandle, + ServerLogs, }; -use crate::harness::shared::SharedServerInfo; -use crate::harness::traits::{IggyServerDependent, Restartable, TestBinary}; +use crate::harness::traits::{Restartable, TestBinary}; use futures::executor::block_on; use iggy::prelude::{ClientWrapper, IggyClient}; use iggy_common::TransportProtocol; @@ -48,8 +47,6 @@ pub struct TestHarness { pub(super) primary_transport: Option<TransportProtocol>, pub(super) primary_client_config: Option<ClientConfig>, pub(super) started: bool, - pub(super) shared_server: Option<Arc<SharedServerInfo>>, - pub(super) shared_connectors_runtime: Option<ConnectorsRuntimeHandle>, } impl std::fmt::Debug for TestHarness { @@ -75,39 +72,6 @@ impl TestHarness { TestHarnessBuilder::default() } - /// Create a harness that uses a shared server (no ownership of server process). - /// - /// Each test still gets its own `TestContext` (for logs), connector runtime, - /// and clients. Only the iggy-server process is shared. - pub async fn from_shared( - shared: Arc<SharedServerInfo>, - connectors_runtime_config: Option<ConnectorsRuntimeConfig>, - primary_client_config: Option<ClientConfig>, - ) -> Result<Self, TestBinaryError> { - let mut context = TestContext::new(None, true)?; - context.ensure_created()?; - let context = Arc::new(context); - - let shared_cr = connectors_runtime_config - .map(|cfg| ConnectorsRuntimeHandle::with_server_id(cfg, context.clone(), 0)); - - let primary_transport = primary_client_config.as_ref().map(|c| c.transport); - - shared.acquire(); - - Ok(TestHarness { - context, - servers: Vec::new(), - clients: Vec::new(), - client_configs: Vec::new(), - primary_transport, - primary_client_config, - started: false, - shared_server: Some(shared), - shared_connectors_runtime: shared_cr, - }) - } - /// Start all configured binaries and create clients. pub async fn start(&mut self) -> Result<(), TestBinaryError> { self.start_internal( @@ -133,76 +97,6 @@ impl TestHarness { self.start_internal(Some(seed)).await } - /// Start a shared-server harness without a seed function. - pub async fn start_shared(&mut self) -> Result<(), TestBinaryError> { - self.start_shared_internal( - None::< - fn( - IggyClient, - ) - -> std::future::Ready<Result<(), Box<dyn std::error::Error + Send + Sync>>>, - >, - ) - .await - } - - /// Start a shared-server harness with a seed function. - /// - /// The seed runs BEFORE the connector runtime starts, allowing it to - /// create streams/topics that the connector expects to find. - pub async fn start_shared_with_seed<F, Fut>(&mut self, seed: F) -> Result<(), TestBinaryError> - where - F: FnOnce(IggyClient) -> Fut, - Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>, - { - self.start_shared_internal(Some(seed)).await - } - - async fn start_shared_internal<F, Fut>( - &mut self, - seed: Option<F>, - ) -> Result<(), TestBinaryError> - where - F: FnOnce(IggyClient) -> Fut, - Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>, - { - if self.started { - return Err(TestBinaryError::AlreadyStarted); - } - - let shared = self - .shared_server - .as_ref() - .ok_or_else(|| TestBinaryError::InvalidState { - message: "start_shared called without shared server".to_string(), - })?; - - let tcp_addr = shared - .tcp_addr() - .ok_or_else(|| TestBinaryError::InvalidState { - message: "Shared server has no TCP address".to_string(), - })?; - - // Seed runs before connector runtime - creates streams/topics - if let Some(seed_fn) = seed { - let client = self.tcp_root_client().await?; - seed_fn(client) - .await - .map_err(|e| TestBinaryError::SeedFailed(e.to_string()))?; - } - - // Connector runtime starts after seed - streams/topics exist now - if let Some(ref mut cr) = self.shared_connectors_runtime { - cr.set_iggy_address(tcp_addr); - cr.start()?; - cr.wait_ready().await?; - } - - self.create_clients().await?; - self.started = true; - Ok(()) - } - async fn start_internal<F, Fut>(&mut self, seed: Option<F>) -> Result<(), TestBinaryError> where F: FnOnce(IggyClient) -> Fut, @@ -244,20 +138,6 @@ impl TestHarness { } self.clients.clear(); - // Stop per-test connector runtime (shared server mode) - if let Some(ref mut cr) = self.shared_connectors_runtime { - cr.stop()?; - } - - // Release shared server ref - last test stops the server and cleans up - if let Some(shared) = self.shared_server.take() { - if std::thread::panicking() { - shared.mark_failed(); - } - shared.release(); - } - - // Stop owned servers (non-shared mode only) for server in self.servers.iter_mut().rev() { server.stop_dependents()?; server.stop()?; @@ -288,24 +168,12 @@ impl TestHarness { } /// Get reference to the first (primary) server handle. - /// - /// Not available in `shared_server` mode - the server is managed by `SharedServerRegistry`. pub fn server(&self) -> &ServerHandle { - assert!( - self.shared_server.is_none(), - "server() is not available in shared_server mode" - ); self.servers.first().expect("No servers configured") } /// Get mutable reference to the first (primary) server handle. - /// - /// Not available in `shared_server` mode - the server is managed by `SharedServerRegistry`. pub fn server_mut(&mut self) -> &mut ServerHandle { - assert!( - self.shared_server.is_none(), - "server_mut() is not available in shared_server mode" - ); self.servers.first_mut().expect("No servers configured") } @@ -382,15 +250,9 @@ impl TestHarness { /// Get the connectors runtime handle from the primary server if configured. /// - /// Returns the per-test connector runtime for shared-server harnesses, - /// or the server-owned one for standard harnesses. - /// /// # Panics /// Panics if called on a cluster (multiple servers). Use `node(i).connectors_runtime()` instead. pub fn connectors_runtime(&self) -> Option<&ConnectorsRuntimeHandle> { - if self.shared_connectors_runtime.is_some() { - return self.shared_connectors_runtime.as_ref(); - } assert!( self.servers.len() <= 1, "connectors_runtime() is only available for single-server setups. Use node(i).connectors_runtime() for clusters." @@ -425,18 +287,6 @@ impl TestHarness { &self, transport: TransportProtocol, ) -> Result<ClientBuilder, TestBinaryError> { - if let Some(ref shared) = self.shared_server { - let connection = ServerConnection { - tcp_addr: shared.tcp_addr(), - http_addr: shared.http_addr(), - quic_addr: shared.quic_addr(), - websocket_addr: shared.websocket_addr(), - tls: None, - websocket_tls: None, - tls_ca_cert_path: shared.tls_ca_cert_path().cloned(), - }; - return Ok(ClientBuilder::new(transport, connection)); - } let server = self.servers.first().ok_or(TestBinaryError::MissingServer)?; match transport { TransportProtocol::Tcp => server.tcp_client(), @@ -464,7 +314,14 @@ impl TestHarness { &self, transport: TransportProtocol, ) -> Result<IggyClient, TestBinaryError> { - self.client_builder_for(transport)?.connect().await + let server = self.servers.first().ok_or(TestBinaryError::MissingServer)?; + let builder = match transport { + TransportProtocol::Tcp => server.tcp_client()?, + TransportProtocol::Http => server.http_client()?, + TransportProtocol::Quic => server.quic_client()?, + TransportProtocol::WebSocket => server.websocket_client()?, + }; + builder.connect().await } pub async fn tcp_root_client(&self) -> Result<IggyClient, TestBinaryError> { @@ -564,33 +421,16 @@ impl TestHarness { } pub(super) async fn create_clients(&mut self) -> Result<(), TestBinaryError> { - // Resolve addresses from either owned server or shared server - let (tcp, http, quic, ws, ca_cert) = if let Some(ref shared) = self.shared_server { - ( - shared.tcp_addr(), - shared.http_addr(), - shared.quic_addr(), - shared.websocket_addr(), - shared.tls_ca_cert_path().cloned(), - ) - } else if let Some(server) = self.servers.first() { - ( - server.tcp_addr(), - server.http_addr(), - server.quic_addr(), - server.websocket_addr(), - server.tls_ca_cert_path(), - ) - } else { + let Some(server) = self.servers.first() else { return Ok(()); }; for config in &self.client_configs { let address = match config.transport { - TransportProtocol::Tcp => tcp, - TransportProtocol::Http => http, - TransportProtocol::Quic => quic, - TransportProtocol::WebSocket => ws, + TransportProtocol::Tcp => server.tcp_addr(), + TransportProtocol::Http => server.http_addr(), + TransportProtocol::Quic => server.quic_addr(), + TransportProtocol::WebSocket => server.websocket_addr(), }; let Some(address) = address else { @@ -601,9 +441,9 @@ impl TestHarness { let mut config = config.clone(); if config.tls_enabled - && let Some(ref path) = ca_cert + && let Some(ca_cert_path) = server.tls_ca_cert_path() { - config.tls_ca_file = Some(path.clone()); + config.tls_ca_file = Some(ca_cert_path); } let mut client = ClientHandle::new(config, address); diff --git a/core/integration/src/harness/seeds.rs b/core/integration/src/harness/seeds.rs index c29ec25cf..aa5554c70 100644 --- a/core/integration/src/harness/seeds.rs +++ b/core/integration/src/harness/seeds.rs @@ -19,7 +19,7 @@ use iggy::prelude::{IggyClient, StreamClient, TopicClient}; use iggy_common::{ - CompressionAlgorithm, Consumer, Identifier, IggyError, IggyExpiry, IggyMessage, MaxTopicSize, + CompressionAlgorithm, Consumer, Identifier, IggyExpiry, IggyMessage, MaxTopicSize, Partitioning, PersonalAccessTokenExpiry, UserStatus, }; use iggy_common::{ @@ -123,75 +123,6 @@ pub async fn connector_multi_topic_stream(client: &IggyClient) -> Result<(), See Ok(()) } -/// Idempotent seed for serial shared_server tests: creates stream and topic, -/// silently skipping if they already exist from a previous test in the group. -pub async fn connector_stream_idempotent(client: &IggyClient) -> Result<(), SeedError> { - let stream_id: Identifier = names::STREAM.try_into()?; - - match client.create_stream(names::STREAM).await { - Ok(_) => {} - Err(e) if e.as_code() == IggyError::StreamNameAlreadyExists(String::new()).as_code() => {} - Err(e) => return Err(e.into()), - } - - match client - .create_topic( - &stream_id, - names::TOPIC, - 1, - CompressionAlgorithm::None, - None, - IggyExpiry::ServerDefault, - MaxTopicSize::ServerDefault, - ) - .await - { - Ok(_) => {} - Err(e) - if e.as_code() - == IggyError::TopicNameAlreadyExists(String::new(), Identifier::default()) - .as_code() => {} - Err(e) => return Err(e.into()), - } - - Ok(()) -} - -/// Idempotent multi-topic seed for serial shared_server tests. -pub async fn connector_multi_topic_stream_idempotent(client: &IggyClient) -> Result<(), SeedError> { - let stream_id: Identifier = names::STREAM.try_into()?; - - match client.create_stream(names::STREAM).await { - Ok(_) => {} - Err(e) if e.as_code() == IggyError::StreamNameAlreadyExists(String::new()).as_code() => {} - Err(e) => return Err(e.into()), - } - - for topic_name in [names::TOPIC, names::TOPIC_2] { - match client - .create_topic( - &stream_id, - topic_name, - 1, - CompressionAlgorithm::None, - None, - IggyExpiry::ServerDefault, - MaxTopicSize::ServerDefault, - ) - .await - { - Ok(_) => {} - Err(e) - if e.as_code() - == IggyError::TopicNameAlreadyExists(String::new(), Identifier::default()) - .as_code() => {} - Err(e) => return Err(e.into()), - } - } - - Ok(()) -} - /// Standard MCP test data: stream, topic, message, consumer group, consumer offset, user, PAT. pub async fn mcp_standard(client: &IggyClient) -> Result<(), SeedError> { let stream_id: Identifier = names::STREAM.try_into()?; diff --git a/core/integration/src/harness/shared.rs b/core/integration/src/harness/shared.rs deleted file mode 100644 index 82f717ea0..000000000 --- a/core/integration/src/harness/shared.rs +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -use crate::harness::config::TestServerConfig; -use crate::harness::context::TestContext; -use crate::harness::error::TestBinaryError; -use crate::harness::handle::ServerHandle; -use crate::harness::traits::TestBinary; -use dashmap::DashMap; -use std::net::SocketAddr; -use std::path::PathBuf; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::{Arc, LazyLock}; -use tokio::sync::OnceCell; - -/// Shared server state: addresses, handles, and active user count. -/// -/// Rust statics are not dropped at process exit, so we use an atomic counter -/// to track active tests. The last test to finish stops the server and -/// cleans up the test directory. -pub struct SharedServerInfo { - tcp_addr: Option<SocketAddr>, - http_addr: Option<SocketAddr>, - quic_addr: Option<SocketAddr>, - websocket_addr: Option<SocketAddr>, - tls_ca_cert_path: Option<PathBuf>, - handle: std::sync::Mutex<Option<ServerHandle>>, - context: Arc<TestContext>, - active_count: AtomicUsize, - any_test_failed: AtomicBool, -} - -impl SharedServerInfo { - pub fn tcp_addr(&self) -> Option<SocketAddr> { - self.tcp_addr - } - - pub fn http_addr(&self) -> Option<SocketAddr> { - self.http_addr - } - - pub fn quic_addr(&self) -> Option<SocketAddr> { - self.quic_addr - } - - pub fn websocket_addr(&self) -> Option<SocketAddr> { - self.websocket_addr - } - - pub fn tls_ca_cert_path(&self) -> Option<&PathBuf> { - self.tls_ca_cert_path.as_ref() - } - - /// Increment the active test count. Called when a test acquires this server. - pub fn acquire(&self) { - self.active_count.fetch_add(1, Ordering::AcqRel); - } - - /// Mark that a test using this server has failed. - /// Preserves the test directory for debugging. - pub fn mark_failed(&self) { - self.any_test_failed.store(true, Ordering::Release); - } - - /// Decrement the active test count. When it reaches zero, stops the server - /// and cleans up the test directory (unless any test failed). - pub fn release(&self) { - let prev = self.active_count.fetch_sub(1, Ordering::AcqRel); - if prev == 1 { - match self.handle.lock() { - Ok(mut guard) => { - if let Some(ref mut server) = *guard - && let Err(e) = server.stop() - { - eprintln!("[SharedServer] failed to stop server: {e}"); - } - *guard = None; - } - Err(e) => { - eprintln!("[SharedServer] mutex poisoned, server process may leak: {e}"); - } - } - if !self.any_test_failed.load(Ordering::Acquire) { - self.context.cleanup(); - } - } - } -} - -static REGISTRY: LazyLock<DashMap<String, Arc<OnceCell<Arc<SharedServerInfo>>>>> = - LazyLock::new(DashMap::new); - -/// Global registry for shared server instances. -/// -/// Tests with the same `shared_server` key share a single iggy-server process. -/// The first test to request a key starts the server; all subsequent tests -/// get a reference to the already-running server. The last test to finish -/// stops the server and cleans up the test directory. -pub struct SharedServerRegistry; - -impl SharedServerRegistry { - /// Get or start a shared server for the given key. - /// - /// Thread-safe: concurrent callers for the same key will block until the - /// first caller finishes initialization, then all receive the same `Arc`. - /// - /// Callers must call `SharedServerInfo::acquire()` after obtaining the - /// reference and `SharedServerInfo::release()` when done (typically in - /// `TestHarness::stop()`). - pub async fn get_or_start( - key: &str, - config: TestServerConfig, - ) -> Result<Arc<SharedServerInfo>, TestBinaryError> { - let cell = REGISTRY - .entry(key.to_string()) - .or_insert_with(|| Arc::new(OnceCell::new())) - .value() - .clone(); - - cell.get_or_try_init(|| async move { - let mut context = TestContext::new(Some(format!("shared_server_{key}")), true)?; - context.ensure_created()?; - let context = Arc::new(context); - - let mut server = ServerHandle::with_config(config, context.clone()); - server.start()?; - - Ok(Arc::new(SharedServerInfo { - tcp_addr: server.tcp_addr(), - http_addr: server.http_addr(), - quic_addr: server.quic_addr(), - websocket_addr: server.websocket_addr(), - tls_ca_cert_path: server.tls_ca_cert_path(), - handle: std::sync::Mutex::new(Some(server)), - context, - active_count: AtomicUsize::new(0), - any_test_failed: AtomicBool::new(false), - })) - }) - .await - .cloned() - } -} diff --git a/core/integration/src/lib.rs b/core/integration/src/lib.rs index 41ec34555..fc03d02d7 100644 --- a/core/integration/src/lib.rs +++ b/core/integration/src/lib.rs @@ -24,8 +24,7 @@ pub use harness_derive::iggy_harness; #[doc(hidden)] pub mod __macro_support { pub use crate::harness::{ - ClientConfig, ConnectorsRuntimeConfig, McpClient, McpConfig, SharedServerRegistry, - TestHarness, TestServerConfig, TlsConfig, + ClientConfig, McpClient, McpConfig, TestHarness, TestServerConfig, TlsConfig, }; pub use iggy::prelude::ClientWrapper; pub use iggy_common::TransportProtocol; diff --git a/core/integration/tests/connectors/api/endpoints.rs b/core/integration/tests/connectors/api/endpoints.rs index 5fe987a29..69f6cdbfb 100644 --- a/core/integration/tests/connectors/api/endpoints.rs +++ b/core/integration/tests/connectors/api/endpoints.rs @@ -27,9 +27,8 @@ use reqwest::Client; const API_KEY: &str = "test-api-key"; #[iggy_harness( - shared_server = "connector_api", server(connectors_runtime(config_path = "tests/connectors/api/config.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn root_endpoint_returns_welcome_message(harness: &TestHarness) { let api_address = harness @@ -50,9 +49,8 @@ async fn root_endpoint_returns_welcome_message(harness: &TestHarness) { } #[iggy_harness( - shared_server = "connector_api", server(connectors_runtime(config_path = "tests/connectors/api/config.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn health_endpoint_returns_healthy(harness: &TestHarness) { let api_address = harness @@ -73,9 +71,8 @@ async fn health_endpoint_returns_healthy(harness: &TestHarness) { } #[iggy_harness( - shared_server = "connector_api", server(connectors_runtime(config_path = "tests/connectors/api/config.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn stats_endpoint_returns_runtime_stats(harness: &TestHarness) { let api_address = harness @@ -103,9 +100,8 @@ async fn stats_endpoint_returns_runtime_stats(harness: &TestHarness) { } #[iggy_harness( - shared_server = "connector_api", server(connectors_runtime(config_path = "tests/connectors/api/config.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn metrics_endpoint_returns_prometheus_format(harness: &TestHarness) { let api_address = harness @@ -131,9 +127,8 @@ async fn metrics_endpoint_returns_prometheus_format(harness: &TestHarness) { } #[iggy_harness( - shared_server = "connector_api", server(connectors_runtime(config_path = "tests/connectors/api/config.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn sources_endpoint_returns_list(harness: &TestHarness) { let api_address = harness @@ -155,9 +150,8 @@ async fn sources_endpoint_returns_list(harness: &TestHarness) { } #[iggy_harness( - shared_server = "connector_api", server(connectors_runtime(config_path = "tests/connectors/api/config.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn sinks_endpoint_returns_list(harness: &TestHarness) { let api_address = harness @@ -179,9 +173,8 @@ async fn sinks_endpoint_returns_list(harness: &TestHarness) { } #[iggy_harness( - shared_server = "connector_api", server(connectors_runtime(config_path = "tests/connectors/api/config.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn api_key_authentication_required(harness: &TestHarness) { let api_address = harness @@ -224,9 +217,8 @@ async fn api_key_authentication_required(harness: &TestHarness) { } #[iggy_harness( - shared_server = "connector_api", server(connectors_runtime(config_path = "tests/connectors/api/config.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn api_key_authentication_rejected_with_invalid_key(harness: &TestHarness) { let api_address = harness diff --git a/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs b/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs index 13190f9b5..ad6ea1c86 100644 --- a/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs +++ b/core/integration/tests/connectors/elasticsearch/elasticsearch_sink.rs @@ -28,9 +28,8 @@ use integration::harness::seeds; use integration::iggy_harness; #[iggy_harness( - shared_server = "elasticsearch_sink", server(connectors_runtime(config_path = "tests/connectors/elasticsearch/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn elasticsearch_sink_stores_json_messages( harness: &TestHarness, @@ -87,9 +86,8 @@ async fn elasticsearch_sink_stores_json_messages( } #[iggy_harness( - shared_server = "elasticsearch_sink", server(connectors_runtime(config_path = "tests/connectors/elasticsearch/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn elasticsearch_sink_handles_bulk_messages( harness: &TestHarness, @@ -142,9 +140,8 @@ async fn elasticsearch_sink_handles_bulk_messages( } #[iggy_harness( - shared_server = "elasticsearch_sink", server(connectors_runtime(config_path = "tests/connectors/elasticsearch/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn elasticsearch_sink_preserves_json_structure( harness: &TestHarness, diff --git a/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs b/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs index f945f4889..2beaad75c 100644 --- a/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs +++ b/core/integration/tests/connectors/elasticsearch/elasticsearch_source.rs @@ -27,9 +27,8 @@ use std::time::Duration; use tokio::time::sleep; #[iggy_harness( - shared_server = "elasticsearch_source", server(connectors_runtime(config_path = "tests/connectors/elasticsearch/source.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn elasticsearch_source_produces_messages_to_iggy( harness: &TestHarness, @@ -105,9 +104,8 @@ async fn elasticsearch_source_produces_messages_to_iggy( } #[iggy_harness( - shared_server = "elasticsearch_source", server(connectors_runtime(config_path = "tests/connectors/elasticsearch/source.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn elasticsearch_source_handles_empty_index( harness: &TestHarness, @@ -146,9 +144,8 @@ async fn elasticsearch_source_handles_empty_index( } #[iggy_harness( - shared_server = "elasticsearch_source", server(connectors_runtime(config_path = "tests/connectors/elasticsearch/source.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn elasticsearch_source_produces_bulk_messages( harness: &TestHarness, diff --git a/core/integration/tests/connectors/http/http_sink.rs b/core/integration/tests/connectors/http/http_sink.rs index 3056d69e3..e41c59ab6 100644 --- a/core/integration/tests/connectors/http/http_sink.rs +++ b/core/integration/tests/connectors/http/http_sink.rs @@ -196,9 +196,8 @@ use integration::iggy_harness; /// Validates `batch_mode=individual`: one HTTP POST per message, each with metadata envelope. /// Checks request count = message count, envelope structure, and `application/json` content type. #[iggy_harness( - shared_server = "http_sink", server(connectors_runtime(config_path = "tests/connectors/http/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn individual_json_messages_delivered_as_separate_posts( harness: &TestHarness, @@ -304,9 +303,8 @@ async fn individual_json_messages_delivered_as_separate_posts( /// Validates `batch_mode=ndjson`: all messages in one request as newline-delimited JSON. /// Checks single request, line count = message count, per-line envelope, `application/x-ndjson`. #[iggy_harness( - shared_server = "http_sink", server(connectors_runtime(config_path = "tests/connectors/http/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn ndjson_messages_delivered_as_single_request( harness: &TestHarness, @@ -397,9 +395,8 @@ async fn ndjson_messages_delivered_as_single_request( /// Validates `batch_mode=json_array`: all messages as a single JSON array in one request. /// Checks single request, array length = message count, per-item envelope, `application/json`. #[iggy_harness( - shared_server = "http_sink", server(connectors_runtime(config_path = "tests/connectors/http/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn json_array_messages_delivered_as_single_request( harness: &TestHarness, @@ -491,9 +488,8 @@ async fn json_array_messages_delivered_as_single_request( /// Validates `batch_mode=raw`: each message as raw bytes without metadata envelope. /// Checks request count = message count, no envelope wrapper, `application/octet-stream`. #[iggy_harness( - shared_server = "http_sink", server(connectors_runtime(config_path = "tests/connectors/http/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn raw_binary_messages_delivered_without_envelope( harness: &TestHarness, @@ -579,9 +575,8 @@ async fn raw_binary_messages_delivered_without_envelope( /// Validates `include_metadata=false`: bare payload without `{metadata, payload}` envelope. /// Checks no metadata field in body, original payload fields at top level. #[iggy_harness( - shared_server = "http_sink", server(connectors_runtime(config_path = "tests/connectors/http/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn metadata_disabled_sends_bare_payload( harness: &TestHarness, @@ -656,9 +651,8 @@ async fn metadata_disabled_sends_bare_payload( /// Validates sequential offset integrity: `iggy_offset` values are contiguous across /// 5 delivered messages. Sorts by offset and checks each = previous + 1. #[iggy_harness( - shared_server = "http_sink", server(connectors_runtime(config_path = "tests/connectors/http/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn individual_messages_have_sequential_offsets( harness: &TestHarness, @@ -744,9 +738,8 @@ async fn individual_messages_have_sequential_offsets( /// Validates multi-topic delivery: one connector consuming two topics on the same stream. /// Sends 2 messages to topic 1, 1 to topic 2, verifies `iggy_topic` metadata matches source. #[iggy_harness( - shared_server = "http_sink", server(connectors_runtime(config_path = "tests/connectors/http/sink.toml")), - seed = seeds::connector_multi_topic_stream_idempotent + seed = seeds::connector_multi_topic_stream )] async fn multi_topic_messages_delivered_with_correct_topic_metadata( harness: &TestHarness, @@ -756,8 +749,8 @@ async fn multi_topic_messages_delivered_with_correct_topic_metadata( let stream_id: Identifier = seeds::names::STREAM.try_into().unwrap(); let topic_1_id: Identifier = seeds::names::TOPIC.try_into().unwrap(); - // Both topics created by connector_multi_topic_stream_idempotent seed (runs before - // connector runtime starts - runtime health check requires all configured topics). + // Step 1: Both topics created by connector_multi_topic_stream seed (runs before + // connector runtime starts — runtime health check requires all configured topics). let topic_2_id: Identifier = seeds::names::TOPIC_2.try_into().unwrap(); // Step 2: Send 2 messages to topic 1 with source identifier in payload @@ -829,6 +822,7 @@ async fn multi_topic_messages_delivered_with_correct_topic_metadata( ) }); + // Match against constants — not magic strings (code review M9) match iggy_topic { t if t == seeds::names::TOPIC => { topic_1_count += 1; diff --git a/core/integration/tests/connectors/http_config_provider/direct_responses.rs b/core/integration/tests/connectors/http_config_provider/direct_responses.rs index 17ce1279d..eeebddf4b 100644 --- a/core/integration/tests/connectors/http_config_provider/direct_responses.rs +++ b/core/integration/tests/connectors/http_config_provider/direct_responses.rs @@ -23,9 +23,8 @@ use integration::iggy_harness; use reqwest::StatusCode; #[iggy_harness( - shared_server = "http_config_direct", server(connectors_runtime(config_path = "tests/connectors/http_config_provider/config_direct.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn source_configs_list_returns_all_versions( harness: &TestHarness, @@ -66,8 +65,8 @@ async fn source_configs_list_returns_all_versions( assert_eq!(streams.len(), 1, "Should have 1 stream config"); let stream = &streams[0]; - assert_eq!(stream["stream"].as_str().unwrap(), seeds::names::STREAM); - assert_eq!(stream["topic"].as_str().unwrap(), seeds::names::TOPIC); + assert_eq!(stream["stream"].as_str().unwrap(), "test_stream"); + assert_eq!(stream["topic"].as_str().unwrap(), "test_topic"); assert_eq!(stream["schema"].as_str().unwrap(), "json"); assert!(stream["batch_length"].as_u64().is_some()); assert!(stream["linger_time"].as_str().is_some()); @@ -81,9 +80,8 @@ async fn source_configs_list_returns_all_versions( } #[iggy_harness( - shared_server = "http_config_direct", server(connectors_runtime(config_path = "tests/connectors/http_config_provider/config_direct.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn source_config_by_version_returns_specific_version( harness: &TestHarness, @@ -113,13 +111,12 @@ async fn source_config_by_version_returns_specific_version( let streams = body["streams"].as_array().unwrap(); assert_eq!(streams.len(), 1); - assert_eq!(streams[0]["topic"].as_str().unwrap(), seeds::names::TOPIC); + assert_eq!(streams[0]["topic"].as_str().unwrap(), "test_topic"); } #[iggy_harness( - shared_server = "http_config_direct", server(connectors_runtime(config_path = "tests/connectors/http_config_provider/config_direct.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn source_active_config_returns_current_version( harness: &TestHarness, @@ -154,9 +151,8 @@ async fn source_active_config_returns_current_version( } #[iggy_harness( - shared_server = "http_config_direct", server(connectors_runtime(config_path = "tests/connectors/http_config_provider/config_direct.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn sink_configs_list_returns_all_versions( harness: &TestHarness, @@ -197,10 +193,10 @@ async fn sink_configs_list_returns_all_versions( assert_eq!(streams.len(), 1, "Should have 1 stream config"); let stream = &streams[0]; - assert_eq!(stream["stream"].as_str().unwrap(), seeds::names::STREAM); + assert_eq!(stream["stream"].as_str().unwrap(), "test_stream"); let topics = stream["topics"].as_array().unwrap(); assert_eq!(topics.len(), 1); - assert_eq!(topics[0].as_str().unwrap(), seeds::names::TOPIC); + assert_eq!(topics[0].as_str().unwrap(), "test_topic"); assert_eq!(stream["schema"].as_str().unwrap(), "json"); assert!(stream["batch_length"].as_u64().is_some()); assert!(stream["poll_interval"].as_str().is_some()); @@ -211,9 +207,8 @@ async fn sink_configs_list_returns_all_versions( } #[iggy_harness( - shared_server = "http_config_direct", server(connectors_runtime(config_path = "tests/connectors/http_config_provider/config_direct.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn sink_config_by_version_returns_specific_version( harness: &TestHarness, @@ -244,13 +239,12 @@ async fn sink_config_by_version_returns_specific_version( let streams = body["streams"].as_array().unwrap(); assert_eq!(streams.len(), 1); let topics = streams[0]["topics"].as_array().unwrap(); - assert_eq!(topics[0].as_str().unwrap(), seeds::names::TOPIC); + assert_eq!(topics[0].as_str().unwrap(), "test_topic"); } #[iggy_harness( - shared_server = "http_config_direct", server(connectors_runtime(config_path = "tests/connectors/http_config_provider/config_direct.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn sink_active_config_returns_current_version( harness: &TestHarness, diff --git a/core/integration/tests/connectors/http_config_provider/wrapped_responses.rs b/core/integration/tests/connectors/http_config_provider/wrapped_responses.rs index aab8df975..1ef5f99ae 100644 --- a/core/integration/tests/connectors/http_config_provider/wrapped_responses.rs +++ b/core/integration/tests/connectors/http_config_provider/wrapped_responses.rs @@ -23,9 +23,8 @@ use integration::iggy_harness; use reqwest::StatusCode; #[iggy_harness( - shared_server = "http_config_wrapped", server(connectors_runtime(config_path = "tests/connectors/http_config_provider/config_wrapped.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn source_configs_list_returns_all_versions( harness: &TestHarness, @@ -66,8 +65,8 @@ async fn source_configs_list_returns_all_versions( assert_eq!(streams.len(), 1, "Should have 1 stream config"); let stream = &streams[0]; - assert_eq!(stream["stream"].as_str().unwrap(), seeds::names::STREAM); - assert_eq!(stream["topic"].as_str().unwrap(), seeds::names::TOPIC); + assert_eq!(stream["stream"].as_str().unwrap(), "test_stream"); + assert_eq!(stream["topic"].as_str().unwrap(), "test_topic"); assert_eq!(stream["schema"].as_str().unwrap(), "json"); assert!(stream["batch_length"].as_u64().is_some()); assert!(stream["linger_time"].as_str().is_some()); @@ -81,9 +80,8 @@ async fn source_configs_list_returns_all_versions( } #[iggy_harness( - shared_server = "http_config_wrapped", server(connectors_runtime(config_path = "tests/connectors/http_config_provider/config_wrapped.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn source_config_by_version_returns_specific_version( harness: &TestHarness, @@ -113,13 +111,12 @@ async fn source_config_by_version_returns_specific_version( let streams = body["streams"].as_array().unwrap(); assert_eq!(streams.len(), 1); - assert_eq!(streams[0]["topic"].as_str().unwrap(), seeds::names::TOPIC); + assert_eq!(streams[0]["topic"].as_str().unwrap(), "test_topic"); } #[iggy_harness( - shared_server = "http_config_wrapped", server(connectors_runtime(config_path = "tests/connectors/http_config_provider/config_wrapped.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn source_active_config_returns_current_version( harness: &TestHarness, @@ -154,9 +151,8 @@ async fn source_active_config_returns_current_version( } #[iggy_harness( - shared_server = "http_config_wrapped", server(connectors_runtime(config_path = "tests/connectors/http_config_provider/config_wrapped.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn sink_configs_list_returns_all_versions( harness: &TestHarness, @@ -197,10 +193,10 @@ async fn sink_configs_list_returns_all_versions( assert_eq!(streams.len(), 1, "Should have 1 stream config"); let stream = &streams[0]; - assert_eq!(stream["stream"].as_str().unwrap(), seeds::names::STREAM); + assert_eq!(stream["stream"].as_str().unwrap(), "test_stream"); let topics = stream["topics"].as_array().unwrap(); assert_eq!(topics.len(), 1); - assert_eq!(topics[0].as_str().unwrap(), seeds::names::TOPIC); + assert_eq!(topics[0].as_str().unwrap(), "test_topic"); assert_eq!(stream["schema"].as_str().unwrap(), "json"); assert!(stream["batch_length"].as_u64().is_some()); assert!(stream["poll_interval"].as_str().is_some()); @@ -211,9 +207,8 @@ async fn sink_configs_list_returns_all_versions( } #[iggy_harness( - shared_server = "http_config_wrapped", server(connectors_runtime(config_path = "tests/connectors/http_config_provider/config_wrapped.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn sink_config_by_version_returns_specific_version( harness: &TestHarness, @@ -244,13 +239,12 @@ async fn sink_config_by_version_returns_specific_version( let streams = body["streams"].as_array().unwrap(); assert_eq!(streams.len(), 1); let topics = streams[0]["topics"].as_array().unwrap(); - assert_eq!(topics[0].as_str().unwrap(), seeds::names::TOPIC); + assert_eq!(topics[0].as_str().unwrap(), "test_topic"); } #[iggy_harness( - shared_server = "http_config_wrapped", server(connectors_runtime(config_path = "tests/connectors/http_config_provider/config_wrapped.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn sink_active_config_returns_current_version( harness: &TestHarness, diff --git a/core/integration/tests/connectors/iceberg/iceberg_sink.rs b/core/integration/tests/connectors/iceberg/iceberg_sink.rs index ad031dd80..777214116 100644 --- a/core/integration/tests/connectors/iceberg/iceberg_sink.rs +++ b/core/integration/tests/connectors/iceberg/iceberg_sink.rs @@ -37,9 +37,8 @@ const SNAPSHOT_POLL_INTERVAL_MS: u64 = 500; const BULK_SNAPSHOT_POLL_ATTEMPTS: usize = 60; #[iggy_harness( - shared_server = "iceberg_sink", server(connectors_runtime(config_path = "tests/connectors/iceberg/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn iceberg_sink_initializes_and_runs( harness: &TestHarness, @@ -69,9 +68,8 @@ async fn iceberg_sink_initializes_and_runs( } #[iggy_harness( - shared_server = "iceberg_sink", server(connectors_runtime(config_path = "tests/connectors/iceberg/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn iceberg_sink_consumes_json_messages( harness: &TestHarness, @@ -141,9 +139,8 @@ async fn iceberg_sink_consumes_json_messages( } #[iggy_harness( - shared_server = "iceberg_sink", server(connectors_runtime(config_path = "tests/connectors/iceberg/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn iceberg_sink_handles_bulk_messages( harness: &TestHarness, diff --git a/core/integration/tests/connectors/influxdb/influxdb_sink.rs b/core/integration/tests/connectors/influxdb/influxdb_sink.rs index 6b82d6428..cda484cfc 100644 --- a/core/integration/tests/connectors/influxdb/influxdb_sink.rs +++ b/core/integration/tests/connectors/influxdb/influxdb_sink.rs @@ -33,8 +33,7 @@ use serde_json::json; #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/sink.toml")), - shared_server = "influxdb_sink", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_sink_writes_messages_to_bucket( harness: &TestHarness, @@ -75,8 +74,7 @@ async fn influxdb_sink_writes_messages_to_bucket( #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/sink.toml")), - shared_server = "influxdb_sink", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_sink_handles_bulk_messages(harness: &TestHarness, fixture: InfluxDbSinkFixture) { let client = harness.root_client().await.unwrap(); @@ -114,8 +112,7 @@ async fn influxdb_sink_handles_bulk_messages(harness: &TestHarness, fixture: Inf #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/sink.toml")), - shared_server = "influxdb_sink", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_sink_payload_fields_stored_correctly( harness: &TestHarness, @@ -153,8 +150,7 @@ async fn influxdb_sink_payload_fields_stored_correctly( #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/sink.toml")), - shared_server = "influxdb_sink", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_sink_large_batch(harness: &TestHarness, fixture: InfluxDbSinkFixture) { let client = harness.root_client().await.unwrap(); @@ -188,8 +184,7 @@ async fn influxdb_sink_large_batch(harness: &TestHarness, fixture: InfluxDbSinkF #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/sink.toml")), - shared_server = "influxdb_sink", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_sink_recovers_backlogged_messages( harness: &TestHarness, @@ -229,8 +224,7 @@ async fn influxdb_sink_recovers_backlogged_messages( #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/sink.toml")), - shared_server = "influxdb_sink", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_sink_multiple_partitions(harness: &TestHarness, fixture: InfluxDbSinkFixture) { let client = harness.root_client().await.unwrap(); diff --git a/core/integration/tests/connectors/influxdb/influxdb_sink_formats.rs b/core/integration/tests/connectors/influxdb/influxdb_sink_formats.rs index e68e78b55..d1225c179 100644 --- a/core/integration/tests/connectors/influxdb/influxdb_sink_formats.rs +++ b/core/integration/tests/connectors/influxdb/influxdb_sink_formats.rs @@ -52,8 +52,7 @@ use integration::iggy_harness; /// with printable ASCII. #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/sink_text.toml")), - shared_server = "influxdb_sink", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_sink_text_writes_points(harness: &TestHarness, fixture: InfluxDbSinkTextFixture) { let client = harness.root_client().await.unwrap(); @@ -92,8 +91,7 @@ async fn influxdb_sink_text_writes_points(harness: &TestHarness, fixture: Influx /// Covers: `write_field_string` quote-escape branch. #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/sink_text.toml")), - shared_server = "influxdb_sink", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_sink_text_with_quote_characters( harness: &TestHarness, @@ -133,8 +131,7 @@ async fn influxdb_sink_text_with_quote_characters( /// Covers: `write_field_string` backslash-escape branch. #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/sink_text.toml")), - shared_server = "influxdb_sink", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_sink_text_with_backslash( harness: &TestHarness, @@ -173,8 +170,7 @@ async fn influxdb_sink_text_with_backslash( /// path under the Text format. #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/sink_text.toml")), - shared_server = "influxdb_sink", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_sink_text_bulk(harness: &TestHarness, fixture: InfluxDbSinkTextFixture) { let client = harness.root_client().await.unwrap(); @@ -216,8 +212,7 @@ async fn influxdb_sink_text_bulk(harness: &TestHarness, fixture: InfluxDbSinkTex /// `general_purpose::STANDARD.encode`. #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/sink_base64.toml")), - shared_server = "influxdb_sink", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_sink_base64_writes_points( harness: &TestHarness, @@ -262,8 +257,7 @@ async fn influxdb_sink_base64_writes_points( /// needed for base64 output, but exercises the full field-write path). #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/sink_base64.toml")), - shared_server = "influxdb_sink", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_sink_base64_roundtrip(harness: &TestHarness, fixture: InfluxDbSinkBase64Fixture) { let client = harness.root_client().await.unwrap(); @@ -308,8 +302,7 @@ async fn influxdb_sink_base64_roundtrip(harness: &TestHarness, fixture: InfluxDb /// test covers the next-closest case (1-byte payload → 4-char base64 string). #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/sink_base64.toml")), - shared_server = "influxdb_sink", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_sink_base64_single_byte_payload( harness: &TestHarness, @@ -353,8 +346,7 @@ async fn influxdb_sink_base64_single_byte_payload( /// branches inside `append_line`. #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/sink_no_metadata.toml")), - shared_server = "influxdb_sink", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_sink_no_metadata_writes_points( harness: &TestHarness, @@ -395,8 +387,7 @@ async fn influxdb_sink_no_metadata_writes_points( /// with the no-metadata tag configuration. #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/sink_no_metadata.toml")), - shared_server = "influxdb_sink", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_sink_no_metadata_bulk( harness: &TestHarness, @@ -441,8 +432,7 @@ async fn influxdb_sink_no_metadata_bulk( /// Covers: `"ns"` arm of `to_precision_timestamp`. #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/sink.toml")), - shared_server = "influxdb_sink", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_sink_ns_precision_writes_points( harness: &TestHarness, @@ -485,8 +475,7 @@ async fn influxdb_sink_ns_precision_writes_points( /// in `PayloadFormat::Json` and `write_field_string` with `{`, `}`, `:`, `"`. #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/sink.toml")), - shared_server = "influxdb_sink", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_sink_json_nested_payload(harness: &TestHarness, fixture: InfluxDbSinkFixture) { let client = harness.root_client().await.unwrap(); @@ -534,8 +523,7 @@ async fn influxdb_sink_json_nested_payload(harness: &TestHarness, fixture: Influ /// Covers: `'\n' => buf.push_str("\\n")` branch in `write_field_string`. #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/sink_text.toml")), - shared_server = "influxdb_sink", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_sink_text_with_newline(harness: &TestHarness, fixture: InfluxDbSinkTextFixture) { let client = harness.root_client().await.unwrap(); diff --git a/core/integration/tests/connectors/influxdb/influxdb_source.rs b/core/integration/tests/connectors/influxdb/influxdb_source.rs index e2716aeb9..f6eab572e 100644 --- a/core/integration/tests/connectors/influxdb/influxdb_source.rs +++ b/core/integration/tests/connectors/influxdb/influxdb_source.rs @@ -26,8 +26,7 @@ use tracing::info; #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/source.toml")), - shared_server = "influxdb_source", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_source_polls_and_produces_messages( harness: &TestHarness, @@ -85,8 +84,7 @@ async fn influxdb_source_polls_and_produces_messages( #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/source.toml")), - shared_server = "influxdb_source", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_source_message_payload_structure( harness: &TestHarness, @@ -142,8 +140,7 @@ async fn influxdb_source_message_payload_structure( #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/source.toml")), - shared_server = "influxdb_source", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_source_empty_bucket_produces_no_messages( harness: &TestHarness, @@ -182,8 +179,7 @@ async fn influxdb_source_empty_bucket_produces_no_messages( #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/source.toml")), - shared_server = "influxdb_source", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_source_multiple_measurements( harness: &TestHarness, diff --git a/core/integration/tests/connectors/influxdb/influxdb_source_formats.rs b/core/integration/tests/connectors/influxdb/influxdb_source_formats.rs index 089637ce5..98cede6db 100644 --- a/core/integration/tests/connectors/influxdb/influxdb_source_formats.rs +++ b/core/integration/tests/connectors/influxdb/influxdb_source_formats.rs @@ -51,8 +51,7 @@ use tracing::info; /// path), `schema()` → `Schema::Text`. #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/source_text.toml")), - shared_server = "influxdb_source", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_source_text_payload_column( harness: &TestHarness, @@ -111,8 +110,7 @@ async fn influxdb_source_text_payload_column( /// Covers: `PayloadFormat::Text` → `Ok(raw_value.into_bytes())`. #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/source_text.toml")), - shared_server = "influxdb_source", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_source_text_whitespace_value( harness: &TestHarness, @@ -174,8 +172,7 @@ async fn influxdb_source_text_whitespace_value( /// STANDARD.decode`. #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/source_raw.toml")), - shared_server = "influxdb_source", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_source_raw_payload_column( harness: &TestHarness, @@ -232,8 +229,7 @@ async fn influxdb_source_raw_payload_column( /// Covers: the loop in `poll_messages` with `PayloadFormat::Raw`. #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/source_raw.toml")), - shared_server = "influxdb_source", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_source_raw_multiple_rows( harness: &TestHarness, @@ -294,8 +290,7 @@ async fn influxdb_source_raw_multiple_rows( /// Covers: `parse_scalar` for `i64`, `f64`, `bool`, and string branches. #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/source.toml")), - shared_server = "influxdb_source", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_source_scalar_type_integer( harness: &TestHarness, @@ -355,8 +350,7 @@ async fn influxdb_source_scalar_type_integer( /// Covers `parse_scalar` for `f64` values. #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/source.toml")), - shared_server = "influxdb_source", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_source_scalar_type_float(harness: &TestHarness, fixture: InfluxDbSourceFixture) { let base_ts: u64 = Utc::now().timestamp_nanos_opt().unwrap_or(0) as u64; @@ -407,8 +401,7 @@ async fn influxdb_source_scalar_type_float(harness: &TestHarness, fixture: Influ /// Covers `parse_scalar` for boolean values (`true`/`false`). #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/source.toml")), - shared_server = "influxdb_source", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_source_scalar_type_bool(harness: &TestHarness, fixture: InfluxDbSourceFixture) { let base_ts: u64 = Utc::now().timestamp_nanos_opt().unwrap_or(0) as u64; @@ -481,8 +474,7 @@ async fn influxdb_source_scalar_type_bool(harness: &TestHarness, fixture: Influx /// cursor state persisted via `state.last_timestamp`. #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/source.toml")), - shared_server = "influxdb_source", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_source_cursor_advances_between_batches( harness: &TestHarness, @@ -573,8 +565,7 @@ async fn influxdb_source_cursor_advances_between_batches( /// `build_payload` (whole-row path). #[iggy_harness( server(connectors_runtime(config_path = "tests/connectors/influxdb/source.toml")), - shared_server = "influxdb_source", - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn influxdb_source_no_metadata_core_fields_present( harness: &TestHarness, diff --git a/core/integration/tests/connectors/mongodb/mongodb_sink.rs b/core/integration/tests/connectors/mongodb/mongodb_sink.rs index 09c400eb3..474b809ed 100644 --- a/core/integration/tests/connectors/mongodb/mongodb_sink.rs +++ b/core/integration/tests/connectors/mongodb/mongodb_sink.rs @@ -48,9 +48,8 @@ fn build_expected_document_id_for_topic(topic_name: &str, message_id: u128) -> S } #[iggy_harness( - shared_server = "mongodb_sink", server(connectors_runtime(config_path = "tests/connectors/mongodb/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn json_messages_sink_to_mongodb(harness: &TestHarness, fixture: MongoDbSinkJsonFixture) { let client = harness.root_client().await.unwrap(); @@ -137,9 +136,8 @@ async fn json_messages_sink_to_mongodb(harness: &TestHarness, fixture: MongoDbSi } #[iggy_harness( - shared_server = "mongodb_sink", server(connectors_runtime(config_path = "tests/connectors/mongodb/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn binary_messages_sink_as_bson_binary(harness: &TestHarness, fixture: MongoDbSinkFixture) { let client = harness.root_client().await.unwrap(); @@ -206,9 +204,8 @@ async fn binary_messages_sink_as_bson_binary(harness: &TestHarness, fixture: Mon } #[iggy_harness( - shared_server = "mongodb_sink", server(connectors_runtime(config_path = "tests/connectors/mongodb/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn large_batch_processed_correctly(harness: &TestHarness, fixture: MongoDbSinkBatchFixture) { let client = harness.root_client().await.unwrap(); @@ -261,9 +258,8 @@ async fn large_batch_processed_correctly(harness: &TestHarness, fixture: MongoDb } #[iggy_harness( - shared_server = "mongodb_sink", server(connectors_runtime(config_path = "tests/connectors/mongodb/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn duplicate_key_is_idempotent_replay_not_sink_error( harness: &TestHarness, @@ -445,9 +441,8 @@ async fn duplicate_key_is_idempotent_replay_not_sink_error( } #[iggy_harness( - shared_server = "mongodb_sink", server(connectors_runtime(config_path = "tests/connectors/mongodb/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn ordered_duplicate_partial_insert_has_exact_accounting( harness: &TestHarness, @@ -622,9 +617,8 @@ async fn ordered_duplicate_partial_insert_has_exact_accounting( } #[iggy_harness( - shared_server = "mongodb_sink", server(connectors_runtime(config_path = "tests/connectors/mongodb/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn schema_validation_mid_batch_surfaces_hard_error_and_partial_prefix( harness: &TestHarness, @@ -735,9 +729,8 @@ async fn schema_validation_mid_batch_surfaces_hard_error_and_partial_prefix( } #[iggy_harness( - shared_server = "mongodb_sink", server(connectors_runtime(config_path = "tests/connectors/mongodb/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn write_concern_timeout_does_not_report_full_success( harness: &TestHarness, @@ -827,9 +820,8 @@ async fn write_concern_timeout_does_not_report_full_success( } #[iggy_harness( - shared_server = "mongodb_sink", server(connectors_runtime(config_path = "tests/connectors/mongodb/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn retryable_write_failover_keeps_single_doc_per_id( harness: &TestHarness, @@ -975,9 +967,8 @@ async fn retryable_write_failover_keeps_single_doc_per_id( } #[iggy_harness( - shared_server = "mongodb_sink", server(connectors_runtime(config_path = "tests/connectors/mongodb/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn no_writes_performed_label_path_preserves_state_accuracy( harness: &TestHarness, @@ -1124,9 +1115,8 @@ async fn no_writes_performed_label_path_preserves_state_accuracy( } #[iggy_harness( - shared_server = "mongodb_sink", server(connectors_runtime(config_path = "tests/connectors/mongodb/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn auto_create_collection_on_open( harness: &TestHarness, @@ -1167,5 +1157,6 @@ async fn auto_create_collection_on_open( "Collection should be empty after open() with no messages" ); + // Suppress unused harness warning. let _ = harness; } diff --git a/core/integration/tests/connectors/postgres/postgres_sink.rs b/core/integration/tests/connectors/postgres/postgres_sink.rs index 6b5b87022..2032c025e 100644 --- a/core/integration/tests/connectors/postgres/postgres_sink.rs +++ b/core/integration/tests/connectors/postgres/postgres_sink.rs @@ -35,9 +35,8 @@ type SinkRow = (i64, String, String, Vec<u8>); type SinkJsonRow = (i64, serde_json::Value); #[iggy_harness( - shared_server = "postgres_sink", server(connectors_runtime(config_path = "tests/connectors/postgres/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn json_messages_sink_stores_as_bytea(harness: &TestHarness, fixture: PostgresSinkFixture) { let client = harness.root_client().await.unwrap(); @@ -98,9 +97,8 @@ async fn json_messages_sink_stores_as_bytea(harness: &TestHarness, fixture: Post } #[iggy_harness( - shared_server = "postgres_sink", server(connectors_runtime(config_path = "tests/connectors/postgres/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn binary_messages_sink_stores_as_bytea( harness: &TestHarness, @@ -163,9 +161,8 @@ async fn binary_messages_sink_stores_as_bytea( } #[iggy_harness( - shared_server = "postgres_sink", server(connectors_runtime(config_path = "tests/connectors/postgres/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn json_messages_sink_stores_as_jsonb( harness: &TestHarness, diff --git a/core/integration/tests/connectors/postgres/postgres_source.rs b/core/integration/tests/connectors/postgres/postgres_source.rs index ce5cb99e2..5f4a79af8 100644 --- a/core/integration/tests/connectors/postgres/postgres_source.rs +++ b/core/integration/tests/connectors/postgres/postgres_source.rs @@ -32,9 +32,8 @@ use std::time::Duration; use tokio::time::sleep; #[iggy_harness( - shared_server = "postgres_source", server(connectors_runtime(config_path = "tests/connectors/postgres/source.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn json_rows_source_produces_messages_to_iggy( harness: &TestHarness, @@ -114,9 +113,8 @@ async fn json_rows_source_produces_messages_to_iggy( } #[iggy_harness( - shared_server = "postgres_source", server(connectors_runtime(config_path = "tests/connectors/postgres/source.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn bytea_rows_source_produces_raw_messages_to_iggy( harness: &TestHarness, @@ -178,9 +176,8 @@ async fn bytea_rows_source_produces_raw_messages_to_iggy( } #[iggy_harness( - shared_server = "postgres_source", server(connectors_runtime(config_path = "tests/connectors/postgres/source.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn jsonb_rows_source_produces_json_messages_to_iggy( harness: &TestHarness, @@ -246,9 +243,8 @@ async fn jsonb_rows_source_produces_json_messages_to_iggy( } #[iggy_harness( - shared_server = "postgres_source", server(connectors_runtime(config_path = "tests/connectors/postgres/source.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn delete_after_read_source_removes_rows_after_producing( harness: &TestHarness, @@ -317,9 +313,8 @@ async fn delete_after_read_source_removes_rows_after_producing( } #[iggy_harness( - shared_server = "postgres_source", server(connectors_runtime(config_path = "tests/connectors/postgres/source.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn processed_column_source_marks_rows_after_producing( harness: &TestHarness, diff --git a/core/integration/tests/connectors/postgres/restart.rs b/core/integration/tests/connectors/postgres/restart.rs index cdba541ee..e6e5f203b 100644 --- a/core/integration/tests/connectors/postgres/restart.rs +++ b/core/integration/tests/connectors/postgres/restart.rs @@ -38,9 +38,8 @@ const SINK_KEY: &str = "postgres"; type SinkRow = (i64, String, String, Vec<u8>); #[iggy_harness( - shared_server = "postgres_sink", server(connectors_runtime(config_path = "tests/connectors/postgres/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn restart_sink_connector_continues_processing( harness: &TestHarness, @@ -128,9 +127,8 @@ async fn restart_sink_connector_continues_processing( } #[iggy_harness( - shared_server = "postgres_sink", server(connectors_runtime(config_path = "tests/connectors/postgres/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn parallel_restart_requests_should_not_break_connector( harness: &TestHarness, diff --git a/core/integration/tests/connectors/quickwit/quickwit_sink.rs b/core/integration/tests/connectors/quickwit/quickwit_sink.rs index b7fe69b10..ce3abd364 100644 --- a/core/integration/tests/connectors/quickwit/quickwit_sink.rs +++ b/core/integration/tests/connectors/quickwit/quickwit_sink.rs @@ -28,9 +28,8 @@ use integration::iggy_harness; use serde::{Deserialize, Serialize}; #[iggy_harness( - shared_server = "quickwit_sink", server(connectors_runtime(config_path = "tests/connectors/quickwit/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn given_existent_quickwit_index_should_store( harness: &TestHarness, @@ -84,9 +83,8 @@ async fn given_existent_quickwit_index_should_store( } #[iggy_harness( - shared_server = "quickwit_sink", server(connectors_runtime(config_path = "tests/connectors/quickwit/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn given_nonexistent_quickwit_index_should_create_and_store( harness: &TestHarness, @@ -140,9 +138,8 @@ async fn given_nonexistent_quickwit_index_should_create_and_store( } #[iggy_harness( - shared_server = "quickwit_sink", server(connectors_runtime(config_path = "tests/connectors/quickwit/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn given_bulk_message_send_should_store( harness: &TestHarness, @@ -196,9 +193,8 @@ async fn given_bulk_message_send_should_store( } #[iggy_harness( - shared_server = "quickwit_sink", server(connectors_runtime(config_path = "tests/connectors/quickwit/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn given_invalid_messages_should_not_store(harness: &TestHarness, fixture: QuickwitFixture) { let client = harness.root_client().await.unwrap(); diff --git a/core/integration/tests/connectors/random/random_source.rs b/core/integration/tests/connectors/random/random_source.rs index 4d44d1bfe..5efb6fd08 100644 --- a/core/integration/tests/connectors/random/random_source.rs +++ b/core/integration/tests/connectors/random/random_source.rs @@ -25,9 +25,8 @@ use std::time::Duration; use tokio::time::sleep; #[iggy_harness( - shared_server = "random_source", server(connectors_runtime(config_path = "tests/connectors/random/source.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn random_source_produces_messages(harness: &TestHarness) { sleep(Duration::from_secs(1)).await; diff --git a/core/integration/tests/connectors/stdout/stdout_sink.rs b/core/integration/tests/connectors/stdout/stdout_sink.rs index 2dfe8758e..be247588c 100644 --- a/core/integration/tests/connectors/stdout/stdout_sink.rs +++ b/core/integration/tests/connectors/stdout/stdout_sink.rs @@ -32,9 +32,8 @@ use tokio::time::sleep; const API_KEY: &str = "test-api-key"; #[iggy_harness( - shared_server = "stdout_sink", server(connectors_runtime(config_path = "tests/connectors/stdout/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn stdout_sink_consumes_messages(harness: &TestHarness) { let client = harness.root_client().await.unwrap(); @@ -91,9 +90,8 @@ async fn stdout_sink_consumes_messages(harness: &TestHarness) { } #[iggy_harness( - shared_server = "stdout_sink", server(connectors_runtime(config_path = "tests/connectors/stdout/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn stdout_sink_reports_metrics(harness: &TestHarness) { let client = harness.root_client().await.unwrap(); @@ -152,9 +150,8 @@ async fn stdout_sink_reports_metrics(harness: &TestHarness) { } #[iggy_harness( - shared_server = "stdout_sink", server(connectors_runtime(config_path = "tests/connectors/stdout/sink.toml")), - seed = seeds::connector_stream_idempotent + seed = seeds::connector_stream )] async fn stdout_sink_handles_bulk_messages(harness: &TestHarness) { let client = harness.root_client().await.unwrap();
