This is an automated email from the ASF dual-hosted git repository.
hubcio pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 0cd1bec3a refactor(connectors): isolate per-connector init failures in
runtime (#3244)
0cd1bec3a is described below
commit 0cd1bec3ab8c96ab4e3a9b35c60e9a50ba46bc1a
Author: Maciej Modzelewski <[email protected]>
AuthorDate: Thu May 14 00:30:34 2026 +0200
refactor(connectors): isolate per-connector init failures in runtime (#3244)
Previously, any failure during connector init (path resolution,
dlopen, state load, plugin init, consumer/producer setup) was
propagated as a fatal `RuntimeError` and aborted the entire
runtime. A single bad config or missing plugin file took down
every other healthy connector with it, and the failing connector
was hidden from the API/health view since it never reached the
manager.
Capture per-connector failures inline against the offending
plugin and continue iterating. Connectors that fail before their
FFI container can be loaded are returned from `sink::init` /
`source::init` as `FailedPlugin` entries and surfaced through the
manager as `ConnectorStatus::Error` with the captured message in
`last_error`, so operators can still see and diagnose them.
The `Result` return is preserved on both init paths for symmetry
and future fatal-error handling, but no system-level errors are
currently emitted.
---
core/connectors/runtime/src/context.rs | 78 ++++-
core/connectors/runtime/src/main.rs | 42 ++-
core/connectors/runtime/src/sink.rs | 199 ++++++++-----
core/connectors/runtime/src/source.rs | 219 ++++++++------
core/integration/tests/connectors/mod.rs | 1 +
.../tests/connectors/runtime/error_isolation.rs | 324 +++++++++++++++++++++
core/integration/tests/connectors/runtime/mod.rs | 20 ++
.../connectors/runtime/sink_invalid_config.toml | 20 ++
.../sink_invalid_config/stdout_invalid.toml | 39 +++
.../runtime/sink_invalid_config/stdout_valid.toml | 39 +++
.../connectors/runtime/sink_missing_plugin.toml | 20 ++
.../sink_missing_plugin_config/stdout_missing.toml | 40 +++
.../sink_missing_plugin_config/stdout_valid.toml | 39 +++
.../connectors/runtime/source_invalid_config.toml | 20 ++
.../source_invalid_config/random_invalid.toml | 42 +++
.../source_invalid_config/random_valid.toml | 40 +++
.../connectors/runtime/source_invalid_state.toml | 20 ++
.../random_invalid_state.toml | 45 +++
.../source_invalid_state_config/random_valid.toml | 40 +++
.../connectors/runtime/source_missing_plugin.toml | 20 ++
.../random_missing.toml | 42 +++
.../source_missing_plugin_config/random_valid.toml | 40 +++
22 files changed, 1225 insertions(+), 164 deletions(-)
diff --git a/core/connectors/runtime/src/context.rs
b/core/connectors/runtime/src/context.rs
index a15d4bea6..a5b1ea9f6 100644
--- a/core/connectors/runtime/src/context.rs
+++ b/core/connectors/runtime/src/context.rs
@@ -21,7 +21,7 @@ use crate::configs::runtime::ConnectorsRuntimeConfig;
use crate::metrics::Metrics;
use crate::stream::IggyClients;
use crate::{
- SinkConnectorWrapper, SourceConnectorWrapper,
+ FailedPlugin, SinkConnectorWrapper, SourceConnectorWrapper,
manager::{
sink::{SinkDetails, SinkInfo, SinkManager},
source::{SourceDetails, SourceInfo, SourceManager},
@@ -54,13 +54,20 @@ pub fn init(
sources_config: &HashMap<String, SourceConfig>,
sink_wrappers: &[SinkConnectorWrapper],
source_wrappers: &[SourceConnectorWrapper],
+ failed_sinks: &[FailedPlugin],
+ failed_sources: &[FailedPlugin],
config_provider: Box<dyn ConnectorsConfigProvider>,
iggy_clients: Arc<IggyClients>,
state_path: String,
) -> RuntimeContext {
let metrics = Arc::new(Metrics::init());
- let sinks = SinkManager::new(map_sinks(sinks_config, sink_wrappers));
- let sources = SourceManager::new(map_sources(sources_config,
source_wrappers));
+ let mut sink_details = map_sinks(sinks_config, sink_wrappers);
+ sink_details.extend(map_failed_sinks(sinks_config, failed_sinks));
+ let mut source_details = map_sources(sources_config, source_wrappers);
+ source_details.extend(map_failed_sources(sources_config, failed_sources));
+
+ let sinks = SinkManager::new(sink_details);
+ let sources = SourceManager::new(source_details);
metrics.set_sinks_total(sinks_config.len() as u32);
metrics.set_sources_total(sources_config.len() as u32);
@@ -167,3 +174,68 @@ fn map_sources(
}
sources
}
+
+const UNKNOWN_PLUGIN_VERSION: &str = "unknown";
+
+fn map_failed_sinks(
+ sinks_config: &HashMap<String, SinkConfig>,
+ failed: &[FailedPlugin],
+) -> Vec<SinkDetails> {
+ let mut sinks = Vec::with_capacity(failed.len());
+ for plugin in failed {
+ let Some(config) = sinks_config.get(&plugin.key) else {
+ error!("Missing sink config for failed plugin: {}", plugin.key);
+ continue;
+ };
+ sinks.push(SinkDetails {
+ info: SinkInfo {
+ id: plugin.id,
+ key: plugin.key.clone(),
+ name: plugin.name.clone(),
+ path: plugin.path.clone(),
+ version: UNKNOWN_PLUGIN_VERSION.to_owned(),
+ enabled: plugin.enabled,
+ status: ConnectorStatus::Error,
+ last_error: Some(ConnectorError::new(&plugin.error)),
+ plugin_config_format: plugin.config_format,
+ },
+ config: config.clone(),
+ shutdown_tx: None,
+ task_handles: vec![],
+ container: None,
+ restart_guard: Arc::new(Mutex::new(())),
+ });
+ }
+ sinks
+}
+
+fn map_failed_sources(
+ sources_config: &HashMap<String, SourceConfig>,
+ failed: &[FailedPlugin],
+) -> Vec<SourceDetails> {
+ let mut sources = Vec::with_capacity(failed.len());
+ for plugin in failed {
+ let Some(config) = sources_config.get(&plugin.key) else {
+ error!("Missing source config for failed plugin: {}", plugin.key);
+ continue;
+ };
+ sources.push(SourceDetails {
+ info: SourceInfo {
+ id: plugin.id,
+ key: plugin.key.clone(),
+ name: plugin.name.clone(),
+ path: plugin.path.clone(),
+ version: UNKNOWN_PLUGIN_VERSION.to_owned(),
+ enabled: plugin.enabled,
+ status: ConnectorStatus::Error,
+ last_error: Some(ConnectorError::new(&plugin.error)),
+ plugin_config_format: plugin.config_format,
+ },
+ config: config.clone(),
+ handler_tasks: vec![],
+ container: None,
+ restart_guard: Arc::new(Mutex::new(())),
+ });
+ }
+ sources
+}
diff --git a/core/connectors/runtime/src/main.rs
b/core/connectors/runtime/src/main.rs
index 908fe44ad..45ccff667 100644
--- a/core/connectors/runtime/src/main.rs
+++ b/core/connectors/runtime/src/main.rs
@@ -156,7 +156,7 @@ async fn main() -> Result<(), RuntimeError> {
connectors_config.sinks().len()
);
let sources_config = connectors_config.sources();
- let sources = source::init(
+ let (sources, failed_sources) = source::init(
sources_config.clone(),
&iggy_clients.producer,
&config.state.path,
@@ -164,7 +164,7 @@ async fn main() -> Result<(), RuntimeError> {
.await?;
let sinks_config = connectors_config.sinks();
- let sinks = sink::init(sinks_config.clone(),
&iggy_clients.consumer).await?;
+ let (sinks, failed_sinks) = sink::init(sinks_config.clone(),
&iggy_clients.consumer).await?;
let mut sink_wrappers = vec![];
let mut sink_containers_by_key: HashMap<String, Arc<Container<SinkApi>>> =
HashMap::new();
@@ -200,6 +200,8 @@ async fn main() -> Result<(), RuntimeError> {
sources_config,
&sink_wrappers,
&source_wrappers,
+ &failed_sinks,
+ &failed_sources,
connectors_config_provider,
iggy_clients.clone(),
config.state.path.clone(),
@@ -459,6 +461,42 @@ struct SourceConnectorWrapper {
plugins: Vec<SourceConnectorPlugin>,
}
+/// Records a connector that failed before its FFI container could be loaded
+/// (path resolution, dlopen, or pre-container setup). Surfaced to the runtime
+/// API/health view as `ConnectorStatus::Error` so one bad config does not hide
+/// the connector or block healthy peers from running.
+pub(crate) struct FailedPlugin {
+ pub id: u32,
+ pub key: String,
+ pub name: String,
+ pub path: String,
+ pub config_format: Option<ConfigFormat>,
+ pub error: String,
+ pub enabled: bool,
+}
+
+impl FailedPlugin {
+ pub(crate) fn new(
+ id: u32,
+ key: &str,
+ name: &str,
+ path: &str,
+ config_format: Option<ConfigFormat>,
+ enabled: bool,
+ error: String,
+ ) -> Self {
+ Self {
+ id,
+ key: key.to_owned(),
+ name: name.to_owned(),
+ path: path.to_owned(),
+ config_format,
+ error,
+ enabled,
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/core/connectors/runtime/src/sink.rs
b/core/connectors/runtime/src/sink.rs
index a0ab25b1a..0ab3c9547 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -22,8 +22,8 @@ use crate::context::RuntimeContext;
use crate::log::LOG_CALLBACK;
use crate::metrics::Metrics;
use crate::{
- PLUGIN_ID, RuntimeError, SinkApi, SinkConnector, SinkConnectorConsumer,
SinkConnectorPlugin,
- SinkConnectorWrapper, resolve_plugin_path, transform,
+ FailedPlugin, PLUGIN_ID, RuntimeError, SinkApi, SinkConnector,
SinkConnectorConsumer,
+ SinkConnectorPlugin, SinkConnectorWrapper, resolve_plugin_path, transform,
};
use dlopen2::wrapper::Container;
use futures::StreamExt;
@@ -46,11 +46,23 @@ use tokio::sync::watch;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};
+/// Initializes all enabled sink connectors.
+///
+/// Per-connector failures (path resolution, dlopen, plugin init,
+/// consumer/decoder/transform setup) are captured against the offending
+/// connector and do not abort the runtime. Connectors that fail before their
+/// FFI container can be loaded are returned in the second tuple element so
+/// they remain visible in health/status output.
+///
+/// Only system-level errors that prevent any connector from running are
+/// propagated as `Err`.
pub async fn init(
sink_configs: HashMap<String, SinkConfig>,
iggy_client: &IggyClient,
-) -> Result<HashMap<String, SinkConnector>, RuntimeError> {
+) -> Result<(HashMap<String, SinkConnector>, Vec<FailedPlugin>), RuntimeError>
{
let mut sink_connectors: HashMap<String, SinkConnector> = HashMap::new();
+ let mut failed_plugins: Vec<FailedPlugin> = Vec::new();
+
for (key, config) in sink_configs {
let name = config.name.clone();
if !config.enabled {
@@ -59,102 +71,135 @@ pub async fn init(
}
let plugin_id = PLUGIN_ID.fetch_add(1, Ordering::SeqCst);
- let path = resolve_plugin_path(&config.path)?;
+
+ let path = match resolve_plugin_path(&config.path) {
+ Ok(path) => path,
+ Err(error) => {
+ let message = format!("Failed to resolve plugin path:
{error}");
+ error!("Sink: {name} ({key}) - {message}");
+ failed_plugins.push(FailedPlugin::new(
+ plugin_id,
+ &key,
+ &name,
+ &config.path,
+ config.plugin_config_format,
+ config.enabled,
+ message,
+ ));
+ continue;
+ }
+ };
+
info!(
"Initializing sink container with name: {name} ({key}), config
version: {}, plugin: {path}",
&config.version
);
- let init_error: Option<String>;
- if let Some(container) = sink_connectors.get_mut(&path) {
- info!("Sink container for plugin: {path} is already loaded.");
- let version = get_plugin_version(&container.container);
- init_error = init_sink(
- &container.container,
- &config.plugin_config.clone().unwrap_or_default(),
- plugin_id,
- )
- .err()
- .map(|error| error.to_string());
- container.plugins.push(SinkConnectorPlugin {
- id: plugin_id,
- key: key.clone(),
- name: name.clone(),
- path: path.clone(),
- version,
- config_format: config.plugin_config_format,
- consumers: vec![],
- error: init_error.clone(),
- verbose: config.verbose,
- });
- } else {
- let container: Container<SinkApi> = unsafe {
- Container::load(&path).map_err(|error| {
- RuntimeError::InvalidConfiguration(format!(
- "Failed to load sink container from {path}: {error}"
- ))
- })?
+
+ if !sink_connectors.contains_key(&path) {
+ let container = match unsafe { Container::<SinkApi>::load(&path) }
{
+ Ok(container) => container,
+ Err(error) => {
+ let message = format!("Failed to load sink container from
{path}: {error}");
+ error!("Sink: {name} ({key}) - {message}");
+ failed_plugins.push(FailedPlugin::new(
+ plugin_id,
+ &key,
+ &name,
+ &config.path,
+ config.plugin_config_format,
+ config.enabled,
+ message,
+ ));
+ continue;
+ }
};
info!("Sink container for plugin: {path} loaded successfully.");
- let version = get_plugin_version(&container);
- init_error = init_sink(
- &container,
- &config.plugin_config.clone().unwrap_or_default(),
- plugin_id,
- )
- .err()
- .map(|error| error.to_string());
sink_connectors.insert(
path.clone(),
SinkConnector {
container,
- plugins: vec![SinkConnectorPlugin {
- id: plugin_id,
- key: key.clone(),
- name: name.clone(),
- path: path.clone(),
- version,
- config_format: config.plugin_config_format,
- consumers: vec![],
- error: init_error.clone(),
- verbose: config.verbose,
- }],
+ plugins: Vec::new(),
},
);
+ } else {
+ info!("Sink container for plugin: {path} is already loaded.");
}
+ let connector = sink_connectors
+ .get_mut(&path)
+ .expect("sink container was just ensured for this path");
+ let version = get_plugin_version(&connector.container);
+ let init_error = init_sink(
+ &connector.container,
+ &config.plugin_config.clone().unwrap_or_default(),
+ plugin_id,
+ )
+ .err()
+ .map(|error| error.to_string());
+
+ connector.plugins.push(SinkConnectorPlugin {
+ id: plugin_id,
+ key: key.clone(),
+ name: name.clone(),
+ path: path.clone(),
+ version,
+ config_format: config.plugin_config_format,
+ consumers: vec![],
+ error: init_error.clone(),
+ verbose: config.verbose,
+ });
+
if let Some(error) = init_error {
error!("Failed to initialize sink container with name: {name}
({key}). {error}");
continue;
- } else {
- info!(
- "Sink container with name: {name} ({key}), initialized
successfully with ID: {plugin_id}."
- );
}
- let consumers = setup_sink_consumers(&key, &config,
iggy_client).await?;
- let connector = sink_connectors.get_mut(&path).ok_or_else(|| {
- RuntimeError::InvalidConfiguration(format!("Sink connector not
found for path: {path}"))
- })?;
- let plugin = connector
- .plugins
- .iter_mut()
- .find(|p| p.id == plugin_id)
- .ok_or_else(|| {
- RuntimeError::InvalidConfiguration(format!(
- "Sink plugin not found for ID: {plugin_id}"
- ))
- })?;
- for (consumer, decoder, batch_size, transforms) in consumers {
- plugin.consumers.push(SinkConnectorConsumer {
- consumer,
- decoder,
- batch_size,
- transforms,
- });
+ match setup_sink_consumers(&key, &config, iggy_client).await {
+ Ok(consumers) => {
+ let connector = sink_connectors
+ .get_mut(&path)
+ .expect("sink connector was inserted above");
+ let plugin = connector
+ .plugins
+ .iter_mut()
+ .find(|plugin| plugin.id == plugin_id)
+ .expect("sink plugin was pushed above");
+ for (consumer, decoder, batch_size, transforms) in consumers {
+ plugin.consumers.push(SinkConnectorConsumer {
+ consumer,
+ decoder,
+ batch_size,
+ transforms,
+ });
+ }
+ info!(
+ "Sink container with name: {name} ({key}) initialized
successfully with ID: {plugin_id}."
+ );
+ }
+ Err(error) => {
+ let message = format!("Failed to set up sink consumers:
{error}");
+ error!("Sink: {name} ({key}) - {message}");
+ let connector = sink_connectors
+ .get_mut(&path)
+ .expect("sink connector was inserted above");
+ let close_result =
(connector.container.iggy_sink_close)(plugin_id);
+ if close_result != 0 {
+ warn!(
+ "iggy_sink_close returned {close_result} while
cleaning up failed sink connector with ID: {plugin_id} ({key})"
+ );
+ }
+ if let Some(plugin) = connector
+ .plugins
+ .iter_mut()
+ .find(|plugin| plugin.id == plugin_id)
+ {
+ plugin.error = Some(message);
+ }
+ }
}
}
- Ok(sink_connectors)
+ Ok((sink_connectors, failed_plugins))
}
pub fn consume(
diff --git a/core/connectors/runtime/src/source.rs
b/core/connectors/runtime/src/source.rs
index 2c20eb1b0..5d992a60b 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -42,7 +42,7 @@ use crate::context::RuntimeContext;
use crate::log::LOG_CALLBACK;
use crate::metrics::ConnectorType;
use crate::{
- PLUGIN_ID, RuntimeError, SourceApi, SourceConnector, SourceConnectorPlugin,
+ FailedPlugin, PLUGIN_ID, RuntimeError, SourceApi, SourceConnector,
SourceConnectorPlugin,
SourceConnectorProducer, SourceConnectorWrapper, resolve_plugin_path,
state::{FileStateProvider, StateProvider, StateStorage},
transform,
@@ -56,12 +56,24 @@ pub fn cleanup_sender(plugin_id: u32) {
SOURCE_SENDERS.remove(&plugin_id);
}
+/// Initializes all enabled source connectors.
+///
+/// Per-connector failures (path resolution, dlopen, state load, plugin init,
+/// producer/encoder/transform setup) are captured against the offending
+/// connector and do not abort the runtime. Connectors that fail before their
+/// FFI container can be loaded are returned in the second tuple element so
+/// they remain visible in health/status output.
+///
+/// Only system-level errors that prevent any connector from running (e.g. a
+/// poisoned global state) are propagated as `Err`.
pub async fn init(
source_configs: HashMap<String, SourceConfig>,
iggy_client: &IggyClient,
state_path: &str,
-) -> Result<HashMap<String, SourceConnector>, RuntimeError> {
+) -> Result<(HashMap<String, SourceConnector>, Vec<FailedPlugin>),
RuntimeError> {
let mut source_connectors: HashMap<String, SourceConnector> =
HashMap::new();
+ let mut failed_plugins: Vec<FailedPlugin> = Vec::new();
+
for (key, config) in source_configs {
let name = config.name.clone();
if !config.enabled {
@@ -70,110 +82,153 @@ pub async fn init(
}
let plugin_id = PLUGIN_ID.fetch_add(1, Ordering::SeqCst);
- let path = resolve_plugin_path(&config.path)?;
+
+ let path = match resolve_plugin_path(&config.path) {
+ Ok(path) => path,
+ Err(error) => {
+ let message = format!("Failed to resolve plugin path:
{error}");
+ error!("Source: {name} ({key}) - {message}");
+ failed_plugins.push(FailedPlugin::new(
+ plugin_id,
+ &key,
+ &name,
+ &config.path,
+ config.plugin_config_format,
+ config.enabled,
+ message,
+ ));
+ continue;
+ }
+ };
+
info!(
"Initializing source container with name: {name} ({key}), config
version: {}, plugin: {path}",
&config.version
);
+
let state_storage = get_state_storage(state_path, &key);
let state = match &state_storage {
- StateStorage::File(file) => file.load().await?,
+ StateStorage::File(file) => match file.load().await {
+ Ok(state) => state,
+ Err(error) => {
+ let message = format!("Failed to load source state:
{error}");
+ error!("Source: {name} ({key}) - {message}");
+ failed_plugins.push(FailedPlugin::new(
+ plugin_id,
+ &key,
+ &name,
+ &config.path,
+ config.plugin_config_format,
+ config.enabled,
+ message,
+ ));
+ continue;
+ }
+ },
};
- let init_error: Option<String>;
- if let Some(container) = source_connectors.get_mut(&path) {
- info!("Source container for plugin: {path} is already loaded.");
- let version = get_plugin_version(&container.container);
- init_error = init_source(
- &container.container,
- &config.plugin_config.clone().unwrap_or_default(),
- plugin_id,
- state,
- )
- .err()
- .map(|error| error.to_string());
- container.plugins.push(SourceConnectorPlugin {
- id: plugin_id,
- key: key.clone(),
- name: name.clone(),
- path: path.clone(),
- version,
- config_format: config.plugin_config_format,
- producer: None,
- transforms: vec![],
- state_storage,
- error: init_error.clone(),
- verbose: config.verbose,
- });
- } else {
- let container: Container<SourceApi> = unsafe {
- Container::load(&path).map_err(|error| {
- RuntimeError::InvalidConfiguration(format!(
- "Failed to load source container from {path}: {error}"
- ))
- })?
+
+ if !source_connectors.contains_key(&path) {
+ let container = match unsafe { Container::<SourceApi>::load(&path)
} {
+ Ok(container) => container,
+ Err(error) => {
+ let message = format!("Failed to load source container
from {path}: {error}");
+ error!("Source: {name} ({key}) - {message}");
+ failed_plugins.push(FailedPlugin::new(
+ plugin_id,
+ &key,
+ &name,
+ &config.path,
+ config.plugin_config_format,
+ config.enabled,
+ message,
+ ));
+ continue;
+ }
};
info!("Source container for plugin: {path} loaded successfully.");
- let version = get_plugin_version(&container);
- init_error = init_source(
- &container,
- &config.plugin_config.clone().unwrap_or_default(),
- plugin_id,
- state,
- )
- .err()
- .map(|error| error.to_string());
source_connectors.insert(
path.clone(),
SourceConnector {
container,
- plugins: vec![SourceConnectorPlugin {
- id: plugin_id,
- key: key.clone(),
- name: name.clone(),
- path: path.clone(),
- version,
- config_format: config.plugin_config_format,
- producer: None,
- transforms: vec![],
- state_storage,
- error: init_error.clone(),
- verbose: config.verbose,
- }],
+ plugins: Vec::new(),
},
);
+ } else {
+ info!("Source container for plugin: {path} is already loaded.");
}
+ let connector = source_connectors
+ .get_mut(&path)
+ .expect("source container was just ensured for this path");
+ let version = get_plugin_version(&connector.container);
+ let init_error = init_source(
+ &connector.container,
+ &config.plugin_config.clone().unwrap_or_default(),
+ plugin_id,
+ state,
+ )
+ .err()
+ .map(|error| error.to_string());
+
+ connector.plugins.push(SourceConnectorPlugin {
+ id: plugin_id,
+ key: key.clone(),
+ name: name.clone(),
+ path: path.clone(),
+ version,
+ config_format: config.plugin_config_format,
+ producer: None,
+ transforms: vec![],
+ state_storage,
+ error: init_error.clone(),
+ verbose: config.verbose,
+ });
+
if let Some(error) = init_error {
error!("Source container with name: {name} ({key}) failed to
initialize: {error}");
continue;
- } else {
- info!(
- "Source container with name: {name} ({key}), initialized
successfully with ID: {plugin_id}."
- );
}
- let (producer, encoder, transforms) =
- setup_source_producer(&key, &config, iggy_client).await?;
-
- let connector = source_connectors.get_mut(&path).ok_or_else(|| {
- RuntimeError::InvalidConfiguration(format!(
- "Source connector not found for path: {path}"
- ))
- })?;
- let plugin = connector
- .plugins
- .iter_mut()
- .find(|p| p.id == plugin_id)
- .ok_or_else(|| {
- RuntimeError::InvalidConfiguration(format!(
- "Source plugin not found for ID: {plugin_id}"
- ))
- })?;
- plugin.producer = Some(SourceConnectorProducer { producer, encoder });
- plugin.transforms = transforms;
+ match setup_source_producer(&key, &config, iggy_client).await {
+ Ok((producer, encoder, transforms)) => {
+ let connector = source_connectors
+ .get_mut(&path)
+ .expect("source connector was inserted above");
+ let plugin = connector
+ .plugins
+ .iter_mut()
+ .find(|plugin| plugin.id == plugin_id)
+ .expect("source plugin was pushed above");
+ plugin.producer = Some(SourceConnectorProducer { producer,
encoder });
+ plugin.transforms = transforms;
+ info!(
+ "Source container with name: {name} ({key}) initialized
successfully with ID: {plugin_id}."
+ );
+ }
+ Err(error) => {
+ let message = format!("Failed to set up source producer:
{error}");
+ error!("Source: {name} ({key}) - {message}");
+ let connector = source_connectors
+ .get_mut(&path)
+ .expect("source connector was inserted above");
+ let close_result =
(connector.container.iggy_source_close)(plugin_id);
+ if close_result != 0 {
+ warn!(
+ "iggy_source_close returned {close_result} while
cleaning up failed source connector with ID: {plugin_id} ({key})"
+ );
+ }
+ if let Some(plugin) = connector
+ .plugins
+ .iter_mut()
+ .find(|plugin| plugin.id == plugin_id)
+ {
+ plugin.error = Some(message);
+ }
+ }
+ }
}
- Ok(source_connectors)
+ Ok((source_connectors, failed_plugins))
}
fn get_plugin_version(container: &Container<SourceApi>) -> String {
diff --git a/core/integration/tests/connectors/mod.rs
b/core/integration/tests/connectors/mod.rs
index 83600be27..adf97f1b7 100644
--- a/core/integration/tests/connectors/mod.rs
+++ b/core/integration/tests/connectors/mod.rs
@@ -29,6 +29,7 @@ mod mongodb;
mod postgres;
mod quickwit;
mod random;
+mod runtime;
mod stdout;
use iggy_common::IggyTimestamp;
diff --git a/core/integration/tests/connectors/runtime/error_isolation.rs
b/core/integration/tests/connectors/runtime/error_isolation.rs
new file mode 100644
index 000000000..3d6fbb977
--- /dev/null
+++ b/core/integration/tests/connectors/runtime/error_isolation.rs
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Per-connector error isolation tests for the connectors runtime.
+//!
+//! Each test drives the runtime with two connectors of the same kind: one
+//! deliberately misconfigured and one healthy. The assertions verify that:
+//! * the runtime stays alive (`/health` returns "healthy"),
+//! * the broken connector is surfaced via the API with
+//! [`ConnectorStatus::Error`] and a populated `last_error`,
+//! * the sibling healthy connector still reaches
[`ConnectorStatus::Running`].
+//!
+//! Together the tests cover every per-connector failure branch in
+//! `source::init` and `sink::init`:
+//! * plugin path resolution failure (missing `.so`),
+//! * source state-load failure (unreachable state file),
+//! * post-container setup failure (invalid duration in stream config).
+
+use iggy_connector_sdk::api::{
+ ConnectorStatus, HealthResponse, SinkInfoResponse, SourceInfoResponse,
+};
+use integration::harness::seeds;
+use integration::iggy_harness;
+use reqwest::Client;
+use std::time::Duration;
+use tokio::time::sleep;
+
+async fn assert_runtime_healthy(http_client: &Client, api_address: &str) {
+ let response = http_client
+ .get(format!("{api_address}/health"))
+ .send()
+ .await
+ .expect("Failed to query health endpoint");
+ assert_eq!(response.status(), 200);
+ let health: HealthResponse = response
+ .json()
+ .await
+ .expect("Failed to parse health response");
+ assert_eq!(health.status, "healthy");
+}
+
+async fn fetch_sinks(http_client: &Client, api_address: &str) ->
Vec<SinkInfoResponse> {
+ let response = http_client
+ .get(format!("{api_address}/sinks"))
+ .send()
+ .await
+ .expect("Failed to query /sinks");
+ assert_eq!(response.status(), 200);
+ response.json().await.expect("Failed to parse sinks")
+}
+
+async fn fetch_sources(http_client: &Client, api_address: &str) ->
Vec<SourceInfoResponse> {
+ let response = http_client
+ .get(format!("{api_address}/sources"))
+ .send()
+ .await
+ .expect("Failed to query /sources");
+ assert_eq!(response.status(), 200);
+ response.json().await.expect("Failed to parse sources")
+}
+
+#[iggy_harness(
+ server(connectors_runtime(
+ config_path = "tests/connectors/runtime/sink_invalid_config.toml"
+ )),
+ seed = seeds::connector_stream
+)]
+async fn sink_with_invalid_config_does_not_abort_runtime(harness:
&TestHarness) {
+ let api_address = harness
+ .connectors_runtime()
+ .expect("connector runtime should be available")
+ .http_url();
+ let http_client = Client::new();
+
+ assert_runtime_healthy(&http_client, &api_address).await;
+ let sinks = fetch_sinks(&http_client, &api_address).await;
+
+ assert_eq!(
+ sinks.len(),
+ 2,
+ "Both the invalid and the valid sink should be visible in the API"
+ );
+
+ let invalid_sink = sinks
+ .iter()
+ .find(|sink| sink.key == "stdout_invalid")
+ .expect("Invalid sink should be reported");
+ assert_eq!(invalid_sink.status, ConnectorStatus::Error);
+ let last_error = invalid_sink
+ .last_error
+ .as_ref()
+ .expect("Invalid sink should expose a last_error");
+ assert!(
+ last_error.message.contains("poll interval"),
+ "last_error should mention the misconfigured poll_interval, got: {}",
+ last_error.message
+ );
+
+ let valid_sink = sinks
+ .iter()
+ .find(|sink| sink.key == "stdout_valid")
+ .expect("Healthy sibling sink should be reported");
+ assert_eq!(valid_sink.status, ConnectorStatus::Running);
+ assert!(
+ valid_sink.last_error.is_none(),
+ "Healthy sibling sink should have no last_error"
+ );
+}
+
+#[iggy_harness(
+ server(connectors_runtime(
+ config_path = "tests/connectors/runtime/sink_missing_plugin.toml"
+ )),
+ seed = seeds::connector_stream
+)]
+async fn sink_with_missing_plugin_does_not_abort_runtime(harness:
&TestHarness) {
+ let api_address = harness
+ .connectors_runtime()
+ .expect("connector runtime should be available")
+ .http_url();
+ let http_client = Client::new();
+
+ assert_runtime_healthy(&http_client, &api_address).await;
+ let sinks = fetch_sinks(&http_client, &api_address).await;
+
+ assert_eq!(
+ sinks.len(),
+ 2,
+ "Both the missing-plugin sink and the valid sink should be visible in
the API"
+ );
+
+ let missing_plugin_sink = sinks
+ .iter()
+ .find(|sink| sink.key == "stdout_missing_plugin")
+ .expect("Missing-plugin sink should be reported");
+ assert_eq!(missing_plugin_sink.status, ConnectorStatus::Error);
+ let last_error = missing_plugin_sink
+ .last_error
+ .as_ref()
+ .expect("Missing-plugin sink should expose a last_error");
+ assert!(
+ last_error.message.contains("Plugin library not found")
+ || last_error.message.contains("Failed to resolve plugin path"),
+ "last_error should mention the missing plugin path, got: {}",
+ last_error.message
+ );
+
+ let valid_sink = sinks
+ .iter()
+ .find(|sink| sink.key == "stdout_valid")
+ .expect("Healthy sibling sink should be reported");
+ assert_eq!(valid_sink.status, ConnectorStatus::Running);
+ assert!(
+ valid_sink.last_error.is_none(),
+ "Healthy sibling sink should have no last_error"
+ );
+}
+
+#[iggy_harness(
+ server(connectors_runtime(
+ config_path = "tests/connectors/runtime/source_invalid_state.toml"
+ )),
+ seed = seeds::connector_stream
+)]
+async fn source_with_invalid_state_does_not_abort_runtime(harness:
&TestHarness) {
+ let api_address = harness
+ .connectors_runtime()
+ .expect("connector runtime should be available")
+ .http_url();
+ let http_client = Client::new();
+
+ sleep(Duration::from_millis(500)).await;
+
+ assert_runtime_healthy(&http_client, &api_address).await;
+ let sources = fetch_sources(&http_client, &api_address).await;
+
+ assert_eq!(
+ sources.len(),
+ 2,
+ "Both the broken-state source and the valid source should be visible
in the API"
+ );
+
+ let invalid_source = sources
+ .iter()
+ .find(|source| source.key == "random_invalid/state_missing_parent")
+ .expect("Source with broken state path should be reported");
+ assert_eq!(invalid_source.status, ConnectorStatus::Error);
+ let last_error = invalid_source
+ .last_error
+ .as_ref()
+ .expect("Source with broken state should expose a last_error");
+ assert!(
+ last_error.message.contains("Failed to load source state"),
+ "last_error should mention the failed state load, got: {}",
+ last_error.message
+ );
+
+ let valid_source = sources
+ .iter()
+ .find(|source| source.key == "random_valid")
+ .expect("Healthy sibling source should be reported");
+ assert_eq!(valid_source.status, ConnectorStatus::Running);
+ assert!(
+ valid_source.last_error.is_none(),
+ "Healthy sibling source should have no last_error"
+ );
+}
+
+#[iggy_harness(
+ server(connectors_runtime(
+ config_path = "tests/connectors/runtime/source_missing_plugin.toml"
+ )),
+ seed = seeds::connector_stream
+)]
+async fn source_with_missing_plugin_does_not_abort_runtime(harness:
&TestHarness) {
+ let api_address = harness
+ .connectors_runtime()
+ .expect("connector runtime should be available")
+ .http_url();
+ let http_client = Client::new();
+
+ sleep(Duration::from_millis(500)).await;
+
+ assert_runtime_healthy(&http_client, &api_address).await;
+ let sources = fetch_sources(&http_client, &api_address).await;
+
+ assert_eq!(
+ sources.len(),
+ 2,
+ "Both the missing-plugin source and the valid source should be visible
in the API"
+ );
+
+ let missing_plugin_source = sources
+ .iter()
+ .find(|source| source.key == "random_missing_plugin")
+ .expect("Missing-plugin source should be reported");
+ assert_eq!(missing_plugin_source.status, ConnectorStatus::Error);
+ let last_error = missing_plugin_source
+ .last_error
+ .as_ref()
+ .expect("Missing-plugin source should expose a last_error");
+ assert!(
+ last_error.message.contains("Plugin library not found")
+ || last_error.message.contains("Failed to resolve plugin path"),
+ "last_error should mention the missing plugin path, got: {}",
+ last_error.message
+ );
+
+ let valid_source = sources
+ .iter()
+ .find(|source| source.key == "random_valid")
+ .expect("Healthy sibling source should be reported");
+ assert_eq!(valid_source.status, ConnectorStatus::Running);
+ assert!(
+ valid_source.last_error.is_none(),
+ "Healthy sibling source should have no last_error"
+ );
+}
+
+#[iggy_harness(
+ server(connectors_runtime(
+ config_path = "tests/connectors/runtime/source_invalid_config.toml"
+ )),
+ seed = seeds::connector_stream
+)]
+async fn source_with_invalid_config_does_not_abort_runtime(harness:
&TestHarness) {
+ let api_address = harness
+ .connectors_runtime()
+ .expect("connector runtime should be available")
+ .http_url();
+ let http_client = Client::new();
+
+ sleep(Duration::from_millis(500)).await;
+
+ assert_runtime_healthy(&http_client, &api_address).await;
+ let sources = fetch_sources(&http_client, &api_address).await;
+
+ assert_eq!(
+ sources.len(),
+ 2,
+ "Both the invalid and the valid source should be visible in the API"
+ );
+
+ let invalid_source = sources
+ .iter()
+ .find(|source| source.key == "random_invalid")
+ .expect("Invalid source should be reported");
+ assert_eq!(invalid_source.status, ConnectorStatus::Error);
+ let last_error = invalid_source
+ .last_error
+ .as_ref()
+ .expect("Invalid source should expose a last_error");
+ assert!(
+ last_error.message.contains("linger time"),
+ "last_error should mention the misconfigured linger_time, got: {}",
+ last_error.message
+ );
+
+ let valid_source = sources
+ .iter()
+ .find(|source| source.key == "random_valid")
+ .expect("Healthy sibling source should be reported");
+ assert_eq!(valid_source.status, ConnectorStatus::Running);
+ assert!(
+ valid_source.last_error.is_none(),
+ "Healthy sibling source should have no last_error"
+ );
+}
diff --git a/core/integration/tests/connectors/runtime/mod.rs
b/core/integration/tests/connectors/runtime/mod.rs
new file mode 100644
index 000000000..e5b353fc3
--- /dev/null
+++ b/core/integration/tests/connectors/runtime/mod.rs
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+mod error_isolation;
diff --git a/core/integration/tests/connectors/runtime/sink_invalid_config.toml
b/core/integration/tests/connectors/runtime/sink_invalid_config.toml
new file mode 100644
index 000000000..3d2c03553
--- /dev/null
+++ b/core/integration/tests/connectors/runtime/sink_invalid_config.toml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[connectors]
+config_type = "local"
+config_dir = "tests/connectors/runtime/sink_invalid_config"
diff --git
a/core/integration/tests/connectors/runtime/sink_invalid_config/stdout_invalid.toml
b/core/integration/tests/connectors/runtime/sink_invalid_config/stdout_invalid.toml
new file mode 100644
index 000000000..b4230b888
--- /dev/null
+++
b/core/integration/tests/connectors/runtime/sink_invalid_config/stdout_invalid.toml
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Deliberately broken sink: `poll_interval` cannot be parsed as a duration.
+# Used to assert that the runtime stays alive and reports this connector
+# with `ConnectorStatus::Error` instead of aborting startup.
+
+type = "sink"
+key = "stdout_invalid"
+enabled = true
+version = 0
+name = "Stdout invalid sink"
+path = "../../target/debug/libiggy_connector_stdout_sink"
+verbose = true
+
+[[streams]]
+stream = "test_stream"
+topics = ["test_topic"]
+schema = "json"
+batch_length = 100
+poll_interval = "not-a-valid-duration"
+consumer_group = "stdout_invalid_sink_connector"
+
+[plugin_config]
+print_payload = true
diff --git
a/core/integration/tests/connectors/runtime/sink_invalid_config/stdout_valid.toml
b/core/integration/tests/connectors/runtime/sink_invalid_config/stdout_valid.toml
new file mode 100644
index 000000000..0670d68fd
--- /dev/null
+++
b/core/integration/tests/connectors/runtime/sink_invalid_config/stdout_valid.toml
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Healthy companion sink: verifies that a sibling connector with valid
+# configuration still starts and runs when another connector in the same
+# runtime fails initialization.
+
+type = "sink"
+key = "stdout_valid"
+enabled = true
+version = 0
+name = "Stdout valid sink"
+path = "../../target/debug/libiggy_connector_stdout_sink"
+verbose = true
+
+[[streams]]
+stream = "test_stream"
+topics = ["test_topic"]
+schema = "json"
+batch_length = 100
+poll_interval = "5ms"
+consumer_group = "stdout_valid_sink_connector"
+
+[plugin_config]
+print_payload = true
diff --git a/core/integration/tests/connectors/runtime/sink_missing_plugin.toml
b/core/integration/tests/connectors/runtime/sink_missing_plugin.toml
new file mode 100644
index 000000000..7eb99d3b4
--- /dev/null
+++ b/core/integration/tests/connectors/runtime/sink_missing_plugin.toml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[connectors]
+config_type = "local"
+config_dir = "tests/connectors/runtime/sink_missing_plugin_config"
diff --git
a/core/integration/tests/connectors/runtime/sink_missing_plugin_config/stdout_missing.toml
b/core/integration/tests/connectors/runtime/sink_missing_plugin_config/stdout_missing.toml
new file mode 100644
index 000000000..608f34e32
--- /dev/null
+++
b/core/integration/tests/connectors/runtime/sink_missing_plugin_config/stdout_missing.toml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Deliberately broken sink: `path` points to a shared library that does not
+# exist on disk, exercising the pre-container failure branch in
+# `source::init` / `sink::init` (failed_plugins list, surfaced through
+# `map_failed_sinks` with `ConnectorStatus::Error`).
+
+type = "sink"
+key = "stdout_missing_plugin"
+enabled = true
+version = 0
+name = "Stdout missing plugin sink"
+path = "/nonexistent/path/to/libiggy_connector_stdout_sink_does_not_exist.so"
+verbose = true
+
+[[streams]]
+stream = "test_stream"
+topics = ["test_topic"]
+schema = "json"
+batch_length = 100
+poll_interval = "5ms"
+consumer_group = "stdout_missing_plugin_sink_connector"
+
+[plugin_config]
+print_payload = true
diff --git
a/core/integration/tests/connectors/runtime/sink_missing_plugin_config/stdout_valid.toml
b/core/integration/tests/connectors/runtime/sink_missing_plugin_config/stdout_valid.toml
new file mode 100644
index 000000000..f9a52dd06
--- /dev/null
+++
b/core/integration/tests/connectors/runtime/sink_missing_plugin_config/stdout_valid.toml
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Healthy companion sink: verifies that a sibling connector with valid
+# configuration still starts and runs when another connector in the same
+# runtime fails to load its plugin.
+
+type = "sink"
+key = "stdout_valid"
+enabled = true
+version = 0
+name = "Stdout valid sink"
+path = "../../target/debug/libiggy_connector_stdout_sink"
+verbose = true
+
+[[streams]]
+stream = "test_stream"
+topics = ["test_topic"]
+schema = "json"
+batch_length = 100
+poll_interval = "5ms"
+consumer_group = "stdout_valid_sink_connector"
+
+[plugin_config]
+print_payload = true
diff --git
a/core/integration/tests/connectors/runtime/source_invalid_config.toml
b/core/integration/tests/connectors/runtime/source_invalid_config.toml
new file mode 100644
index 000000000..ec101f9cd
--- /dev/null
+++ b/core/integration/tests/connectors/runtime/source_invalid_config.toml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[connectors]
+config_type = "local"
+config_dir = "tests/connectors/runtime/source_invalid_config"
diff --git
a/core/integration/tests/connectors/runtime/source_invalid_config/random_invalid.toml
b/core/integration/tests/connectors/runtime/source_invalid_config/random_invalid.toml
new file mode 100644
index 000000000..2df8c39c0
--- /dev/null
+++
b/core/integration/tests/connectors/runtime/source_invalid_config/random_invalid.toml
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Deliberately broken source: `linger_time` cannot be parsed as a duration.
+# Used to assert that the runtime stays alive and reports this connector
+# with `ConnectorStatus::Error` after the FFI plugin instance has been
+# cleaned up (post-container Err arm in `source::init`).
+
+type = "source"
+key = "random_invalid"
+enabled = true
+version = 0
+name = "Random invalid source"
+path = "../../target/debug/libiggy_connector_random_source"
+plugin_config_format = "json"
+
+[[streams]]
+stream = "test_stream"
+topic = "test_topic"
+schema = "json"
+batch_length = 1000
+linger_time = "not-a-valid-duration"
+
+[plugin_config]
+interval = "100ms"
+max_count = 1_000_000
+messages_range = [1, 5]
+payload_size = 200
diff --git
a/core/integration/tests/connectors/runtime/source_invalid_config/random_valid.toml
b/core/integration/tests/connectors/runtime/source_invalid_config/random_valid.toml
new file mode 100644
index 000000000..8810177d1
--- /dev/null
+++
b/core/integration/tests/connectors/runtime/source_invalid_config/random_valid.toml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Healthy companion source: verifies that a sibling connector still starts
+# and runs when another source in the same runtime fails producer setup.
+
+type = "source"
+key = "random_valid"
+enabled = true
+version = 0
+name = "Random valid source"
+path = "../../target/debug/libiggy_connector_random_source"
+plugin_config_format = "json"
+
+[[streams]]
+stream = "test_stream"
+topic = "test_topic"
+schema = "json"
+batch_length = 1000
+linger_time = "5ms"
+
+[plugin_config]
+interval = "100ms"
+max_count = 1_000_000
+messages_range = [1, 5]
+payload_size = 200
diff --git
a/core/integration/tests/connectors/runtime/source_invalid_state.toml
b/core/integration/tests/connectors/runtime/source_invalid_state.toml
new file mode 100644
index 000000000..bdb23b871
--- /dev/null
+++ b/core/integration/tests/connectors/runtime/source_invalid_state.toml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[connectors]
+config_type = "local"
+config_dir = "tests/connectors/runtime/source_invalid_state_config"
diff --git
a/core/integration/tests/connectors/runtime/source_invalid_state_config/random_invalid_state.toml
b/core/integration/tests/connectors/runtime/source_invalid_state_config/random_invalid_state.toml
new file mode 100644
index 000000000..c6632a152
--- /dev/null
+++
b/core/integration/tests/connectors/runtime/source_invalid_state_config/random_invalid_state.toml
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Deliberately broken source: the `/` in the connector key forces the runtime
+# to build a state file path of the form
+# `{state_dir}/source_random_invalid/state_missing_parent.state`.
+# The parent directory `source_random_invalid/` does not exist, so the state
+# load step in `source::init` returns `Err(CannotOpenStateFile)` and the
+# connector is recorded in the `failed_plugins` list. Exercises the
+# state-load failure branch of the per-connector error isolation.
+
+type = "source"
+key = "random_invalid/state_missing_parent"
+enabled = true
+version = 0
+name = "Random invalid state source"
+path = "../../target/debug/libiggy_connector_random_source"
+plugin_config_format = "json"
+
+[[streams]]
+stream = "test_stream"
+topic = "test_topic"
+schema = "json"
+batch_length = 1000
+linger_time = "5ms"
+
+[plugin_config]
+interval = "100ms"
+max_count = 1_000_000
+messages_range = [1, 5]
+payload_size = 200
diff --git
a/core/integration/tests/connectors/runtime/source_invalid_state_config/random_valid.toml
b/core/integration/tests/connectors/runtime/source_invalid_state_config/random_valid.toml
new file mode 100644
index 000000000..82d49229e
--- /dev/null
+++
b/core/integration/tests/connectors/runtime/source_invalid_state_config/random_valid.toml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Healthy companion source: verifies that a sibling connector keeps running
+# when another source in the same runtime fails its state load.
+
+type = "source"
+key = "random_valid"
+enabled = true
+version = 0
+name = "Random valid source"
+path = "../../target/debug/libiggy_connector_random_source"
+plugin_config_format = "json"
+
+[[streams]]
+stream = "test_stream"
+topic = "test_topic"
+schema = "json"
+batch_length = 1000
+linger_time = "5ms"
+
+[plugin_config]
+interval = "100ms"
+max_count = 1_000_000
+messages_range = [1, 5]
+payload_size = 200
diff --git
a/core/integration/tests/connectors/runtime/source_missing_plugin.toml
b/core/integration/tests/connectors/runtime/source_missing_plugin.toml
new file mode 100644
index 000000000..1d810b479
--- /dev/null
+++ b/core/integration/tests/connectors/runtime/source_missing_plugin.toml
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[connectors]
+config_type = "local"
+config_dir = "tests/connectors/runtime/source_missing_plugin_config"
diff --git
a/core/integration/tests/connectors/runtime/source_missing_plugin_config/random_missing.toml
b/core/integration/tests/connectors/runtime/source_missing_plugin_config/random_missing.toml
new file mode 100644
index 000000000..2deb6bbaf
--- /dev/null
+++
b/core/integration/tests/connectors/runtime/source_missing_plugin_config/random_missing.toml
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Deliberately broken source: `path` points to a shared library that does not
+# exist on disk, exercising the pre-container failure branch in
+# `source::init` (failed_plugins list, surfaced through `map_failed_sources`
+# with `ConnectorStatus::Error`).
+
+type = "source"
+key = "random_missing_plugin"
+enabled = true
+version = 0
+name = "Random missing plugin source"
+path = "/nonexistent/path/to/libiggy_connector_random_source_does_not_exist.so"
+plugin_config_format = "json"
+
+[[streams]]
+stream = "test_stream"
+topic = "test_topic"
+schema = "json"
+batch_length = 1000
+linger_time = "5ms"
+
+[plugin_config]
+interval = "100ms"
+max_count = 1_000_000
+messages_range = [1, 5]
+payload_size = 200
diff --git
a/core/integration/tests/connectors/runtime/source_missing_plugin_config/random_valid.toml
b/core/integration/tests/connectors/runtime/source_missing_plugin_config/random_valid.toml
new file mode 100644
index 000000000..a06ad65aa
--- /dev/null
+++
b/core/integration/tests/connectors/runtime/source_missing_plugin_config/random_valid.toml
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Healthy companion source: verifies that a sibling connector keeps running
+# when another source in the same runtime fails to load its plugin.
+
+type = "source"
+key = "random_valid"
+enabled = true
+version = 0
+name = "Random valid source"
+path = "../../target/debug/libiggy_connector_random_source"
+plugin_config_format = "json"
+
+[[streams]]
+stream = "test_stream"
+topic = "test_topic"
+schema = "json"
+batch_length = 1000
+linger_time = "5ms"
+
+[plugin_config]
+interval = "100ms"
+max_count = 1_000_000
+messages_range = [1, 5]
+payload_size = 200