Hi Sachin, Not sure I understand all the aspects of your use-case but let me explain coupld of things.
You've identified a real limitation with the approach you're describing. However the scenario you described is not likely to occur the way you think. The key misunderstanding is about how Flink handles failures and restarts: - Key partitioning is deterministic: When a TaskManager restarts after failure, Flink restores from the last successful checkpoint. The key-to-task assignment (via key groups) is deterministic based on your parallelism and key hash. So the same key will always route to the same task instance (though potentially on a different TaskManager). - State is restored: When recovering from a checkpoint, your hash state for each key is restored exactly as it was at checkpoint time. So if a key had certain hashes stored before the failure, those will be present after recovery. - Upstream replay: After recovery, Kafka (or other sources) will replay from the last checkpointed offset. These records will be routed to the same keyed tasks that processed them before, where the state was restored. However, there is a deduplication gap. The real issue is the window between the last checkpoint and the failure: - Records processed after checkpoint N but before the failure - These records' hashes were stored in state - But that state wasn't checkpointed yet - After recovery from checkpoint N, this state is lost - When Kafka replays these records, they'll be processed again as "new" This is expected behavior and relates to Flink's guarantees: - Flink provides at-least-once delivery by default (with checkpointing enabled) - For exactly-once semantics, you need both: - Exactly-once sources (Kafka with checkpointing) - Exactly-once sinks (transactional sinks or idempotent writes) Solutions for true deduplication: 1. Use an external deduplication store (Redis, database) with idempotent writes which survives Flink restarts but adds latency and external dependency 2. Use Flink's two-phase commit sinks for exactly-once end-to-end: - Only commit downstream writes when checkpoint completes - Records between checkpoints never reach downstream on failure - Supported by Kafka sink, JDBC with XA, etc. 3. Make downstream idempotent: Design downstream to handle duplicates (e.g. upserts with unique keys) Recommendation: If your downstream system is Kafka, use Flink's KafkaSink with exactly-once delivery mode. If it's a database, use transactions or unique constraints. State-based deduplication alone cannot provide exactly-once guarantees across failures. Hope this helps. BR, G
