hubcio commented on issue #3062: URL: https://github.com/apache/iggy/issues/3062#issuecomment-4229478979
hey @ryerraguntla, thanks for the detailed writeup. i reviewed this and here's where i landed. the 4-layer `InfluxClient` trait approach is over-engineered for what's actually different between v2 and v3. i'd keep the existing 2 crates - one `influxdb_sink.so`, one `influxdb_source.so` - both supporting v2 and v3 via a `version` field in config. no new crates, no unified trait. users deploy the same plugin and just flip the config. the sink and source have very different divergence profiles and should be structured asymmetrically. on the **sink** side, v2/v3 differences are tiny (~30 lines: URL path, auth prefix, precision strings). the entire LP construction, batching, circuit breaker, and `process_batch()` orchestration is identical. a simple `InfluxVersion` enum matched in `build_write_url()` and auth header construction is enough. no need for separate modules. on the **source** side, v2/v3 differences are fundamental - different query languages (Flux vs SQL), different response formats (CSV vs JSONL), different dedup strategies, different injection prevention (regex validation for Flux vs parameterized queries for SQL). these need separate `v2.rs`/`v3.rs` modules with a `common.rs` for shared bits (HTTP client, circuit breaker, `PayloadFormat`). ### config use `#[serde(tag = "version")]` with separate variant structs so illegal states (v3 + org + bucket) are unrepresentable. v2 configs must include `version = "v2"` explicitly - no backward compat dance, this is a breaking change (i think the influxdb connector has no users yet). ```toml # v2 [plugin_config] version = "v2" url = "http://localhost:8086" org = "iggy" bucket = "events" token = "..." # v3 [plugin_config] version = "v3" url = "http://localhost:8086" database = "events" token = "..." ``` ### state persistence (important) the v2 source state has `cursor_row_count` for `>= $cursor` + skip-N dedup. this is **provably unsafe on v3** - DataFusion/Parquet doesn't guarantee stable sub-timestamp row ordering, so skip-N would silently skip different rows than the ones already delivered. v3 must use strict `> cursor` with no skip-N. fix: version-tag the persisted state as an enum (`PersistedState::V2 { ..., cursor_row_count }` / `PersistedState::V3 { ... }`). on startup, if config version doesn't match state version, refuse to start. since there are no existing deployments to migrate, we can just switch to the versioned format directly. ### v3 stuck-timestamp handling with strict `> cursor`, if many rows share one timestamp and the batch is smaller than the row count at that timestamp, the source gets stuck. handling: 1. detect when full batch shares a single cursor timestamp 2. deliver the batch, refuse to advance cursor (at-least-once must not become at-most-once) 3. auto-inflate batch size on consecutive stuck polls (bounded by configurable cap, default 10x) 4. trip circuit breaker if cap reached, log ERROR with actionable guidance ### other notes - **ship v3 sink without `accept_partial=true`** - `Sink::consume()` returns binary `Result<(), Error>`. add 207 support in a follow-up after the SDK trait evolves. - **version is immutable per connector lifetime** - set once from config, never changes at runtime. ### phasing 1. **phase 1**: versioned `PersistedState` enum + v3 sink (enum dispatch, no `accept_partial`) + v3 source (SQL/JSONL, strict `>` cursor, stuck-timestamp detection) 2. **phase 2** (future): opt-in `accept_partial = true` for v3 sink i'd love to see a PR following this structure. let me know if anything is unclear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
