ryerraguntla commented on code in PR #3140:
URL: https://github.com/apache/iggy/pull/3140#discussion_r3142656714


##########
core/connectors/sinks/influxdb_sink/src/lib.rs:
##########
@@ -31,88 +34,323 @@ use reqwest::Url;
 use reqwest_middleware::ClientWithMiddleware;
 use secrecy::{ExposeSecret, SecretString};
 use serde::{Deserialize, Serialize};
+use std::fmt::Write as _;
 use std::sync::Arc;
 use std::sync::atomic::{AtomicU64, Ordering};
-use std::time::Duration;
-use std::time::SystemTime;
-use std::time::UNIX_EPOCH;
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
 use tracing::{debug, error, info, warn};
+
 sink_connector!(InfluxDbSink);
 
 const DEFAULT_MAX_RETRIES: u32 = 3;
 const DEFAULT_RETRY_DELAY: &str = "1s";
 const DEFAULT_TIMEOUT: &str = "30s";
 const DEFAULT_PRECISION: &str = "us";
-// Maximum attempts for open() connectivity retries
 const DEFAULT_MAX_OPEN_RETRIES: u32 = 10;
-// Cap for exponential backoff in open() — never wait longer than this
 const DEFAULT_OPEN_RETRY_MAX_DELAY: &str = "60s";
-// Cap for exponential backoff on per-write retries — kept short so a
-// transient InfluxDB blip does not stall message delivery for too long
 const DEFAULT_RETRY_MAX_DELAY: &str = "5s";
-// How many consecutive batch failures open the circuit breaker
 const DEFAULT_CIRCUIT_BREAKER_THRESHOLD: u32 = 5;
-// How long the circuit stays open before allowing a probe attempt
 const DEFAULT_CIRCUIT_COOL_DOWN: &str = "30s";
 
