ryerraguntla commented on code in PR #3140:
URL: https://github.com/apache/iggy/pull/3140#discussion_r3138901924


##########
core/connectors/sinks/influxdb_sink/src/lib.rs:
##########
@@ -125,316 +363,171 @@ enum PayloadFormat {
 impl PayloadFormat {
     fn from_config(value: Option<&str>) -> Self {
         match value.map(|v| v.to_ascii_lowercase()).as_deref() {
-            Some("text") | Some("utf8") => PayloadFormat::Text,
-            Some("base64") | Some("raw") => PayloadFormat::Base64,
-            Some("json") => PayloadFormat::Json,
-            other => {
-                warn!(
-                    "Unrecognized payload_format value {:?}, falling back to 
JSON. \
-                     Valid values are: \"json\", \"text\", \"utf8\", 
\"base64\", \"raw\".",
-                    other
-                );
-                PayloadFormat::Json
-            }
-        }
-    }
-}
-
-// ---------------------------------------------------------------------------
-// Helpers
-// ---------------------------------------------------------------------------
-
-/// Write an escaped measurement name into `buf`.
-/// Escapes: `\` → `\\`, `,` → `\,`, ` ` → `\ `, `\n` → `\\n`, `\r` → `\\r`
-///
-/// Newline (`\n`) and carriage-return (`\r`) are the InfluxDB line-protocol
-/// record delimiters; a literal newline inside a measurement name would split
-/// the line and corrupt the batch.
-fn write_measurement(buf: &mut String, value: &str) {
-    for ch in value.chars() {
-        match ch {
-            '\\' => buf.push_str("\\\\"),
-            ',' => buf.push_str("\\,"),
-            ' ' => buf.push_str("\\ "),
-            '\n' => buf.push_str("\\n"),
-            '\r' => buf.push_str("\\r"),
-            _ => buf.push(ch),
-        }
-    }
-}
-
-/// Write an escaped tag key/value into `buf`.
-/// Escapes: `\` → `\\`, `,` → `\,`, `=` → `\=`, ` ` → `\ `, `\n` → `\\n`, 
`\r` → `\\r`
-///
-/// Newline and carriage-return are escaped for the same reason as in
-/// [`write_measurement`]: they are InfluxDB line-protocol record delimiters.
-fn write_tag_value(buf: &mut String, value: &str) {
-    for ch in value.chars() {
-        match ch {
-            '\\' => buf.push_str("\\\\"),
-            ',' => buf.push_str("\\,"),
-            '=' => buf.push_str("\\="),
-            ' ' => buf.push_str("\\ "),
-            '\n' => buf.push_str("\\n"),
-            '\r' => buf.push_str("\\r"),
-            _ => buf.push(ch),
-        }
-    }
-}
-
-/// Write an escaped string field value (without surrounding quotes) into 
`buf`.
-/// Escapes: `\` → `\\`, `"` → `\"`, `\n` → `\\n`, `\r` → `\\r`
-///
-/// Newline and carriage-return are the InfluxDB line-protocol record
-/// delimiters; a literal newline inside a string field value (e.g. from a
-/// multi-line text payload) would split the line and corrupt the batch.
-fn write_field_string(buf: &mut String, value: &str) {
-    for ch in value.chars() {
-        match ch {
-            '\\' => buf.push_str("\\\\"),
-            '"' => buf.push_str("\\\""),
-            '\n' => buf.push_str("\\n"),
-            '\r' => buf.push_str("\\r"),
-            _ => buf.push(ch),
+            Some("text") | Some("utf8") => Self::Text,
+            Some("base64") | Some("raw") => Self::Base64,
+            _ => Self::Json,
         }
     }
 }
 
-// ---------------------------------------------------------------------------
-// InfluxDbSink implementation
-// ---------------------------------------------------------------------------
+// ── InfluxDbSink impl 
─────────────────────────────────────────────────────────
 
 impl InfluxDbSink {
     pub fn new(id: u32, config: InfluxDbSinkConfig) -> Self {
-        let verbose = config.verbose_logging.unwrap_or(false);
-        let retry_delay = parse_duration(config.retry_delay.as_deref(), 
DEFAULT_RETRY_DELAY);
-        let payload_format = 
PayloadFormat::from_config(config.payload_format.as_deref());
-
-        // Build circuit breaker from config
-        let cb_threshold = config
-            .circuit_breaker_threshold
-            .unwrap_or(DEFAULT_CIRCUIT_BREAKER_THRESHOLD);
-        let cb_cool_down = parse_duration(
-            config.circuit_breaker_cool_down.as_deref(),
-            DEFAULT_CIRCUIT_COOL_DOWN,
-        );
-
-        InfluxDbSink {
+        let verbose = config.verbose_logging();
+        let retry_delay = parse_duration(config.retry_delay(), 
DEFAULT_RETRY_DELAY);
+        let payload_format = 
PayloadFormat::from_config(config.payload_format());
+        let circuit_breaker = Arc::new(CircuitBreaker::new(
+            config.circuit_breaker_threshold(),
+            parse_duration(
+                config.circuit_breaker_cool_down(),
+                DEFAULT_CIRCUIT_COOL_DOWN,
+            ),
+        ));
+        let measurement = 
config.measurement().unwrap_or("iggy_messages").to_string();
+        let precision = config.precision().to_string();
+        let include_metadata = config.include_metadata();
+        let include_checksum = config.include_checksum();
+        let include_origin_timestamp = config.include_origin_timestamp();
+        let include_stream_tag = config.include_stream_tag();
+        let include_topic_tag = config.include_topic_tag();
+        let include_partition_tag = config.include_partition_tag();
+        let batch_size_limit = config.batch_size().max(1) as usize;
+
+        Self {
             id,
             config,
             client: None,
             write_url: None,
+            auth_header: None,
+            circuit_breaker,
             messages_attempted: AtomicU64::new(0),
             write_success: AtomicU64::new(0),
             write_errors: AtomicU64::new(0),
             verbose,
             retry_delay,
             payload_format,
-            circuit_breaker: Arc::new(CircuitBreaker::new(cb_threshold, 
cb_cool_down)),
+            measurement,
+            precision,
+            include_metadata,
+            include_checksum,
+            include_origin_timestamp,
+            include_stream_tag,
+            include_topic_tag,
+            include_partition_tag,
+            batch_size_limit,
         }
     }
 
     fn build_raw_client(&self) -> Result<reqwest::Client, Error> {
-        let timeout = parse_duration(self.config.timeout.as_deref(), 
DEFAULT_TIMEOUT);
+        let timeout = parse_duration(self.config.timeout(), DEFAULT_TIMEOUT);
         reqwest::Client::builder()
             .timeout(timeout)
             .build()
             .map_err(|e| Error::InitError(format!("Failed to create HTTP 
client: {e}")))
     }
 
-    fn build_write_url(&self) -> Result<Url, Error> {
-        let base = self.config.url.trim_end_matches('/');
-        let mut url = Url::parse(&format!("{base}/api/v2/write"))
-            .map_err(|e| Error::InvalidConfigValue(format!("Invalid InfluxDB 
URL: {e}")))?;
-
-        let precision = self
-            .config
-            .precision
-            .as_deref()
-            .unwrap_or(DEFAULT_PRECISION);
-        url.query_pairs_mut()
-            .append_pair("org", &self.config.org)
-            .append_pair("bucket", &self.config.bucket)
-            .append_pair("precision", precision);
-
-        Ok(url)
-    }
-
-    fn build_health_url(&self) -> Result<Url, Error> {
-        let base = self.config.url.trim_end_matches('/');
-        Url::parse(&format!("{base}/health"))
-            .map_err(|e| Error::InvalidConfigValue(format!("Invalid InfluxDB 
URL: {e}")))
-    }
-
     fn get_client(&self) -> Result<&ClientWithMiddleware, Error> {
-        self.client
-            .as_ref()
-            .ok_or_else(|| Error::Connection("InfluxDB client is not 
initialized".to_string()))
-    }
-
-    fn measurement(&self) -> &str {
-        self.config
-            .measurement
-            .as_deref()
-            .unwrap_or("iggy_messages")
-    }
-
-    fn payload_format(&self) -> PayloadFormat {
-        self.payload_format
-    }
-
-    fn timestamp_precision(&self) -> &str {
-        self.config
-            .precision
-            .as_deref()
-            .unwrap_or(DEFAULT_PRECISION)
-    }
-
-    fn get_max_retries(&self) -> u32 {
-        self.config
-            .max_retries
-            .unwrap_or(DEFAULT_MAX_RETRIES)
-            .max(1)
+        self.client.as_ref().ok_or_else(|| {
+            Error::Connection("InfluxDB client not initialized — call open() 
first".to_string())
+        })
     }
 
+    #[inline]
     fn to_precision_timestamp(&self, micros: u64) -> u64 {
-        match self.timestamp_precision() {
+        match self.precision.as_str() {
             "ns" => micros.saturating_mul(1_000),
             "us" => micros,
             "ms" => micros / 1_000,
             "s" => micros / 1_000_000,
-            _ => micros,
+            _ => unreachable!("precision validated in open()"),
         }
     }
 
-    /// Serialise one message as a line-protocol line, appending directly into
-    /// `buf` with no intermediate `Vec<String>` for tags or fields.
-    ///
-    /// # Allocation budget (per message, happy path)
-    /// - Zero `Vec` allocations for tags or fields.
-    /// - Zero per-tag/per-field `format!` allocations.
-    /// - One `Vec<u8>` for `payload_bytes` (unavoidable — payload must be
-    ///   decoded/serialised before it can be escaped into the buffer).
-    /// - The caller's `buf` grows in place; if it was pre-allocated with
-    ///   `with_capacity` it will not reallocate for typical message sizes.
     fn append_line(
         &self,
         buf: &mut String,
         topic_metadata: &TopicMetadata,
         messages_metadata: &MessagesMetadata,
         message: &ConsumedMessage,
     ) -> Result<(), Error> {
-        let include_metadata = self.config.include_metadata.unwrap_or(true);
-        let include_checksum = self.config.include_checksum.unwrap_or(true);
-        let include_origin_timestamp = 
self.config.include_origin_timestamp.unwrap_or(true);
-        let include_stream_tag = 
self.config.include_stream_tag.unwrap_or(true);
-        let include_topic_tag = self.config.include_topic_tag.unwrap_or(true);
-        let include_partition_tag = 
self.config.include_partition_tag.unwrap_or(true);
-
-        // ── Measurement 
──────────────────────────────────────────────────────
-        write_measurement(buf, self.measurement());
-
-        // ── Tag set 
──────────────────────────────────────────────────────────
-        // Tags are written as ",key=value" pairs directly into buf.
-        // The offset tag is always present — it makes every point unique in
-        // InfluxDB's deduplication key (measurement + tag set + timestamp),
-        // regardless of precision or how many messages share a timestamp.
-        if include_metadata && include_stream_tag {
+        write_measurement(buf, &self.measurement);
+
+        // Tag *key* strings below ("stream", "topic", "partition", "offset", 
etc.) are
+        // static ASCII literals — they contain no InfluxDB line-protocol 
special chars
+        // (comma, equals, space, backslash, newline) and therefore do not 
need escaping.
+        // Only the tag *values* (user-supplied stream/topic names) are 
escaped via
+        // `write_tag_value`. The user-supplied `measurement` is escaped via
+        // `write_measurement`.
+        if self.include_metadata && self.include_stream_tag {
             buf.push_str(",stream=");
             write_tag_value(buf, &topic_metadata.stream);
         }
-        if include_metadata && include_topic_tag {
+        if self.include_metadata && self.include_topic_tag {
             buf.push_str(",topic=");
             write_tag_value(buf, &topic_metadata.topic);
         }
-        if include_metadata && include_partition_tag {
-            use std::fmt::Write as _;
-            write!(buf, ",partition={}", messages_metadata.partition_id)
-                .expect("write to String is infallible");
-        }
-        // offset tag — always written, ensures point uniqueness
-        {
-            use std::fmt::Write as _;
-            write!(buf, ",offset={}", message.offset).expect("write to String 
is infallible");
+        if self.include_metadata && self.include_partition_tag {
+            write!(buf, ",partition={}", 
messages_metadata.partition_id).expect("infallible");
         }
+        // `offset` is always written as a tag regardless of 
`include_metadata`.
+        // It forms the deduplication key for idempotent writes: without it, 
two
+        // messages at the same timestamp in the same measurement+tag-set would
+        // silently overwrite each other in InfluxDB's last-write-wins model.
+        write!(buf, ",offset={}", message.offset).expect("infallible");
 
-        // ── Field set 
────────────────────────────────────────────────────────
-        // First field: no leading comma.  All subsequent fields: leading 
comma.
         buf.push(' ');
-
         buf.push_str("message_id=\"");
         write_field_string(buf, &message.id.to_string());

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to