Hi Patricia, What you try to implement can be achieved out-of-the-box by windowing.
I assume these packets of 100 event are by key but globally. In that case use non-keyed windowing [1] with count trigger (100) [3] and maybe add a processing time trigger if it takes too long time to collect all 100 events, then create the output with a process window function [2]. I hope this helps Thias [1] https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/#keyed-vs-non-keyed-windows [2] https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/#processwindowfunction [3] https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/#built-in-and-custom-triggers From: patricia lee <plee3...@gmail.com> Sent: Wednesday, August 30, 2023 6:54 AM To: user@flink.apache.org Subject: Rate Limit / Throttle Data to Send Hi, I have a requirement that I need to send data to a third party with a limit number of elements with flow below. kafkasource mapToVendorPojo processfunction sinkToVendor My implementation is I continuously add the elements to my list state ListState<AppEvent> in ProcessFunction and once it reaches 100 in size I emit the data and start collecting data again to another set of 100. if (rateConfig == Iterables.size(appEventState.get()) { List<AppEvent> holder = new ArrayList(); appEventState.get().forEach(e -> holder.add(e)); collector.collect(holder); appEventState.clear() } The problem I am getting is, "if " condition above never gets matched. Because the appEventState size is always 0 or 1 only. The rateConfig is set to 20. What am I missing? Thanks, Patricia Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.