Re: [DISCUSS] Contribution of Multi Cluster Kafka Source
Hi all, Thanks for the feedback! I'm adding the users, who responded in the user mailing list, to this thread. @Qingsheng - Yes, I would prefer to reuse the existing Kafka connector module. It makes a lot of sense since the dependencies are the same and the implementation can also extend and improve some of the test utilities you have been working on for the FLIP 27 Kafka Source. I will enumerate the migration steps in the FLIP template. @Ryan - I don't have a public branch available yet, but I would appreciate your review on the FLIP design! When the FLIP design is approved by devs and the community, I can start to commit our implementation to a fork. @Andrew - Yup, one of the requirements of the connector is to read multiple clusters within a single source, so it should be able to work well with your use case. @Devs - what do I need to get started on the FLIP design? I see the FLIP template and I have an account (mason6345), but I don't have access to create a page. Best, Mason On Sun, Jun 26, 2022 at 8:08 PM Qingsheng Ren wrote: > Hi Mason, > > It sounds like an exciting enhancement to the Kafka source and will > benefit a lot of users I believe. > > Would you prefer to reuse the existing flink-connector-kafka module or > create a new one for the new multi-cluster feature? Personally I prefer the > former one because users won’t need to introduce another dependency module > to their projects in order to use the feature. > > Thanks for the effort on this and looking forward to your FLIP! > > Best, > Qingsheng > > > On Jun 24, 2022, at 09:43, Mason Chen wrote: > > > > Hi community, > > > > We have been working on a Multi Cluster Kafka Source and are looking to > > contribute it upstream. I've given a talk about the features and design > at > > a Flink meetup: https://youtu.be/H1SYOuLcUTI. > > > > The main features that it provides is: > > 1. Reading multiple Kafka clusters within a single source. > > 2. Adjusting the clusters and topics the source consumes from > dynamically, > > without Flink job restart. > > > > Some of the challenging use cases that these features solve are: > > 1. Transparent Kafka cluster migration without Flink job restart. > > 2. Transparent Kafka topic migration without Flink job restart. > > 3. Direct integration with Hybrid Source. > > > > In addition, this is designed with wrapping and managing the existing > > KafkaSource components to enable these features, so it can continue to > > benefit from KafkaSource improvements and bug fixes. It can be considered > > as a form of a composite source. > > > > I think the contribution of this source could benefit a lot of users who > > have asked in the mailing list about Flink handling Kafka migrations and > > removing topics in the past. I would love to hear and address your > thoughts > > and feedback, and if possible drive a FLIP! > > > > Best, > > Mason > >
Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.0.1 released
Thanks Gyula for working on the first patch release for the Flink Kubernetes Operator project. Best, Yang Gyula Fóra 于2022年6月28日周二 00:22写道: > The Apache Flink community is very happy to announce the release of Apache > Flink Kubernetes Operator 1.0.1. > > The Flink Kubernetes Operator allows users to manage their Apache Flink > applications and their lifecycle through native k8s tooling like kubectl. > < > https://flink.apache.org/news/2022/04/03/release-kubernetes-operator-0.1.0.html > > > > The release is available for download at: > https://flink.apache.org/downloads.html > > Official Docker image for Flink Kubernetes Operator applications can be > found at: > https://hub.docker.com/r/apache/flink-kubernetes-operator > > The full release notes are available in Jira: > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351812 > > We would like to thank all contributors of the Apache Flink community who > made this release possible! > > Regards, > Gyula Fora >
Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.0.1 released
Sorry for the incorrect blogpost link, please ignore that. There is no blogpbost for 1.0.1 :) Gyula On Tue, Jun 28, 2022 at 9:43 AM Yang Wang wrote: > Thanks Gyula for working on the first patch release for the Flink > Kubernetes Operator project. > > > Best, > Yang > > > > Gyula Fóra 于2022年6月28日周二 00:22写道: > >> The Apache Flink community is very happy to announce the release of Apache >> Flink Kubernetes Operator 1.0.1. >> >> The Flink Kubernetes Operator allows users to manage their Apache Flink >> applications and their lifecycle through native k8s tooling like kubectl. >> < >> https://flink.apache.org/news/2022/04/03/release-kubernetes-operator-0.1.0.html >> > >> >> The release is available for download at: >> https://flink.apache.org/downloads.html >> >> Official Docker image for Flink Kubernetes Operator applications can be >> found at: >> https://hub.docker.com/r/apache/flink-kubernetes-operator >> >> The full release notes are available in Jira: >> >> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351812 >> >> We would like to thank all contributors of the Apache Flink community who >> made this release possible! >> >> Regards, >> Gyula Fora >> >
how to connect to the flink-state store and use it as cache to serve APIs.
Hi Team, I am not sure if this is the right use case for the state-store but I wanted to serve the APIs using queryable-state, what are the different ways to achieve this ? I have come across a version where we can use Job_Id to connect to the state, but is there any other way to expose a specific rest-endpoint etc ? Any sample example/github link would be nice. Thank you.
Re: Synchronizing streams in coprocessfunction
On Mon, Jun 27, 2022 at 7:47 PM Schwalbe Matthias < matthias.schwa...@viseca.ch> wrote: > Hi Gopi, > > > > Your use case is a little under-specified to give a specific answer, > especially to the nature of the two input streams and the way events of > both streams are correlated (joined): > >- *Is your fast-stream keyed?* > - If yes: keyed state and timers can be used, otherwise only > operator state can be used to buffer events, no timers > > Fast stream is keyed. The metadata is a broadcast. > >- >- *Is your metadata-stream keyed?* I.e. > - Metadata-stream events are combined only to fast-stream events > having the same respective key > - Implement a KeyedCoProcessFunction … > - Metadata-stream events apply to all fast-stream events > irrespective of the key > - Implement a KeyedBroadcastProcessFunction (after converting > the metadata-stream to a broadcast stream) > - Then in the processBroadcastElement function you can iterate > over all keys of all state primitives > - *None of your streams are keyed?* > - That leaves you only the option of using operator state > - Current implementation of operator state is not incremental > and thus it is completely generated/stored with each state checkpoint > - This allows only a moderate number of datapoints in operator > state > > Correct, the main concern is, there's no flow control that could be applied here. If fast stream has very high throughput, a delay in the metadata stream will cause a lot of state to be persisted. > >- > - > - > - *Which version of Flink are you using?* Recommendations above >refer to Flink 1.15.0 > > 1.14 but I can use the latest version if needed. > >- > > > > Looking forward to your answers (also please go a little more into detail > of you use case) and follow up questions … > Use case: We have a stream of events with a primary key (consider CDC). We need to store these events sorted by the primary key in a set of files (consider DeltaLake table). The flink pipeline is used to split the stream and map it to the corresponding file where the event should go. The events come in arbitrary order of primary keys and mapping of keys to files can change over time. The metadata stream is a list of key ranges regularly updated from the file system. Event stream is partitioned into multiple tasks (potentially by primary key), each task gets the metadata via broadcast connected to the keyed stream. In the CoProcess of connected stream, the file path is determined using the metadata and then the stream is keyed again using the filepath to group all events that belong to same file. Now the problem is, when the job starts, it might take some time to query the metadata. During this time, a large number of events will be buffered. Hope this clarifies the use case, if this impl can be simplified, please let me know. cdcStream .keyBy(r -> "primaryKey") .connect(tableStream.broadcast()) .process(new KeyedCoProcessFunction() { @Override public void processElement1(RowData value, KeyedCoProcessFunction.Context ctx, Collector out) throws Exception { } @Override public void processElement2(DeltaTable value, KeyedCoProcessFunction.Context ctx, Collector out) throws Exception { } }) .keyBy(Enriched::getFilePath) .map(Enriched::getData) .sink() > > > Greetings > > > > Thias > > > > > > *From:* Gopi Krishna M > *Sent:* Monday, June 27, 2022 3:01 PM > *To:* Qingsheng Ren > *Cc:* user@flink.apache.org > *Subject:* Re: Synchronizing streams in coprocessfunction > > > > Thanks Quingsheng, that would definitely work. But I'm unable to figure > out how I can apply this with CoProcessFunction. One stream is windowed and > trigger implementation uses the 2nd stream. > > > > On Mon, Jun 27, 2022 at 3:29 PM Qingsheng Ren wrote: > > Hi Gopi, > > What about using a window with a custom trigger? The window is doing > nothing but aggregating your input to a collection. The trigger accepts > metadata from the low input stream so it can fire and purge the window > (emit all elements in the window to downstream) on arrival of metadata. > > Best, > Qingsheng > > > On Jun 27, 2022, at 12:46, Gopi Krishna M > wrote: > > > > Hi, > > I've a scenario where I use connected streams where one is a low > throughput metadata stream and another one is a high throughput data > stream. I use CoProcessFunction that operates on a data stream with > behavior controlled by a metadata stream. > > > > Is there a way to slow down/pause the high throughput data stream until > I've received one entry from the metadata stream? It's possible that by the > time I get the first element from the metadata stream, I might get 1000s of > items from the data stream. One option is to create a stat
Re: Reply:DelegationTokenManager
Hi, Gabor Somogyi! Because my fork repository from apache flink repository was outdated, i did't find in time that some subtasks about KerberosDelegationTokenManager had been done. I accomplished this feature on flink-1.13.5 release used in my scenario, and created a jira FLINK-28291 with patch attached. If this situation is allowed, please assign to related ones or review this patch at your convenience. If not allowed, please drop this jira. And i am sorry to have caused this trouble. Best Regards! 在 2022年6月21日 16:20,Gabor Somogyi 写道: Thanks for pinging me! Yes, this is my main target to finish this feature however there are major code parts which are still missing. Please have a look at the umbrella jira to get better understanding: https://issues.apache.org/jira/browse/FLINK-21232 In general it's not advised to use it for production use-cases but only for testing and/or reporting bugs in test environments. When this will be finished the only thing needs to be done as a framework user is to implement a new provider and all the rest (TGT renewal, token obtain, token propagation, etc.) will be handled automatically in the right time. Related questions I've answered them inline with green, hope I've given answers to all your questions. In general we're open to make the design/implementation better so feel free to contribute any way. If you have Spark knowledge that would answer huge amount of questions because I'm trying to apply the knowledge here (and not re-inventing the wheel). Of course there are some minor differences because obviously these are 2 completely different products but the main concept is the same. BR, G On Tue, Jun 21, 2022 at 8:43 AM Márton Balassi wrote: Hi, For your information G (ccd) is actively working on this topic. [1] He is in the best position to answer your questions as far as I know. :-) [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-211%3A+Kerberos+delegation+token+framework On Tue, Jun 21, 2022 at 8:38 AM vtygoss wrote: Hi, flink community! I don't know much details for KDC. Can different TaskManagers hold different tokens? If so, driver and each worker can renew their tokens in their respective DelegationTokenManager individually. Not sure the intention of this question but as a general rule one cluster means one active DelegationTokenManager which rules all the TaskManagers. So TaskManagers are not intended to be shared between 2 DelegationTokenManagers. > Can different TaskManagers hold different tokens? Physically yes, but it's not intended to do that in the design what we've put together. There is an exotic Hadoop feature which will allow different DTs per TM but it's more like a hack than a real feature (UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION). > If so, driver and each worker can renew their tokens in their respective > DelegationTokenManager individually. If you ask this I'm not sure whether you understand the whole DT concept. The main target of DT is that only one entity (this case JobManager) have Kerberos credentials, obtains tokens which then propagates that set of tokens to all TaskManager. TaskManager simply doesn't have the right to obtain any token since that would be a security hole. Thanks for your any replies. Best Regards! 在 2022年6月21日 13:30,vtygoss 写道: Hi, flink community! I am trying to do some work on renewing DelegationToken periodically for DelegtionTokenManager, and met some problems. 1. Implementations of DelegationTokenProvider There seems to be only two implementations for testing defined by SPI service: TestDelegationTokenProvider and ExceptionThrowingDelegationTokenProvider, no hdfs or hbase. That said implementation is ongoing and far not finished. Currently I'm working on the HadoopFSDelegationTokenProvider which is half-way implemented. The implementation is depending on one of my currently open PR and until that is not merged I'm fully blocked. If you would like to help I suggest to review my PRs. 2. RPCGateway When the renewer thread of DelegationTokenManager in resource manager renews tokens of all providers periodically, DelegationTokenManager should update the tokens of all taskmanagers by RPCGateway that seems to be the appropriate way for now. But the registered taskmanagers are managed by TaskExecutorManager in RM which DelegationTokenManager don't have the pointer of. So, how about using a global context to hold all necessary services, e.g. RPCService, TaskExecutorManager or HA? In my mind, RPCGateway seems to be a little clunky(just privately) for extentions, why not thinking of the async typed message model between driver and worker in design? At the moment there is no DT propagation to TaskManagers since it's not yet implemented. There will be an RPC call which will then transmit all the obtained tokens TaskManagers and all of them will have the same set of DTs. According to the plans it wi
Re: how to connect to the flink-state store and use it as cache to serve APIs.
Hi, laxmi. There are two ways that users can access the state store currently: 1. Queryable state [1] which you could access states in runtime. 2. State Processor API [2] which you could access states (snapshot) offline. But we have marked the Queryable state as "Reaching End-of-Life". We are also trying to find a graceful and effective way for users to debug and troubleshoot. So could you share your case about what you want to use it for ? Your feedback is important for us to design it in the long term. Thanks! [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/queryable_state/ [2] https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/ [3] https://flink.apache.org/roadmap.html Best, Hangxiang. On Tue, Jun 28, 2022 at 8:26 PM laxmi narayan wrote: > Hi Team, > I am not sure if this is the right use case for the state-store but I > wanted to serve the APIs using queryable-state, what are the different ways > to achieve this ? > > I have come across a version where we can use Job_Id to connect to the > state, but is there any other way to expose a specific rest-endpoint etc ? > > Any sample example/github link would be nice. > > > > Thank you. >