This is an automated email from the ASF dual-hosted git repository. mmodzelewski pushed a commit to branch connectors-runtime in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 3b620dc13b96768ce41c42a7dc975257df3cdec8 Author: Maciej Modzelewski <[email protected]> AuthorDate: Tue May 12 15:57:16 2026 +0200 refactor(connectors): isolate per-connector init failures in runtime Previously, any failure during connector init (path resolution, dlopen, state load, plugin init, consumer/producer setup) was propagated as a fatal `RuntimeError` and aborted the entire runtime. A single bad config or missing plugin file took down every other healthy connector with it, and the failing connector was hidden from the API/health view since it never reached the manager. Capture per-connector failures inline against the offending plugin and continue iterating. Connectors that fail before their FFI container can be loaded are returned from `sink::init` / `source::init` as `FailedPlugin` entries and surfaced through the manager as `ConnectorStatus::Error` with the captured message in `last_error`, so operators can still see and diagnose them. The `Result` return is preserved on both init paths for symmetry and future fatal-error handling, but no system-level errors are currently emitted. --- core/connectors/runtime/src/context.rs | 78 +++++++- core/connectors/runtime/src/main.rs | 20 +- core/connectors/runtime/src/sink.rs | 205 +++++++++++-------- core/connectors/runtime/src/source.rs | 219 +++++++++++++-------- .../tests/connectors/stdout/invalid_sink.toml | 20 ++ .../stdout/invalid_sink_config/stdout_invalid.toml | 39 ++++ .../stdout/invalid_sink_config/stdout_valid.toml | 39 ++++ .../tests/connectors/stdout/stdout_sink.rs | 68 ++++++- 8 files changed, 523 insertions(+), 165 deletions(-) diff --git a/core/connectors/runtime/src/context.rs b/core/connectors/runtime/src/context.rs index a15d4bea6..a5b1ea9f6 100644 --- a/core/connectors/runtime/src/context.rs +++ b/core/connectors/runtime/src/context.rs @@ -21,7 +21,7 @@ use crate::configs::runtime::ConnectorsRuntimeConfig; use crate::metrics::Metrics; use crate::stream::IggyClients; use crate::{ - SinkConnectorWrapper, SourceConnectorWrapper, + FailedPlugin, SinkConnectorWrapper, SourceConnectorWrapper, manager::{ sink::{SinkDetails, SinkInfo, SinkManager}, source::{SourceDetails, SourceInfo, SourceManager}, @@ -54,13 +54,20 @@ pub fn init( sources_config: &HashMap<String, SourceConfig>, sink_wrappers: &[SinkConnectorWrapper], source_wrappers: &[SourceConnectorWrapper], + failed_sinks: &[FailedPlugin], + failed_sources: &[FailedPlugin], config_provider: Box<dyn ConnectorsConfigProvider>, iggy_clients: Arc<IggyClients>, state_path: String, ) -> RuntimeContext { let metrics = Arc::new(Metrics::init()); - let sinks = SinkManager::new(map_sinks(sinks_config, sink_wrappers)); - let sources = SourceManager::new(map_sources(sources_config, source_wrappers)); + let mut sink_details = map_sinks(sinks_config, sink_wrappers); + sink_details.extend(map_failed_sinks(sinks_config, failed_sinks)); + let mut source_details = map_sources(sources_config, source_wrappers); + source_details.extend(map_failed_sources(sources_config, failed_sources)); + + let sinks = SinkManager::new(sink_details); + let sources = SourceManager::new(source_details); metrics.set_sinks_total(sinks_config.len() as u32); metrics.set_sources_total(sources_config.len() as u32); @@ -167,3 +174,68 @@ fn map_sources( } sources } + +const UNKNOWN_PLUGIN_VERSION: &str = "unknown"; + +fn map_failed_sinks( + sinks_config: &HashMap<String, SinkConfig>, + failed: &[FailedPlugin], +) -> Vec<SinkDetails> { + let mut sinks = Vec::with_capacity(failed.len()); + for plugin in failed { + let Some(config) = sinks_config.get(&plugin.key) else { + error!("Missing sink config for failed plugin: {}", plugin.key); + continue; + }; + sinks.push(SinkDetails { + info: SinkInfo { + id: plugin.id, + key: plugin.key.clone(), + name: plugin.name.clone(), + path: plugin.path.clone(), + version: UNKNOWN_PLUGIN_VERSION.to_owned(), + enabled: plugin.enabled, + status: ConnectorStatus::Error, + last_error: Some(ConnectorError::new(&plugin.error)), + plugin_config_format: plugin.config_format, + }, + config: config.clone(), + shutdown_tx: None, + task_handles: vec![], + container: None, + restart_guard: Arc::new(Mutex::new(())), + }); + } + sinks +} + +fn map_failed_sources( + sources_config: &HashMap<String, SourceConfig>, + failed: &[FailedPlugin], +) -> Vec<SourceDetails> { + let mut sources = Vec::with_capacity(failed.len()); + for plugin in failed { + let Some(config) = sources_config.get(&plugin.key) else { + error!("Missing source config for failed plugin: {}", plugin.key); + continue; + }; + sources.push(SourceDetails { + info: SourceInfo { + id: plugin.id, + key: plugin.key.clone(), + name: plugin.name.clone(), + path: plugin.path.clone(), + version: UNKNOWN_PLUGIN_VERSION.to_owned(), + enabled: plugin.enabled, + status: ConnectorStatus::Error, + last_error: Some(ConnectorError::new(&plugin.error)), + plugin_config_format: plugin.config_format, + }, + config: config.clone(), + handler_tasks: vec![], + container: None, + restart_guard: Arc::new(Mutex::new(())), + }); + } + sources +} diff --git a/core/connectors/runtime/src/main.rs b/core/connectors/runtime/src/main.rs index 908fe44ad..f4e0cac60 100644 --- a/core/connectors/runtime/src/main.rs +++ b/core/connectors/runtime/src/main.rs @@ -156,7 +156,7 @@ async fn main() -> Result<(), RuntimeError> { connectors_config.sinks().len() ); let sources_config = connectors_config.sources(); - let sources = source::init( + let (sources, failed_sources) = source::init( sources_config.clone(), &iggy_clients.producer, &config.state.path, @@ -164,7 +164,7 @@ async fn main() -> Result<(), RuntimeError> { .await?; let sinks_config = connectors_config.sinks(); - let sinks = sink::init(sinks_config.clone(), &iggy_clients.consumer).await?; + let (sinks, failed_sinks) = sink::init(sinks_config.clone(), &iggy_clients.consumer).await?; let mut sink_wrappers = vec![]; let mut sink_containers_by_key: HashMap<String, Arc<Container<SinkApi>>> = HashMap::new(); @@ -200,6 +200,8 @@ async fn main() -> Result<(), RuntimeError> { sources_config, &sink_wrappers, &source_wrappers, + &failed_sinks, + &failed_sources, connectors_config_provider, iggy_clients.clone(), config.state.path.clone(), @@ -459,6 +461,20 @@ struct SourceConnectorWrapper { plugins: Vec<SourceConnectorPlugin>, } +/// Records a connector that failed before its FFI container could be loaded +/// (path resolution, dlopen, or pre-container setup). Surfaced to the runtime +/// API/health view as `ConnectorStatus::Error` so one bad config does not hide +/// the connector or block healthy peers from running. +pub(crate) struct FailedPlugin { + pub id: u32, + pub key: String, + pub name: String, + pub path: String, + pub config_format: Option<ConfigFormat>, + pub error: String, + pub enabled: bool, +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/connectors/runtime/src/sink.rs b/core/connectors/runtime/src/sink.rs index a0ab25b1a..092aef7e9 100644 --- a/core/connectors/runtime/src/sink.rs +++ b/core/connectors/runtime/src/sink.rs @@ -22,8 +22,8 @@ use crate::context::RuntimeContext; use crate::log::LOG_CALLBACK; use crate::metrics::Metrics; use crate::{ - PLUGIN_ID, RuntimeError, SinkApi, SinkConnector, SinkConnectorConsumer, SinkConnectorPlugin, - SinkConnectorWrapper, resolve_plugin_path, transform, + FailedPlugin, PLUGIN_ID, RuntimeError, SinkApi, SinkConnector, SinkConnectorConsumer, + SinkConnectorPlugin, SinkConnectorWrapper, resolve_plugin_path, transform, }; use dlopen2::wrapper::Container; use futures::StreamExt; @@ -46,11 +46,23 @@ use tokio::sync::watch; use tokio::task::JoinHandle; use tracing::{debug, error, info, warn}; +/// Initializes all enabled sink connectors. +/// +/// Per-connector failures (path resolution, dlopen, plugin init, +/// consumer/decoder/transform setup) are captured against the offending +/// connector and do not abort the runtime. Connectors that fail before their +/// FFI container can be loaded are returned in the second tuple element so +/// they remain visible in health/status output. +/// +/// Only system-level errors that prevent any connector from running are +/// propagated as `Err`. pub async fn init( sink_configs: HashMap<String, SinkConfig>, iggy_client: &IggyClient, -) -> Result<HashMap<String, SinkConnector>, RuntimeError> { +) -> Result<(HashMap<String, SinkConnector>, Vec<FailedPlugin>), RuntimeError> { let mut sink_connectors: HashMap<String, SinkConnector> = HashMap::new(); + let mut failed_plugins: Vec<FailedPlugin> = Vec::new(); + for (key, config) in sink_configs { let name = config.name.clone(); if !config.enabled { @@ -59,102 +71,141 @@ pub async fn init( } let plugin_id = PLUGIN_ID.fetch_add(1, Ordering::SeqCst); - let path = resolve_plugin_path(&config.path)?; + + let path = match resolve_plugin_path(&config.path) { + Ok(path) => path, + Err(error) => { + let message = format!("Failed to resolve plugin path: {error}"); + error!("Sink: {name} ({key}) - {message}"); + failed_plugins.push(build_failed_plugin( + plugin_id, &key, &name, &config, message, + )); + continue; + } + }; + info!( "Initializing sink container with name: {name} ({key}), config version: {}, plugin: {path}", &config.version ); - let init_error: Option<String>; - if let Some(container) = sink_connectors.get_mut(&path) { - info!("Sink container for plugin: {path} is already loaded."); - let version = get_plugin_version(&container.container); - init_error = init_sink( - &container.container, - &config.plugin_config.clone().unwrap_or_default(), - plugin_id, - ) - .err() - .map(|error| error.to_string()); - container.plugins.push(SinkConnectorPlugin { - id: plugin_id, - key: key.clone(), - name: name.clone(), - path: path.clone(), - version, - config_format: config.plugin_config_format, - consumers: vec![], - error: init_error.clone(), - verbose: config.verbose, - }); - } else { - let container: Container<SinkApi> = unsafe { - Container::load(&path).map_err(|error| { - RuntimeError::InvalidConfiguration(format!( - "Failed to load sink container from {path}: {error}" - )) - })? + + if !sink_connectors.contains_key(&path) { + let container = match unsafe { Container::<SinkApi>::load(&path) } { + Ok(container) => container, + Err(error) => { + let message = format!("Failed to load sink container from {path}: {error}"); + error!("Sink: {name} ({key}) - {message}"); + failed_plugins.push(build_failed_plugin( + plugin_id, &key, &name, &config, message, + )); + continue; + } }; info!("Sink container for plugin: {path} loaded successfully."); - let version = get_plugin_version(&container); - init_error = init_sink( - &container, - &config.plugin_config.clone().unwrap_or_default(), - plugin_id, - ) - .err() - .map(|error| error.to_string()); sink_connectors.insert( path.clone(), SinkConnector { container, - plugins: vec![SinkConnectorPlugin { - id: plugin_id, - key: key.clone(), - name: name.clone(), - path: path.clone(), - version, - config_format: config.plugin_config_format, - consumers: vec![], - error: init_error.clone(), - verbose: config.verbose, - }], + plugins: Vec::new(), }, ); + } else { + info!("Sink container for plugin: {path} is already loaded."); } + let connector = sink_connectors + .get_mut(&path) + .expect("sink container was just ensured for this path"); + let version = get_plugin_version(&connector.container); + let init_error = init_sink( + &connector.container, + &config.plugin_config.clone().unwrap_or_default(), + plugin_id, + ) + .err() + .map(|error| error.to_string()); + + connector.plugins.push(SinkConnectorPlugin { + id: plugin_id, + key: key.clone(), + name: name.clone(), + path: path.clone(), + version, + config_format: config.plugin_config_format, + consumers: vec![], + error: init_error.clone(), + verbose: config.verbose, + }); + if let Some(error) = init_error { error!("Failed to initialize sink container with name: {name} ({key}). {error}"); continue; - } else { - info!( - "Sink container with name: {name} ({key}), initialized successfully with ID: {plugin_id}." - ); } - let consumers = setup_sink_consumers(&key, &config, iggy_client).await?; - let connector = sink_connectors.get_mut(&path).ok_or_else(|| { - RuntimeError::InvalidConfiguration(format!("Sink connector not found for path: {path}")) - })?; - let plugin = connector - .plugins - .iter_mut() - .find(|p| p.id == plugin_id) - .ok_or_else(|| { - RuntimeError::InvalidConfiguration(format!( - "Sink plugin not found for ID: {plugin_id}" - )) - })?; - for (consumer, decoder, batch_size, transforms) in consumers { - plugin.consumers.push(SinkConnectorConsumer { - consumer, - decoder, - batch_size, - transforms, - }); + match setup_sink_consumers(&key, &config, iggy_client).await { + Ok(consumers) => { + let connector = sink_connectors + .get_mut(&path) + .expect("sink connector was inserted above"); + let plugin = connector + .plugins + .iter_mut() + .find(|plugin| plugin.id == plugin_id) + .expect("sink plugin was pushed above"); + for (consumer, decoder, batch_size, transforms) in consumers { + plugin.consumers.push(SinkConnectorConsumer { + consumer, + decoder, + batch_size, + transforms, + }); + } + info!( + "Sink container with name: {name} ({key}) initialized successfully with ID: {plugin_id}." + ); + } + Err(error) => { + let message = format!("Failed to set up sink consumers: {error}"); + error!("Sink: {name} ({key}) - {message}"); + let connector = sink_connectors + .get_mut(&path) + .expect("sink connector was inserted above"); + let close_result = (connector.container.iggy_sink_close)(plugin_id); + if close_result != 0 { + warn!( + "iggy_sink_close returned {close_result} while cleaning up failed sink connector with ID: {plugin_id} ({key})" + ); + } + if let Some(plugin) = connector + .plugins + .iter_mut() + .find(|plugin| plugin.id == plugin_id) + { + plugin.error = Some(message); + } + } } } - Ok(sink_connectors) + Ok((sink_connectors, failed_plugins)) +} + +fn build_failed_plugin( + id: u32, + key: &str, + name: &str, + config: &SinkConfig, + error: String, +) -> FailedPlugin { + FailedPlugin { + id, + key: key.to_owned(), + name: name.to_owned(), + path: config.path.clone(), + config_format: config.plugin_config_format, + error, + enabled: config.enabled, + } } pub fn consume( diff --git a/core/connectors/runtime/src/source.rs b/core/connectors/runtime/src/source.rs index 2c20eb1b0..c160bf025 100644 --- a/core/connectors/runtime/src/source.rs +++ b/core/connectors/runtime/src/source.rs @@ -42,7 +42,7 @@ use crate::context::RuntimeContext; use crate::log::LOG_CALLBACK; use crate::metrics::ConnectorType; use crate::{ - PLUGIN_ID, RuntimeError, SourceApi, SourceConnector, SourceConnectorPlugin, + FailedPlugin, PLUGIN_ID, RuntimeError, SourceApi, SourceConnector, SourceConnectorPlugin, SourceConnectorProducer, SourceConnectorWrapper, resolve_plugin_path, state::{FileStateProvider, StateProvider, StateStorage}, transform, @@ -56,12 +56,24 @@ pub fn cleanup_sender(plugin_id: u32) { SOURCE_SENDERS.remove(&plugin_id); } +/// Initializes all enabled source connectors. +/// +/// Per-connector failures (path resolution, dlopen, state load, plugin init, +/// producer/encoder/transform setup) are captured against the offending +/// connector and do not abort the runtime. Connectors that fail before their +/// FFI container can be loaded are returned in the second tuple element so +/// they remain visible in health/status output. +/// +/// Only system-level errors that prevent any connector from running (e.g. a +/// poisoned global state) are propagated as `Err`. pub async fn init( source_configs: HashMap<String, SourceConfig>, iggy_client: &IggyClient, state_path: &str, -) -> Result<HashMap<String, SourceConnector>, RuntimeError> { +) -> Result<(HashMap<String, SourceConnector>, Vec<FailedPlugin>), RuntimeError> { let mut source_connectors: HashMap<String, SourceConnector> = HashMap::new(); + let mut failed_plugins: Vec<FailedPlugin> = Vec::new(); + for (key, config) in source_configs { let name = config.name.clone(); if !config.enabled { @@ -70,110 +82,153 @@ pub async fn init( } let plugin_id = PLUGIN_ID.fetch_add(1, Ordering::SeqCst); - let path = resolve_plugin_path(&config.path)?; + + let path = match resolve_plugin_path(&config.path) { + Ok(path) => path, + Err(error) => { + let message = format!("Failed to resolve plugin path: {error}"); + error!("Source: {name} ({key}) - {message}"); + failed_plugins.push(build_failed_plugin( + plugin_id, &key, &name, &config, message, + )); + continue; + } + }; + info!( "Initializing source container with name: {name} ({key}), config version: {}, plugin: {path}", &config.version ); + let state_storage = get_state_storage(state_path, &key); let state = match &state_storage { - StateStorage::File(file) => file.load().await?, + StateStorage::File(file) => match file.load().await { + Ok(state) => state, + Err(error) => { + let message = format!("Failed to load source state: {error}"); + error!("Source: {name} ({key}) - {message}"); + failed_plugins.push(build_failed_plugin( + plugin_id, &key, &name, &config, message, + )); + continue; + } + }, }; - let init_error: Option<String>; - if let Some(container) = source_connectors.get_mut(&path) { - info!("Source container for plugin: {path} is already loaded."); - let version = get_plugin_version(&container.container); - init_error = init_source( - &container.container, - &config.plugin_config.clone().unwrap_or_default(), - plugin_id, - state, - ) - .err() - .map(|error| error.to_string()); - container.plugins.push(SourceConnectorPlugin { - id: plugin_id, - key: key.clone(), - name: name.clone(), - path: path.clone(), - version, - config_format: config.plugin_config_format, - producer: None, - transforms: vec![], - state_storage, - error: init_error.clone(), - verbose: config.verbose, - }); - } else { - let container: Container<SourceApi> = unsafe { - Container::load(&path).map_err(|error| { - RuntimeError::InvalidConfiguration(format!( - "Failed to load source container from {path}: {error}" - )) - })? + + if !source_connectors.contains_key(&path) { + let container = match unsafe { Container::<SourceApi>::load(&path) } { + Ok(container) => container, + Err(error) => { + let message = format!("Failed to load source container from {path}: {error}"); + error!("Source: {name} ({key}) - {message}"); + failed_plugins.push(build_failed_plugin( + plugin_id, &key, &name, &config, message, + )); + continue; + } }; info!("Source container for plugin: {path} loaded successfully."); - let version = get_plugin_version(&container); - init_error = init_source( - &container, - &config.plugin_config.clone().unwrap_or_default(), - plugin_id, - state, - ) - .err() - .map(|error| error.to_string()); source_connectors.insert( path.clone(), SourceConnector { container, - plugins: vec![SourceConnectorPlugin { - id: plugin_id, - key: key.clone(), - name: name.clone(), - path: path.clone(), - version, - config_format: config.plugin_config_format, - producer: None, - transforms: vec![], - state_storage, - error: init_error.clone(), - verbose: config.verbose, - }], + plugins: Vec::new(), }, ); + } else { + info!("Source container for plugin: {path} is already loaded."); } + let connector = source_connectors + .get_mut(&path) + .expect("source container was just ensured for this path"); + let version = get_plugin_version(&connector.container); + let init_error = init_source( + &connector.container, + &config.plugin_config.clone().unwrap_or_default(), + plugin_id, + state, + ) + .err() + .map(|error| error.to_string()); + + connector.plugins.push(SourceConnectorPlugin { + id: plugin_id, + key: key.clone(), + name: name.clone(), + path: path.clone(), + version, + config_format: config.plugin_config_format, + producer: None, + transforms: vec![], + state_storage, + error: init_error.clone(), + verbose: config.verbose, + }); + if let Some(error) = init_error { error!("Source container with name: {name} ({key}) failed to initialize: {error}"); continue; - } else { - info!( - "Source container with name: {name} ({key}), initialized successfully with ID: {plugin_id}." - ); } - let (producer, encoder, transforms) = - setup_source_producer(&key, &config, iggy_client).await?; - - let connector = source_connectors.get_mut(&path).ok_or_else(|| { - RuntimeError::InvalidConfiguration(format!( - "Source connector not found for path: {path}" - )) - })?; - let plugin = connector - .plugins - .iter_mut() - .find(|p| p.id == plugin_id) - .ok_or_else(|| { - RuntimeError::InvalidConfiguration(format!( - "Source plugin not found for ID: {plugin_id}" - )) - })?; - plugin.producer = Some(SourceConnectorProducer { producer, encoder }); - plugin.transforms = transforms; + match setup_source_producer(&key, &config, iggy_client).await { + Ok((producer, encoder, transforms)) => { + let connector = source_connectors + .get_mut(&path) + .expect("source connector was inserted above"); + let plugin = connector + .plugins + .iter_mut() + .find(|plugin| plugin.id == plugin_id) + .expect("source plugin was pushed above"); + plugin.producer = Some(SourceConnectorProducer { producer, encoder }); + plugin.transforms = transforms; + info!( + "Source container with name: {name} ({key}) initialized successfully with ID: {plugin_id}." + ); + } + Err(error) => { + let message = format!("Failed to set up source producer: {error}"); + error!("Source: {name} ({key}) - {message}"); + let connector = source_connectors + .get_mut(&path) + .expect("source connector was inserted above"); + let close_result = (connector.container.iggy_source_close)(plugin_id); + if close_result != 0 { + warn!( + "iggy_source_close returned {close_result} while cleaning up failed source connector with ID: {plugin_id} ({key})" + ); + } + if let Some(plugin) = connector + .plugins + .iter_mut() + .find(|plugin| plugin.id == plugin_id) + { + plugin.error = Some(message); + } + } + } } - Ok(source_connectors) + Ok((source_connectors, failed_plugins)) +} + +fn build_failed_plugin( + id: u32, + key: &str, + name: &str, + config: &SourceConfig, + error: String, +) -> FailedPlugin { + FailedPlugin { + id, + key: key.to_owned(), + name: name.to_owned(), + path: config.path.clone(), + config_format: config.plugin_config_format, + error, + enabled: config.enabled, + } } fn get_plugin_version(container: &Container<SourceApi>) -> String { diff --git a/core/integration/tests/connectors/stdout/invalid_sink.toml b/core/integration/tests/connectors/stdout/invalid_sink.toml new file mode 100644 index 000000000..10ee15efc --- /dev/null +++ b/core/integration/tests/connectors/stdout/invalid_sink.toml @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[connectors] +config_type = "local" +config_dir = "tests/connectors/stdout/invalid_sink_config" diff --git a/core/integration/tests/connectors/stdout/invalid_sink_config/stdout_invalid.toml b/core/integration/tests/connectors/stdout/invalid_sink_config/stdout_invalid.toml new file mode 100644 index 000000000..b4230b888 --- /dev/null +++ b/core/integration/tests/connectors/stdout/invalid_sink_config/stdout_invalid.toml @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Deliberately broken sink: `poll_interval` cannot be parsed as a duration. +# Used to assert that the runtime stays alive and reports this connector +# with `ConnectorStatus::Error` instead of aborting startup. + +type = "sink" +key = "stdout_invalid" +enabled = true +version = 0 +name = "Stdout invalid sink" +path = "../../target/debug/libiggy_connector_stdout_sink" +verbose = true + +[[streams]] +stream = "test_stream" +topics = ["test_topic"] +schema = "json" +batch_length = 100 +poll_interval = "not-a-valid-duration" +consumer_group = "stdout_invalid_sink_connector" + +[plugin_config] +print_payload = true diff --git a/core/integration/tests/connectors/stdout/invalid_sink_config/stdout_valid.toml b/core/integration/tests/connectors/stdout/invalid_sink_config/stdout_valid.toml new file mode 100644 index 000000000..0670d68fd --- /dev/null +++ b/core/integration/tests/connectors/stdout/invalid_sink_config/stdout_valid.toml @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Healthy companion sink: verifies that a sibling connector with valid +# configuration still starts and runs when another connector in the same +# runtime fails initialization. + +type = "sink" +key = "stdout_valid" +enabled = true +version = 0 +name = "Stdout valid sink" +path = "../../target/debug/libiggy_connector_stdout_sink" +verbose = true + +[[streams]] +stream = "test_stream" +topics = ["test_topic"] +schema = "json" +batch_length = 100 +poll_interval = "5ms" +consumer_group = "stdout_valid_sink_connector" + +[plugin_config] +print_payload = true diff --git a/core/integration/tests/connectors/stdout/stdout_sink.rs b/core/integration/tests/connectors/stdout/stdout_sink.rs index be247588c..db7e01a5f 100644 --- a/core/integration/tests/connectors/stdout/stdout_sink.rs +++ b/core/integration/tests/connectors/stdout/stdout_sink.rs @@ -22,7 +22,9 @@ use bytes::Bytes; use iggy::prelude::{IggyMessage, Partitioning}; use iggy_common::Identifier; use iggy_common::MessageClient; -use iggy_connector_sdk::api::{ConnectorRuntimeStats, ConnectorStatus, SinkInfoResponse}; +use iggy_connector_sdk::api::{ + ConnectorRuntimeStats, ConnectorStatus, HealthResponse, SinkInfoResponse, +}; use integration::harness::seeds; use integration::iggy_harness; use reqwest::Client; @@ -205,3 +207,67 @@ async fn stdout_sink_handles_bulk_messages(harness: &TestHarness) { assert_eq!(sinks[0].status, ConnectorStatus::Running); assert!(sinks[0].last_error.is_none()); } + +#[iggy_harness( + server(connectors_runtime(config_path = "tests/connectors/stdout/invalid_sink.toml")), + seed = seeds::connector_stream +)] +async fn stdout_sink_with_invalid_config_does_not_abort_runtime(harness: &TestHarness) { + let api_address = harness + .connectors_runtime() + .expect("connector runtime should be available") + .http_url(); + let http_client = Client::new(); + + let health_response = http_client + .get(format!("{}/health", api_address)) + .send() + .await + .expect("Failed to query health endpoint"); + assert_eq!(health_response.status(), 200); + let health: HealthResponse = health_response + .json() + .await + .expect("Failed to parse health response"); + assert_eq!(health.status, "healthy"); + + let response = http_client + .get(format!("{}/sinks", api_address)) + .send() + .await + .expect("Failed to get sinks"); + + assert_eq!(response.status(), 200); + let sinks: Vec<SinkInfoResponse> = response.json().await.expect("Failed to parse sinks"); + + assert_eq!( + sinks.len(), + 2, + "Both the invalid and the valid sink should be visible in the API" + ); + + let invalid_sink = sinks + .iter() + .find(|sink| sink.key == "stdout_invalid") + .expect("Invalid sink should be reported"); + assert_eq!(invalid_sink.status, ConnectorStatus::Error); + let last_error = invalid_sink + .last_error + .as_ref() + .expect("Invalid sink should expose a last_error"); + assert!( + last_error.message.contains("poll interval"), + "last_error should mention the misconfigured poll_interval, got: {}", + last_error.message + ); + + let valid_sink = sinks + .iter() + .find(|sink| sink.key == "stdout_valid") + .expect("Healthy sibling sink should be reported"); + assert_eq!(valid_sink.status, ConnectorStatus::Running); + assert!( + valid_sink.last_error.is_none(), + "Healthy sibling sink should have no last_error" + ); +}
