Hi,

I have a requirement that I need to send data to a third party with a limit
number of elements with flow below.

kafkasource
mapToVendorPojo
processfunction
sinkToVendor

My implementation is I continuously add the elements to my list state
ListState<AppEvent> in ProcessFunction and once it reaches 100 in size I
emit the data and start collecting data again to another set of 100.

*if (rateConfig == Iterables.size(appEventState.get()) {*
*List<AppEvent> holder = new ArrayList();*
*appEventState.get().forEach(e -> holder.add(e));*
*collector.collect(holder);*
*appEventState.clear()*
*}*

The problem I am getting is, *"if " condition above never gets matched*.
Because the appEventState size is always *0 or 1 only*. The rateConfig is
set to *20. *

What am I missing?

Thanks,
Patricia

Reply via email to