Hi ,

I have the following pipeline :
1. single hour window that counts the number of records
2. single day window that accepts the aggregated data from #1 and emits the
highest hour count of that day
3. union #1 + #2
4. Logic operator that accepts the data from #3 and keep a listState of #2
and apply some logic on #1 based on that state (e.g comparing a single hour
the history of the max hours at the last X days ) and emits the result

the timestamsAndWaterMarks is using BoundedOutOfOrdernessTimestampExtractor
(event-time)  and I allow lateness of 3 hours

 the problem is that when I try to do unit tests of all the pipeline, the
data from #1 rich #4 before the latter accepts the data from #3 hence it
doesn't have any state yet (state is always empty when the stream from #1
arrives ).
My source in the tests is a collection that represents the records.
 is there anyway I can solve this ?
[image: Screen Shot 2019-12-25 at 13.04.17.png]
I appreciate any help you can provide
Cheers
Avi

Reply via email to