mlevkov commented on code in PR #2925:
URL: https://github.com/apache/iggy/pull/2925#discussion_r3019483663


##########
core/connectors/sinks/http_sink/src/lib.rs:
##########
@@ -0,0 +1,2069 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use base64::Engine;
+use base64::engine::general_purpose;
+use bytes::Bytes;
+use humantime::Duration as HumanDuration;
+use iggy_connector_sdk::{
+    ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata,
+    convert::owned_value_to_serde_json, sink_connector,
+};
+use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
+use reqwest_retry::{
+    RetryTransientMiddleware, Retryable, RetryableStrategy, 
policies::ExponentialBackoff,
+};
+use reqwest_tracing::TracingMiddleware;
+use serde::{Deserialize, Serialize};
+use std::collections::{HashMap, HashSet};
+use std::str::FromStr;
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use tracing::{debug, error, info, warn};
+
+sink_connector!(HttpSink);
+
+const DEFAULT_TIMEOUT: &str = "30s";
+const DEFAULT_RETRY_DELAY: &str = "1s";
+const DEFAULT_MAX_RETRY_DELAY: &str = "30s";
+const DEFAULT_MAX_RETRIES: u32 = 3;
+const DEFAULT_BACKOFF_MULTIPLIER: u32 = 2;
+const DEFAULT_MAX_PAYLOAD_SIZE: u64 = 10 * 1024 * 1024; // 10 MB
+const DEFAULT_MAX_CONNECTIONS: usize = 10;
+/// TCP keep-alive interval for detecting dead connections behind load 
balancers.
+/// Cloud LBs silently drop idle connections (AWS ALB ~60s, GCP ~600s);
+/// probing at 30s detects these before requests fail.
+const DEFAULT_TCP_KEEPALIVE_SECS: u64 = 30;
+/// Close pooled connections unused for this long. Prevents stale connections
+/// from accumulating when traffic is bursty.
+const DEFAULT_POOL_IDLE_TIMEOUT_SECS: u64 = 90;
+/// Abort remaining messages in individual/raw mode after this many 
consecutive HTTP failures.
+/// Prevents hammering a dead endpoint with N sequential retry cycles per poll.
+const MAX_CONSECUTIVE_FAILURES: u32 = 3;
+
+const ENCODING_BASE64: &str = "base64";
+
+/// HTTP method enum — validated at deserialization, prevents invalid values 
like "DELEET" or "GETX".
+#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "UPPERCASE")]
+pub enum HttpMethod {
+    Get,
+    Head,
+    #[default]
+    Post,
+    Put,
+    Patch,
+    Delete,
+}
+
+/// Payload formatting mode for HTTP requests.
+#[derive(
+    Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, 
strum_macros::Display,
+)]
+#[serde(rename_all = "snake_case")]
+pub enum BatchMode {
+    /// One HTTP request per message (default). Note: with batch_length=50, 
this produces 50
+    /// sequential HTTP round trips per poll cycle. Use ndjson or json_array 
for higher throughput.
+    #[default]
+    #[strum(to_string = "individual")]
+    Individual,
+    /// All messages in one request, newline-delimited JSON.
+    #[strum(to_string = "NDJSON")]
+    NdJson,
+    /// All messages as a single JSON array.
+    #[strum(to_string = "JSON array")]
+    JsonArray,
+    /// Raw bytes, one request per message (for non-JSON payloads).
+    #[strum(to_string = "raw")]
+    Raw,
+}
+
+impl BatchMode {
+    /// Determine the Content-Type header based on batch mode.
+    fn content_type(&self) -> &'static str {
+        match self {
+            BatchMode::Individual | BatchMode::JsonArray => "application/json",
+            BatchMode::NdJson => "application/x-ndjson",
+            BatchMode::Raw => "application/octet-stream",
+        }
+    }
+}
+
+/// Metadata envelope wrapping a payload with Iggy message metadata.
+#[derive(Debug, Serialize)]
+struct MetadataEnvelope {
+    metadata: IggyMetadata,
+    payload: serde_json::Value,
+}
+
+/// Iggy message metadata fields.
+#[derive(Debug, Serialize)]
+struct IggyMetadata {
+    iggy_id: String,
+    iggy_offset: u64,
+    iggy_timestamp: u64,
+    iggy_stream: String,
+    iggy_topic: String,
+    iggy_partition_id: u32,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    iggy_checksum: Option<u64>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    iggy_origin_timestamp: Option<u64>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    iggy_headers: Option<serde_json::Map<String, serde_json::Value>>,
+}
+
+/// Binary payload with base64 encoding marker.
+#[derive(Debug, Serialize)]
+struct EncodedPayload {
+    data: String,
+    iggy_payload_encoding: &'static str,
+}
+
+/// Binary header value with base64 encoding marker.
+#[derive(Debug, Serialize)]
+struct EncodedHeader {
+    data: String,
+    iggy_header_encoding: &'static str,
+}
+
+/// Configuration for the HTTP sink connector, deserialized from 
[plugin_config] in config.toml.
+#[derive(Debug, Serialize, Deserialize)]
+pub struct HttpSinkConfig {
+    /// Target URL for HTTP requests (required).
+    pub url: String,
+    /// HTTP method (default: POST).
+    pub method: Option<HttpMethod>,
+    /// Request timeout as a human-readable duration string, e.g. "30s" 
(default: 30s).
+    pub timeout: Option<String>,
+    /// Maximum HTTP body size in bytes (default: 10MB). Set to 0 to disable.
+    pub max_payload_size_bytes: Option<u64>,
+    /// Custom HTTP headers.
+    pub headers: Option<HashMap<String, String>>,
+    /// Payload formatting mode (default: individual).
+    pub batch_mode: Option<BatchMode>,
+    /// Include Iggy metadata envelope in payload (default: true).
+    pub include_metadata: Option<bool>,
+    /// Include message checksum in metadata (default: false).
+    pub include_checksum: Option<bool>,
+    /// Include origin timestamp in metadata (default: false).
+    pub include_origin_timestamp: Option<bool>,
+    /// Enable health check request in open() (default: false).
+    pub health_check_enabled: Option<bool>,
+    /// HTTP method for health check (default: HEAD).
+    pub health_check_method: Option<HttpMethod>,
+    /// Maximum number of retries for transient errors (default: 3).
+    pub max_retries: Option<u32>,
+    /// Retry delay as a human-readable duration string, e.g. "1s" (default: 
1s).
+    pub retry_delay: Option<String>,
+    /// Backoff multiplier for exponential retry delay (default: 2).
+    pub retry_backoff_multiplier: Option<u32>,
+    /// Maximum retry delay cap as a human-readable duration string (default: 
30s).
+    pub max_retry_delay: Option<String>,
+    /// HTTP status codes considered successful (default: [200, 201, 202, 
204]).
+    pub success_status_codes: Option<Vec<u16>>,
+    /// Accept invalid TLS certificates (default: false). Named to signal 
danger.
+    pub tls_danger_accept_invalid_certs: Option<bool>,
+    /// Maximum idle connections per host (default: 10).
+    pub max_connections: Option<usize>,
+    /// Enable verbose request/response logging (default: false).
+    pub verbose_logging: Option<bool>,
+}
+
+/// HTTP sink connector that delivers consumed messages to any HTTP endpoint.
+///
+/// Lifecycle: `new()` → `open()` → `consume()` (repeated) → `close()`.
+/// The `reqwest::Client` is built in `open()` (not `new()`) so that 
config-derived
+/// settings (timeout, TLS, connection pool) are applied. This matches the
+/// MongoDB/Elasticsearch/PostgreSQL sink initialization pattern.
+#[derive(Debug)]
+pub struct HttpSink {
+    id: u32,
+    url: String,
+    method: HttpMethod,
+    timeout: Duration,
+    max_payload_size_bytes: u64,
+    headers: HashMap<String, String>,
+    batch_mode: BatchMode,
+    include_metadata: bool,
+    include_checksum: bool,
+    include_origin_timestamp: bool,
+    health_check_enabled: bool,
+    health_check_method: HttpMethod,
+    max_retries: u32,
+    retry_delay: Duration,
+    retry_backoff_multiplier: u32,
+    max_retry_delay: Duration,
+    success_status_codes: HashSet<u16>,
+    tls_danger_accept_invalid_certs: bool,
+    max_connections: usize,
+    verbose: bool,
+    /// Pre-built HTTP headers (excluding Content-Type). Built once in 
`open()` from validated
+    /// `self.headers`, reused for every request. `None` before `open()` is 
called.
+    request_headers: Option<reqwest::header::HeaderMap>,
+    /// Initialized in `open()` with config-derived settings. `None` before 
`open()` is called.
+    client: Option<ClientWithMiddleware>,
+    send_attempts: AtomicU64,
+    messages_delivered: AtomicU64,
+    errors_count: AtomicU64,
+    /// Epoch seconds of last successful HTTP request.
+    last_success_timestamp: AtomicU64,
+}
+
+impl HttpSink {
+    pub fn new(id: u32, config: HttpSinkConfig) -> Self {
+        let url = config.url;
+        let method = config.method.unwrap_or_default();
+        let timeout = parse_duration(config.timeout.as_deref(), 
DEFAULT_TIMEOUT);
+        let max_payload_size_bytes = config
+            .max_payload_size_bytes
+            .unwrap_or(DEFAULT_MAX_PAYLOAD_SIZE);
+        let headers = config.headers.unwrap_or_default();
+        let batch_mode = config.batch_mode.unwrap_or_default();
+        let include_metadata = config.include_metadata.unwrap_or(true);
+        let include_checksum = config.include_checksum.unwrap_or(false);
+        let include_origin_timestamp = 
config.include_origin_timestamp.unwrap_or(false);
+        let health_check_enabled = 
config.health_check_enabled.unwrap_or(false);
+        let health_check_method = 
config.health_check_method.unwrap_or(HttpMethod::Head);
+        let max_retries = config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES);
+        let mut retry_delay = parse_duration(config.retry_delay.as_deref(), 
DEFAULT_RETRY_DELAY);
+        let retry_backoff_multiplier = config
+            .retry_backoff_multiplier
+            .unwrap_or(DEFAULT_BACKOFF_MULTIPLIER)
+            .max(1);
+        let mut max_retry_delay =
+            parse_duration(config.max_retry_delay.as_deref(), 
DEFAULT_MAX_RETRY_DELAY);
+        let success_status_codes: HashSet<u16> = config
+            .success_status_codes
+            .unwrap_or_else(|| vec![200, 201, 202, 204])
+            .into_iter()
+            .collect();
+        let tls_danger_accept_invalid_certs =
+            config.tls_danger_accept_invalid_certs.unwrap_or(false);
+        let max_connections = 
config.max_connections.unwrap_or(DEFAULT_MAX_CONNECTIONS);
+        let verbose = config.verbose_logging.unwrap_or(false);
+
+        if retry_delay > max_retry_delay {
+            warn!(
+                "HTTP sink ID: {} — retry_delay ({:?}) exceeds max_retry_delay 
({:?}). \
+                 Swapping values to prevent ExponentialBackoff panic.",
+                id, retry_delay, max_retry_delay,
+            );
+            std::mem::swap(&mut retry_delay, &mut max_retry_delay);
+        }
+
+        if tls_danger_accept_invalid_certs {
+            warn!(
+                "HTTP sink ID: {} — tls_danger_accept_invalid_certs is 
enabled. \
+                 TLS certificate validation is DISABLED.",
+                id
+            );
+        }
+
+        if batch_mode == BatchMode::Raw && include_metadata {
+            warn!(
+                "HTTP sink ID: {} — batch_mode=raw ignores include_metadata. \
+                 Raw mode sends payload bytes directly without metadata 
envelope.",
+                id
+            );
+        }
+
+        if matches!(method, HttpMethod::Get | HttpMethod::Head)
+            && batch_mode != BatchMode::Individual
+        {
+            warn!(
+                "HTTP sink ID: {} — {:?} with batch_mode={:?} will send a 
request body. \
+                 Some servers may reject GET/HEAD requests with a body.",
+                id, method, batch_mode,
+            );
+        }
+
+        HttpSink {
+            id,
+            url,
+            method,
+            timeout,
+            max_payload_size_bytes,
+            headers,
+            batch_mode,
+            include_metadata,
+            include_checksum,
+            include_origin_timestamp,
+            health_check_enabled,
+            health_check_method,
+            max_retries,
+            retry_delay,
+            retry_backoff_multiplier,
+            max_retry_delay,
+            success_status_codes,
+            tls_danger_accept_invalid_certs,
+            max_connections,
+            verbose,
+            request_headers: None,
+            client: None,
+            send_attempts: AtomicU64::new(0),
+            messages_delivered: AtomicU64::new(0),
+            errors_count: AtomicU64::new(0),
+            last_success_timestamp: AtomicU64::new(0),
+        }
+    }
+
+    /// Build the `reqwest::Client` wrapped with retry and tracing middleware.
+    fn build_client(&self) -> Result<ClientWithMiddleware, Error> {
+        let raw_client = reqwest::Client::builder()
+            .timeout(self.timeout)
+            .pool_max_idle_per_host(self.max_connections)
+            
.pool_idle_timeout(Duration::from_secs(DEFAULT_POOL_IDLE_TIMEOUT_SECS))
+            .tcp_keepalive(Duration::from_secs(DEFAULT_TCP_KEEPALIVE_SECS))
+            .danger_accept_invalid_certs(self.tls_danger_accept_invalid_certs)
+            .build()
+            .map_err(|e| Error::InitError(format!("Failed to build HTTP 
client: {}", e)))?;
+
+        let retry_policy = ExponentialBackoff::builder()
+            .retry_bounds(self.retry_delay, self.max_retry_delay)
+            .base(self.retry_backoff_multiplier)
+            .build_with_max_retries(self.max_retries);
+
+        let retry_strategy = HttpSinkRetryStrategy {
+            success_status_codes: self.success_status_codes.clone(),
+        };
+
+        let retry_middleware =
+            
RetryTransientMiddleware::new_with_policy_and_strategy(retry_policy, 
retry_strategy);
+
+        Ok(ClientBuilder::new(raw_client)
+            .with(TracingMiddleware::default())
+            .with(retry_middleware)
+            .build())
+    }
+
+    /// Returns the initialized HTTP client, or an error if `open()` was not 
called.
+    fn client(&self) -> Result<&ClientWithMiddleware, Error> {
+        self.client.as_ref().ok_or_else(|| {
+            Error::InitError("HTTP client not initialized — was open() 
called?".to_string())
+        })
+    }
+
+    /// Convert a `Payload` to a JSON value for metadata wrapping.
+    /// Non-JSON payloads are base64-encoded with a `iggy_payload_encoding` 
marker.
+    ///
+    /// Note: All current `Payload` variants produce infallible conversions.
+    /// The `Result` return type exists as a safety net for future variants.
+    fn payload_to_json(&self, payload: Payload) -> Result<serde_json::Value, 
Error> {
+        match payload {
+            Payload::Json(value) => {
+                // Direct structural conversion (not serialization roundtrip).
+                // Follows the Elasticsearch sink pattern. NaN/Infinity f64 → 
null.
+                Ok(owned_value_to_serde_json(&value))
+            }
+            Payload::Text(text) => Ok(serde_json::Value::String(text)),
+            Payload::Raw(bytes) | Payload::FlatBuffer(bytes) => {
+                let encoded = EncodedPayload {
+                    data: general_purpose::STANDARD.encode(&bytes),
+                    iggy_payload_encoding: ENCODING_BASE64,
+                };
+                serde_json::to_value(encoded)
+                    .map_err(|e| Error::Serialization(format!("EncodedPayload: 
{}", e)))
+            }
+            Payload::Proto(proto_str) => {
+                let encoded = EncodedPayload {
+                    data: 
general_purpose::STANDARD.encode(proto_str.as_bytes()),
+                    iggy_payload_encoding: ENCODING_BASE64,
+                };
+                serde_json::to_value(encoded)
+                    .map_err(|e| Error::Serialization(format!("EncodedPayload: 
{}", e)))
+            }
+        }
+    }
+
+    /// Build a message envelope with optional metadata wrapping.
+    fn build_envelope(
+        &self,
+        message: &ConsumedMessage,
+        topic_metadata: &TopicMetadata,
+        messages_metadata: &MessagesMetadata,
+        payload_json: serde_json::Value,
+    ) -> Result<serde_json::Value, Error> {
+        if !self.include_metadata {
+            return Ok(payload_json);
+        }
+
+        let headers_map = if let Some(ref headers) = message.headers
+            && !headers.is_empty()
+        {
+            let map: serde_json::Map<String, serde_json::Value> = headers
+                .iter()
+                .map(|(k, v)| {
+                    // Raw bytes: base64-encode to avoid Rust debug format in 
JSON output.
+                    // as_raw() returns Ok only for HeaderKind::Raw.
+                    let value = if let Ok(raw) = v.as_raw() {
+                        let encoded = EncodedHeader {
+                            data: general_purpose::STANDARD.encode(raw),
+                            iggy_header_encoding: ENCODING_BASE64,
+                        };
+                        serde_json::to_value(encoded)
+                            .map_err(|e| 
Error::Serialization(format!("EncodedHeader: {}", e)))?
+                    } else {
+                        serde_json::Value::String(v.to_string_value())
+                    };
+                    Ok((k.to_string_value(), value))
+                })
+                .collect::<Result<serde_json::Map<String, serde_json::Value>, 
Error>>()?;
+            Some(map)
+        } else {
+            None
+        };
+
+        let metadata = IggyMetadata {
+            iggy_id: format_u128_as_hex(message.id),
+            iggy_offset: message.offset,
+            iggy_timestamp: message.timestamp,
+            iggy_stream: topic_metadata.stream.clone(),
+            iggy_topic: topic_metadata.topic.clone(),
+            iggy_partition_id: messages_metadata.partition_id,
+            iggy_checksum: if self.include_checksum {
+                Some(message.checksum)
+            } else {
+                None
+            },
+            iggy_origin_timestamp: if self.include_origin_timestamp {
+                Some(message.origin_timestamp)
+            } else {
+                None
+            },
+            iggy_headers: headers_map,
+        };
+
+        let envelope = MetadataEnvelope {
+            metadata,
+            payload: payload_json,
+        };
+
+        serde_json::to_value(envelope)
+            .map_err(|e| Error::Serialization(format!("MetadataEnvelope: {}", 
e)))
+    }
+
+    /// Record a successful request timestamp.
+    fn record_success(&self) {
+        let now = SystemTime::now()
+            .duration_since(UNIX_EPOCH)
+            .unwrap_or_default()
+            .as_secs();
+        self.last_success_timestamp.store(now, Ordering::Relaxed);
+    }
+
+    /// Send an HTTP request with retry via reqwest-middleware. Returns Ok on 
success,
+    /// Err after exhausting retries. Retry logic (backoff, transient 
classification)
+    /// is handled by the middleware configured in `build_client()`.
+    async fn send_with_retry(&self, body: Bytes, content_type: &str) -> 
Result<(), Error> {
+        let client = self.client()?;
+        let headers = self.request_headers.as_ref().ok_or_else(|| {
+            Error::InitError("HTTP headers not initialized — was open() 
called?".to_string())
+        })?;
+
+        if self.verbose {
+            debug!(
+                "HTTP sink ID: {} — sending {:?} {} ({} bytes)",
+                self.id,
+                self.method,
+                self.url,

Review Comment:
   Fixed. Added `sanitize_url_for_log()` that strips userinfo 
(`user:password@`) before logging. All log/error messages now use 
`self.log_url` (sanitized); only actual HTTP requests use `self.url` (raw). 
Falls back to regex-style stripping when `Url::parse` fails (prevents 
credential leak even for malformed URLs). Good catch on the `user:password@` 
vector.



##########
core/connectors/sinks/http_sink/src/lib.rs:
##########
@@ -0,0 +1,2069 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use base64::Engine;
+use base64::engine::general_purpose;
+use bytes::Bytes;
+use humantime::Duration as HumanDuration;
+use iggy_connector_sdk::{
+    ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata,
+    convert::owned_value_to_serde_json, sink_connector,
+};
+use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
+use reqwest_retry::{
+    RetryTransientMiddleware, Retryable, RetryableStrategy, 
policies::ExponentialBackoff,
+};
+use reqwest_tracing::TracingMiddleware;
+use serde::{Deserialize, Serialize};
+use std::collections::{HashMap, HashSet};
+use std::str::FromStr;
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use tracing::{debug, error, info, warn};
+
+sink_connector!(HttpSink);
+
+const DEFAULT_TIMEOUT: &str = "30s";
+const DEFAULT_RETRY_DELAY: &str = "1s";
+const DEFAULT_MAX_RETRY_DELAY: &str = "30s";
+const DEFAULT_MAX_RETRIES: u32 = 3;
+const DEFAULT_BACKOFF_MULTIPLIER: u32 = 2;
+const DEFAULT_MAX_PAYLOAD_SIZE: u64 = 10 * 1024 * 1024; // 10 MB
+const DEFAULT_MAX_CONNECTIONS: usize = 10;
+/// TCP keep-alive interval for detecting dead connections behind load 
balancers.
+/// Cloud LBs silently drop idle connections (AWS ALB ~60s, GCP ~600s);
+/// probing at 30s detects these before requests fail.
+const DEFAULT_TCP_KEEPALIVE_SECS: u64 = 30;
+/// Close pooled connections unused for this long. Prevents stale connections
+/// from accumulating when traffic is bursty.
+const DEFAULT_POOL_IDLE_TIMEOUT_SECS: u64 = 90;
+/// Abort remaining messages in individual/raw mode after this many 
consecutive HTTP failures.
+/// Prevents hammering a dead endpoint with N sequential retry cycles per poll.
+const MAX_CONSECUTIVE_FAILURES: u32 = 3;
+
+const ENCODING_BASE64: &str = "base64";
+
+/// HTTP method enum — validated at deserialization, prevents invalid values 
like "DELEET" or "GETX".
+#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "UPPERCASE")]
+pub enum HttpMethod {
+    Get,
+    Head,
+    #[default]
+    Post,
+    Put,
+    Patch,
+    Delete,
+}
+
+/// Payload formatting mode for HTTP requests.
+#[derive(
+    Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, 
strum_macros::Display,
+)]
+#[serde(rename_all = "snake_case")]

Review Comment:
   Fixed. Added `#[serde(rename = "ndjson")]` on the `NdJson` variant to 
override the `snake_case` default. All config files, README, and tests use 
`"ndjson"`. Added a comment explaining the rename precedence for future 
maintainers. The Rust variant stays `NdJson` (correct Rust casing), serializes 
as `"ndjson"` (industry standard per ndjson.org).



##########
core/connectors/sinks/http_sink/src/lib.rs:
##########
@@ -0,0 +1,2069 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use base64::Engine;
+use base64::engine::general_purpose;
+use bytes::Bytes;
+use humantime::Duration as HumanDuration;
+use iggy_connector_sdk::{
+    ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata,
+    convert::owned_value_to_serde_json, sink_connector,
+};
+use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
+use reqwest_retry::{
+    RetryTransientMiddleware, Retryable, RetryableStrategy, 
policies::ExponentialBackoff,
+};
+use reqwest_tracing::TracingMiddleware;
+use serde::{Deserialize, Serialize};
+use std::collections::{HashMap, HashSet};
+use std::str::FromStr;
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use tracing::{debug, error, info, warn};
+
+sink_connector!(HttpSink);
+
+const DEFAULT_TIMEOUT: &str = "30s";
+const DEFAULT_RETRY_DELAY: &str = "1s";
+const DEFAULT_MAX_RETRY_DELAY: &str = "30s";
+const DEFAULT_MAX_RETRIES: u32 = 3;
+const DEFAULT_BACKOFF_MULTIPLIER: u32 = 2;
+const DEFAULT_MAX_PAYLOAD_SIZE: u64 = 10 * 1024 * 1024; // 10 MB
+const DEFAULT_MAX_CONNECTIONS: usize = 10;
+/// TCP keep-alive interval for detecting dead connections behind load 
balancers.
+/// Cloud LBs silently drop idle connections (AWS ALB ~60s, GCP ~600s);
+/// probing at 30s detects these before requests fail.
+const DEFAULT_TCP_KEEPALIVE_SECS: u64 = 30;
+/// Close pooled connections unused for this long. Prevents stale connections
+/// from accumulating when traffic is bursty.
+const DEFAULT_POOL_IDLE_TIMEOUT_SECS: u64 = 90;
+/// Abort remaining messages in individual/raw mode after this many 
consecutive HTTP failures.
+/// Prevents hammering a dead endpoint with N sequential retry cycles per poll.
+const MAX_CONSECUTIVE_FAILURES: u32 = 3;
+
+const ENCODING_BASE64: &str = "base64";
+
+/// HTTP method enum — validated at deserialization, prevents invalid values 
like "DELEET" or "GETX".
+#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "UPPERCASE")]
+pub enum HttpMethod {
+    Get,
+    Head,
+    #[default]
+    Post,
+    Put,
+    Patch,
+    Delete,
+}
+
+/// Payload formatting mode for HTTP requests.
+#[derive(
+    Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, 
strum_macros::Display,
+)]
+#[serde(rename_all = "snake_case")]
+pub enum BatchMode {
+    /// One HTTP request per message (default). Note: with batch_length=50, 
this produces 50
+    /// sequential HTTP round trips per poll cycle. Use ndjson or json_array 
for higher throughput.
+    #[default]
+    #[strum(to_string = "individual")]
+    Individual,
+    /// All messages in one request, newline-delimited JSON.
+    #[strum(to_string = "NDJSON")]
+    NdJson,
+    /// All messages as a single JSON array.
+    #[strum(to_string = "JSON array")]
+    JsonArray,
+    /// Raw bytes, one request per message (for non-JSON payloads).
+    #[strum(to_string = "raw")]
+    Raw,
+}
+
+impl BatchMode {
+    /// Determine the Content-Type header based on batch mode.
+    fn content_type(&self) -> &'static str {
+        match self {
+            BatchMode::Individual | BatchMode::JsonArray => "application/json",
+            BatchMode::NdJson => "application/x-ndjson",
+            BatchMode::Raw => "application/octet-stream",
+        }
+    }
+}
+
+/// Metadata envelope wrapping a payload with Iggy message metadata.
+#[derive(Debug, Serialize)]
+struct MetadataEnvelope {
+    metadata: IggyMetadata,
+    payload: serde_json::Value,
+}
+
+/// Iggy message metadata fields.
+#[derive(Debug, Serialize)]
+struct IggyMetadata {
+    iggy_id: String,
+    iggy_offset: u64,
+    iggy_timestamp: u64,
+    iggy_stream: String,
+    iggy_topic: String,
+    iggy_partition_id: u32,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    iggy_checksum: Option<u64>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    iggy_origin_timestamp: Option<u64>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    iggy_headers: Option<serde_json::Map<String, serde_json::Value>>,
+}
+
+/// Binary payload with base64 encoding marker.
+#[derive(Debug, Serialize)]
+struct EncodedPayload {
+    data: String,
+    iggy_payload_encoding: &'static str,
+}
+
+/// Binary header value with base64 encoding marker.
+#[derive(Debug, Serialize)]
+struct EncodedHeader {
+    data: String,
+    iggy_header_encoding: &'static str,
+}
+
+/// Configuration for the HTTP sink connector, deserialized from 
[plugin_config] in config.toml.
+#[derive(Debug, Serialize, Deserialize)]
+pub struct HttpSinkConfig {
+    /// Target URL for HTTP requests (required).
+    pub url: String,
+    /// HTTP method (default: POST).
+    pub method: Option<HttpMethod>,
+    /// Request timeout as a human-readable duration string, e.g. "30s" 
(default: 30s).
+    pub timeout: Option<String>,
+    /// Maximum HTTP body size in bytes (default: 10MB). Set to 0 to disable.
+    pub max_payload_size_bytes: Option<u64>,
+    /// Custom HTTP headers.
+    pub headers: Option<HashMap<String, String>>,
+    /// Payload formatting mode (default: individual).
+    pub batch_mode: Option<BatchMode>,
+    /// Include Iggy metadata envelope in payload (default: true).
+    pub include_metadata: Option<bool>,
+    /// Include message checksum in metadata (default: false).
+    pub include_checksum: Option<bool>,
+    /// Include origin timestamp in metadata (default: false).
+    pub include_origin_timestamp: Option<bool>,
+    /// Enable health check request in open() (default: false).
+    pub health_check_enabled: Option<bool>,
+    /// HTTP method for health check (default: HEAD).
+    pub health_check_method: Option<HttpMethod>,
+    /// Maximum number of retries for transient errors (default: 3).
+    pub max_retries: Option<u32>,
+    /// Retry delay as a human-readable duration string, e.g. "1s" (default: 
1s).
+    pub retry_delay: Option<String>,
+    /// Backoff multiplier for exponential retry delay (default: 2).
+    pub retry_backoff_multiplier: Option<u32>,
+    /// Maximum retry delay cap as a human-readable duration string (default: 
30s).
+    pub max_retry_delay: Option<String>,
+    /// HTTP status codes considered successful (default: [200, 201, 202, 
204]).
+    pub success_status_codes: Option<Vec<u16>>,
+    /// Accept invalid TLS certificates (default: false). Named to signal 
danger.
+    pub tls_danger_accept_invalid_certs: Option<bool>,
+    /// Maximum idle connections per host (default: 10).
+    pub max_connections: Option<usize>,
+    /// Enable verbose request/response logging (default: false).
+    pub verbose_logging: Option<bool>,
+}
+
+/// HTTP sink connector that delivers consumed messages to any HTTP endpoint.
+///
+/// Lifecycle: `new()` → `open()` → `consume()` (repeated) → `close()`.
+/// The `reqwest::Client` is built in `open()` (not `new()`) so that 
config-derived
+/// settings (timeout, TLS, connection pool) are applied. This matches the
+/// MongoDB/Elasticsearch/PostgreSQL sink initialization pattern.
+#[derive(Debug)]
+pub struct HttpSink {
+    id: u32,
+    url: String,
+    method: HttpMethod,
+    timeout: Duration,
+    max_payload_size_bytes: u64,
+    headers: HashMap<String, String>,
+    batch_mode: BatchMode,
+    include_metadata: bool,
+    include_checksum: bool,
+    include_origin_timestamp: bool,
+    health_check_enabled: bool,
+    health_check_method: HttpMethod,
+    max_retries: u32,
+    retry_delay: Duration,
+    retry_backoff_multiplier: u32,
+    max_retry_delay: Duration,
+    success_status_codes: HashSet<u16>,
+    tls_danger_accept_invalid_certs: bool,
+    max_connections: usize,
+    verbose: bool,
+    /// Pre-built HTTP headers (excluding Content-Type). Built once in 
`open()` from validated
+    /// `self.headers`, reused for every request. `None` before `open()` is 
called.
+    request_headers: Option<reqwest::header::HeaderMap>,
+    /// Initialized in `open()` with config-derived settings. `None` before 
`open()` is called.
+    client: Option<ClientWithMiddleware>,
+    send_attempts: AtomicU64,
+    messages_delivered: AtomicU64,
+    errors_count: AtomicU64,
+    /// Epoch seconds of last successful HTTP request.
+    last_success_timestamp: AtomicU64,
+}
+
+impl HttpSink {
+    pub fn new(id: u32, config: HttpSinkConfig) -> Self {
+        let url = config.url;
+        let method = config.method.unwrap_or_default();
+        let timeout = parse_duration(config.timeout.as_deref(), 
DEFAULT_TIMEOUT);
+        let max_payload_size_bytes = config
+            .max_payload_size_bytes
+            .unwrap_or(DEFAULT_MAX_PAYLOAD_SIZE);
+        let headers = config.headers.unwrap_or_default();
+        let batch_mode = config.batch_mode.unwrap_or_default();
+        let include_metadata = config.include_metadata.unwrap_or(true);
+        let include_checksum = config.include_checksum.unwrap_or(false);
+        let include_origin_timestamp = 
config.include_origin_timestamp.unwrap_or(false);
+        let health_check_enabled = 
config.health_check_enabled.unwrap_or(false);
+        let health_check_method = 
config.health_check_method.unwrap_or(HttpMethod::Head);
+        let max_retries = config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES);
+        let mut retry_delay = parse_duration(config.retry_delay.as_deref(), 
DEFAULT_RETRY_DELAY);
+        let retry_backoff_multiplier = config
+            .retry_backoff_multiplier
+            .unwrap_or(DEFAULT_BACKOFF_MULTIPLIER)
+            .max(1);
+        let mut max_retry_delay =
+            parse_duration(config.max_retry_delay.as_deref(), 
DEFAULT_MAX_RETRY_DELAY);
+        let success_status_codes: HashSet<u16> = config
+            .success_status_codes
+            .unwrap_or_else(|| vec![200, 201, 202, 204])
+            .into_iter()
+            .collect();
+        let tls_danger_accept_invalid_certs =
+            config.tls_danger_accept_invalid_certs.unwrap_or(false);
+        let max_connections = 
config.max_connections.unwrap_or(DEFAULT_MAX_CONNECTIONS);
+        let verbose = config.verbose_logging.unwrap_or(false);
+
+        if retry_delay > max_retry_delay {
+            warn!(
+                "HTTP sink ID: {} — retry_delay ({:?}) exceeds max_retry_delay 
({:?}). \
+                 Swapping values to prevent ExponentialBackoff panic.",
+                id, retry_delay, max_retry_delay,
+            );
+            std::mem::swap(&mut retry_delay, &mut max_retry_delay);
+        }
+
+        if tls_danger_accept_invalid_certs {
+            warn!(
+                "HTTP sink ID: {} — tls_danger_accept_invalid_certs is 
enabled. \
+                 TLS certificate validation is DISABLED.",
+                id
+            );
+        }
+
+        if batch_mode == BatchMode::Raw && include_metadata {
+            warn!(
+                "HTTP sink ID: {} — batch_mode=raw ignores include_metadata. \
+                 Raw mode sends payload bytes directly without metadata 
envelope.",
+                id
+            );
+        }
+
+        if matches!(method, HttpMethod::Get | HttpMethod::Head)
+            && batch_mode != BatchMode::Individual
+        {
+            warn!(
+                "HTTP sink ID: {} — {:?} with batch_mode={:?} will send a 
request body. \
+                 Some servers may reject GET/HEAD requests with a body.",
+                id, method, batch_mode,
+            );
+        }
+
+        HttpSink {
+            id,
+            url,
+            method,
+            timeout,
+            max_payload_size_bytes,
+            headers,
+            batch_mode,
+            include_metadata,
+            include_checksum,
+            include_origin_timestamp,
+            health_check_enabled,
+            health_check_method,
+            max_retries,
+            retry_delay,
+            retry_backoff_multiplier,
+            max_retry_delay,
+            success_status_codes,
+            tls_danger_accept_invalid_certs,
+            max_connections,
+            verbose,
+            request_headers: None,
+            client: None,
+            send_attempts: AtomicU64::new(0),
+            messages_delivered: AtomicU64::new(0),
+            errors_count: AtomicU64::new(0),
+            last_success_timestamp: AtomicU64::new(0),
+        }
+    }
+
+    /// Build the `reqwest::Client` wrapped with retry and tracing middleware.
+    fn build_client(&self) -> Result<ClientWithMiddleware, Error> {
+        let raw_client = reqwest::Client::builder()
+            .timeout(self.timeout)
+            .pool_max_idle_per_host(self.max_connections)
+            
.pool_idle_timeout(Duration::from_secs(DEFAULT_POOL_IDLE_TIMEOUT_SECS))
+            .tcp_keepalive(Duration::from_secs(DEFAULT_TCP_KEEPALIVE_SECS))
+            .danger_accept_invalid_certs(self.tls_danger_accept_invalid_certs)
+            .build()
+            .map_err(|e| Error::InitError(format!("Failed to build HTTP 
client: {}", e)))?;
+
+        let retry_policy = ExponentialBackoff::builder()
+            .retry_bounds(self.retry_delay, self.max_retry_delay)
+            .base(self.retry_backoff_multiplier)
+            .build_with_max_retries(self.max_retries);
+
+        let retry_strategy = HttpSinkRetryStrategy {
+            success_status_codes: self.success_status_codes.clone(),
+        };
+
+        let retry_middleware =
+            
RetryTransientMiddleware::new_with_policy_and_strategy(retry_policy, 
retry_strategy);
+
+        Ok(ClientBuilder::new(raw_client)
+            .with(TracingMiddleware::default())
+            .with(retry_middleware)
+            .build())
+    }
+
+    /// Returns the initialized HTTP client, or an error if `open()` was not 
called.
+    fn client(&self) -> Result<&ClientWithMiddleware, Error> {
+        self.client.as_ref().ok_or_else(|| {
+            Error::InitError("HTTP client not initialized — was open() 
called?".to_string())
+        })
+    }
+
+    /// Convert a `Payload` to a JSON value for metadata wrapping.
+    /// Non-JSON payloads are base64-encoded with a `iggy_payload_encoding` 
marker.
+    ///
+    /// Note: All current `Payload` variants produce infallible conversions.
+    /// The `Result` return type exists as a safety net for future variants.
+    fn payload_to_json(&self, payload: Payload) -> Result<serde_json::Value, 
Error> {
+        match payload {
+            Payload::Json(value) => {
+                // Direct structural conversion (not serialization roundtrip).
+                // Follows the Elasticsearch sink pattern. NaN/Infinity f64 → 
null.
+                Ok(owned_value_to_serde_json(&value))
+            }
+            Payload::Text(text) => Ok(serde_json::Value::String(text)),
+            Payload::Raw(bytes) | Payload::FlatBuffer(bytes) => {
+                let encoded = EncodedPayload {
+                    data: general_purpose::STANDARD.encode(&bytes),
+                    iggy_payload_encoding: ENCODING_BASE64,
+                };
+                serde_json::to_value(encoded)
+                    .map_err(|e| Error::Serialization(format!("EncodedPayload: 
{}", e)))
+            }
+            Payload::Proto(proto_str) => {
+                let encoded = EncodedPayload {
+                    data: 
general_purpose::STANDARD.encode(proto_str.as_bytes()),
+                    iggy_payload_encoding: ENCODING_BASE64,
+                };
+                serde_json::to_value(encoded)
+                    .map_err(|e| Error::Serialization(format!("EncodedPayload: 
{}", e)))
+            }
+        }
+    }
+
+    /// Build a message envelope with optional metadata wrapping.
+    fn build_envelope(
+        &self,
+        message: &ConsumedMessage,
+        topic_metadata: &TopicMetadata,
+        messages_metadata: &MessagesMetadata,
+        payload_json: serde_json::Value,
+    ) -> Result<serde_json::Value, Error> {
+        if !self.include_metadata {
+            return Ok(payload_json);
+        }
+
+        let headers_map = if let Some(ref headers) = message.headers
+            && !headers.is_empty()
+        {
+            let map: serde_json::Map<String, serde_json::Value> = headers
+                .iter()
+                .map(|(k, v)| {
+                    // Raw bytes: base64-encode to avoid Rust debug format in 
JSON output.
+                    // as_raw() returns Ok only for HeaderKind::Raw.
+                    let value = if let Ok(raw) = v.as_raw() {
+                        let encoded = EncodedHeader {
+                            data: general_purpose::STANDARD.encode(raw),
+                            iggy_header_encoding: ENCODING_BASE64,
+                        };
+                        serde_json::to_value(encoded)
+                            .map_err(|e| 
Error::Serialization(format!("EncodedHeader: {}", e)))?
+                    } else {
+                        serde_json::Value::String(v.to_string_value())
+                    };
+                    Ok((k.to_string_value(), value))
+                })
+                .collect::<Result<serde_json::Map<String, serde_json::Value>, 
Error>>()?;
+            Some(map)
+        } else {
+            None
+        };
+
+        let metadata = IggyMetadata {
+            iggy_id: format_u128_as_hex(message.id),
+            iggy_offset: message.offset,
+            iggy_timestamp: message.timestamp,
+            iggy_stream: topic_metadata.stream.clone(),
+            iggy_topic: topic_metadata.topic.clone(),
+            iggy_partition_id: messages_metadata.partition_id,
+            iggy_checksum: if self.include_checksum {
+                Some(message.checksum)
+            } else {
+                None
+            },
+            iggy_origin_timestamp: if self.include_origin_timestamp {
+                Some(message.origin_timestamp)
+            } else {
+                None
+            },
+            iggy_headers: headers_map,
+        };
+
+        let envelope = MetadataEnvelope {
+            metadata,
+            payload: payload_json,
+        };
+
+        serde_json::to_value(envelope)
+            .map_err(|e| Error::Serialization(format!("MetadataEnvelope: {}", 
e)))
+    }
+
+    /// Record a successful request timestamp.
+    fn record_success(&self) {
+        let now = SystemTime::now()
+            .duration_since(UNIX_EPOCH)
+            .unwrap_or_default()
+            .as_secs();
+        self.last_success_timestamp.store(now, Ordering::Relaxed);
+    }
+
+    /// Send an HTTP request with retry via reqwest-middleware. Returns Ok on 
success,
+    /// Err after exhausting retries. Retry logic (backoff, transient 
classification)
+    /// is handled by the middleware configured in `build_client()`.
+    async fn send_with_retry(&self, body: Bytes, content_type: &str) -> 
Result<(), Error> {
+        let client = self.client()?;
+        let headers = self.request_headers.as_ref().ok_or_else(|| {
+            Error::InitError("HTTP headers not initialized — was open() 
called?".to_string())
+        })?;
+
+        if self.verbose {
+            debug!(
+                "HTTP sink ID: {} — sending {:?} {} ({} bytes)",
+                self.id,
+                self.method,
+                self.url,
+                body.len(),
+            );
+        }
+
+        self.send_attempts.fetch_add(1, Ordering::Relaxed);
+
+        let response = build_request(self.method, client, &self.url)
+            .headers(headers.clone())
+            .header("content-type", content_type)
+            .body(body)
+            .send()
+            .await
+            .map_err(|e| {
+                self.errors_count.fetch_add(1, Ordering::Relaxed);
+                error!(
+                    "HTTP sink ID: {} — request to {} failed after middleware 
retries: {:#}",
+                    self.id, self.url, e
+                );
+                Error::HttpRequestFailed(format!("HTTP {} — {}", self.url, e))
+            })?;
+
+        let status = response.status();
+        // success_status_codes is checked in BOTH the retry strategy (to stop 
retrying)
+        // AND here (to classify the final response). Both must use the same 
set.
+        if self.success_status_codes.contains(&status.as_u16()) {
+            if self.verbose {
+                debug!(
+                    "HTTP sink ID: {} — success (status {})",
+                    self.id,
+                    status.as_u16()
+                );
+            }
+            self.record_success();
+            return Ok(());
+        }
+
+        // Non-success status after middleware exhausted retries — read body 
for diagnostics
+        let response_body = match response.text().await {
+            Ok(body) => body,
+            Err(e) => format!("<body read error: {}>", e),
+        };
+
+        error!(
+            "HTTP sink ID: {} — request failed (status {}). Response: {}",
+            self.id,
+            status.as_u16(),
+            truncate_response(&response_body, 500),
+        );
+        self.errors_count.fetch_add(1, Ordering::Relaxed);
+        Err(Error::HttpRequestFailed(format!(
+            "HTTP {} — status: {}",
+            self.url,
+            status.as_u16()
+        )))
+    }
+
+    /// Shared per-message send loop for `individual` and `raw` modes.
+    ///
+    /// Iterates `messages`, builds a body for each via `build_body`, enforces 
payload size
+    /// limits, sends via `send_with_retry`, and tracks partial delivery.
+    /// Aborts after `MAX_CONSECUTIVE_FAILURES` consecutive HTTP failures.
+    ///
+    /// `build_body` takes ownership of each `ConsumedMessage` — callers must 
extract
+    /// all needed fields (payload, metadata) within the closure.
+    async fn send_per_message<F>(
+        &self,
+        messages: Vec<ConsumedMessage>,
+        content_type: &str,
+        mut build_body: F,
+    ) -> Result<(), Error>
+    where
+        F: FnMut(ConsumedMessage) -> Result<Vec<u8>, Error>,
+    {
+        let total = messages.len();
+        let mut delivered = 0u64;
+        let mut http_failures = 0u64;
+        let mut serialization_failures = 0u64;
+        let mut consecutive_failures = 0u32;
+        let mut last_error: Option<Error> = None;
+
+        for message in messages {
+            let offset = message.offset;
+            let body = match build_body(message) {
+                Ok(b) => b,
+                Err(e) => {
+                    error!(
+                        "HTTP sink ID: {} — failed to build {} body at offset 
{}: {}",
+                        self.id, self.batch_mode, offset, e
+                    );
+                    self.errors_count.fetch_add(1, Ordering::Relaxed);
+                    serialization_failures += 1;
+                    last_error = Some(e);
+                    continue;
+                }
+            };
+
+            if self.max_payload_size_bytes > 0 && body.len() as u64 > 
self.max_payload_size_bytes {
+                error!(
+                    "HTTP sink ID: {} — {} payload at offset {} exceeds max 
size ({} > {} bytes). Skipping.",
+                    self.id,
+                    self.batch_mode,
+                    offset,
+                    body.len(),
+                    self.max_payload_size_bytes,
+                );
+                self.errors_count.fetch_add(1, Ordering::Relaxed);
+                serialization_failures += 1;
+                last_error = Some(Error::HttpRequestFailed(format!(
+                    "Payload exceeds max size: {} bytes",
+                    body.len()
+                )));
+                continue;
+            }
+
+            match self.send_with_retry(Bytes::from(body), content_type).await {
+                Ok(()) => {
+                    delivered += 1;
+                    consecutive_failures = 0;
+                }
+                Err(e) => {
+                    error!(
+                        "HTTP sink ID: {} — failed to deliver {} message at 
offset {} after retries: {}",
+                        self.id, self.batch_mode, offset, e
+                    );
+                    http_failures += 1;
+                    consecutive_failures += 1;
+                    last_error = Some(e);
+
+                    if consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
+                        let processed = delivered + http_failures + 
serialization_failures;
+                        debug_assert!(
+                            processed <= total as u64,
+                            "processed ({processed}) > total ({total}) — 
accounting bug"
+                        );
+                        let skipped = (total as u64).saturating_sub(processed);
+                        error!(
+                            "HTTP sink ID: {} — aborting {} batch after {} 
consecutive HTTP failures \
+                             ({} remaining messages skipped)",
+                            self.id, self.batch_mode, consecutive_failures, 
skipped,
+                        );
+                        self.errors_count.fetch_add(skipped, 
Ordering::Relaxed);
+                        break;
+                    }
+                }
+            }
+        }
+
+        self.messages_delivered
+            .fetch_add(delivered, Ordering::Relaxed);
+
+        match last_error {
+            Some(e) => {
+                error!(
+                    "HTTP sink ID: {} — partial {} delivery: {}/{} delivered, \
+                     {} HTTP failures, {} serialization errors",
+                    self.id,
+                    self.batch_mode,
+                    delivered,
+                    total,
+                    http_failures,
+                    serialization_failures,
+                );
+                Err(e)
+            }
+            None => Ok(()),
+        }
+    }
+
+    /// Send messages in `individual` mode — one HTTP request per message.
+    async fn send_individual(
+        &self,
+        topic_metadata: &TopicMetadata,
+        messages_metadata: &MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error> {
+        self.send_per_message(messages, self.batch_mode.content_type(), |mut 
message| {
+            let payload = std::mem::replace(&mut message.payload, 
Payload::Raw(vec![]));
+            let payload_json = self.payload_to_json(payload)?;
+            let envelope =
+                self.build_envelope(&message, topic_metadata, 
messages_metadata, payload_json)?;
+            serde_json::to_vec(&envelope)
+                .map_err(|e| Error::Serialization(format!("Envelope serialize: 
{}", e)))
+        })
+        .await
+    }
+
+    /// Sends a batch body and updates delivery/error accounting.
+    ///
+    /// Shared by `send_ndjson` and `send_json_array` — the post-send 
accounting logic
+    /// (error propagation, skip warnings) is identical across batch modes.
+    async fn send_batch_body(&self, body: Bytes, count: u64, skipped: u64) -> 
Result<(), Error> {
+        debug_assert!(
+            count > 0,
+            "send_batch_body called with count=0 — callers must guard against 
empty batches"
+        );
+        if let Err(e) = self
+            .send_with_retry(body, self.batch_mode.content_type())
+            .await
+        {
+            // send_with_retry already added 1 to errors_count for the HTTP 
failure.
+            // Add the remaining messages that were serialized but not 
delivered.
+            if count > 1 {
+                self.errors_count.fetch_add(count - 1, Ordering::Relaxed);
+            }
+            if skipped > 0 {
+                error!(
+                    "HTTP sink ID: {} — {} batch failed with {} serialization 
skips",
+                    self.id, self.batch_mode, skipped,
+                );
+            }
+            return Err(e);
+        }
+        self.messages_delivered.fetch_add(count, Ordering::Relaxed);
+        if skipped > 0 {
+            warn!(
+                "HTTP sink ID: {} — {} batch: {} delivered, {} skipped 
(serialization errors)",
+                self.id, self.batch_mode, count, skipped,
+            );
+        }
+        Ok(())
+    }
+
+    /// Send messages in `ndjson` mode — all messages in one request, 
newline-delimited.
+    /// Skips individual messages that fail serialization rather than aborting 
the batch.
+    async fn send_ndjson(
+        &self,
+        topic_metadata: &TopicMetadata,
+        messages_metadata: &MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error> {
+        let mut lines = Vec::with_capacity(messages.len());
+        let mut skipped = 0u64;
+
+        for mut message in messages {
+            let payload = std::mem::replace(&mut message.payload, 
Payload::Raw(vec![]));
+            let payload_json = match self.payload_to_json(payload) {
+                Ok(json) => json,
+                Err(e) => {
+                    error!(
+                        "HTTP sink ID: {} — skipping message at offset {} in 
NDJSON batch: {}",
+                        self.id, message.offset, e
+                    );
+                    self.errors_count.fetch_add(1, Ordering::Relaxed);
+                    skipped += 1;
+                    continue;
+                }
+            };
+            let envelope = match self.build_envelope(
+                &message,
+                topic_metadata,
+                messages_metadata,
+                payload_json,
+            ) {
+                Ok(env) => env,
+                Err(e) => {
+                    error!(
+                        "HTTP sink ID: {} — skipping message at offset {} in 
NDJSON batch (envelope): {}",
+                        self.id, message.offset, e
+                    );
+                    self.errors_count.fetch_add(1, Ordering::Relaxed);
+                    skipped += 1;
+                    continue;
+                }
+            };
+            match serde_json::to_string(&envelope) {
+                Ok(line) => lines.push(line),
+                Err(e) => {
+                    error!(
+                        "HTTP sink ID: {} — skipping message at offset {} in 
NDJSON batch (serialize): {}",
+                        self.id, message.offset, e
+                    );
+                    self.errors_count.fetch_add(1, Ordering::Relaxed);
+                    skipped += 1;
+                    continue;
+                }
+            }
+        }
+
+        if lines.is_empty() {
+            return Err(Error::Serialization(
+                "All messages in NDJSON batch failed 
serialization".to_string(),
+            ));
+        }
+
+        let count = lines.len() as u64;
+
+        let mut body_str = lines.join("\n");

Review Comment:
   Fixed. Replaced `Vec<String>` + `join("\n")` with direct 
`serde_json::to_writer(&mut body, &envelope)` into a single pre-sized 
`Vec<u8>`. Each message is serialized directly into the output buffer followed 
by `body.push(b'\n')`. On error, `body.truncate(pos)` rolls back the partial 
write. Eliminates both the intermediate `String` per message and the `join` 
copy. `Vec::with_capacity(messages.len() * 256)` avoids reallocation for 
typical payloads.



##########
core/connectors/sinks/http_sink/src/lib.rs:
##########
@@ -0,0 +1,2069 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use base64::Engine;
+use base64::engine::general_purpose;
+use bytes::Bytes;
+use humantime::Duration as HumanDuration;
+use iggy_connector_sdk::{
+    ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata,
+    convert::owned_value_to_serde_json, sink_connector,
+};
+use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
+use reqwest_retry::{
+    RetryTransientMiddleware, Retryable, RetryableStrategy, 
policies::ExponentialBackoff,
+};
+use reqwest_tracing::TracingMiddleware;
+use serde::{Deserialize, Serialize};
+use std::collections::{HashMap, HashSet};
+use std::str::FromStr;
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use tracing::{debug, error, info, warn};
+
+sink_connector!(HttpSink);
+
+const DEFAULT_TIMEOUT: &str = "30s";
+const DEFAULT_RETRY_DELAY: &str = "1s";
+const DEFAULT_MAX_RETRY_DELAY: &str = "30s";
+const DEFAULT_MAX_RETRIES: u32 = 3;
+const DEFAULT_BACKOFF_MULTIPLIER: u32 = 2;
+const DEFAULT_MAX_PAYLOAD_SIZE: u64 = 10 * 1024 * 1024; // 10 MB
+const DEFAULT_MAX_CONNECTIONS: usize = 10;
+/// TCP keep-alive interval for detecting dead connections behind load 
balancers.
+/// Cloud LBs silently drop idle connections (AWS ALB ~60s, GCP ~600s);
+/// probing at 30s detects these before requests fail.
+const DEFAULT_TCP_KEEPALIVE_SECS: u64 = 30;
+/// Close pooled connections unused for this long. Prevents stale connections
+/// from accumulating when traffic is bursty.
+const DEFAULT_POOL_IDLE_TIMEOUT_SECS: u64 = 90;
+/// Abort remaining messages in individual/raw mode after this many 
consecutive HTTP failures.
+/// Prevents hammering a dead endpoint with N sequential retry cycles per poll.
+const MAX_CONSECUTIVE_FAILURES: u32 = 3;
+
+const ENCODING_BASE64: &str = "base64";
+
+/// HTTP method enum — validated at deserialization, prevents invalid values 
like "DELEET" or "GETX".
+#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "UPPERCASE")]
+pub enum HttpMethod {
+    Get,
+    Head,
+    #[default]
+    Post,
+    Put,
+    Patch,
+    Delete,
+}
+
+/// Payload formatting mode for HTTP requests.
+#[derive(
+    Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, 
strum_macros::Display,
+)]
+#[serde(rename_all = "snake_case")]
+pub enum BatchMode {
+    /// One HTTP request per message (default). Note: with batch_length=50, 
this produces 50
+    /// sequential HTTP round trips per poll cycle. Use ndjson or json_array 
for higher throughput.
+    #[default]
+    #[strum(to_string = "individual")]
+    Individual,
+    /// All messages in one request, newline-delimited JSON.
+    #[strum(to_string = "NDJSON")]
+    NdJson,
+    /// All messages as a single JSON array.
+    #[strum(to_string = "JSON array")]
+    JsonArray,
+    /// Raw bytes, one request per message (for non-JSON payloads).
+    #[strum(to_string = "raw")]
+    Raw,
+}
+
+impl BatchMode {
+    /// Determine the Content-Type header based on batch mode.
+    fn content_type(&self) -> &'static str {
+        match self {
+            BatchMode::Individual | BatchMode::JsonArray => "application/json",
+            BatchMode::NdJson => "application/x-ndjson",
+            BatchMode::Raw => "application/octet-stream",
+        }
+    }
+}
+
+/// Metadata envelope wrapping a payload with Iggy message metadata.
+#[derive(Debug, Serialize)]
+struct MetadataEnvelope {
+    metadata: IggyMetadata,
+    payload: serde_json::Value,
+}
+
+/// Iggy message metadata fields.
+#[derive(Debug, Serialize)]
+struct IggyMetadata {
+    iggy_id: String,
+    iggy_offset: u64,
+    iggy_timestamp: u64,
+    iggy_stream: String,
+    iggy_topic: String,
+    iggy_partition_id: u32,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    iggy_checksum: Option<u64>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    iggy_origin_timestamp: Option<u64>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    iggy_headers: Option<serde_json::Map<String, serde_json::Value>>,
+}
+
+/// Binary payload with base64 encoding marker.
+#[derive(Debug, Serialize)]
+struct EncodedPayload {
+    data: String,
+    iggy_payload_encoding: &'static str,
+}
+
+/// Binary header value with base64 encoding marker.
+#[derive(Debug, Serialize)]
+struct EncodedHeader {
+    data: String,
+    iggy_header_encoding: &'static str,
+}
+
+/// Configuration for the HTTP sink connector, deserialized from 
[plugin_config] in config.toml.
+#[derive(Debug, Serialize, Deserialize)]
+pub struct HttpSinkConfig {
+    /// Target URL for HTTP requests (required).
+    pub url: String,
+    /// HTTP method (default: POST).
+    pub method: Option<HttpMethod>,
+    /// Request timeout as a human-readable duration string, e.g. "30s" 
(default: 30s).
+    pub timeout: Option<String>,
+    /// Maximum HTTP body size in bytes (default: 10MB). Set to 0 to disable.
+    pub max_payload_size_bytes: Option<u64>,
+    /// Custom HTTP headers.
+    pub headers: Option<HashMap<String, String>>,
+    /// Payload formatting mode (default: individual).
+    pub batch_mode: Option<BatchMode>,
+    /// Include Iggy metadata envelope in payload (default: true).
+    pub include_metadata: Option<bool>,
+    /// Include message checksum in metadata (default: false).
+    pub include_checksum: Option<bool>,
+    /// Include origin timestamp in metadata (default: false).
+    pub include_origin_timestamp: Option<bool>,
+    /// Enable health check request in open() (default: false).
+    pub health_check_enabled: Option<bool>,
+    /// HTTP method for health check (default: HEAD).
+    pub health_check_method: Option<HttpMethod>,
+    /// Maximum number of retries for transient errors (default: 3).
+    pub max_retries: Option<u32>,
+    /// Retry delay as a human-readable duration string, e.g. "1s" (default: 
1s).
+    pub retry_delay: Option<String>,
+    /// Backoff multiplier for exponential retry delay (default: 2).
+    pub retry_backoff_multiplier: Option<u32>,
+    /// Maximum retry delay cap as a human-readable duration string (default: 
30s).
+    pub max_retry_delay: Option<String>,
+    /// HTTP status codes considered successful (default: [200, 201, 202, 
204]).
+    pub success_status_codes: Option<Vec<u16>>,
+    /// Accept invalid TLS certificates (default: false). Named to signal 
danger.
+    pub tls_danger_accept_invalid_certs: Option<bool>,
+    /// Maximum idle connections per host (default: 10).
+    pub max_connections: Option<usize>,
+    /// Enable verbose request/response logging (default: false).
+    pub verbose_logging: Option<bool>,
+}
+
+/// HTTP sink connector that delivers consumed messages to any HTTP endpoint.
+///
+/// Lifecycle: `new()` → `open()` → `consume()` (repeated) → `close()`.
+/// The `reqwest::Client` is built in `open()` (not `new()`) so that 
config-derived
+/// settings (timeout, TLS, connection pool) are applied. This matches the
+/// MongoDB/Elasticsearch/PostgreSQL sink initialization pattern.
+#[derive(Debug)]
+pub struct HttpSink {
+    id: u32,
+    url: String,
+    method: HttpMethod,
+    timeout: Duration,
+    max_payload_size_bytes: u64,
+    headers: HashMap<String, String>,
+    batch_mode: BatchMode,
+    include_metadata: bool,
+    include_checksum: bool,
+    include_origin_timestamp: bool,
+    health_check_enabled: bool,
+    health_check_method: HttpMethod,
+    max_retries: u32,
+    retry_delay: Duration,
+    retry_backoff_multiplier: u32,
+    max_retry_delay: Duration,
+    success_status_codes: HashSet<u16>,
+    tls_danger_accept_invalid_certs: bool,
+    max_connections: usize,
+    verbose: bool,
+    /// Pre-built HTTP headers (excluding Content-Type). Built once in 
`open()` from validated
+    /// `self.headers`, reused for every request. `None` before `open()` is 
called.
+    request_headers: Option<reqwest::header::HeaderMap>,
+    /// Initialized in `open()` with config-derived settings. `None` before 
`open()` is called.
+    client: Option<ClientWithMiddleware>,
+    send_attempts: AtomicU64,
+    messages_delivered: AtomicU64,
+    errors_count: AtomicU64,
+    /// Epoch seconds of last successful HTTP request.
+    last_success_timestamp: AtomicU64,
+}
+
+impl HttpSink {
+    pub fn new(id: u32, config: HttpSinkConfig) -> Self {
+        let url = config.url;
+        let method = config.method.unwrap_or_default();
+        let timeout = parse_duration(config.timeout.as_deref(), 
DEFAULT_TIMEOUT);
+        let max_payload_size_bytes = config
+            .max_payload_size_bytes
+            .unwrap_or(DEFAULT_MAX_PAYLOAD_SIZE);
+        let headers = config.headers.unwrap_or_default();
+        let batch_mode = config.batch_mode.unwrap_or_default();
+        let include_metadata = config.include_metadata.unwrap_or(true);
+        let include_checksum = config.include_checksum.unwrap_or(false);
+        let include_origin_timestamp = 
config.include_origin_timestamp.unwrap_or(false);
+        let health_check_enabled = 
config.health_check_enabled.unwrap_or(false);
+        let health_check_method = 
config.health_check_method.unwrap_or(HttpMethod::Head);
+        let max_retries = config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES);
+        let mut retry_delay = parse_duration(config.retry_delay.as_deref(), 
DEFAULT_RETRY_DELAY);
+        let retry_backoff_multiplier = config
+            .retry_backoff_multiplier
+            .unwrap_or(DEFAULT_BACKOFF_MULTIPLIER)
+            .max(1);
+        let mut max_retry_delay =
+            parse_duration(config.max_retry_delay.as_deref(), 
DEFAULT_MAX_RETRY_DELAY);
+        let success_status_codes: HashSet<u16> = config
+            .success_status_codes
+            .unwrap_or_else(|| vec![200, 201, 202, 204])
+            .into_iter()
+            .collect();
+        let tls_danger_accept_invalid_certs =
+            config.tls_danger_accept_invalid_certs.unwrap_or(false);
+        let max_connections = 
config.max_connections.unwrap_or(DEFAULT_MAX_CONNECTIONS);
+        let verbose = config.verbose_logging.unwrap_or(false);
+
+        if retry_delay > max_retry_delay {
+            warn!(
+                "HTTP sink ID: {} — retry_delay ({:?}) exceeds max_retry_delay 
({:?}). \
+                 Swapping values to prevent ExponentialBackoff panic.",
+                id, retry_delay, max_retry_delay,
+            );
+            std::mem::swap(&mut retry_delay, &mut max_retry_delay);
+        }
+
+        if tls_danger_accept_invalid_certs {
+            warn!(
+                "HTTP sink ID: {} — tls_danger_accept_invalid_certs is 
enabled. \
+                 TLS certificate validation is DISABLED.",
+                id
+            );
+        }
+
+        if batch_mode == BatchMode::Raw && include_metadata {
+            warn!(
+                "HTTP sink ID: {} — batch_mode=raw ignores include_metadata. \
+                 Raw mode sends payload bytes directly without metadata 
envelope.",
+                id
+            );
+        }
+
+        if matches!(method, HttpMethod::Get | HttpMethod::Head)
+            && batch_mode != BatchMode::Individual
+        {
+            warn!(
+                "HTTP sink ID: {} — {:?} with batch_mode={:?} will send a 
request body. \
+                 Some servers may reject GET/HEAD requests with a body.",
+                id, method, batch_mode,
+            );
+        }
+
+        HttpSink {
+            id,
+            url,
+            method,
+            timeout,
+            max_payload_size_bytes,
+            headers,
+            batch_mode,
+            include_metadata,
+            include_checksum,
+            include_origin_timestamp,
+            health_check_enabled,
+            health_check_method,
+            max_retries,
+            retry_delay,
+            retry_backoff_multiplier,
+            max_retry_delay,
+            success_status_codes,
+            tls_danger_accept_invalid_certs,
+            max_connections,
+            verbose,
+            request_headers: None,
+            client: None,
+            send_attempts: AtomicU64::new(0),
+            messages_delivered: AtomicU64::new(0),
+            errors_count: AtomicU64::new(0),
+            last_success_timestamp: AtomicU64::new(0),
+        }
+    }
+
+    /// Build the `reqwest::Client` wrapped with retry and tracing middleware.
+    fn build_client(&self) -> Result<ClientWithMiddleware, Error> {
+        let raw_client = reqwest::Client::builder()
+            .timeout(self.timeout)
+            .pool_max_idle_per_host(self.max_connections)
+            
.pool_idle_timeout(Duration::from_secs(DEFAULT_POOL_IDLE_TIMEOUT_SECS))
+            .tcp_keepalive(Duration::from_secs(DEFAULT_TCP_KEEPALIVE_SECS))
+            .danger_accept_invalid_certs(self.tls_danger_accept_invalid_certs)
+            .build()
+            .map_err(|e| Error::InitError(format!("Failed to build HTTP 
client: {}", e)))?;
+
+        let retry_policy = ExponentialBackoff::builder()
+            .retry_bounds(self.retry_delay, self.max_retry_delay)
+            .base(self.retry_backoff_multiplier)
+            .build_with_max_retries(self.max_retries);
+
+        let retry_strategy = HttpSinkRetryStrategy {
+            success_status_codes: self.success_status_codes.clone(),
+        };
+
+        let retry_middleware =
+            
RetryTransientMiddleware::new_with_policy_and_strategy(retry_policy, 
retry_strategy);
+
+        Ok(ClientBuilder::new(raw_client)
+            .with(TracingMiddleware::default())
+            .with(retry_middleware)
+            .build())
+    }
+
+    /// Returns the initialized HTTP client, or an error if `open()` was not 
called.
+    fn client(&self) -> Result<&ClientWithMiddleware, Error> {
+        self.client.as_ref().ok_or_else(|| {
+            Error::InitError("HTTP client not initialized — was open() 
called?".to_string())
+        })
+    }
+
+    /// Convert a `Payload` to a JSON value for metadata wrapping.
+    /// Non-JSON payloads are base64-encoded with a `iggy_payload_encoding` 
marker.
+    ///
+    /// Note: All current `Payload` variants produce infallible conversions.
+    /// The `Result` return type exists as a safety net for future variants.
+    fn payload_to_json(&self, payload: Payload) -> Result<serde_json::Value, 
Error> {
+        match payload {
+            Payload::Json(value) => {
+                // Direct structural conversion (not serialization roundtrip).
+                // Follows the Elasticsearch sink pattern. NaN/Infinity f64 → 
null.
+                Ok(owned_value_to_serde_json(&value))
+            }
+            Payload::Text(text) => Ok(serde_json::Value::String(text)),
+            Payload::Raw(bytes) | Payload::FlatBuffer(bytes) => {
+                let encoded = EncodedPayload {
+                    data: general_purpose::STANDARD.encode(&bytes),
+                    iggy_payload_encoding: ENCODING_BASE64,
+                };
+                serde_json::to_value(encoded)
+                    .map_err(|e| Error::Serialization(format!("EncodedPayload: 
{}", e)))
+            }
+            Payload::Proto(proto_str) => {
+                let encoded = EncodedPayload {
+                    data: 
general_purpose::STANDARD.encode(proto_str.as_bytes()),
+                    iggy_payload_encoding: ENCODING_BASE64,
+                };
+                serde_json::to_value(encoded)
+                    .map_err(|e| Error::Serialization(format!("EncodedPayload: 
{}", e)))
+            }
+        }
+    }
+
+    /// Build a message envelope with optional metadata wrapping.
+    fn build_envelope(
+        &self,
+        message: &ConsumedMessage,
+        topic_metadata: &TopicMetadata,
+        messages_metadata: &MessagesMetadata,
+        payload_json: serde_json::Value,
+    ) -> Result<serde_json::Value, Error> {
+        if !self.include_metadata {
+            return Ok(payload_json);
+        }
+
+        let headers_map = if let Some(ref headers) = message.headers
+            && !headers.is_empty()
+        {
+            let map: serde_json::Map<String, serde_json::Value> = headers
+                .iter()
+                .map(|(k, v)| {
+                    // Raw bytes: base64-encode to avoid Rust debug format in 
JSON output.
+                    // as_raw() returns Ok only for HeaderKind::Raw.
+                    let value = if let Ok(raw) = v.as_raw() {
+                        let encoded = EncodedHeader {
+                            data: general_purpose::STANDARD.encode(raw),
+                            iggy_header_encoding: ENCODING_BASE64,
+                        };
+                        serde_json::to_value(encoded)
+                            .map_err(|e| 
Error::Serialization(format!("EncodedHeader: {}", e)))?
+                    } else {
+                        serde_json::Value::String(v.to_string_value())
+                    };
+                    Ok((k.to_string_value(), value))
+                })
+                .collect::<Result<serde_json::Map<String, serde_json::Value>, 
Error>>()?;
+            Some(map)
+        } else {
+            None
+        };
+
+        let metadata = IggyMetadata {
+            iggy_id: format_u128_as_hex(message.id),
+            iggy_offset: message.offset,
+            iggy_timestamp: message.timestamp,
+            iggy_stream: topic_metadata.stream.clone(),
+            iggy_topic: topic_metadata.topic.clone(),
+            iggy_partition_id: messages_metadata.partition_id,
+            iggy_checksum: if self.include_checksum {
+                Some(message.checksum)
+            } else {
+                None
+            },
+            iggy_origin_timestamp: if self.include_origin_timestamp {
+                Some(message.origin_timestamp)
+            } else {
+                None
+            },
+            iggy_headers: headers_map,
+        };
+
+        let envelope = MetadataEnvelope {
+            metadata,
+            payload: payload_json,
+        };
+
+        serde_json::to_value(envelope)
+            .map_err(|e| Error::Serialization(format!("MetadataEnvelope: {}", 
e)))
+    }
+
+    /// Record a successful request timestamp.
+    fn record_success(&self) {
+        let now = SystemTime::now()
+            .duration_since(UNIX_EPOCH)
+            .unwrap_or_default()
+            .as_secs();
+        self.last_success_timestamp.store(now, Ordering::Relaxed);
+    }
+
+    /// Send an HTTP request with retry via reqwest-middleware. Returns Ok on 
success,
+    /// Err after exhausting retries. Retry logic (backoff, transient 
classification)
+    /// is handled by the middleware configured in `build_client()`.
+    async fn send_with_retry(&self, body: Bytes, content_type: &str) -> 
Result<(), Error> {
+        let client = self.client()?;
+        let headers = self.request_headers.as_ref().ok_or_else(|| {
+            Error::InitError("HTTP headers not initialized — was open() 
called?".to_string())
+        })?;
+
+        if self.verbose {
+            debug!(
+                "HTTP sink ID: {} — sending {:?} {} ({} bytes)",
+                self.id,
+                self.method,
+                self.url,
+                body.len(),
+            );
+        }
+
+        self.send_attempts.fetch_add(1, Ordering::Relaxed);
+
+        let response = build_request(self.method, client, &self.url)
+            .headers(headers.clone())
+            .header("content-type", content_type)
+            .body(body)
+            .send()
+            .await
+            .map_err(|e| {
+                self.errors_count.fetch_add(1, Ordering::Relaxed);
+                error!(
+                    "HTTP sink ID: {} — request to {} failed after middleware 
retries: {:#}",
+                    self.id, self.url, e
+                );
+                Error::HttpRequestFailed(format!("HTTP {} — {}", self.url, e))
+            })?;
+
+        let status = response.status();
+        // success_status_codes is checked in BOTH the retry strategy (to stop 
retrying)
+        // AND here (to classify the final response). Both must use the same 
set.
+        if self.success_status_codes.contains(&status.as_u16()) {
+            if self.verbose {
+                debug!(
+                    "HTTP sink ID: {} — success (status {})",
+                    self.id,
+                    status.as_u16()
+                );
+            }
+            self.record_success();
+            return Ok(());
+        }
+
+        // Non-success status after middleware exhausted retries — read body 
for diagnostics
+        let response_body = match response.text().await {
+            Ok(body) => body,
+            Err(e) => format!("<body read error: {}>", e),
+        };
+
+        error!(
+            "HTTP sink ID: {} — request failed (status {}). Response: {}",
+            self.id,
+            status.as_u16(),
+            truncate_response(&response_body, 500),
+        );
+        self.errors_count.fetch_add(1, Ordering::Relaxed);
+        Err(Error::HttpRequestFailed(format!(
+            "HTTP {} — status: {}",
+            self.url,
+            status.as_u16()
+        )))
+    }
+
+    /// Shared per-message send loop for `individual` and `raw` modes.
+    ///
+    /// Iterates `messages`, builds a body for each via `build_body`, enforces 
payload size
+    /// limits, sends via `send_with_retry`, and tracks partial delivery.
+    /// Aborts after `MAX_CONSECUTIVE_FAILURES` consecutive HTTP failures.
+    ///
+    /// `build_body` takes ownership of each `ConsumedMessage` — callers must 
extract
+    /// all needed fields (payload, metadata) within the closure.
+    async fn send_per_message<F>(
+        &self,
+        messages: Vec<ConsumedMessage>,
+        content_type: &str,
+        mut build_body: F,
+    ) -> Result<(), Error>
+    where
+        F: FnMut(ConsumedMessage) -> Result<Vec<u8>, Error>,
+    {
+        let total = messages.len();
+        let mut delivered = 0u64;
+        let mut http_failures = 0u64;
+        let mut serialization_failures = 0u64;
+        let mut consecutive_failures = 0u32;
+        let mut last_error: Option<Error> = None;
+
+        for message in messages {
+            let offset = message.offset;
+            let body = match build_body(message) {
+                Ok(b) => b,
+                Err(e) => {
+                    error!(
+                        "HTTP sink ID: {} — failed to build {} body at offset 
{}: {}",
+                        self.id, self.batch_mode, offset, e
+                    );
+                    self.errors_count.fetch_add(1, Ordering::Relaxed);
+                    serialization_failures += 1;
+                    last_error = Some(e);
+                    continue;
+                }
+            };
+
+            if self.max_payload_size_bytes > 0 && body.len() as u64 > 
self.max_payload_size_bytes {
+                error!(
+                    "HTTP sink ID: {} — {} payload at offset {} exceeds max 
size ({} > {} bytes). Skipping.",
+                    self.id,
+                    self.batch_mode,
+                    offset,
+                    body.len(),
+                    self.max_payload_size_bytes,
+                );
+                self.errors_count.fetch_add(1, Ordering::Relaxed);
+                serialization_failures += 1;
+                last_error = Some(Error::HttpRequestFailed(format!(
+                    "Payload exceeds max size: {} bytes",
+                    body.len()
+                )));
+                continue;
+            }
+
+            match self.send_with_retry(Bytes::from(body), content_type).await {
+                Ok(()) => {
+                    delivered += 1;
+                    consecutive_failures = 0;
+                }
+                Err(e) => {
+                    error!(
+                        "HTTP sink ID: {} — failed to deliver {} message at 
offset {} after retries: {}",
+                        self.id, self.batch_mode, offset, e
+                    );
+                    http_failures += 1;
+                    consecutive_failures += 1;
+                    last_error = Some(e);
+
+                    if consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
+                        let processed = delivered + http_failures + 
serialization_failures;
+                        debug_assert!(
+                            processed <= total as u64,
+                            "processed ({processed}) > total ({total}) — 
accounting bug"
+                        );
+                        let skipped = (total as u64).saturating_sub(processed);
+                        error!(
+                            "HTTP sink ID: {} — aborting {} batch after {} 
consecutive HTTP failures \
+                             ({} remaining messages skipped)",
+                            self.id, self.batch_mode, consecutive_failures, 
skipped,
+                        );
+                        self.errors_count.fetch_add(skipped, 
Ordering::Relaxed);
+                        break;
+                    }
+                }
+            }
+        }
+
+        self.messages_delivered
+            .fetch_add(delivered, Ordering::Relaxed);
+
+        match last_error {
+            Some(e) => {
+                error!(
+                    "HTTP sink ID: {} — partial {} delivery: {}/{} delivered, \
+                     {} HTTP failures, {} serialization errors",
+                    self.id,
+                    self.batch_mode,
+                    delivered,
+                    total,
+                    http_failures,
+                    serialization_failures,
+                );
+                Err(e)
+            }
+            None => Ok(()),
+        }
+    }
+
+    /// Send messages in `individual` mode — one HTTP request per message.
+    async fn send_individual(
+        &self,
+        topic_metadata: &TopicMetadata,
+        messages_metadata: &MessagesMetadata,
+        messages: Vec<ConsumedMessage>,
+    ) -> Result<(), Error> {
+        self.send_per_message(messages, self.batch_mode.content_type(), |mut 
message| {
+            let payload = std::mem::replace(&mut message.payload, 
Payload::Raw(vec![]));
+            let payload_json = self.payload_to_json(payload)?;
+            let envelope =
+                self.build_envelope(&message, topic_metadata, 
messages_metadata, payload_json)?;
+            serde_json::to_vec(&envelope)
+                .map_err(|e| Error::Serialization(format!("Envelope serialize: 
{}", e)))
+        })
+        .await
+    }
+
+    /// Sends a batch body and updates delivery/error accounting.
+    ///
+    /// Shared by `send_ndjson` and `send_json_array` — the post-send 
accounting logic
+    /// (error propagation, skip warnings) is identical across batch modes.
+    async fn send_batch_body(&self, body: Bytes, count: u64, skipped: u64) -> 
Result<(), Error> {
+        debug_assert!(
+            count > 0,
+            "send_batch_body called with count=0 — callers must guard against 
empty batches"
+        );
+        if let Err(e) = self
+            .send_with_retry(body, self.batch_mode.content_type())
+            .await
+        {
+            // send_with_retry already added 1 to errors_count for the HTTP 
failure.
+            // Add the remaining messages that were serialized but not 
delivered.
+            if count > 1 {
+                self.errors_count.fetch_add(count - 1, Ordering::Relaxed);

Review Comment:
   Fixed. Changed `count - 1` to `count.saturating_sub(1)` and added an 
INVARIANT comment documenting the dependency on `send_with_retry` incrementing 
`errors_count` exactly once before returning `Err` (with line references to 
both error paths). The `debug_assert!(count > 0)` at the top of the function 
catches the zero case in dev builds.



##########
core/connectors/sinks/http_sink/src/lib.rs:
##########
@@ -0,0 +1,2069 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use base64::Engine;
+use base64::engine::general_purpose;
+use bytes::Bytes;
+use humantime::Duration as HumanDuration;
+use iggy_connector_sdk::{
+    ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata,
+    convert::owned_value_to_serde_json, sink_connector,
+};
+use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
+use reqwest_retry::{
+    RetryTransientMiddleware, Retryable, RetryableStrategy, 
policies::ExponentialBackoff,
+};
+use reqwest_tracing::TracingMiddleware;
+use serde::{Deserialize, Serialize};
+use std::collections::{HashMap, HashSet};
+use std::str::FromStr;
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use tracing::{debug, error, info, warn};
+
+sink_connector!(HttpSink);
+
+const DEFAULT_TIMEOUT: &str = "30s";
+const DEFAULT_RETRY_DELAY: &str = "1s";
+const DEFAULT_MAX_RETRY_DELAY: &str = "30s";
+const DEFAULT_MAX_RETRIES: u32 = 3;
+const DEFAULT_BACKOFF_MULTIPLIER: u32 = 2;
+const DEFAULT_MAX_PAYLOAD_SIZE: u64 = 10 * 1024 * 1024; // 10 MB
+const DEFAULT_MAX_CONNECTIONS: usize = 10;
+/// TCP keep-alive interval for detecting dead connections behind load 
balancers.
+/// Cloud LBs silently drop idle connections (AWS ALB ~60s, GCP ~600s);
+/// probing at 30s detects these before requests fail.
+const DEFAULT_TCP_KEEPALIVE_SECS: u64 = 30;
+/// Close pooled connections unused for this long. Prevents stale connections
+/// from accumulating when traffic is bursty.
+const DEFAULT_POOL_IDLE_TIMEOUT_SECS: u64 = 90;
+/// Abort remaining messages in individual/raw mode after this many 
consecutive HTTP failures.
+/// Prevents hammering a dead endpoint with N sequential retry cycles per poll.
+const MAX_CONSECUTIVE_FAILURES: u32 = 3;
+
+const ENCODING_BASE64: &str = "base64";
+
+/// HTTP method enum — validated at deserialization, prevents invalid values 
like "DELEET" or "GETX".
+#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "UPPERCASE")]
+pub enum HttpMethod {
+    Get,
+    Head,
+    #[default]
+    Post,
+    Put,
+    Patch,
+    Delete,
+}
+
+/// Payload formatting mode for HTTP requests.
+#[derive(
+    Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, 
strum_macros::Display,
+)]
+#[serde(rename_all = "snake_case")]
+pub enum BatchMode {
+    /// One HTTP request per message (default). Note: with batch_length=50, 
this produces 50
+    /// sequential HTTP round trips per poll cycle. Use ndjson or json_array 
for higher throughput.
+    #[default]
+    #[strum(to_string = "individual")]
+    Individual,
+    /// All messages in one request, newline-delimited JSON.
+    #[strum(to_string = "NDJSON")]
+    NdJson,
+    /// All messages as a single JSON array.
+    #[strum(to_string = "JSON array")]
+    JsonArray,
+    /// Raw bytes, one request per message (for non-JSON payloads).
+    #[strum(to_string = "raw")]
+    Raw,
+}
+
+impl BatchMode {
+    /// Determine the Content-Type header based on batch mode.
+    fn content_type(&self) -> &'static str {
+        match self {
+            BatchMode::Individual | BatchMode::JsonArray => "application/json",
+            BatchMode::NdJson => "application/x-ndjson",
+            BatchMode::Raw => "application/octet-stream",
+        }
+    }
+}
+
+/// Metadata envelope wrapping a payload with Iggy message metadata.
+#[derive(Debug, Serialize)]
+struct MetadataEnvelope {
+    metadata: IggyMetadata,
+    payload: serde_json::Value,
+}
+
+/// Iggy message metadata fields.
+#[derive(Debug, Serialize)]
+struct IggyMetadata {
+    iggy_id: String,
+    iggy_offset: u64,
+    iggy_timestamp: u64,
+    iggy_stream: String,
+    iggy_topic: String,
+    iggy_partition_id: u32,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    iggy_checksum: Option<u64>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    iggy_origin_timestamp: Option<u64>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    iggy_headers: Option<serde_json::Map<String, serde_json::Value>>,
+}
+
+/// Binary payload with base64 encoding marker.
+#[derive(Debug, Serialize)]
+struct EncodedPayload {
+    data: String,
+    iggy_payload_encoding: &'static str,
+}
+
+/// Binary header value with base64 encoding marker.
+#[derive(Debug, Serialize)]
+struct EncodedHeader {
+    data: String,
+    iggy_header_encoding: &'static str,
+}
+
+/// Configuration for the HTTP sink connector, deserialized from 
[plugin_config] in config.toml.
+#[derive(Debug, Serialize, Deserialize)]
+pub struct HttpSinkConfig {
+    /// Target URL for HTTP requests (required).
+    pub url: String,
+    /// HTTP method (default: POST).
+    pub method: Option<HttpMethod>,
+    /// Request timeout as a human-readable duration string, e.g. "30s" 
(default: 30s).
+    pub timeout: Option<String>,
+    /// Maximum HTTP body size in bytes (default: 10MB). Set to 0 to disable.
+    pub max_payload_size_bytes: Option<u64>,
+    /// Custom HTTP headers.
+    pub headers: Option<HashMap<String, String>>,
+    /// Payload formatting mode (default: individual).
+    pub batch_mode: Option<BatchMode>,
+    /// Include Iggy metadata envelope in payload (default: true).
+    pub include_metadata: Option<bool>,
+    /// Include message checksum in metadata (default: false).
+    pub include_checksum: Option<bool>,
+    /// Include origin timestamp in metadata (default: false).
+    pub include_origin_timestamp: Option<bool>,
+    /// Enable health check request in open() (default: false).
+    pub health_check_enabled: Option<bool>,
+    /// HTTP method for health check (default: HEAD).
+    pub health_check_method: Option<HttpMethod>,
+    /// Maximum number of retries for transient errors (default: 3).
+    pub max_retries: Option<u32>,
+    /// Retry delay as a human-readable duration string, e.g. "1s" (default: 
1s).
+    pub retry_delay: Option<String>,
+    /// Backoff multiplier for exponential retry delay (default: 2).
+    pub retry_backoff_multiplier: Option<u32>,
+    /// Maximum retry delay cap as a human-readable duration string (default: 
30s).
+    pub max_retry_delay: Option<String>,
+    /// HTTP status codes considered successful (default: [200, 201, 202, 
204]).
+    pub success_status_codes: Option<Vec<u16>>,
+    /// Accept invalid TLS certificates (default: false). Named to signal 
danger.
+    pub tls_danger_accept_invalid_certs: Option<bool>,
+    /// Maximum idle connections per host (default: 10).
+    pub max_connections: Option<usize>,
+    /// Enable verbose request/response logging (default: false).
+    pub verbose_logging: Option<bool>,
+}
+
+/// HTTP sink connector that delivers consumed messages to any HTTP endpoint.
+///
+/// Lifecycle: `new()` → `open()` → `consume()` (repeated) → `close()`.
+/// The `reqwest::Client` is built in `open()` (not `new()`) so that 
config-derived
+/// settings (timeout, TLS, connection pool) are applied. This matches the
+/// MongoDB/Elasticsearch/PostgreSQL sink initialization pattern.
+#[derive(Debug)]
+pub struct HttpSink {
+    id: u32,
+    url: String,
+    method: HttpMethod,
+    timeout: Duration,
+    max_payload_size_bytes: u64,
+    headers: HashMap<String, String>,
+    batch_mode: BatchMode,
+    include_metadata: bool,
+    include_checksum: bool,
+    include_origin_timestamp: bool,
+    health_check_enabled: bool,
+    health_check_method: HttpMethod,
+    max_retries: u32,
+    retry_delay: Duration,
+    retry_backoff_multiplier: u32,
+    max_retry_delay: Duration,
+    success_status_codes: HashSet<u16>,
+    tls_danger_accept_invalid_certs: bool,
+    max_connections: usize,
+    verbose: bool,
+    /// Pre-built HTTP headers (excluding Content-Type). Built once in 
`open()` from validated
+    /// `self.headers`, reused for every request. `None` before `open()` is 
called.
+    request_headers: Option<reqwest::header::HeaderMap>,
+    /// Initialized in `open()` with config-derived settings. `None` before 
`open()` is called.
+    client: Option<ClientWithMiddleware>,
+    send_attempts: AtomicU64,
+    messages_delivered: AtomicU64,
+    errors_count: AtomicU64,
+    /// Epoch seconds of last successful HTTP request.
+    last_success_timestamp: AtomicU64,
+}
+
+impl HttpSink {
+    pub fn new(id: u32, config: HttpSinkConfig) -> Self {
+        let url = config.url;
+        let method = config.method.unwrap_or_default();
+        let timeout = parse_duration(config.timeout.as_deref(), 
DEFAULT_TIMEOUT);
+        let max_payload_size_bytes = config
+            .max_payload_size_bytes
+            .unwrap_or(DEFAULT_MAX_PAYLOAD_SIZE);
+        let headers = config.headers.unwrap_or_default();
+        let batch_mode = config.batch_mode.unwrap_or_default();
+        let include_metadata = config.include_metadata.unwrap_or(true);
+        let include_checksum = config.include_checksum.unwrap_or(false);
+        let include_origin_timestamp = 
config.include_origin_timestamp.unwrap_or(false);
+        let health_check_enabled = 
config.health_check_enabled.unwrap_or(false);
+        let health_check_method = 
config.health_check_method.unwrap_or(HttpMethod::Head);
+        let max_retries = config.max_retries.unwrap_or(DEFAULT_MAX_RETRIES);
+        let mut retry_delay = parse_duration(config.retry_delay.as_deref(), 
DEFAULT_RETRY_DELAY);
+        let retry_backoff_multiplier = config
+            .retry_backoff_multiplier
+            .unwrap_or(DEFAULT_BACKOFF_MULTIPLIER)
+            .max(1);
+        let mut max_retry_delay =
+            parse_duration(config.max_retry_delay.as_deref(), 
DEFAULT_MAX_RETRY_DELAY);
+        let success_status_codes: HashSet<u16> = config
+            .success_status_codes
+            .unwrap_or_else(|| vec![200, 201, 202, 204])
+            .into_iter()
+            .collect();
+        let tls_danger_accept_invalid_certs =
+            config.tls_danger_accept_invalid_certs.unwrap_or(false);
+        let max_connections = 
config.max_connections.unwrap_or(DEFAULT_MAX_CONNECTIONS);
+        let verbose = config.verbose_logging.unwrap_or(false);
+
+        if retry_delay > max_retry_delay {
+            warn!(
+                "HTTP sink ID: {} — retry_delay ({:?}) exceeds max_retry_delay 
({:?}). \
+                 Swapping values to prevent ExponentialBackoff panic.",
+                id, retry_delay, max_retry_delay,
+            );
+            std::mem::swap(&mut retry_delay, &mut max_retry_delay);
+        }
+
+        if tls_danger_accept_invalid_certs {
+            warn!(
+                "HTTP sink ID: {} — tls_danger_accept_invalid_certs is 
enabled. \
+                 TLS certificate validation is DISABLED.",
+                id
+            );
+        }
+
+        if batch_mode == BatchMode::Raw && include_metadata {
+            warn!(
+                "HTTP sink ID: {} — batch_mode=raw ignores include_metadata. \
+                 Raw mode sends payload bytes directly without metadata 
envelope.",
+                id
+            );
+        }
+
+        if matches!(method, HttpMethod::Get | HttpMethod::Head)
+            && batch_mode != BatchMode::Individual
+        {
+            warn!(
+                "HTTP sink ID: {} — {:?} with batch_mode={:?} will send a 
request body. \
+                 Some servers may reject GET/HEAD requests with a body.",
+                id, method, batch_mode,
+            );
+        }
+
+        HttpSink {
+            id,
+            url,
+            method,
+            timeout,
+            max_payload_size_bytes,
+            headers,
+            batch_mode,
+            include_metadata,
+            include_checksum,
+            include_origin_timestamp,
+            health_check_enabled,
+            health_check_method,
+            max_retries,
+            retry_delay,
+            retry_backoff_multiplier,
+            max_retry_delay,
+            success_status_codes,
+            tls_danger_accept_invalid_certs,
+            max_connections,
+            verbose,
+            request_headers: None,
+            client: None,
+            send_attempts: AtomicU64::new(0),
+            messages_delivered: AtomicU64::new(0),
+            errors_count: AtomicU64::new(0),
+            last_success_timestamp: AtomicU64::new(0),
+        }
+    }
+
+    /// Build the `reqwest::Client` wrapped with retry and tracing middleware.
+    fn build_client(&self) -> Result<ClientWithMiddleware, Error> {
+        let raw_client = reqwest::Client::builder()
+            .timeout(self.timeout)
+            .pool_max_idle_per_host(self.max_connections)
+            
.pool_idle_timeout(Duration::from_secs(DEFAULT_POOL_IDLE_TIMEOUT_SECS))
+            .tcp_keepalive(Duration::from_secs(DEFAULT_TCP_KEEPALIVE_SECS))
+            .danger_accept_invalid_certs(self.tls_danger_accept_invalid_certs)
+            .build()
+            .map_err(|e| Error::InitError(format!("Failed to build HTTP 
client: {}", e)))?;
+
+        let retry_policy = ExponentialBackoff::builder()
+            .retry_bounds(self.retry_delay, self.max_retry_delay)
+            .base(self.retry_backoff_multiplier)
+            .build_with_max_retries(self.max_retries);
+
+        let retry_strategy = HttpSinkRetryStrategy {
+            success_status_codes: self.success_status_codes.clone(),
+        };
+
+        let retry_middleware =
+            
RetryTransientMiddleware::new_with_policy_and_strategy(retry_policy, 
retry_strategy);
+
+        Ok(ClientBuilder::new(raw_client)
+            .with(TracingMiddleware::default())
+            .with(retry_middleware)
+            .build())
+    }
+
+    /// Returns the initialized HTTP client, or an error if `open()` was not 
called.
+    fn client(&self) -> Result<&ClientWithMiddleware, Error> {
+        self.client.as_ref().ok_or_else(|| {
+            Error::InitError("HTTP client not initialized — was open() 
called?".to_string())
+        })
+    }
+
+    /// Convert a `Payload` to a JSON value for metadata wrapping.
+    /// Non-JSON payloads are base64-encoded with a `iggy_payload_encoding` 
marker.
+    ///
+    /// Note: All current `Payload` variants produce infallible conversions.
+    /// The `Result` return type exists as a safety net for future variants.
+    fn payload_to_json(&self, payload: Payload) -> Result<serde_json::Value, 
Error> {
+        match payload {
+            Payload::Json(value) => {
+                // Direct structural conversion (not serialization roundtrip).
+                // Follows the Elasticsearch sink pattern. NaN/Infinity f64 → 
null.
+                Ok(owned_value_to_serde_json(&value))
+            }
+            Payload::Text(text) => Ok(serde_json::Value::String(text)),
+            Payload::Raw(bytes) | Payload::FlatBuffer(bytes) => {
+                let encoded = EncodedPayload {
+                    data: general_purpose::STANDARD.encode(&bytes),
+                    iggy_payload_encoding: ENCODING_BASE64,
+                };
+                serde_json::to_value(encoded)
+                    .map_err(|e| Error::Serialization(format!("EncodedPayload: 
{}", e)))
+            }
+            Payload::Proto(proto_str) => {
+                let encoded = EncodedPayload {
+                    data: 
general_purpose::STANDARD.encode(proto_str.as_bytes()),
+                    iggy_payload_encoding: ENCODING_BASE64,
+                };
+                serde_json::to_value(encoded)
+                    .map_err(|e| Error::Serialization(format!("EncodedPayload: 
{}", e)))
+            }
+        }
+    }
+
+    /// Build a message envelope with optional metadata wrapping.
+    fn build_envelope(
+        &self,
+        message: &ConsumedMessage,
+        topic_metadata: &TopicMetadata,
+        messages_metadata: &MessagesMetadata,
+        payload_json: serde_json::Value,
+    ) -> Result<serde_json::Value, Error> {
+        if !self.include_metadata {
+            return Ok(payload_json);
+        }
+
+        let headers_map = if let Some(ref headers) = message.headers
+            && !headers.is_empty()
+        {
+            let map: serde_json::Map<String, serde_json::Value> = headers
+                .iter()
+                .map(|(k, v)| {
+                    // Raw bytes: base64-encode to avoid Rust debug format in 
JSON output.
+                    // as_raw() returns Ok only for HeaderKind::Raw.
+                    let value = if let Ok(raw) = v.as_raw() {
+                        let encoded = EncodedHeader {
+                            data: general_purpose::STANDARD.encode(raw),
+                            iggy_header_encoding: ENCODING_BASE64,
+                        };
+                        serde_json::to_value(encoded)
+                            .map_err(|e| 
Error::Serialization(format!("EncodedHeader: {}", e)))?
+                    } else {
+                        serde_json::Value::String(v.to_string_value())
+                    };
+                    Ok((k.to_string_value(), value))
+                })
+                .collect::<Result<serde_json::Map<String, serde_json::Value>, 
Error>>()?;
+            Some(map)
+        } else {
+            None
+        };
+
+        let metadata = IggyMetadata {
+            iggy_id: format_u128_as_hex(message.id),
+            iggy_offset: message.offset,
+            iggy_timestamp: message.timestamp,
+            iggy_stream: topic_metadata.stream.clone(),
+            iggy_topic: topic_metadata.topic.clone(),
+            iggy_partition_id: messages_metadata.partition_id,
+            iggy_checksum: if self.include_checksum {
+                Some(message.checksum)
+            } else {
+                None
+            },
+            iggy_origin_timestamp: if self.include_origin_timestamp {
+                Some(message.origin_timestamp)
+            } else {
+                None
+            },
+            iggy_headers: headers_map,
+        };
+
+        let envelope = MetadataEnvelope {
+            metadata,
+            payload: payload_json,
+        };
+
+        serde_json::to_value(envelope)

Review Comment:
   Addressed. Initially added `build_envelope_vec` and 
`build_envelope_to_writer` to eliminate the `Value` intermediate, but 6-agent 
review flagged the triple maintenance surface as a higher risk than the 
~microsecond serialization overhead. Collapsed back to single `build_envelope` 
entry point — callers choose their serialization format (`to_vec` for 
individual, `to_writer` for ndjson, `Value` directly for json_array). 
`build_iggy_metadata` remains as the shared helper. The performance difference 
is negligible against HTTP round-trip cost.



##########
core/connectors/sinks/http_sink/src/lib.rs:
##########
@@ -0,0 +1,2069 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use async_trait::async_trait;
+use base64::Engine;
+use base64::engine::general_purpose;
+use bytes::Bytes;
+use humantime::Duration as HumanDuration;
+use iggy_connector_sdk::{
+    ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata,
+    convert::owned_value_to_serde_json, sink_connector,
+};
+use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
+use reqwest_retry::{
+    RetryTransientMiddleware, Retryable, RetryableStrategy, 
policies::ExponentialBackoff,
+};
+use reqwest_tracing::TracingMiddleware;
+use serde::{Deserialize, Serialize};
+use std::collections::{HashMap, HashSet};
+use std::str::FromStr;
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use tracing::{debug, error, info, warn};
+
+sink_connector!(HttpSink);
+
+const DEFAULT_TIMEOUT: &str = "30s";
+const DEFAULT_RETRY_DELAY: &str = "1s";
+const DEFAULT_MAX_RETRY_DELAY: &str = "30s";
+const DEFAULT_MAX_RETRIES: u32 = 3;
+const DEFAULT_BACKOFF_MULTIPLIER: u32 = 2;
+const DEFAULT_MAX_PAYLOAD_SIZE: u64 = 10 * 1024 * 1024; // 10 MB
+const DEFAULT_MAX_CONNECTIONS: usize = 10;
+/// TCP keep-alive interval for detecting dead connections behind load 
balancers.
+/// Cloud LBs silently drop idle connections (AWS ALB ~60s, GCP ~600s);
+/// probing at 30s detects these before requests fail.
+const DEFAULT_TCP_KEEPALIVE_SECS: u64 = 30;
+/// Close pooled connections unused for this long. Prevents stale connections
+/// from accumulating when traffic is bursty.
+const DEFAULT_POOL_IDLE_TIMEOUT_SECS: u64 = 90;
+/// Abort remaining messages in individual/raw mode after this many 
consecutive HTTP failures.
+/// Prevents hammering a dead endpoint with N sequential retry cycles per poll.
+const MAX_CONSECUTIVE_FAILURES: u32 = 3;
+
+const ENCODING_BASE64: &str = "base64";
+
+/// HTTP method enum — validated at deserialization, prevents invalid values 
like "DELEET" or "GETX".
+#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "UPPERCASE")]
+pub enum HttpMethod {
+    Get,
+    Head,
+    #[default]
+    Post,
+    Put,
+    Patch,
+    Delete,
+}
+
+/// Payload formatting mode for HTTP requests.
+#[derive(
+    Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, 
strum_macros::Display,
+)]
+#[serde(rename_all = "snake_case")]
+pub enum BatchMode {
+    /// One HTTP request per message (default). Note: with batch_length=50, 
this produces 50
+    /// sequential HTTP round trips per poll cycle. Use ndjson or json_array 
for higher throughput.
+    #[default]
+    #[strum(to_string = "individual")]
+    Individual,
+    /// All messages in one request, newline-delimited JSON.
+    #[strum(to_string = "NDJSON")]
+    NdJson,
+    /// All messages as a single JSON array.
+    #[strum(to_string = "JSON array")]
+    JsonArray,
+    /// Raw bytes, one request per message (for non-JSON payloads).
+    #[strum(to_string = "raw")]
+    Raw,
+}
+
+impl BatchMode {
+    /// Determine the Content-Type header based on batch mode.
+    fn content_type(&self) -> &'static str {
+        match self {
+            BatchMode::Individual | BatchMode::JsonArray => "application/json",
+            BatchMode::NdJson => "application/x-ndjson",
+            BatchMode::Raw => "application/octet-stream",
+        }
+    }
+}
+
+/// Metadata envelope wrapping a payload with Iggy message metadata.
+#[derive(Debug, Serialize)]
+struct MetadataEnvelope {
+    metadata: IggyMetadata,
+    payload: serde_json::Value,
+}
+
+/// Iggy message metadata fields.
+#[derive(Debug, Serialize)]
+struct IggyMetadata {

Review Comment:
   Added a doc comment on `IggyMetadata` documenting the intentional 
divergence: HTTP sink uses `iggy_*` (standard JSON convention), ES sink uses 
`_iggy_*` (ES internal field convention). Both are defensible for their 
respective ecosystems. Agree a follow-up to unify would be valuable but it 
touches another component — happy to do it as a separate PR if the team has a 
preferred convention.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to