Hi , I have the following pipeline : 1. single hour window that counts the number of records 2. single day window that accepts the aggregated data from #1 and emits the highest hour count of that day 3. union #1 + #2 4. Logic operator that accepts the data from #3 and keep a listState of #2 and apply some logic on #1 based on that state (e.g comparing a single hour the history of the max hours at the last X days ) and emits the result
the timestamsAndWaterMarks is using BoundedOutOfOrdernessTimestampExtractor (event-time) and I allow lateness of 3 hours the problem is that when I try to do unit tests of all the pipeline, the data from #1 rich #4 before the latter accepts the data from #3 hence it doesn't have any state yet (state is always empty when the stream from #1 arrives ). My source in the tests is a collection that represents the records. is there anyway I can solve this ? [image: Screen Shot 2019-12-25 at 13.04.17.png] I appreciate any help you can provide Cheers Avi