atharvalade commented on code in PR #3103: URL: https://github.com/apache/iggy/pull/3103#discussion_r3138639703
########## core/connectors/sinks/s3_sink/src/lib.rs: ########## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use iggy_connector_sdk::{Error, sink_connector}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::fmt; + +pub mod buffer; +pub mod client; +pub mod formatter; +pub mod path; +pub mod sink; + +sink_connector!(S3Sink); + +const DEFAULT_MAX_RETRIES: u32 = 3; +const DEFAULT_RETRY_DELAY: &str = "1s"; +const DEFAULT_MAX_FILE_SIZE: &str = "8MiB"; +const DEFAULT_PATH_TEMPLATE: &str = "{stream}/{topic}/{date}/{hour}"; +const DEFAULT_OUTPUT_FORMAT: &str = "json_lines"; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct S3SinkConfig { + pub bucket: String, + pub region: String, + #[serde(default)] + pub prefix: Option<String>, + #[serde(default)] + pub endpoint: Option<String>, + #[serde(default)] + pub access_key_id: Option<String>, + #[serde(default)] + pub secret_access_key: Option<String>, + #[serde(default = "default_path_template")] + pub path_template: String, + #[serde(default = "default_file_rotation")] + pub file_rotation: FileRotation, + #[serde(default = "default_max_file_size")] + pub max_file_size: String, + #[serde(default)] + pub max_messages_per_file: Option<u64>, + #[serde(default = "default_output_format")] + pub output_format: String, + #[serde(default = "default_true")] + pub include_metadata: bool, + #[serde(default)] + pub include_headers: bool, + #[serde(default)] + pub max_retries: Option<u32>, + #[serde(default)] + pub retry_delay: Option<String>, + #[serde(default)] + pub path_style: Option<bool>, +} + +fn default_path_template() -> String { + DEFAULT_PATH_TEMPLATE.to_string() +} + +fn default_file_rotation() -> FileRotation { + FileRotation::Size +} + +fn default_max_file_size() -> String { + DEFAULT_MAX_FILE_SIZE.to_string() +} + +fn default_output_format() -> String { + DEFAULT_OUTPUT_FORMAT.to_string() +} + +fn default_true() -> bool { + true +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum FileRotation { + Size, + Messages, +} + +impl fmt::Display for FileRotation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + FileRotation::Size => write!(f, "size"), + FileRotation::Messages => write!(f, "messages"), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum OutputFormat { + JsonLines, + JsonArray, + Raw, +} + +impl OutputFormat { + pub fn from_str_config(s: &str) -> Result<Self, Error> { + match s.to_lowercase().as_str() { + "json_lines" | "jsonl" | "jsonlines" => Ok(OutputFormat::JsonLines), + "json_array" | "json" => Ok(OutputFormat::JsonArray), + "raw" => Ok(OutputFormat::Raw), + other => Err(Error::InvalidConfigValue(format!( + "Unknown output format: '{other}'. Expected: json_lines, json_array, or raw" + ))), + } + } + + pub fn file_extension(&self) -> &'static str { + match self { + OutputFormat::JsonLines => "jsonl", + OutputFormat::JsonArray => "json", + OutputFormat::Raw => "bin", + } + } +} + +#[derive(Debug)] +pub struct S3Sink { + id: u32, + config: S3SinkConfig, + bucket: Option<Box<s3::Bucket>>, + buffers: tokio::sync::Mutex<HashMap<BufferKey, buffer::FileBuffer>>, + max_file_size_bytes: u64, + output_format: OutputFormat, + state: tokio::sync::Mutex<SinkState>, + retry_delay: std::time::Duration, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct BufferKey { + pub stream: String, + pub topic: String, + pub partition_id: u32, +} + +#[derive(Debug)] +struct SinkState { + messages_processed: u64, + uploads_completed: u64, + upload_errors: u64, +} + +impl S3Sink { + pub fn new(id: u32, config: S3SinkConfig) -> Self { + let output_format = match OutputFormat::from_str_config(&config.output_format) { + Ok(fmt) => fmt, + Err(e) => { + tracing::warn!( + "S3 sink ID: {id} invalid output_format '{}': {e}, defaulting to json_lines", + config.output_format, + ); + OutputFormat::JsonLines + } + }; + + let max_file_size_bytes = match parse_file_size(&config.max_file_size) { + Ok(size) => size, + Err(e) => { + tracing::warn!( + "S3 sink ID: {id} invalid max_file_size '{}': {e}, defaulting to 8 MiB", + config.max_file_size, + ); + 8 * 1024 * 1024 + } + }; + + let delay_str = config.retry_delay.as_deref().unwrap_or(DEFAULT_RETRY_DELAY); + let retry_delay = match humantime::Duration::from_str(delay_str) { + Ok(d) => d.into(), + Err(e) => { + tracing::warn!( + "S3 sink ID: {id} invalid retry_delay '{delay_str}': {e}, defaulting to 1s", + ); + std::time::Duration::from_secs(1) Review Comment: Good point, I'll change these to return hard errors on invalid config -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
