Hi Matyas, Sorry for the late reply. Dynamic Kafka Source has been introduced for a long time, and it is a little confusing for developers like me that KafkaDynamicSource and DynamicKafkaSource have the same kafka connector repo for different things. Now DynamicKafkaSink will be introduced again. I wonder: 1. Is there any plan to merge this code into one class? 2. Is there any plan to expose them to SQL?
Best Hongshun On Wed, Mar 26, 2025 at 2:48 PM Arvid Heise <ahe...@confluent.io.invalid> wrote: > Hi Matyas, > > thanks for clarifying. The design LGTM. > > Based on your description, I'd phrase the state management a bit > differently: > * The source of truth is and always will be the MDS. > * The response is cached/replicated in Flink state to bridge MDS downtime. > * Since MDS is queried periodically, state is reconsolidated async to > reflect the latest changes. > > My main confusion came from the fact that there are seemingly two > sources of truth in the document. I take that the writer will never > update the MDS but the MDS will update the writer. > > Best, > > Arvid > > On Mon, Mar 24, 2025 at 9:22 PM Őrhidi Mátyás <matyas.orh...@gmail.com> > wrote: > > > > Hi Arvin, > > > > Thank you for taking time to look at this proposal. > > > > The concept of KafkaMetadataService is not new, it has been introduced > with > > Dynamic Kafka Source already: > > > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/dynamic-kafka/#kafka-metadata-service > > > > This is an external component often hosted by Kafka infra teams (often > as a > > REST API). In essence, the Kafka Metadata Service (MDS) removes the > burden > > of managing Kafka connection details and infrastructure changes from the > > Flink developer. > > > > - *No Hardcoding:* Instead of hardcoding Kafka broker addresses, topic > > names, and other details directly into your Flink code, you can fetch > them > > dynamically from the MDS. > > - *Dynamic Changes:* If the Kafka cluster changes (e.g., a broker is > added > > or removed, a topic is moved), the MDS reflects these changes. Your Flink > > application, using the MDS, can automatically adapt without manual > > intervention or redeployment. > > - *Environment Agnostic:* You can easily switch between development, > > testing, and production environments because the MDS provides the correct > > Kafka details for each environment. Your code doesn't need > > environment-specific configuration. > > - *Easier Topic Management:* The MDS helps you discover available topics > > and streams. You can find the right topic for your data without needing > to > > know the exact Kafka cluster details. You use a logical "stream ID," and > > the MDS tells you the corresponding Kafka topic(s). > > > > Flink state becomes the "source of truth" after fetching this metadata > from > > the MDS, the initial source of truth is the MDS. The planned > > DynamicKafkaWriter can cache and use Flink's state management to store > the > > mapping between logical streams and physical Kafka clusters/topics. The > > writer uses a background thread and a reconciliation process to > dynamically > > update this state based on information from the MDS. This state is > > checkpointed for fault tolerance and becomes the "source of truth." when > > recovering its internal state. > > > > I hope this clarifies. Will fix the copy paste errors soon. > > > > Cheers, > > Matyas > > > > > > On Wed, Mar 19, 2025 at 8:13 AM Arvid Heise <ar...@apache.org> wrote: > > > > > Hi Matyas, > > > > > > could you please provide more details on the KafkaMetadataService? > > > Where does it live? How does it work? How does "Metadata state will be > > > stored in Flink state as the source of truth for the Flink job" work? > > > > > > Also nit: the test plan contains copy&paste errors. > > > > > > Best, > > > > > > Arvid > > > > > > On Thu, Mar 13, 2025 at 10:04 PM Őrhidi Mátyás < > matyas.orh...@gmail.com> > > > wrote: > > > > > > > > Hi devs, > > > > > > > > I'd like to start a discussion on FLIP-515: Dynamic Kafka Sink [1]. > This > > > is > > > > an addition to the existing Dynamic Kafka Source [2] to make the > > > > functionality complete. > > > > > > > > Feel free to share your thoughts and suggestions to make this feature > > > > better. > > > > > > > > + Mason Chen > > > > > > > > Thanks, > > > > Matyas > > > > > > > > [1] > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-515%3A+Dynamic+Kafka+Sink > > > > > > > > [2] > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320 > > > >