Hi Patricia,

What you try to implement can be achieved out-of-the-box by windowing.

I assume these packets of 100 event are by key but globally.
In that case use non-keyed windowing [1] with count trigger (100) [3] and maybe 
add a processing time trigger if it takes too long time to collect all 100 
events, then create the output with a process window function [2].

I hope this helps

Thias


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/#keyed-vs-non-keyed-windows
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/#processwindowfunction
[3] 
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/#built-in-and-custom-triggers


From: patricia lee <plee3...@gmail.com>
Sent: Wednesday, August 30, 2023 6:54 AM
To: user@flink.apache.org
Subject: Rate Limit / Throttle Data to Send

Hi,

I have a requirement that I need to send data to a third party with a limit 
number of elements with flow below.

kafkasource
mapToVendorPojo
processfunction
sinkToVendor

My implementation is I continuously add the elements to my list state
ListState<AppEvent> in ProcessFunction and once it reaches 100 in size I emit 
the data and start collecting data again to another set of 100.

if (rateConfig == Iterables.size(appEventState.get()) {
List<AppEvent> holder = new ArrayList();
appEventState.get().forEach(e -> holder.add(e));
collector.collect(holder);
appEventState.clear()
}

The problem I am getting is, "if " condition above never gets matched. Because 
the appEventState size is always 0 or 1 only. The rateConfig is set to 20.

What am I missing?

Thanks,
Patricia

Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to