ryerraguntla commented on code in PR #3140:
URL: https://github.com/apache/iggy/pull/3140#discussion_r3138844281
##########
core/connectors/sinks/influxdb_sink/src/lib.rs:
##########
@@ -125,316 +363,171 @@ enum PayloadFormat {
impl PayloadFormat {
fn from_config(value: Option<&str>) -> Self {
match value.map(|v| v.to_ascii_lowercase()).as_deref() {
- Some("text") | Some("utf8") => PayloadFormat::Text,
- Some("base64") | Some("raw") => PayloadFormat::Base64,
- Some("json") => PayloadFormat::Json,
- other => {
- warn!(
- "Unrecognized payload_format value {:?}, falling back to
JSON. \
- Valid values are: \"json\", \"text\", \"utf8\",
\"base64\", \"raw\".",
- other
- );
- PayloadFormat::Json
- }
- }
- }
-}
-
-// ---------------------------------------------------------------------------
-// Helpers
-// ---------------------------------------------------------------------------
-
-/// Write an escaped measurement name into `buf`.
-/// Escapes: `\` → `\\`, `,` → `\,`, ` ` → `\ `, `\n` → `\\n`, `\r` → `\\r`
-///
-/// Newline (`\n`) and carriage-return (`\r`) are the InfluxDB line-protocol
-/// record delimiters; a literal newline inside a measurement name would split
-/// the line and corrupt the batch.
-fn write_measurement(buf: &mut String, value: &str) {
- for ch in value.chars() {
- match ch {
- '\\' => buf.push_str("\\\\"),
- ',' => buf.push_str("\\,"),
- ' ' => buf.push_str("\\ "),
- '\n' => buf.push_str("\\n"),
- '\r' => buf.push_str("\\r"),
- _ => buf.push(ch),
- }
- }
-}
-
-/// Write an escaped tag key/value into `buf`.
-/// Escapes: `\` → `\\`, `,` → `\,`, `=` → `\=`, ` ` → `\ `, `\n` → `\\n`,
`\r` → `\\r`
-///
-/// Newline and carriage-return are escaped for the same reason as in
-/// [`write_measurement`]: they are InfluxDB line-protocol record delimiters.
-fn write_tag_value(buf: &mut String, value: &str) {
- for ch in value.chars() {
- match ch {
- '\\' => buf.push_str("\\\\"),
- ',' => buf.push_str("\\,"),
- '=' => buf.push_str("\\="),
- ' ' => buf.push_str("\\ "),
- '\n' => buf.push_str("\\n"),
- '\r' => buf.push_str("\\r"),
- _ => buf.push(ch),
- }
- }
-}
-
-/// Write an escaped string field value (without surrounding quotes) into
`buf`.
-/// Escapes: `\` → `\\`, `"` → `\"`, `\n` → `\\n`, `\r` → `\\r`
-///
-/// Newline and carriage-return are the InfluxDB line-protocol record
-/// delimiters; a literal newline inside a string field value (e.g. from a
-/// multi-line text payload) would split the line and corrupt the batch.
-fn write_field_string(buf: &mut String, value: &str) {
- for ch in value.chars() {
- match ch {
- '\\' => buf.push_str("\\\\"),
- '"' => buf.push_str("\\\""),
- '\n' => buf.push_str("\\n"),
- '\r' => buf.push_str("\\r"),
- _ => buf.push(ch),
+ Some("text") | Some("utf8") => Self::Text,
+ Some("base64") | Some("raw") => Self::Base64,
+ _ => Self::Json,
}
}
}
-// ---------------------------------------------------------------------------
-// InfluxDbSink implementation
-// ---------------------------------------------------------------------------
+// ── InfluxDbSink impl
─────────────────────────────────────────────────────────
impl InfluxDbSink {
pub fn new(id: u32, config: InfluxDbSinkConfig) -> Self {
- let verbose = config.verbose_logging.unwrap_or(false);
- let retry_delay = parse_duration(config.retry_delay.as_deref(),
DEFAULT_RETRY_DELAY);
- let payload_format =
PayloadFormat::from_config(config.payload_format.as_deref());
-
- // Build circuit breaker from config
- let cb_threshold = config
- .circuit_breaker_threshold
- .unwrap_or(DEFAULT_CIRCUIT_BREAKER_THRESHOLD);
- let cb_cool_down = parse_duration(
- config.circuit_breaker_cool_down.as_deref(),
- DEFAULT_CIRCUIT_COOL_DOWN,
- );
-
- InfluxDbSink {
+ let verbose = config.verbose_logging();
+ let retry_delay = parse_duration(config.retry_delay(),
DEFAULT_RETRY_DELAY);
+ let payload_format =
PayloadFormat::from_config(config.payload_format());
+ let circuit_breaker = Arc::new(CircuitBreaker::new(
+ config.circuit_breaker_threshold(),
+ parse_duration(
+ config.circuit_breaker_cool_down(),
+ DEFAULT_CIRCUIT_COOL_DOWN,
+ ),
+ ));
+ let measurement =
config.measurement().unwrap_or("iggy_messages").to_string();
+ let precision = config.precision().to_string();
+ let include_metadata = config.include_metadata();
+ let include_checksum = config.include_checksum();
+ let include_origin_timestamp = config.include_origin_timestamp();
+ let include_stream_tag = config.include_stream_tag();
+ let include_topic_tag = config.include_topic_tag();
+ let include_partition_tag = config.include_partition_tag();
+ let batch_size_limit = config.batch_size().max(1) as usize;
+
+ Self {
id,
config,
client: None,
write_url: None,
+ auth_header: None,
+ circuit_breaker,
messages_attempted: AtomicU64::new(0),
write_success: AtomicU64::new(0),
write_errors: AtomicU64::new(0),
verbose,
retry_delay,
payload_format,
- circuit_breaker: Arc::new(CircuitBreaker::new(cb_threshold,
cb_cool_down)),
+ measurement,
+ precision,
+ include_metadata,
+ include_checksum,
+ include_origin_timestamp,
+ include_stream_tag,
+ include_topic_tag,
+ include_partition_tag,
+ batch_size_limit,
}
}
fn build_raw_client(&self) -> Result<reqwest::Client, Error> {
- let timeout = parse_duration(self.config.timeout.as_deref(),
DEFAULT_TIMEOUT);
+ let timeout = parse_duration(self.config.timeout(), DEFAULT_TIMEOUT);
reqwest::Client::builder()
.timeout(timeout)
.build()
.map_err(|e| Error::InitError(format!("Failed to create HTTP
client: {e}")))
}
- fn build_write_url(&self) -> Result<Url, Error> {
- let base = self.config.url.trim_end_matches('/');
- let mut url = Url::parse(&format!("{base}/api/v2/write"))
- .map_err(|e| Error::InvalidConfigValue(format!("Invalid InfluxDB
URL: {e}")))?;
-
- let precision = self
- .config
- .precision
- .as_deref()
- .unwrap_or(DEFAULT_PRECISION);
- url.query_pairs_mut()
- .append_pair("org", &self.config.org)
- .append_pair("bucket", &self.config.bucket)
- .append_pair("precision", precision);
-
- Ok(url)
- }
-
- fn build_health_url(&self) -> Result<Url, Error> {
- let base = self.config.url.trim_end_matches('/');
- Url::parse(&format!("{base}/health"))
- .map_err(|e| Error::InvalidConfigValue(format!("Invalid InfluxDB
URL: {e}")))
- }
-
fn get_client(&self) -> Result<&ClientWithMiddleware, Error> {
- self.client
- .as_ref()
- .ok_or_else(|| Error::Connection("InfluxDB client is not
initialized".to_string()))
- }
-
- fn measurement(&self) -> &str {
- self.config
- .measurement
- .as_deref()
- .unwrap_or("iggy_messages")
- }
-
- fn payload_format(&self) -> PayloadFormat {
- self.payload_format
- }
-
- fn timestamp_precision(&self) -> &str {
- self.config
- .precision
- .as_deref()
- .unwrap_or(DEFAULT_PRECISION)
- }
-
- fn get_max_retries(&self) -> u32 {
- self.config
- .max_retries
- .unwrap_or(DEFAULT_MAX_RETRIES)
- .max(1)
+ self.client.as_ref().ok_or_else(|| {
+ Error::Connection("InfluxDB client not initialized — call open()
first".to_string())
+ })
}
+ #[inline]
fn to_precision_timestamp(&self, micros: u64) -> u64 {
- match self.timestamp_precision() {
+ match self.precision.as_str() {
"ns" => micros.saturating_mul(1_000),
"us" => micros,
"ms" => micros / 1_000,
"s" => micros / 1_000_000,
- _ => micros,
+ _ => unreachable!("precision validated in open()"),
Review Comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]