mmodzelewski commented on code in PR #3244:
URL: https://github.com/apache/iggy/pull/3244#discussion_r3233658994
##########
core/connectors/runtime/src/sink.rs:
##########
@@ -59,102 +71,141 @@ pub async fn init(
}
let plugin_id = PLUGIN_ID.fetch_add(1, Ordering::SeqCst);
- let path = resolve_plugin_path(&config.path)?;
+
+ let path = match resolve_plugin_path(&config.path) {
+ Ok(path) => path,
+ Err(error) => {
+ let message = format!("Failed to resolve plugin path:
{error}");
+ error!("Sink: {name} ({key}) - {message}");
+ failed_plugins.push(build_failed_plugin(
+ plugin_id, &key, &name, &config, message,
+ ));
+ continue;
+ }
+ };
+
info!(
"Initializing sink container with name: {name} ({key}), config
version: {}, plugin: {path}",
&config.version
);
- let init_error: Option<String>;
- if let Some(container) = sink_connectors.get_mut(&path) {
- info!("Sink container for plugin: {path} is already loaded.");
- let version = get_plugin_version(&container.container);
- init_error = init_sink(
- &container.container,
- &config.plugin_config.clone().unwrap_or_default(),
- plugin_id,
- )
- .err()
- .map(|error| error.to_string());
- container.plugins.push(SinkConnectorPlugin {
- id: plugin_id,
- key: key.clone(),
- name: name.clone(),
- path: path.clone(),
- version,
- config_format: config.plugin_config_format,
- consumers: vec![],
- error: init_error.clone(),
- verbose: config.verbose,
- });
- } else {
- let container: Container<SinkApi> = unsafe {
- Container::load(&path).map_err(|error| {
- RuntimeError::InvalidConfiguration(format!(
- "Failed to load sink container from {path}: {error}"
- ))
- })?
+
+ if !sink_connectors.contains_key(&path) {
+ let container = match unsafe { Container::<SinkApi>::load(&path) }
{
+ Ok(container) => container,
+ Err(error) => {
+ let message = format!("Failed to load sink container from
{path}: {error}");
+ error!("Sink: {name} ({key}) - {message}");
+ failed_plugins.push(build_failed_plugin(
+ plugin_id, &key, &name, &config, message,
+ ));
+ continue;
+ }
};
info!("Sink container for plugin: {path} loaded successfully.");
- let version = get_plugin_version(&container);
- init_error = init_sink(
- &container,
- &config.plugin_config.clone().unwrap_or_default(),
- plugin_id,
- )
- .err()
- .map(|error| error.to_string());
sink_connectors.insert(
path.clone(),
SinkConnector {
container,
- plugins: vec![SinkConnectorPlugin {
- id: plugin_id,
- key: key.clone(),
- name: name.clone(),
- path: path.clone(),
- version,
- config_format: config.plugin_config_format,
- consumers: vec![],
- error: init_error.clone(),
- verbose: config.verbose,
- }],
+ plugins: Vec::new(),
},
);
+ } else {
+ info!("Sink container for plugin: {path} is already loaded.");
}
+ let connector = sink_connectors
+ .get_mut(&path)
+ .expect("sink container was just ensured for this path");
+ let version = get_plugin_version(&connector.container);
+ let init_error = init_sink(
+ &connector.container,
+ &config.plugin_config.clone().unwrap_or_default(),
+ plugin_id,
+ )
+ .err()
+ .map(|error| error.to_string());
+
+ connector.plugins.push(SinkConnectorPlugin {
+ id: plugin_id,
+ key: key.clone(),
+ name: name.clone(),
+ path: path.clone(),
+ version,
+ config_format: config.plugin_config_format,
+ consumers: vec![],
+ error: init_error.clone(),
+ verbose: config.verbose,
+ });
+
if let Some(error) = init_error {
error!("Failed to initialize sink container with name: {name}
({key}). {error}");
continue;
- } else {
- info!(
- "Sink container with name: {name} ({key}), initialized
successfully with ID: {plugin_id}."
- );
}
- let consumers = setup_sink_consumers(&key, &config,
iggy_client).await?;
- let connector = sink_connectors.get_mut(&path).ok_or_else(|| {
- RuntimeError::InvalidConfiguration(format!("Sink connector not
found for path: {path}"))
- })?;
- let plugin = connector
- .plugins
- .iter_mut()
- .find(|p| p.id == plugin_id)
- .ok_or_else(|| {
- RuntimeError::InvalidConfiguration(format!(
- "Sink plugin not found for ID: {plugin_id}"
- ))
- })?;
- for (consumer, decoder, batch_size, transforms) in consumers {
- plugin.consumers.push(SinkConnectorConsumer {
- consumer,
- decoder,
- batch_size,
- transforms,
- });
+ match setup_sink_consumers(&key, &config, iggy_client).await {
+ Ok(consumers) => {
+ let connector = sink_connectors
+ .get_mut(&path)
+ .expect("sink connector was inserted above");
+ let plugin = connector
+ .plugins
+ .iter_mut()
+ .find(|plugin| plugin.id == plugin_id)
+ .expect("sink plugin was pushed above");
+ for (consumer, decoder, batch_size, transforms) in consumers {
+ plugin.consumers.push(SinkConnectorConsumer {
+ consumer,
+ decoder,
+ batch_size,
+ transforms,
+ });
+ }
+ info!(
+ "Sink container with name: {name} ({key}) initialized
successfully with ID: {plugin_id}."
+ );
+ }
+ Err(error) => {
+ let message = format!("Failed to set up sink consumers:
{error}");
+ error!("Sink: {name} ({key}) - {message}");
+ let connector = sink_connectors
+ .get_mut(&path)
+ .expect("sink connector was inserted above");
+ let close_result =
(connector.container.iggy_sink_close)(plugin_id);
+ if close_result != 0 {
+ warn!(
+ "iggy_sink_close returned {close_result} while
cleaning up failed sink connector with ID: {plugin_id} ({key})"
+ );
+ }
+ if let Some(plugin) = connector
+ .plugins
+ .iter_mut()
+ .find(|plugin| plugin.id == plugin_id)
+ {
+ plugin.error = Some(message);
+ }
+ }
}
}
- Ok(sink_connectors)
+ Ok((sink_connectors, failed_plugins))
+}
+
+fn build_failed_plugin(
+ id: u32,
+ key: &str,
+ name: &str,
+ config: &SinkConfig,
+ error: String,
+) -> FailedPlugin {
+ FailedPlugin {
+ id,
+ key: key.to_owned(),
+ name: name.to_owned(),
+ path: config.path.clone(),
+ config_format: config.plugin_config_format,
+ error,
+ enabled: config.enabled,
+ }
}
Review Comment:
refactored the method to `new`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]