spetz commented on code in PR #2925: URL: https://github.com/apache/iggy/pull/2925#discussion_r2986278048
########## core/connectors/sinks/http_sink/src/lib.rs: ########## @@ -0,0 +1,2121 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use async_trait::async_trait; +use base64::Engine; +use base64::engine::general_purpose; +use bytes::Bytes; +use humantime::Duration as HumanDuration; +use iggy_connector_sdk::{ + ConsumedMessage, Error, MessagesMetadata, Payload, Sink, TopicMetadata, + convert::owned_value_to_serde_json, sink_connector, +}; +use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; +use std::str::FromStr; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tracing::{debug, error, info, warn}; + +sink_connector!(HttpSink); + +const DEFAULT_TIMEOUT: &str = "30s"; +const DEFAULT_RETRY_DELAY: &str = "1s"; +const DEFAULT_MAX_RETRY_DELAY: &str = "30s"; +const DEFAULT_MAX_RETRIES: u32 = 3; +const DEFAULT_BACKOFF_MULTIPLIER: f64 = 2.0; +const DEFAULT_MAX_PAYLOAD_SIZE: u64 = 10 * 1024 * 1024; // 10 MB +const DEFAULT_MAX_CONNECTIONS: usize = 10; +/// TCP keep-alive interval for detecting dead connections behind load balancers. +/// Cloud LBs silently drop idle connections (AWS ALB ~60s, GCP ~600s); +/// probing at 30s detects these before requests fail. +const DEFAULT_TCP_KEEPALIVE_SECS: u64 = 30; +/// Close pooled connections unused for this long. Prevents stale connections +/// from accumulating when traffic is bursty. +const DEFAULT_POOL_IDLE_TIMEOUT_SECS: u64 = 90; +/// Abort remaining messages in individual/raw mode after this many consecutive HTTP failures. +/// Prevents hammering a dead endpoint with N sequential retry cycles per poll. +const MAX_CONSECUTIVE_FAILURES: u32 = 3; + +/// HTTP method enum — validated at deserialization, prevents invalid values like "DELEET" or "GETX". +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "UPPERCASE")] +pub enum HttpMethod { + Get, + Head, + #[default] + Post, + Put, + Patch, + Delete, +} + +/// Payload formatting mode for HTTP requests. +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum BatchMode { + /// One HTTP request per message (default). Note: with batch_length=50, this produces 50 + /// sequential HTTP round trips per poll cycle. Use ndjson or json_array for higher throughput. + #[default] + Individual, + /// All messages in one request, newline-delimited JSON. + Ndjson, Review Comment: Should it be `NdJson` (`nd_json`) instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
