Hi,

Thanks for the info.

I updated my code implementation. I am producing a list<elements> in my
custom processAllWindowFunction() before it goes to my customSink. The
problem I get is the list doesnt get reset/cleared.

inputStream.
windowAll(TumblingProcessingTimeWindows.of(Time.of(Time.seconds(10)))
.trigger(CountTrigger.of(maxNo))
.process(new CustomProcessAllWindowFunc());




CustomProcessAllWindowFunc's code:

eventList = new ArrayList<>();
If (rateConfig === eventList.size()) {
    collector.collect(eventList);

    //clear the size since I need to send only 100 elements
    eventList.clear();
}

However, even I cleared the eventList, the elements that went to Sink
previously are still there and the elements just keep adding to the
eventList with the previous elements

How can I correctly cleared the list in processWindowFunction?




On Wed, Aug 30, 2023, 3:54 PM Schwalbe Matthias <matthias.schwa...@viseca.ch>
wrote:

> Hi Patricia,
>
>
>
> What you try to implement can be achieved out-of-the-box by windowing.
>
>
>
> I assume these packets of 100 event are by key but globally.
>
> In that case use non-keyed windowing [1] with count trigger (100) [3] and
> maybe add a processing time trigger if it takes too long time to collect
> all 100 events, then create the output with a process window function [2].
>
>
>
> I hope this helps
>
>
>
> Thias
>
>
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/#keyed-vs-non-keyed-windows
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/#processwindowfunction
>
> [3]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/#built-in-and-custom-triggers
>
>
>
>
>
> *From:* patricia lee <plee3...@gmail.com>
> *Sent:* Wednesday, August 30, 2023 6:54 AM
> *To:* user@flink.apache.org
> *Subject:* Rate Limit / Throttle Data to Send
>
>
>
> Hi,
>
>
>
> I have a requirement that I need to send data to a third party with a
> limit number of elements with flow below.
>
>
>
> kafkasource
>
> mapToVendorPojo
>
> processfunction
>
> sinkToVendor
>
>
>
> My implementation is I continuously add the elements to my list state
>
> ListState<AppEvent> in ProcessFunction and once it reaches 100 in size I
> emit the data and start collecting data again to another set of 100.
>
>
>
> *if (rateConfig == Iterables.size(appEventState.get()) {*
>
> *List<AppEvent> holder = new ArrayList();*
>
> *appEventState.get().forEach(e -> holder.add(e));*
>
> *collector.collect(holder);*
>
> *appEventState.clear()*
>
> *}*
>
>
>
> The problem I am getting is, *"if " condition above never gets matched*.
> Because the appEventState size is always *0 or 1 only*. The rateConfig is
> set to *20. *
>
>
>
> What am I missing?
>
>
>
> Thanks,
>
> Patricia
>
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to