Hi everyone, thank you for reviewing the FLIP. > 1. Manually specifying the topic ID seems to be > a bit costly for users. Why does it require flink users to pass the topic > id > manually or explicitly? It would be helpful for users if flink source or > sink > are able to fetch topic id automatically in the beginning, right?
Thanks Rui for your suggestion. Topic ID auto-retrieval indeed sounds like a better UX, I will update the FLIP accordingly However a downside to be aware of is that topic names to IDs mapping must be checkpointed to retain the same ID across recoveries, inflicting statefulness on so-far stateless operators like the non-transactional sink. > 2. On the Sink side, the proposal uses "setPartitionDiscoveryIntervalMs". > Since "Partition Discovery" in Flink implies Source behavior (finding > splits), and the Sink is doing a metadata update, have you considered > "setMetadataRefreshIntervalMs"? Maybe that aligns better with the > underlying Kafka Client terminology (Metadata Fetch/Lookup/Refresh). > introducing “Partition Discovery” for sinks feels conceptually inconsistent with Flink’s existing design. Thanks Aleksandr and Leonard The concept of partition discovery isn’t tied with source. It was introduced to address the increase of partitions count, which may happen on both source and sink topics. Assigning splits to the newly found partitions is a source implementation detail, and is also not on the docs. I decided to advertise the feature as "sink partition discovery" to help folks already familiar with the concept better understand the implications. What do you think? > 3. Is it correct that enabling partition discovery is strictly required for > the integrity check? The integrity verification is performed upon metadata refresh. On both source and sink, disabling partition discovery means metadata isn't refreshed, so integrity check will be performed once on startup. > 4. The disclaimer mentions an "inevitable short period of time" where the > job reads/writes to the new topic before detection. If a checkpoint > completes successfully during this window, do we risk to have a corrupted > state? Is this a known limitation? The risk of data corruption upon topic recreation is discussed on the FLIP. We can not save users from themselves. After all, if they decide to recreate a topic, the data is lost. Indeed the proposed check can not guarantee 100% protection due to its periodical nature. Nevertheless the job will fail shortly after, and jobs restored from that checkpoint will not be allowed to start. For more sensitive jobs you could obviously decrease the interval, (trading performance) > 5. Will there be metrics exposed for monitoring the integrity checks? We will not measure integrity checks, as a failure immediately triggers a global unrecoverable failure. Efrat On Tue, 20 Jan 2026 at 05:58, Leonard Xu <[email protected]> wrote: > Hi Efrat, > > Thanks for kicking off this discussion — I’m also in favor of adding a > check for the Kafka Connector. > > > > My only concern is that manually specifying the topic ID seems to be > > a bit costly for users. Why does it require flink users to pass the topic > > id > > manually or explicitly? It would be helpful for users if flink source or > > sink > > are able to fetch topic id automatically in the beginning, right? > > I share Rui’s concern about requiring users to manually specify the > topicId. Given that topicId-based communication isn’t part of Kafka’s > current roadmap (as noted in your “Future Plan” section) and is starting > available in Kafka 4.0–4.2, it seems premature—and potentially > burdensome—to expose it in Flink’s user-facing API. Ideally, if needed at > all, the source or sink should be able to resolve the topicId automatically > during initialization. > > Additionally, I agree with Aleksandr that introducing “Partition > Discovery” for sinks feels conceptually inconsistent with Flink’s existing > design. > > Best, > Leonard > > > > > On Thu, Jan 15, 2026 at 12:38 PM Efrat Levitan <[email protected]> > > wrote: > > > >> Hi everyone, I'd like to start a discussion on FLIP-562 [1] to implement > >> topic integrity checks on kafka connector, as currently the connector is > >> blind to topic recreations, presenting risks to job consistency. > >> > >> Kafka APIs traditionally rely on topic names, which do not guarantee > >> uniqueness over time. > >> Though both KIP-516 [2] and KIP-848 [3] discuss topicId based > >> communication, client support is not on the roadmap [4]. > >> > >> The FLIP contains the new proposal to make flink kafka connector > sensitive > >> to topicId changes through integrity checks over the periodical metadata > >> fetching (AKA topic partition discovery), and sets the grounds for > future > >> topicId based communication with both the user and kafka server. > >> > >> I'd appreciate your feedback on the proposed changes. > >> > >> Thanks, > >> Efrat. > >> > >> [1] > >> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-562%3A+Topic+integrity+checks+in+Kafka+Connector > >> > >> [2] > >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers > >> > >> [3] > >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol > >> > >> [4] > >> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers/#KIP516:TopicIdentifiers-Clients > >> > >
