Hi,

You can merge the logic of #2 into #4, it will be much simpler.

Best,
Kurt


On Wed, Dec 25, 2019 at 7:36 PM Avi Levi <avi.l...@bluevoyant.com> wrote:

>  Hi ,
>
> I have the following pipeline :
> 1. single hour window that counts the number of records
> 2. single day window that accepts the aggregated data from #1 and emits
> the highest hour count of that day
> 3. union #1 + #2
> 4. Logic operator that accepts the data from #3 and keep a listState of #2
> and apply some logic on #1 based on that state (e.g comparing a single hour
> the history of the max hours at the last X days ) and emits the result
>
> the timestamsAndWaterMarks is
> using BoundedOutOfOrdernessTimestampExtractor (event-time)  and I allow
> lateness of 3 hours
>
>  the problem is that when I try to do unit tests of all the pipeline, the
> data from #1 rich #4 before the latter accepts the data from #3 hence it
> doesn't have any state yet (state is always empty when the stream from #1
> arrives ).
> My source in the tests is a collection that represents the records.
>  is there anyway I can solve this ?
> [image: Screen Shot 2019-12-25 at 13.04.17.png]
> I appreciate any help you can provide
> Cheers
> Avi
>
>
>

Reply via email to