Hi,
Thanks for the info. I updated my code implementation. I am producing a list<elements> in my custom processAllWindowFunction() before it goes to my customSink. The problem I get is the list doesnt get reset/cleared. inputStream. windowAll(TumblingProcessingTimeWindows.of(Time.of(Time.seconds(10))) .trigger(CountTrigger.of(maxNo)) .process(new CustomProcessAllWindowFunc()); CustomProcessAllWindowFunc's code: eventList = new ArrayList<>(); If (rateConfig === eventList.size()) { collector.collect(eventList); //clear the size since I need to send only 100 elements eventList.clear(); } However, even I cleared the eventList, the elements that went to Sink previously are still there and the elements just keep adding to the eventList with the previous elements How can I correctly cleared the list in processWindowFunction? On Wed, Aug 30, 2023, 3:54 PM Schwalbe Matthias <matthias.schwa...@viseca.ch> wrote: > Hi Patricia, > > > > What you try to implement can be achieved out-of-the-box by windowing. > > > > I assume these packets of 100 event are by key but globally. > > In that case use non-keyed windowing [1] with count trigger (100) [3] and > maybe add a processing time trigger if it takes too long time to collect > all 100 events, then create the output with a process window function [2]. > > > > I hope this helps > > > > Thias > > > > > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/#keyed-vs-non-keyed-windows > > [2] > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/#processwindowfunction > > [3] > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/#built-in-and-custom-triggers > > > > > > *From:* patricia lee <plee3...@gmail.com> > *Sent:* Wednesday, August 30, 2023 6:54 AM > *To:* user@flink.apache.org > *Subject:* Rate Limit / Throttle Data to Send > > > > Hi, > > > > I have a requirement that I need to send data to a third party with a > limit number of elements with flow below. > > > > kafkasource > > mapToVendorPojo > > processfunction > > sinkToVendor > > > > My implementation is I continuously add the elements to my list state > > ListState<AppEvent> in ProcessFunction and once it reaches 100 in size I > emit the data and start collecting data again to another set of 100. > > > > *if (rateConfig == Iterables.size(appEventState.get()) {* > > *List<AppEvent> holder = new ArrayList();* > > *appEventState.get().forEach(e -> holder.add(e));* > > *collector.collect(holder);* > > *appEventState.clear()* > > *}* > > > > The problem I am getting is, *"if " condition above never gets matched*. > Because the appEventState size is always *0 or 1 only*. The rateConfig is > set to *20. * > > > > What am I missing? > > > > Thanks, > > Patricia > > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >