mlevkov commented on code in PR #2925: URL: https://github.com/apache/iggy/pull/2925#discussion_r2990343359
########## core/connectors/sinks/http_sink/src/lib.rs: ########## @@ -0,0 +1,2121 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use async_trait::async_trait; +use base64::Engine; +use base64::engine::general_purpose; +use bytes::Bytes; +use humantime::Duration as HumanDuration; +use iggy_connector_sdk::{ + ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, + convert::owned_value_to_serde_json, sink_connector, +}; +use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; +use std::str::FromStr; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tracing::{debug, error, info, warn}; + +sink_connector!(HttpSink); + +const DEFAULT_TIMEOUT: &str = "30s"; +const DEFAULT_RETRY_DELAY: &str = "1s"; +const DEFAULT_MAX_RETRY_DELAY: &str = "30s"; +const DEFAULT_MAX_RETRIES: u32 = 3; +const DEFAULT_BACKOFF_MULTIPLIER: f64 = 2.0; +const DEFAULT_MAX_PAYLOAD_SIZE: u64 = 10 * 1024 * 1024; // 10 MB +const DEFAULT_MAX_CONNECTIONS: usize = 10; +/// TCP keep-alive interval for detecting dead connections behind load balancers. +/// Cloud LBs silently drop idle connections (AWS ALB ~60s, GCP ~600s); +/// probing at 30s detects these before requests fail. +const DEFAULT_TCP_KEEPALIVE_SECS: u64 = 30; +/// Close pooled connections unused for this long. Prevents stale connections +/// from accumulating when traffic is bursty. +const DEFAULT_POOL_IDLE_TIMEOUT_SECS: u64 = 90; +/// Abort remaining messages in individual/raw mode after this many consecutive HTTP failures. +/// Prevents hammering a dead endpoint with N sequential retry cycles per poll. +const MAX_CONSECUTIVE_FAILURES: u32 = 3; + +/// HTTP method enum — validated at deserialization, prevents invalid values like "DELEET" or "GETX". +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "UPPERCASE")] +pub enum HttpMethod { + Get, + Head, + #[default] + Post, + Put, + Patch, + Delete, +} + +/// Payload formatting mode for HTTP requests. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum BatchMode { + /// One HTTP request per message (default). Note: with batch_length=50, this produces 50 + /// sequential HTTP round trips per poll cycle. Use ndjson or json_array for higher throughput. + #[default] + Individual, + /// All messages in one request, newline-delimited JSON. + Ndjson, + /// All messages as a single JSON array. + JsonArray, + /// Raw bytes, one request per message (for non-JSON payloads). + Raw, +} + +/// Configuration for the HTTP sink connector, deserialized from [plugin_config] in config.toml. +#[derive(Debug, Serialize, Deserialize)] +pub struct HttpSinkConfig { + /// Target URL for HTTP requests (required). + pub url: String, + /// HTTP method (default: POST). + pub method: Option<HttpMethod>, + /// Request timeout as a human-readable duration string, e.g. "30s" (default: 30s). + pub timeout: Option<String>, + /// Maximum HTTP body size in bytes (default: 10MB). Set to 0 to disable. + pub max_payload_size_bytes: Option<u64>, + /// Custom HTTP headers. + pub headers: Option<HashMap<String, String>>, + /// Payload formatting mode (default: individual). + pub batch_mode: Option<BatchMode>, + /// Include Iggy metadata envelope in payload (default: true). + pub include_metadata: Option<bool>, + /// Include message checksum in metadata (default: false). + pub include_checksum: Option<bool>, + /// Include origin timestamp in metadata (default: false). + pub include_origin_timestamp: Option<bool>, + /// Enable health check request in open() (default: false). + pub health_check_enabled: Option<bool>, + /// HTTP method for health check (default: HEAD). + pub health_check_method: Option<HttpMethod>, + /// Maximum number of retries for transient errors (default: 3). + pub max_retries: Option<u32>, + /// Retry delay as a human-readable duration string, e.g. "1s" (default: 1s). + pub retry_delay: Option<String>, + /// Backoff multiplier for exponential retry delay (default: 2.0). + pub retry_backoff_multiplier: Option<f64>, + /// Maximum retry delay cap as a human-readable duration string (default: 30s). + pub max_retry_delay: Option<String>, + /// HTTP status codes considered successful (default: [200, 201, 202, 204]). + pub success_status_codes: Option<Vec<u16>>, + /// Accept invalid TLS certificates (default: false). Named to signal danger. + pub tls_danger_accept_invalid_certs: Option<bool>, + /// Maximum idle connections per host (default: 10). + pub max_connections: Option<usize>, + /// Enable verbose request/response logging (default: false). + pub verbose_logging: Option<bool>, +} + +/// HTTP sink connector that delivers consumed messages to any HTTP endpoint. +/// +/// Lifecycle: `new()` → `open()` → `consume()` (repeated) → `close()`. +/// The `reqwest::Client` is built in `open()` (not `new()`) so that config-derived +/// settings (timeout, TLS, connection pool) are applied. This matches the +/// MongoDB/Elasticsearch/PostgreSQL sink initialization pattern. +#[derive(Debug)] +pub struct HttpSink { + id: u32, + url: String, + method: HttpMethod, + timeout: Duration, + max_payload_size_bytes: u64, + headers: HashMap<String, String>, + batch_mode: BatchMode, + include_metadata: bool, + include_checksum: bool, + include_origin_timestamp: bool, + health_check_enabled: bool, + health_check_method: HttpMethod, + max_retries: u32, + retry_delay: Duration, + retry_backoff_multiplier: f64, + max_retry_delay: Duration, + success_status_codes: HashSet<u16>, + tls_danger_accept_invalid_certs: bool, + max_connections: usize, + verbose: bool, + /// Pre-built HTTP headers (excluding Content-Type). Built once in `open()` from validated + /// `self.headers`, reused for every request. `None` before `open()` is called. + request_headers: Option<reqwest::header::HeaderMap>, + /// Initialized in `open()` with config-derived settings. `None` before `open()` is called. + client: Option<reqwest::Client>, + requests_sent: AtomicU64, + messages_delivered: AtomicU64, + errors_count: AtomicU64, + retries_count: AtomicU64, + /// Epoch seconds of last successful HTTP request. + last_success_timestamp: AtomicU64, +} + +/// Parse a human-readable duration string, falling back to a default on failure. +fn parse_duration(input: Option<&str>, default: &str) -> Duration { + let raw = input.unwrap_or(default); + HumanDuration::from_str(raw) + .map(|d| *d) + .unwrap_or_else(|e| { + warn!( + "Invalid duration '{}': {}, using default '{}'", + raw, e, default + ); + *HumanDuration::from_str(default).expect("default duration must be valid") + }) +} + +impl HttpSink { + pub fn new(id: u32, config: HttpSinkConfig) -> Self { + let url = config.url; + let method = config.method.unwrap_or_default(); + let timeout = parse_duration(config.timeout.as_deref(), DEFAULT_TIMEOUT); + let max_payload_size_bytes = config + .max_payload_size_bytes + .unwrap_or(DEFAULT_MAX_PAYLOAD_SIZE); + let headers = config.headers.unwrap_or_default(); + let batch_mode = config.batch_mode.unwrap_or_default(); + let include_metadata = config.include_metadata.unwrap_or(true); + let include_checksum = config.include_checksum.unwrap_or(false); + let include_origin_timestamp = config.include_origin_timestamp.unwrap_or(false); + let health_check_enabled = config.health_check_enabled.unwrap_or(false); + let health_check_method = config.health_check_method.unwrap_or(HttpMethod::Head); + let max_retries = config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES); + let retry_delay = parse_duration(config.retry_delay.as_deref(), DEFAULT_RETRY_DELAY); + let retry_backoff_multiplier = config + .retry_backoff_multiplier + .unwrap_or(DEFAULT_BACKOFF_MULTIPLIER) + .max(1.0); + let max_retry_delay = + parse_duration(config.max_retry_delay.as_deref(), DEFAULT_MAX_RETRY_DELAY); + let success_status_codes: HashSet<u16> = config + .success_status_codes + .unwrap_or_else(|| vec![200, 201, 202, 204]) + .into_iter() + .collect(); + let tls_danger_accept_invalid_certs = + config.tls_danger_accept_invalid_certs.unwrap_or(false); + let max_connections = config.max_connections.unwrap_or(DEFAULT_MAX_CONNECTIONS); + let verbose = config.verbose_logging.unwrap_or(false); + + if retry_delay > max_retry_delay { + warn!( + "HTTP sink ID: {} — retry_delay ({:?}) exceeds max_retry_delay ({:?}). \ + All retry delays will be capped to max_retry_delay.", + id, retry_delay, max_retry_delay, + ); + } + + if tls_danger_accept_invalid_certs { + warn!( + "HTTP sink ID: {} — tls_danger_accept_invalid_certs is enabled. \ + TLS certificate validation is DISABLED.", + id + ); + } + + if batch_mode == BatchMode::Raw && include_metadata { + warn!( + "HTTP sink ID: {} — batch_mode=raw ignores include_metadata. \ + Raw mode sends payload bytes directly without metadata envelope.", + id + ); + } + + if matches!(method, HttpMethod::Get | HttpMethod::Head) + && batch_mode != BatchMode::Individual + { + warn!( + "HTTP sink ID: {} — {:?} with batch_mode={:?} will send a request body. \ + Some servers may reject GET/HEAD requests with a body.", + id, method, batch_mode, + ); + } + + HttpSink { + id, + url, + method, + timeout, + max_payload_size_bytes, + headers, + batch_mode, + include_metadata, + include_checksum, + include_origin_timestamp, + health_check_enabled, + health_check_method, + max_retries, + retry_delay, + retry_backoff_multiplier, + max_retry_delay, + success_status_codes, + tls_danger_accept_invalid_certs, + max_connections, + verbose, + request_headers: None, + client: None, + requests_sent: AtomicU64::new(0), + messages_delivered: AtomicU64::new(0), + errors_count: AtomicU64::new(0), + retries_count: AtomicU64::new(0), + last_success_timestamp: AtomicU64::new(0), + } + } + + /// Build the `reqwest::Client` from resolved config. + fn build_client(&self) -> Result<reqwest::Client, Error> { + let builder = reqwest::Client::builder() + .timeout(self.timeout) + .pool_max_idle_per_host(self.max_connections) + .pool_idle_timeout(Duration::from_secs(DEFAULT_POOL_IDLE_TIMEOUT_SECS)) + .tcp_keepalive(Duration::from_secs(DEFAULT_TCP_KEEPALIVE_SECS)) + .danger_accept_invalid_certs(self.tls_danger_accept_invalid_certs); + + builder + .build() + .map_err(|e| Error::InitError(format!("Failed to build HTTP client: {}", e))) + } + + /// Determine the Content-Type header based on batch mode. + fn content_type(&self) -> &'static str { + match self.batch_mode { + BatchMode::Individual | BatchMode::JsonArray => "application/json", + BatchMode::Ndjson => "application/x-ndjson", + BatchMode::Raw => "application/octet-stream", + } + } + + /// Convert a `Payload` to a JSON value for metadata wrapping. + /// Non-JSON payloads are base64-encoded with a `iggy_payload_encoding` marker. + /// + /// Note: All current `Payload` variants produce infallible conversions. + /// The `Result` return type exists as a safety net for future variants. + fn payload_to_json(&self, payload: Payload) -> Result<serde_json::Value, Error> { + match payload { + Payload::Json(value) => { + // Direct structural conversion (not serialization roundtrip). + // Follows the Elasticsearch sink pattern. NaN/Infinity f64 → null. + Ok(owned_value_to_serde_json(&value)) + } + Payload::Text(text) => Ok(serde_json::Value::String(text)), + Payload::Raw(bytes) | Payload::FlatBuffer(bytes) => Ok(serde_json::json!({ + "data": general_purpose::STANDARD.encode(&bytes), + "iggy_payload_encoding": "base64" + })), + Payload::Proto(proto_str) => Ok(serde_json::json!({ + "data": general_purpose::STANDARD.encode(proto_str.as_bytes()), + "iggy_payload_encoding": "base64" + })), + } + } + + /// Build a message envelope with optional metadata wrapping. + fn build_envelope( + &self, + message: &ConsumedMessage, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + payload_json: serde_json::Value, + ) -> serde_json::Value { + if !self.include_metadata { + return payload_json; + } + + let mut metadata = serde_json::json!({ + "iggy_id": format_u128_as_uuid(message.id), + "iggy_offset": message.offset, + "iggy_timestamp": message.timestamp, + "iggy_stream": topic_metadata.stream, + "iggy_topic": topic_metadata.topic, + "iggy_partition_id": messages_metadata.partition_id, + }); + + if self.include_checksum { + metadata["iggy_checksum"] = serde_json::json!(message.checksum); + } + + if self.include_origin_timestamp { + metadata["iggy_origin_timestamp"] = serde_json::json!(message.origin_timestamp); + } + + if let Some(ref headers) = message.headers + && !headers.is_empty() + { + let headers_map: serde_json::Map<String, serde_json::Value> = headers + .iter() + .map(|(k, v)| { + // Raw bytes: base64-encode to avoid Rust debug format in JSON output. + // as_raw() returns Ok only for HeaderKind::Raw. + let value = if let Ok(raw) = v.as_raw() { + serde_json::json!({ + "data": general_purpose::STANDARD.encode(raw), + "iggy_header_encoding": "base64" + }) + } else { + serde_json::Value::String(v.to_string_value()) + }; + (k.to_string_value(), value) + }) + .collect(); + metadata["iggy_headers"] = serde_json::Value::Object(headers_map); + } + + serde_json::json!({ + "metadata": metadata, + "payload": payload_json, + }) + } + + /// Classify whether an HTTP status code is transient (worth retrying). + fn is_transient_status(status: reqwest::StatusCode) -> bool { + matches!(status.as_u16(), 429 | 500 | 502 | 503 | 504) + } + + /// Extract `Retry-After` header value as a Duration (seconds), capped to `max_retry_delay`. + fn parse_retry_after(&self, response: &reqwest::Response) -> Option<Duration> { + let header_raw = response.headers().get(reqwest::header::RETRY_AFTER)?; + let header_value = match header_raw.to_str() { + Ok(s) => s, + Err(e) => { + warn!( + "HTTP sink ID: {} — Retry-After header contains non-ASCII bytes: {}. \ + Using computed backoff.", + self.id, e, + ); + return None; + } + }; + match header_value.parse::<u64>() { + Ok(secs) => Some(Duration::from_secs(secs).min(self.max_retry_delay)), + Err(_) => { + warn!( + "HTTP sink ID: {} — Retry-After header '{}' is not an integer delay; \ + HTTP-date format is not supported. Using computed backoff.", + self.id, header_value, + ); + None + } + } + } + + /// Compute the retry delay for a given attempt, applying exponential backoff + /// capped at `max_retry_delay`. Clamps before `Duration::from_secs_f64` to avoid + /// panics when extreme backoff configs produce infinity (e.g., multiplier=1000, retries=200). + fn compute_retry_delay(&self, attempt: u32) -> Duration { + let delay_secs = + self.retry_delay.as_secs_f64() * self.retry_backoff_multiplier.powi(attempt as i32); + let capped_secs = delay_secs.min(self.max_retry_delay.as_secs_f64()); + if !capped_secs.is_finite() || capped_secs < 0.0 { + return self.max_retry_delay; + } + Duration::from_secs_f64(capped_secs) + } + + /// Record a successful request timestamp. + fn record_success(&self) { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + self.last_success_timestamp.store(now, Ordering::Relaxed); + } + + /// Send an HTTP request with retry logic. Returns Ok on success, Err after exhausting retries. + /// + /// Takes `Bytes` instead of `Vec<u8>` so retries clone via reference-count increment (O(1)) + /// rather than copying the entire payload on each attempt. + async fn send_with_retry( + &self, + client: &reqwest::Client, + body: Bytes, + content_type: &str, + ) -> Result<(), Error> { + let mut attempt = 0u32; + + loop { + let headers = self.request_headers.as_ref().ok_or_else(|| { + Error::InitError("HTTP headers not initialized — was open() called?".to_string()) + })?; + let request = build_request(self.method, client, &self.url) + .headers(headers.clone()) + .header("content-type", content_type) + .body(body.clone()) + .build() + .map_err(|e| Error::HttpRequestFailed(format!("Request build error: {}", e)))?; + + if self.verbose { + debug!( + "HTTP sink ID: {} — sending {} {} (attempt {}/{}, {} bytes)", + self.id, + request.method(), + request.url(), + attempt + 1, + self.max_retries + 1, + body.len(), + ); + } + + self.requests_sent.fetch_add(1, Ordering::Relaxed); + + match client.execute(request).await { + Ok(response) => { + let status = response.status(); + + // Check for Retry-After before consuming the response + let retry_after = self.parse_retry_after(&response); + + if self.success_status_codes.contains(&status.as_u16()) { + if self.verbose { + debug!( + "HTTP sink ID: {} — success (status {})", + self.id, + status.as_u16() + ); + } + self.record_success(); + return Ok(()); + } + + // Non-success status — read body for diagnostics + let response_body = match response.text().await { + Ok(body) => body, + Err(e) => format!("<body read error: {}>", e), + }; + + if Self::is_transient_status(status) && attempt < self.max_retries { + let delay = + retry_after.unwrap_or_else(|| self.compute_retry_delay(attempt)); + warn!( + "HTTP sink ID: {} — transient error (status {}, attempt {}/{}). \ + Retrying in {:?}. Response: {}", + self.id, + status.as_u16(), + attempt + 1, + self.max_retries + 1, + delay, + truncate_response(&response_body, 200), + ); + self.retries_count.fetch_add(1, Ordering::Relaxed); + tokio::time::sleep(delay).await; + attempt += 1; + continue; + } + + // Non-transient or retries exhausted + error!( + "HTTP sink ID: {} — request failed (status {}, attempt {}/{}). \ + Response: {}", + self.id, + status.as_u16(), + attempt + 1, + self.max_retries + 1, + truncate_response(&response_body, 500), + ); + self.errors_count.fetch_add(1, Ordering::Relaxed); + return Err(Error::HttpRequestFailed(format!( + "HTTP {} — status: {}", + self.url, + status.as_u16() + ))); + } + Err(network_err) => { + if attempt < self.max_retries { + let delay = self.compute_retry_delay(attempt); + warn!( + "HTTP sink ID: {} — network error (attempt {}/{}): {}. \ + Retrying in {:?}.", + self.id, + attempt + 1, + self.max_retries + 1, + network_err, + delay, + ); + self.retries_count.fetch_add(1, Ordering::Relaxed); + tokio::time::sleep(delay).await; + attempt += 1; + continue; + } + + error!( + "HTTP sink ID: {} — network error after {} attempts: {}", + self.id, + attempt + 1, + network_err, + ); + self.errors_count.fetch_add(1, Ordering::Relaxed); + return Err(Error::HttpRequestFailed(format!( + "Network error after {} attempts: {}", + attempt + 1, + network_err + ))); + } + } + } + } + + /// Shared per-message send loop for `individual` and `raw` modes. + /// + /// Iterates `messages`, builds a body for each via `build_body`, enforces payload size + /// limits, sends via `send_with_retry`, and tracks partial delivery. + /// Aborts after `MAX_CONSECUTIVE_FAILURES` consecutive HTTP failures. + /// + /// `build_body` takes ownership of each `ConsumedMessage` — callers must extract + /// all needed fields (payload, metadata) within the closure. + async fn send_per_message<F>( + &self, + client: &reqwest::Client, + messages: Vec<ConsumedMessage>, + content_type: &str, + mode_name: &str, + mut build_body: F, + ) -> Result<(), Error> + where + F: FnMut(ConsumedMessage) -> Result<Vec<u8>, Error>, + { + let total = messages.len(); + let mut delivered = 0u64; + let mut http_failures = 0u64; + let mut serialization_failures = 0u64; + let mut consecutive_failures = 0u32; + let mut last_error: Option<Error> = None; + + for message in messages { + let offset = message.offset; + let body = match build_body(message) { + Ok(b) => b, + Err(e) => { + error!( + "HTTP sink ID: {} — failed to build {} body at offset {}: {}", + self.id, mode_name, offset, e + ); + self.errors_count.fetch_add(1, Ordering::Relaxed); + serialization_failures += 1; + last_error = Some(e); + continue; + } + }; + + if self.max_payload_size_bytes > 0 && body.len() as u64 > self.max_payload_size_bytes { + error!( + "HTTP sink ID: {} — {} payload at offset {} exceeds max size ({} > {} bytes). Skipping.", + self.id, + mode_name, + offset, + body.len(), + self.max_payload_size_bytes, + ); + self.errors_count.fetch_add(1, Ordering::Relaxed); + serialization_failures += 1; + last_error = Some(Error::HttpRequestFailed(format!( + "Payload exceeds max size: {} bytes", + body.len() + ))); + continue; + } + + match self + .send_with_retry(client, Bytes::from(body), content_type) + .await + { + Ok(()) => { + delivered += 1; + consecutive_failures = 0; + } + Err(e) => { + error!( + "HTTP sink ID: {} — failed to deliver {} message at offset {} after retries: {}", + self.id, mode_name, offset, e + ); + http_failures += 1; + consecutive_failures += 1; + last_error = Some(e); + + if consecutive_failures >= MAX_CONSECUTIVE_FAILURES { + let processed = delivered + http_failures + serialization_failures; + debug_assert!( + processed <= total as u64, + "processed ({processed}) > total ({total}) — accounting bug" + ); + let skipped = (total as u64).saturating_sub(processed); + error!( + "HTTP sink ID: {} — aborting {} batch after {} consecutive HTTP failures \ + ({} remaining messages skipped)", + self.id, mode_name, consecutive_failures, skipped, + ); + self.errors_count.fetch_add(skipped, Ordering::Relaxed); + break; + } + } + } + } + + self.messages_delivered + .fetch_add(delivered, Ordering::Relaxed); + + match last_error { + Some(e) => { + error!( + "HTTP sink ID: {} — partial {} delivery: {}/{} delivered, \ + {} HTTP failures, {} serialization errors", + self.id, mode_name, delivered, total, http_failures, serialization_failures, + ); + Err(e) + } + None => Ok(()), + } + } + + /// Send messages in `individual` mode — one HTTP request per message. + async fn send_individual( + &self, + client: &reqwest::Client, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + messages: Vec<ConsumedMessage>, + ) -> Result<(), Error> { + self.send_per_message( + client, + messages, + self.content_type(), + "individual", + |mut message| { + let payload = std::mem::replace(&mut message.payload, Payload::Raw(vec![])); + let payload_json = self.payload_to_json(payload)?; + let envelope = + self.build_envelope(&message, topic_metadata, messages_metadata, payload_json); + serde_json::to_vec(&envelope) + .map_err(|e| Error::Serialization(format!("Envelope serialize: {}", e))) + }, + ) + .await + } + + /// Sends a batch body and updates delivery/error accounting. + /// + /// Shared by `send_ndjson` and `send_json_array` — the post-send accounting logic + /// (error propagation, skip warnings) is identical across batch modes. + async fn send_batch_body( + &self, + client: &reqwest::Client, + body: Bytes, + count: u64, + skipped: u64, + batch_mode: &str, + ) -> Result<(), Error> { + debug_assert!( + count > 0, + "send_batch_body called with count=0 — callers must guard against empty batches" + ); + if let Err(e) = self + .send_with_retry(client, body, self.content_type()) + .await + { + // send_with_retry already added 1 to errors_count for the HTTP failure. + // Add the remaining messages that were serialized but not delivered. + if count > 1 { + self.errors_count.fetch_add(count - 1, Ordering::Relaxed); + } + if skipped > 0 { + error!( + "HTTP sink ID: {} — {} batch failed with {} serialization skips", + self.id, batch_mode, skipped, + ); + } + return Err(e); + } + self.messages_delivered.fetch_add(count, Ordering::Relaxed); + if skipped > 0 { + warn!( + "HTTP sink ID: {} — {} batch: {} delivered, {} skipped (serialization errors)", + self.id, batch_mode, count, skipped, + ); + } + Ok(()) + } + + /// Send messages in `ndjson` mode — all messages in one request, newline-delimited. + /// Skips individual messages that fail serialization rather than aborting the batch. + async fn send_ndjson( + &self, + client: &reqwest::Client, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + messages: Vec<ConsumedMessage>, + ) -> Result<(), Error> { + let mut lines = Vec::with_capacity(messages.len()); + let mut skipped = 0u64; + + for mut message in messages { + let payload = std::mem::replace(&mut message.payload, Payload::Raw(vec![])); + let payload_json = match self.payload_to_json(payload) { + Ok(json) => json, + Err(e) => { + error!( + "HTTP sink ID: {} — skipping message at offset {} in NDJSON batch: {}", + self.id, message.offset, e + ); + self.errors_count.fetch_add(1, Ordering::Relaxed); + skipped += 1; + continue; + } + }; + let envelope = + self.build_envelope(&message, topic_metadata, messages_metadata, payload_json); + match serde_json::to_string(&envelope) { + Ok(line) => lines.push(line), + Err(e) => { + error!( + "HTTP sink ID: {} — skipping message at offset {} in NDJSON batch (serialize): {}", + self.id, message.offset, e + ); + self.errors_count.fetch_add(1, Ordering::Relaxed); + skipped += 1; + continue; + } + } + } + + if lines.is_empty() { + return Err(Error::Serialization( + "All messages in NDJSON batch failed serialization".to_string(), + )); + } + + let count = lines.len() as u64; + + let mut body_str = lines.join("\n"); + body_str.push('\n'); // NDJSON spec requires trailing newline + let body = body_str.into_bytes(); + + if self.max_payload_size_bytes > 0 && body.len() as u64 > self.max_payload_size_bytes { + error!( + "HTTP sink ID: {} — NDJSON batch exceeds max payload size ({} > {} bytes)", + self.id, + body.len(), + self.max_payload_size_bytes, + ); + // Count all successfully-serialized messages as errors (skipped already counted individually) + self.errors_count.fetch_add(count, Ordering::Relaxed); + return Err(Error::HttpRequestFailed(format!( + "NDJSON batch exceeds max size: {} bytes", + body.len() + ))); + } + + self.send_batch_body(client, Bytes::from(body), count, skipped, "NDJSON") + .await + } + + /// Send messages in `json_array` mode — all messages as a single JSON array. + /// Skips individual messages that fail serialization rather than aborting the batch. + async fn send_json_array( + &self, + client: &reqwest::Client, + topic_metadata: &TopicMetadata, + messages_metadata: &MessagesMetadata, + messages: Vec<ConsumedMessage>, + ) -> Result<(), Error> { + let mut envelopes = Vec::with_capacity(messages.len()); + let mut skipped = 0u64; + + for mut message in messages { + let payload = std::mem::replace(&mut message.payload, Payload::Raw(vec![])); + let payload_json = match self.payload_to_json(payload) { + Ok(json) => json, + Err(e) => { + error!( + "HTTP sink ID: {} — skipping message at offset {} in JSON array batch: {}", + self.id, message.offset, e + ); + self.errors_count.fetch_add(1, Ordering::Relaxed); + skipped += 1; + continue; + } + }; + let envelope = + self.build_envelope(&message, topic_metadata, messages_metadata, payload_json); + envelopes.push(envelope); + } + + if envelopes.is_empty() { + return Err(Error::Serialization( + "All messages in JSON array batch failed serialization".to_string(), + )); + } + + let count = envelopes.len() as u64; + + let body = match serde_json::to_vec(&envelopes) { + Ok(b) => b, + Err(e) => { + error!( + "HTTP sink ID: {} — failed to serialize JSON array batch \ + ({} envelopes, {} skipped): {}", + self.id, + envelopes.len(), + skipped, + e, + ); + // Count all successfully-built envelopes as errors (skipped already counted individually) + self.errors_count.fetch_add(count, Ordering::Relaxed); + return Err(Error::Serialization(format!( + "JSON array serialize ({} envelopes): {}", + envelopes.len(), + e + ))); + } + }; + + if self.max_payload_size_bytes > 0 && body.len() as u64 > self.max_payload_size_bytes { + error!( + "HTTP sink ID: {} — JSON array batch exceeds max payload size ({} > {} bytes)", + self.id, + body.len(), + self.max_payload_size_bytes, + ); + // Count all successfully-serialized messages as errors (skipped already counted individually) + self.errors_count.fetch_add(count, Ordering::Relaxed); + return Err(Error::HttpRequestFailed(format!( + "JSON array batch exceeds max size: {} bytes", + body.len() + ))); + } + + self.send_batch_body(client, Bytes::from(body), count, skipped, "JSON array") + .await + } + + /// Send messages in `raw` mode — one HTTP request per message with raw bytes. + async fn send_raw( + &self, + client: &reqwest::Client, + messages: Vec<ConsumedMessage>, + ) -> Result<(), Error> { + self.send_per_message(client, messages, self.content_type(), "raw", |message| { + message + .payload + .try_into_vec() + .map_err(|e| Error::Serialization(format!("Raw payload convert: {}", e))) + }) + .await + } +} + +/// Map an `HttpMethod` to a `reqwest::RequestBuilder` for the given URL. +fn build_request( + method: HttpMethod, + client: &reqwest::Client, + url: &str, +) -> reqwest::RequestBuilder { + match method { + HttpMethod::Get => client.get(url), + HttpMethod::Head => client.head(url), + HttpMethod::Post => client.post(url), + HttpMethod::Put => client.put(url), + HttpMethod::Patch => client.patch(url), + HttpMethod::Delete => client.delete(url), + } +} + +/// Format a u128 message ID as an RFC 4122 v8 (custom) UUID. +/// +/// Uses `Uuid::new_v8()` which sets version=8 and variant=RFC4122 bits, +/// producing UUIDs that downstream libraries accept as valid. +/// Note: `new_v8()` overwrites 6 bits (version nibble + variant bits), so the +/// UUID is not round-trippable to the original u128 value. +fn format_u128_as_uuid(id: u128) -> String { + uuid::Uuid::new_v8(id.to_be_bytes()).to_string() Review Comment: Great question. Here's the trade-off analysis: ### UUID format options | Option | What happens | Pros | Cons | |--------|-------------|------|------| | **v4 (random)** | Fresh random UUID per message | Valid RFC 4122, universally accepted | Loses all correlation to Iggy message ID — can't trace back to source message | | **v8 (current)** | Encodes u128 message ID with version/variant bits | Valid UUID, downstream libraries accept it, mostly preserves original ID | Overwrites 6 bits — not round-trippable, theoretical 1-in-64 collision window | | **Simple hex** | `01234567-89ab-cdef-0123-456789abcdef` (no version bits) | Lossless — exact u128 preserved, fully round-trippable | Not a valid UUID — downstream `parse_uuid()` may reject it | | **Configurable** | User picks format in config | Maximum flexibility | Adds config surface, complexity, testing burden for a display format | ### Recommendation Keep **simple hex** (lossless, original approach) — it preserves the exact Iggy message ID for debugging/correlation, which is more valuable for an Iggy connector than strict UUID compliance. Downstream consumers already treat `iggy_id` as an opaque identifier, not as a UUID they need to validate. If strict UUID compliance is needed, v8 is the right choice — but it sacrifices 6 bits of the original ID. Happy to go with whatever you prefer here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
