Hi, I have a requirement that I need to send data to a third party with a limit number of elements with flow below.
kafkasource mapToVendorPojo processfunction sinkToVendor My implementation is I continuously add the elements to my list state ListState<AppEvent> in ProcessFunction and once it reaches 100 in size I emit the data and start collecting data again to another set of 100. *if (rateConfig == Iterables.size(appEventState.get()) {* *List<AppEvent> holder = new ArrayList();* *appEventState.get().forEach(e -> holder.add(e));* *collector.collect(holder);* *appEventState.clear()* *}* The problem I am getting is, *"if " condition above never gets matched*. Because the appEventState size is always *0 or 1 only*. The rateConfig is set to *20. * What am I missing? Thanks, Patricia