-// ---------------------------------------------------------------------------
-// Main connector structs
-// ---------------------------------------------------------------------------
+// ── Configuration 
─────────────────────────────────────────────────────────────
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct V2SinkConfig {
+    pub(crate) url: String,
+    pub(crate) org: String,
+    pub(crate) bucket: String,
+    #[serde(serialize_with = "serialize_secret")]
+    pub(crate) token: SecretString,
+    pub(crate) measurement: Option<String>,
+    pub(crate) precision: Option<String>,
+    pub(crate) batch_size: Option<u32>,
+    pub(crate) include_metadata: Option<bool>,
+    pub(crate) include_checksum: Option<bool>,
+    pub(crate) include_origin_timestamp: Option<bool>,
+    pub(crate) include_stream_tag: Option<bool>,
+    pub(crate) include_topic_tag: Option<bool>,
+    pub(crate) include_partition_tag: Option<bool>,
+    pub(crate) payload_format: Option<String>,
+    pub(crate) verbose_logging: Option<bool>,
+    pub(crate) max_retries: Option<u32>,
+    pub(crate) retry_delay: Option<String>,
+    pub(crate) timeout: Option<String>,
+    pub(crate) max_open_retries: Option<u32>,
+    pub(crate) open_retry_max_delay: Option<String>,
+    pub(crate) retry_max_delay: Option<String>,
+    pub(crate) circuit_breaker_threshold: Option<u32>,
+    pub(crate) circuit_breaker_cool_down: Option<String>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct V3SinkConfig {
+    pub(crate) url: String,
+    pub(crate) db: String,
+    #[serde(serialize_with = "serialize_secret")]
+    pub(crate) token: SecretString,
+    pub(crate) measurement: Option<String>,
+    pub(crate) precision: Option<String>,
+    pub(crate) batch_size: Option<u32>,
+    pub(crate) include_metadata: Option<bool>,
+    pub(crate) include_checksum: Option<bool>,
+    pub(crate) include_origin_timestamp: Option<bool>,
+    pub(crate) include_stream_tag: Option<bool>,
+    pub(crate) include_topic_tag: Option<bool>,
+    pub(crate) include_partition_tag: Option<bool>,
+    pub(crate) payload_format: Option<String>,
+    pub(crate) verbose_logging: Option<bool>,
+    pub(crate) max_retries: Option<u32>,
+    pub(crate) retry_delay: Option<String>,
+    pub(crate) timeout: Option<String>,
+    pub(crate) max_open_retries: Option<u32>,
+    pub(crate) open_retry_max_delay: Option<String>,
+    pub(crate) retry_max_delay: Option<String>,
+    pub(crate) circuit_breaker_threshold: Option<u32>,
+    pub(crate) circuit_breaker_cool_down: Option<String>,
+}
+
+#[derive(Debug, Clone, Serialize)]
+#[serde(tag = "version")]
+pub enum InfluxDbSinkConfig {
+    #[serde(rename = "v2")]
+    V2(V2SinkConfig),
+    #[serde(rename = "v3")]
+    V3(V3SinkConfig),
+}
+
+impl<'de> serde::Deserialize<'de> for InfluxDbSinkConfig {
+    fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self, 
D::Error> {
+        let raw = serde_json::Value::deserialize(d)?;
+        let version = raw.get("version").and_then(|v| 
v.as_str()).unwrap_or("v2");
+        match version {
+            "v2" => serde_json::from_value::<V2SinkConfig>(raw)
+                .map(Self::V2)
+                .map_err(serde::de::Error::custom),
+            "v3" => serde_json::from_value::<V3SinkConfig>(raw)
+                .map(Self::V3)
+                .map_err(serde::de::Error::custom),
+            other => Err(serde::de::Error::custom(format!(
+                "unknown InfluxDB version {other:?}; expected \"v2\" or \"v3\""
+            ))),
+        }
+    }
+}
+
+/// Map a short precision string to InfluxDB 3's long-form equivalent.
+///
+/// InfluxDB 3 rejects the short forms (`"ns"`, `"us"`, `"ms"`, `"s"`) on the
+/// `/api/v3/write_lp` endpoint and expects full English words. Returns an 
error
+/// for unrecognised values rather than silently defaulting.
+#[must_use = "precision mapping errors must be propagated — ignoring this 
silently corrupts timestamps"]
+fn map_precision_v3(p: &str) -> Result<&'static str, Error> {
+    match p {
+        "ns" => Ok("nanosecond"),
+        "us" => Ok("microsecond"),
+        "ms" => Ok("millisecond"),
+        "s" => Ok("second"),
+        other => Err(Error::InvalidConfigValue(format!(
+            "unknown precision {other:?}; valid values are \"ns\", \"us\", 
\"ms\", \"s\""
+        ))),
+    }
+}
+
+// Eliminates the repetitive "match self { V2(c) => …, V3(c) => … }" pattern 
for
+// fields that are identical across all config variants. Methods with 
version-specific
+// logic (auth_header, build_write_url, build_health_url, version_label) 
remain explicit.
+//
+// Supported patterns:
+//   delegate!(ref  self.url)                        →  &String (borrow)
+//   delegate!(opt  self.measurement)                →  Option<&str>
+//   delegate!(str_or self.precision, "us")          →  &str with string 
fallback
+//   delegate!(unwrap self.batch_size, 500)          →  T: Copy with value 
fallback
+//
+// Not supported (use explicit match arms instead):
+//   Fields with version-specific defaults (e.g. cursor_field: "_time" vs 
"time")
+//   Fields with chained transformations (e.g. .max(1))
+//   Fields requiring complex construction (e.g. auth_header building)
+macro_rules! delegate {
+    // &T field reference  →  fn foo(&self) -> &T
+    (ref $self:ident . $field:ident) => {
+        match $self {
+            Self::V2(c) => &c.$field,
+            Self::V3(c) => &c.$field,
+        }
+    };
+    // Option<String>  →  Option<&str>
+    (opt $self:ident . $field:ident) => {
+        match $self {
+            Self::V2(c) => c.$field.as_deref(),
+            Self::V3(c) => c.$field.as_deref(),
+        }
+    };
+    // Option<String>  →  &str with fallback
+    (str_or $self:ident . $field:ident, $default:expr) => {
+        match $self {
+            Self::V2(c) => c.$field.as_deref().unwrap_or($default),
+            Self::V3(c) => c.$field.as_deref().unwrap_or($default),
+        }
+    };
+    // Option<T: Copy>  →  T with fallback
+    (unwrap $self:ident . $field:ident, $default:expr) => {
+        match $self {
+            Self::V2(c) => c.$field.unwrap_or($default),
+            Self::V3(c) => c.$field.unwrap_or($default),
+        }
+    };
+}
+
+impl InfluxDbSinkConfig {
+    fn url(&self) -> &str {
+        delegate!(ref     self.url)
+    }
+    fn base_url(&self) -> &str {
+        self.url().trim_end_matches('/')
+    }
+    fn measurement(&self) -> Option<&str> {
+        delegate!(opt     self.measurement)
+    }
+    fn precision(&self) -> &str {
+        delegate!(str_or  self.precision, DEFAULT_PRECISION)
+    }
+    fn batch_size(&self) -> u32 {
+        delegate!(unwrap  self.batch_size, 500)
+    }
+    fn include_metadata(&self) -> bool {
+        delegate!(unwrap  self.include_metadata, true)
+    }
+    fn include_checksum(&self) -> bool {
+        delegate!(unwrap  self.include_checksum, true)
+    }
+    fn include_origin_timestamp(&self) -> bool {
+        delegate!(unwrap  self.include_origin_timestamp, true)
+    }
+    fn include_stream_tag(&self) -> bool {
+        delegate!(unwrap  self.include_stream_tag, true)
+    }
+    fn include_topic_tag(&self) -> bool {
+        delegate!(unwrap  self.include_topic_tag, true)
+    }
+    fn include_partition_tag(&self) -> bool {
+        delegate!(unwrap  self.include_partition_tag, true)
+    }
+    fn payload_format(&self) -> Option<&str> {
+        delegate!(opt     self.payload_format)
+    }
+    fn verbose_logging(&self) -> bool {
+        delegate!(unwrap  self.verbose_logging, false)
+    }
+    fn max_retries(&self) -> u32 {
+        delegate!(unwrap  self.max_retries, DEFAULT_MAX_RETRIES)
+    }
+    fn retry_delay(&self) -> Option<&str> {
+        delegate!(opt     self.retry_delay)
+    }
+    fn timeout(&self) -> Option<&str> {
+        delegate!(opt     self.timeout)
+    }
+    fn max_open_retries(&self) -> u32 {
+        delegate!(unwrap  self.max_open_retries, DEFAULT_MAX_OPEN_RETRIES)
+    }
+    fn open_retry_max_delay(&self) -> Option<&str> {
+        delegate!(opt   self.open_retry_max_delay)
+    }
+    fn retry_max_delay(&self) -> Option<&str> {
+        delegate!(opt     self.retry_max_delay)
+    }
+    fn circuit_breaker_threshold(&self) -> u32 {
+        delegate!(unwrap  self.circuit_breaker_threshold, 
DEFAULT_CIRCUIT_BREAKER_THRESHOLD)
+    }
+    fn circuit_breaker_cool_down(&self) -> Option<&str> {
+        delegate!(opt self.circuit_breaker_cool_down)
+    }
+
+    fn auth_header(&self) -> String {
+        match self {
+            Self::V2(c) => format!("Token {}", c.token.expose_secret()),
+            Self::V3(c) => format!("Bearer {}", c.token.expose_secret()),
+        }
+    }
+
+    #[must_use = "URL construction can fail; the error must be propagated or 
open() will silently use a stale URL"]
+    fn build_write_url(&self) -> Result<Url, Error> {
+        let precision = self.precision();
+        match self {
+            Self::V2(c) => {
+                if c.org.trim().is_empty() {
+                    return Err(Error::InvalidConfigValue(
+                        "InfluxDB V2 'org' must not be empty".into(),
+                    ));
+                }
+                let mut url = Url::parse(&format!("{}/api/v2/write", 
self.base_url()))
+                    .map_err(|e| Error::InvalidConfigValue(format!("Invalid 
InfluxDB URL: {e}")))?;
+                url.query_pairs_mut()
+                    .append_pair("org", &c.org)
+                    .append_pair("bucket", &c.bucket)
+                    .append_pair("precision", precision);
+                Ok(url)
+            }
+            Self::V3(c) => {
+                let mut url = Url::parse(&format!("{}/api/v3/write_lp", 
self.base_url()))
+                    .map_err(|e| Error::InvalidConfigValue(format!("Invalid 
InfluxDB URL: {e}")))?;
+                url.query_pairs_mut()
+                    .append_pair("db", &c.db)
+                    .append_pair("precision", map_precision_v3(precision)?);
+                Ok(url)
+            }
+        }
+    }
+
+    #[must_use = "URL construction can fail; the error must be propagated"]
+    fn build_health_url(&self) -> Result<Url, Error> {
+        Url::parse(&format!("{}/health", self.base_url()))
+            .map_err(|e| Error::InvalidConfigValue(format!("Invalid InfluxDB 
URL: {e}")))
+    }
+
+    fn version_label(&self) -> &'static str {
+        match self {
+            Self::V2(_) => "v2",
+            Self::V3(_) => "v3",
+        }
+    }
+}
+
+// ── Sink struct 
───────────────────────────────────────────────────────────────
 
+/// InfluxDB sink connector state.
+///
+/// **Init-time fields** (populated in `new()` from config, never `None`):
+/// `id`, `config`, `circuit_breaker`, `verbose`, `retry_delay`, 
`payload_format`,
+/// `measurement`, `precision`, `include_*`, `batch_size_limit`.
+///
+/// **Open-time fields** (populated in `open()`, guarded by `Option<T>`):
+/// `client`, `write_url`, `auth_header` — callers must invoke `open()` before
+/// any `process_batch()` call; `get_client()` returns an error otherwise.
 #[derive(Debug)]

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to