Hi, You can merge the logic of #2 into #4, it will be much simpler.
Best, Kurt On Wed, Dec 25, 2019 at 7:36 PM Avi Levi <avi.l...@bluevoyant.com> wrote: > Hi , > > I have the following pipeline : > 1. single hour window that counts the number of records > 2. single day window that accepts the aggregated data from #1 and emits > the highest hour count of that day > 3. union #1 + #2 > 4. Logic operator that accepts the data from #3 and keep a listState of #2 > and apply some logic on #1 based on that state (e.g comparing a single hour > the history of the max hours at the last X days ) and emits the result > > the timestamsAndWaterMarks is > using BoundedOutOfOrdernessTimestampExtractor (event-time) and I allow > lateness of 3 hours > > the problem is that when I try to do unit tests of all the pipeline, the > data from #1 rich #4 before the latter accepts the data from #3 hence it > doesn't have any state yet (state is always empty when the stream from #1 > arrives ). > My source in the tests is a collection that represents the records. > is there anyway I can solve this ? > [image: Screen Shot 2019-12-25 at 13.04.17.png] > I appreciate any help you can provide > Cheers > Avi > > >