ryerraguntla commented on code in PR #3140:
URL: https://github.com/apache/iggy/pull/3140#discussion_r3142656714
##########
core/connectors/sinks/influxdb_sink/src/lib.rs:
##########
@@ -31,88 +34,323 @@ use reqwest::Url;
use reqwest_middleware::ClientWithMiddleware;
use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
+use std::fmt::Write as _;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
-use std::time::Duration;
-use std::time::SystemTime;
-use std::time::UNIX_EPOCH;
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::{debug, error, info, warn};
+
sink_connector!(InfluxDbSink);
const DEFAULT_MAX_RETRIES: u32 = 3;
const DEFAULT_RETRY_DELAY: &str = "1s";
const DEFAULT_TIMEOUT: &str = "30s";
const DEFAULT_PRECISION: &str = "us";
-// Maximum attempts for open() connectivity retries
const DEFAULT_MAX_OPEN_RETRIES: u32 = 10;
-// Cap for exponential backoff in open() — never wait longer than this
const DEFAULT_OPEN_RETRY_MAX_DELAY: &str = "60s";
-// Cap for exponential backoff on per-write retries — kept short so a
-// transient InfluxDB blip does not stall message delivery for too long
const DEFAULT_RETRY_MAX_DELAY: &str = "5s";
-// How many consecutive batch failures open the circuit breaker
const DEFAULT_CIRCUIT_BREAKER_THRESHOLD: u32 = 5;
-// How long the circuit stays open before allowing a probe attempt
const DEFAULT_CIRCUIT_COOL_DOWN: &str = "30s";
-// ---------------------------------------------------------------------------
-// Main connector structs
-// ---------------------------------------------------------------------------
+// ── Configuration
─────────────────────────────────────────────────────────────
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct V2SinkConfig {
+ pub(crate) url: String,
+ pub(crate) org: String,
+ pub(crate) bucket: String,
+ #[serde(serialize_with = "serialize_secret")]
+ pub(crate) token: SecretString,
+ pub(crate) measurement: Option<String>,
+ pub(crate) precision: Option<String>,
+ pub(crate) batch_size: Option<u32>,
+ pub(crate) include_metadata: Option<bool>,
+ pub(crate) include_checksum: Option<bool>,
+ pub(crate) include_origin_timestamp: Option<bool>,
+ pub(crate) include_stream_tag: Option<bool>,
+ pub(crate) include_topic_tag: Option<bool>,
+ pub(crate) include_partition_tag: Option<bool>,
+ pub(crate) payload_format: Option<String>,
+ pub(crate) verbose_logging: Option<bool>,
+ pub(crate) max_retries: Option<u32>,
+ pub(crate) retry_delay: Option<String>,
+ pub(crate) timeout: Option<String>,
+ pub(crate) max_open_retries: Option<u32>,
+ pub(crate) open_retry_max_delay: Option<String>,
+ pub(crate) retry_max_delay: Option<String>,
+ pub(crate) circuit_breaker_threshold: Option<u32>,
+ pub(crate) circuit_breaker_cool_down: Option<String>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct V3SinkConfig {
+ pub(crate) url: String,
+ pub(crate) db: String,
+ #[serde(serialize_with = "serialize_secret")]
+ pub(crate) token: SecretString,
+ pub(crate) measurement: Option<String>,
+ pub(crate) precision: Option<String>,
+ pub(crate) batch_size: Option<u32>,
+ pub(crate) include_metadata: Option<bool>,
+ pub(crate) include_checksum: Option<bool>,
+ pub(crate) include_origin_timestamp: Option<bool>,
+ pub(crate) include_stream_tag: Option<bool>,
+ pub(crate) include_topic_tag: Option<bool>,
+ pub(crate) include_partition_tag: Option<bool>,
+ pub(crate) payload_format: Option<String>,
+ pub(crate) verbose_logging: Option<bool>,
+ pub(crate) max_retries: Option<u32>,
+ pub(crate) retry_delay: Option<String>,
+ pub(crate) timeout: Option<String>,
+ pub(crate) max_open_retries: Option<u32>,
+ pub(crate) open_retry_max_delay: Option<String>,
+ pub(crate) retry_max_delay: Option<String>,
+ pub(crate) circuit_breaker_threshold: Option<u32>,
+ pub(crate) circuit_breaker_cool_down: Option<String>,
+}
+
+#[derive(Debug, Clone, Serialize)]
+#[serde(tag = "version")]
+pub enum InfluxDbSinkConfig {
+ #[serde(rename = "v2")]
+ V2(V2SinkConfig),
+ #[serde(rename = "v3")]
+ V3(V3SinkConfig),
+}
+
+impl<'de> serde::Deserialize<'de> for InfluxDbSinkConfig {
+ fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self,
D::Error> {
+ let raw = serde_json::Value::deserialize(d)?;
+ let version = raw.get("version").and_then(|v|
v.as_str()).unwrap_or("v2");
+ match version {
+ "v2" => serde_json::from_value::<V2SinkConfig>(raw)
+ .map(Self::V2)
+ .map_err(serde::de::Error::custom),
+ "v3" => serde_json::from_value::<V3SinkConfig>(raw)
+ .map(Self::V3)
+ .map_err(serde::de::Error::custom),
+ other => Err(serde::de::Error::custom(format!(
+ "unknown InfluxDB version {other:?}; expected \"v2\" or \"v3\""
+ ))),
+ }
+ }
+}
+
+/// Map a short precision string to InfluxDB 3's long-form equivalent.
+///
+/// InfluxDB 3 rejects the short forms (`"ns"`, `"us"`, `"ms"`, `"s"`) on the
+/// `/api/v3/write_lp` endpoint and expects full English words. Returns an
error
+/// for unrecognised values rather than silently defaulting.
+#[must_use = "precision mapping errors must be propagated — ignoring this
silently corrupts timestamps"]
+fn map_precision_v3(p: &str) -> Result<&'static str, Error> {
+ match p {
+ "ns" => Ok("nanosecond"),
+ "us" => Ok("microsecond"),
+ "ms" => Ok("millisecond"),
+ "s" => Ok("second"),
+ other => Err(Error::InvalidConfigValue(format!(
+ "unknown precision {other:?}; valid values are \"ns\", \"us\",
\"ms\", \"s\""
+ ))),
+ }
+}
+
+// Eliminates the repetitive "match self { V2(c) => …, V3(c) => … }" pattern
for
+// fields that are identical across all config variants. Methods with
version-specific
+// logic (auth_header, build_write_url, build_health_url, version_label)
remain explicit.
+//
+// Supported patterns:
+// delegate!(ref self.url) → &String (borrow)
+// delegate!(opt self.measurement) → Option<&str>
+// delegate!(str_or self.precision, "us") → &str with string
fallback
+// delegate!(unwrap self.batch_size, 500) → T: Copy with value
fallback
+//
+// Not supported (use explicit match arms instead):
+// Fields with version-specific defaults (e.g. cursor_field: "_time" vs
"time")
+// Fields with chained transformations (e.g. .max(1))
+// Fields requiring complex construction (e.g. auth_header building)
+macro_rules! delegate {
+ // &T field reference → fn foo(&self) -> &T
+ (ref $self:ident . $field:ident) => {
+ match $self {
+ Self::V2(c) => &c.$field,
+ Self::V3(c) => &c.$field,
+ }
+ };
+ // Option<String> → Option<&str>
+ (opt $self:ident . $field:ident) => {
+ match $self {
+ Self::V2(c) => c.$field.as_deref(),
+ Self::V3(c) => c.$field.as_deref(),
+ }
+ };
+ // Option<String> → &str with fallback
+ (str_or $self:ident . $field:ident, $default:expr) => {
+ match $self {
+ Self::V2(c) => c.$field.as_deref().unwrap_or($default),
+ Self::V3(c) => c.$field.as_deref().unwrap_or($default),
+ }
+ };
+ // Option<T: Copy> → T with fallback
+ (unwrap $self:ident . $field:ident, $default:expr) => {
+ match $self {
+ Self::V2(c) => c.$field.unwrap_or($default),
+ Self::V3(c) => c.$field.unwrap_or($default),
+ }
+ };
+}
+
+impl InfluxDbSinkConfig {
+ fn url(&self) -> &str {
+ delegate!(ref self.url)
+ }
+ fn base_url(&self) -> &str {
+ self.url().trim_end_matches('/')
+ }
+ fn measurement(&self) -> Option<&str> {
+ delegate!(opt self.measurement)
+ }
+ fn precision(&self) -> &str {
+ delegate!(str_or self.precision, DEFAULT_PRECISION)
+ }
+ fn batch_size(&self) -> u32 {
+ delegate!(unwrap self.batch_size, 500)
+ }
+ fn include_metadata(&self) -> bool {
+ delegate!(unwrap self.include_metadata, true)
+ }
+ fn include_checksum(&self) -> bool {
+ delegate!(unwrap self.include_checksum, true)
+ }
+ fn include_origin_timestamp(&self) -> bool {
+ delegate!(unwrap self.include_origin_timestamp, true)
+ }
+ fn include_stream_tag(&self) -> bool {
+ delegate!(unwrap self.include_stream_tag, true)
+ }
+ fn include_topic_tag(&self) -> bool {
+ delegate!(unwrap self.include_topic_tag, true)
+ }
+ fn include_partition_tag(&self) -> bool {
+ delegate!(unwrap self.include_partition_tag, true)
+ }
+ fn payload_format(&self) -> Option<&str> {
+ delegate!(opt self.payload_format)
+ }
+ fn verbose_logging(&self) -> bool {
+ delegate!(unwrap self.verbose_logging, false)
+ }
+ fn max_retries(&self) -> u32 {
+ delegate!(unwrap self.max_retries, DEFAULT_MAX_RETRIES)
+ }
+ fn retry_delay(&self) -> Option<&str> {
+ delegate!(opt self.retry_delay)
+ }
+ fn timeout(&self) -> Option<&str> {
+ delegate!(opt self.timeout)
+ }
+ fn max_open_retries(&self) -> u32 {
+ delegate!(unwrap self.max_open_retries, DEFAULT_MAX_OPEN_RETRIES)
+ }
+ fn open_retry_max_delay(&self) -> Option<&str> {
+ delegate!(opt self.open_retry_max_delay)
+ }
+ fn retry_max_delay(&self) -> Option<&str> {
+ delegate!(opt self.retry_max_delay)
+ }
+ fn circuit_breaker_threshold(&self) -> u32 {
+ delegate!(unwrap self.circuit_breaker_threshold,
DEFAULT_CIRCUIT_BREAKER_THRESHOLD)
+ }
+ fn circuit_breaker_cool_down(&self) -> Option<&str> {
+ delegate!(opt self.circuit_breaker_cool_down)
+ }
+
+ fn auth_header(&self) -> String {
+ match self {
+ Self::V2(c) => format!("Token {}", c.token.expose_secret()),
+ Self::V3(c) => format!("Bearer {}", c.token.expose_secret()),
+ }
+ }
+
+ #[must_use = "URL construction can fail; the error must be propagated or
open() will silently use a stale URL"]
+ fn build_write_url(&self) -> Result<Url, Error> {
+ let precision = self.precision();
+ match self {
+ Self::V2(c) => {
+ if c.org.trim().is_empty() {
+ return Err(Error::InvalidConfigValue(
+ "InfluxDB V2 'org' must not be empty".into(),
+ ));
+ }
+ let mut url = Url::parse(&format!("{}/api/v2/write",
self.base_url()))
+ .map_err(|e| Error::InvalidConfigValue(format!("Invalid
InfluxDB URL: {e}")))?;
+ url.query_pairs_mut()
+ .append_pair("org", &c.org)
+ .append_pair("bucket", &c.bucket)
+ .append_pair("precision", precision);
+ Ok(url)
+ }
+ Self::V3(c) => {
+ let mut url = Url::parse(&format!("{}/api/v3/write_lp",
self.base_url()))
+ .map_err(|e| Error::InvalidConfigValue(format!("Invalid
InfluxDB URL: {e}")))?;
+ url.query_pairs_mut()
+ .append_pair("db", &c.db)
+ .append_pair("precision", map_precision_v3(precision)?);
+ Ok(url)
+ }
+ }
+ }
+
+ #[must_use = "URL construction can fail; the error must be propagated"]
+ fn build_health_url(&self) -> Result<Url, Error> {
+ Url::parse(&format!("{}/health", self.base_url()))
+ .map_err(|e| Error::InvalidConfigValue(format!("Invalid InfluxDB
URL: {e}")))
+ }
+
+ fn version_label(&self) -> &'static str {
+ match self {
+ Self::V2(_) => "v2",
+ Self::V3(_) => "v3",
+ }
+ }
+}
+
+// ── Sink struct
───────────────────────────────────────────────────────────────
+/// InfluxDB sink connector state.
+///
+/// **Init-time fields** (populated in `new()` from config, never `None`):
+/// `id`, `config`, `circuit_breaker`, `verbose`, `retry_delay`,
`payload_format`,
+/// `measurement`, `precision`, `include_*`, `batch_size_limit`.
+///
+/// **Open-time fields** (populated in `open()`, guarded by `Option<T>`):
+/// `client`, `write_url`, `auth_header` — callers must invoke `open()` before
+/// any `process_batch()` call; `get_client()` returns an error otherwise.
#[derive(Debug)]
Review Comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]