Hi all, I was also looking into this because I have a use case in which topics could change.
This was previously discussed on the user mailing list (https://lists.apache.org/thread/ys3171grvbw843lfjph8gcnoydqcwq29 <https://lists.apache.org/thread/ys3171grvbw843lfjph8gcnoydqcwq29>) and this is the JIRA ticket that was created to track exposing the KafkaSubscriber: https://issues.apache.org/jira/browse/FLINK-24660 <https://issues.apache.org/jira/browse/FLINK-24660>. I was working on it and thought of some issues regarding the topics changing—since the Kafka subscriber is used to discover the current splits, it could be possible that some previously consumed topics should not be consumed anymore, requiring a split removal mechanism. I talked with Qingsheng about this here: https://lists.apache.org/thread/3wxfr39t2rz1wvxw2vsz5hsrp9t8qrwx <https://lists.apache.org/thread/3wxfr39t2rz1wvxw2vsz5hsrp9t8qrwx> (there is a simple example where splits would need to be removed, because user changes the topic list while the reader state reflects the old topic list). I think there needs to be some sort of filtering logic on splits, where the readers wait for enumerator to assign splits (although readers may have some state) and split removal mechanism via enumerator rpc. Best, Mason On 2021/12/01 16:48:12 Till Rohrmann wrote: > Hi Etienne, > > I think this is a reasonable request. The question we need to answer is > what kind of stability guarantees we want to provide for the > KafkaSubscriber interface. Once this is done, I am sure that we can also > expose this part for advanced use cases. I am pulling in Arvid and Fabian > who are working on these parts. > > Cheers, > Till > > On Wed, Dec 1, 2021 at 3:10 PM CARRIERE Etienne > <et...@socgen.com.invalid> wrote: > > > Hello, > > > > Thank you for the work on dynamic topic/partition topology possible with > > KafkaSource which permit to do a lot of things (tested in 1.14) . But, in > > our organization, we have a custom need to have the list of topic defined > > from an external source (SQL database in our case). > > We plan to write a custom KafkaSubscriber< > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java> > > that will implement our logic. > > Unfortunately, we can't inject our custom subscriber in KafkaSourceBuilder > > => the builder has only methods for standard subscriber nor in KafkaSource > > as the constructor is not public. > > > > What can be a good method to implement this need ? > > Would a PR which add "advanced" method to Builder or which put the > > KafkaSource as public be accepted ? > > > > Thanks in advance, > > > > Regards, > > > > Etienne Carrière > > ========================================================= > > > > Ce message et toutes les pieces jointes (ci-apres le "message") > > sont confidentiels et susceptibles de contenir des informations > > couvertes par le secret professionnel. Ce message est etabli > > a l'intention exclusive de ses destinataires. Toute utilisation > > ou diffusion non autorisee interdite. > > Tout message electronique est susceptible d'alteration. La SOCIETE GENERALE > > et ses filiales declinent toute responsabilite au titre de ce message > > s'il a ete altere, deforme falsifie. > > > > ========================================================= > > > > This message and any attachments (the "message") are confidential, > > intended solely for the addresses, and may contain legally privileged > > information. Any unauthorized use or dissemination is prohibited. > > E-mails are susceptible to alteration. Neither SOCIETE GENERALE nor any > > of its subsidiaries or affiliates shall be liable for the message > > if altered, changed or falsified. > > > > ========================================================= > > >