Hi All,

I'm trying to understand how to create a sample trigger. Let's say that I have 
a stream like this one:

Event: "YELLOW, BLUE, WHITE, RED, GREEN, RED, GREEN, RED, YELLOW, YELLOW"
Event: "YELLOW, BLUE, BLACK, RED,BLUE, RED, PINK, RED, YELLOW, YELLOW"

My stream is then mapped, and I produce an enriched Stream with an additional 
Boolean variable that define when is the moment to trigger the window. (I'm 
using this twostep approach to also calculate a value for a dynamic session 
window parameter, but on this example to simplify I cut it out, it would be 
more precise to say that I should use a Tuple3). In this specific example if 
two consecutive messages with 2 "yellow" are seen MyFunctionToAddTheBoolean() 
will emit a "true".

DataStream<Tuple2<Event, Boolean>> Enriched = stream
                .keyBy(...)
                .map(new MyFunctionToAddTheBoolean());

Enriched:Tuple2<stream_A, Boolean>

With this new stream called "Enriched", I'm going to move on to the second step 
where I would like to use the parameter to trigger the window processing.

DataStream<String> WinStream = enriched
                .keyBy(new MyKeySelector())
                .window(EventTimeSessionWindows.withDynamicGap(new 
DynamicSessionWindows()))
                .trigger(MySuperTriggerFunction()_?)
                .process(new MyProcessWindowFunction());

My questions are:

1)Would be possible to have an example on how to write a function 
(MySuperTriggerFunction().)that can do this?
2)How the DynamicSessionWindows() and MySuperTriggerFunction() can work 
together, when on the DynamicSessionWindows() I'm giving to Flink an indication 
to process my data if the gap is greater than '1000' millis, but on the other 
hand I'm also giving a trigger(). Would the application be able to follow both 
and run a processWindowFunction if either of the two are respected, or do I 
have to decide which one of the two should be used? or prioritise?


I have been reading:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
Where i learn that: A Trigger determines when a window (as formed by the window 
assigner) is ready to be processed by the window function. Each WindowAssigner 
comes with a default Trigger. If the default trigger does not fit your needs, 
you can specify a custom trigger using trigger(...).

And:

https://gist.github.com/mxm/c5831ead9c9d9ad68731f5f2f3793154


But still... Some help would be really appreciated!

Thanks!

Simone

Reply via email to