ryerraguntla commented on code in PR #3140:
URL: https://github.com/apache/iggy/pull/3140#discussion_r3142655148
##########
core/connectors/sinks/influxdb_sink/src/lib.rs:
##########
@@ -31,88 +34,323 @@ use reqwest::Url;
use reqwest_middleware::ClientWithMiddleware;
use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
+use std::fmt::Write as _;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
-use std::time::Duration;
-use std::time::SystemTime;
-use std::time::UNIX_EPOCH;
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::{debug, error, info, warn};
+
sink_connector!(InfluxDbSink);
const DEFAULT_MAX_RETRIES: u32 = 3;
const DEFAULT_RETRY_DELAY: &str = "1s";
const DEFAULT_TIMEOUT: &str = "30s";
const DEFAULT_PRECISION: &str = "us";
-// Maximum attempts for open() connectivity retries
const DEFAULT_MAX_OPEN_RETRIES: u32 = 10;
-// Cap for exponential backoff in open() — never wait longer than this
const DEFAULT_OPEN_RETRY_MAX_DELAY: &str = "60s";
-// Cap for exponential backoff on per-write retries — kept short so a
-// transient InfluxDB blip does not stall message delivery for too long
const DEFAULT_RETRY_MAX_DELAY: &str = "5s";
-// How many consecutive batch failures open the circuit breaker
const DEFAULT_CIRCUIT_BREAKER_THRESHOLD: u32 = 5;
-// How long the circuit stays open before allowing a probe attempt
const DEFAULT_CIRCUIT_COOL_DOWN: &str = "30s";
-// ---------------------------------------------------------------------------
-// Main connector structs
-// ---------------------------------------------------------------------------
+// ── Configuration
─────────────────────────────────────────────────────────────
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct V2SinkConfig {
+ pub(crate) url: String,
+ pub(crate) org: String,
+ pub(crate) bucket: String,
+ #[serde(serialize_with = "serialize_secret")]
+ pub(crate) token: SecretString,
+ pub(crate) measurement: Option<String>,
+ pub(crate) precision: Option<String>,
+ pub(crate) batch_size: Option<u32>,
+ pub(crate) include_metadata: Option<bool>,
+ pub(crate) include_checksum: Option<bool>,
+ pub(crate) include_origin_timestamp: Option<bool>,
+ pub(crate) include_stream_tag: Option<bool>,
+ pub(crate) include_topic_tag: Option<bool>,
+ pub(crate) include_partition_tag: Option<bool>,
+ pub(crate) payload_format: Option<String>,
+ pub(crate) verbose_logging: Option<bool>,
+ pub(crate) max_retries: Option<u32>,
+ pub(crate) retry_delay: Option<String>,
+ pub(crate) timeout: Option<String>,
+ pub(crate) max_open_retries: Option<u32>,
+ pub(crate) open_retry_max_delay: Option<String>,
+ pub(crate) retry_max_delay: Option<String>,
+ pub(crate) circuit_breaker_threshold: Option<u32>,
+ pub(crate) circuit_breaker_cool_down: Option<String>,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct V3SinkConfig {
+ pub(crate) url: String,
+ pub(crate) db: String,
+ #[serde(serialize_with = "serialize_secret")]
+ pub(crate) token: SecretString,
+ pub(crate) measurement: Option<String>,
+ pub(crate) precision: Option<String>,
+ pub(crate) batch_size: Option<u32>,
+ pub(crate) include_metadata: Option<bool>,
+ pub(crate) include_checksum: Option<bool>,
+ pub(crate) include_origin_timestamp: Option<bool>,
+ pub(crate) include_stream_tag: Option<bool>,
+ pub(crate) include_topic_tag: Option<bool>,
+ pub(crate) include_partition_tag: Option<bool>,
+ pub(crate) payload_format: Option<String>,
+ pub(crate) verbose_logging: Option<bool>,
+ pub(crate) max_retries: Option<u32>,
+ pub(crate) retry_delay: Option<String>,
+ pub(crate) timeout: Option<String>,
+ pub(crate) max_open_retries: Option<u32>,
+ pub(crate) open_retry_max_delay: Option<String>,
+ pub(crate) retry_max_delay: Option<String>,
+ pub(crate) circuit_breaker_threshold: Option<u32>,
+ pub(crate) circuit_breaker_cool_down: Option<String>,
+}
+
+#[derive(Debug, Clone, Serialize)]
+#[serde(tag = "version")]
+pub enum InfluxDbSinkConfig {
+ #[serde(rename = "v2")]
+ V2(V2SinkConfig),
+ #[serde(rename = "v3")]
+ V3(V3SinkConfig),
+}
+
+impl<'de> serde::Deserialize<'de> for InfluxDbSinkConfig {
+ fn deserialize<D: serde::Deserializer<'de>>(d: D) -> Result<Self,
D::Error> {
+ let raw = serde_json::Value::deserialize(d)?;
+ let version = raw.get("version").and_then(|v|
v.as_str()).unwrap_or("v2");
Review Comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]