atharvalade commented on code in PR #3103: URL: https://github.com/apache/iggy/pull/3103#discussion_r3138646648
########## core/connectors/sinks/s3_sink/src/lib.rs: ########## @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use iggy_connector_sdk::{Error, sink_connector}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::fmt; + +pub mod buffer; +pub mod client; +pub mod formatter; +pub mod path; +pub mod sink; + +sink_connector!(S3Sink); + +const DEFAULT_MAX_RETRIES: u32 = 3; +const DEFAULT_RETRY_DELAY: &str = "1s"; +const DEFAULT_MAX_FILE_SIZE: &str = "8MiB"; +const DEFAULT_PATH_TEMPLATE: &str = "{stream}/{topic}/{date}/{hour}"; +const DEFAULT_OUTPUT_FORMAT: &str = "json_lines"; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct S3SinkConfig { + pub bucket: String, + pub region: String, + #[serde(default)] + pub prefix: Option<String>, + #[serde(default)] + pub endpoint: Option<String>, + #[serde(default)] + pub access_key_id: Option<String>, + #[serde(default)] + pub secret_access_key: Option<String>, + #[serde(default = "default_path_template")] + pub path_template: String, + #[serde(default = "default_file_rotation")] + pub file_rotation: FileRotation, + #[serde(default = "default_max_file_size")] + pub max_file_size: String, + #[serde(default)] + pub max_messages_per_file: Option<u64>, + #[serde(default = "default_output_format")] + pub output_format: String, + #[serde(default = "default_true")] + pub include_metadata: bool, + #[serde(default)] + pub include_headers: bool, + #[serde(default)] + pub max_retries: Option<u32>, + #[serde(default)] + pub retry_delay: Option<String>, + #[serde(default)] + pub path_style: Option<bool>, +} + +fn default_path_template() -> String { + DEFAULT_PATH_TEMPLATE.to_string() +} + +fn default_file_rotation() -> FileRotation { + FileRotation::Size +} + +fn default_max_file_size() -> String { + DEFAULT_MAX_FILE_SIZE.to_string() +} + +fn default_output_format() -> String { + DEFAULT_OUTPUT_FORMAT.to_string() +} + +fn default_true() -> bool { + true +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum FileRotation { + Size, + Messages, +} + +impl fmt::Display for FileRotation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + FileRotation::Size => write!(f, "size"), + FileRotation::Messages => write!(f, "messages"), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum OutputFormat { + JsonLines, + JsonArray, + Raw, +} + +impl OutputFormat { + pub fn from_str_config(s: &str) -> Result<Self, Error> { + match s.to_lowercase().as_str() { + "json_lines" | "jsonl" | "jsonlines" => Ok(OutputFormat::JsonLines), + "json_array" | "json" => Ok(OutputFormat::JsonArray), + "raw" => Ok(OutputFormat::Raw), + other => Err(Error::InvalidConfigValue(format!( + "Unknown output format: '{other}'. Expected: json_lines, json_array, or raw" + ))), + } + } Review Comment: I'll implement `TryFrom<&str>` for `OutputFormat` which is more idiomatic -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
