Windowing and triggering on a keyed stream is done independently for each
key. So for each key, your custom trigger is observing when the lunumState
changes from null to a production cycle number, but it will never change
again -- because only those stream elements with the same key will be
processed in the context of that item of partitioned state.

One advantage of windowing on keyed streams is the parallelism that's made
possible by partitioning by key -- but in your case there's probably little
to be gained, assuming the production cycles are sequential, rather than
overlapping. You could proceed by (1) not keying the stream, (2) adapting
ImaginePaperWindowReportFunction to only process events for the cycle that
just ended (if necessary), and (3) writing a custom evictor to remove
events once they've been reported on.

On Tue, Jan 22, 2019 at 7:52 PM Daniel Krenn <notshur...@googlemail.com>
wrote:

> Hello people!
>
> I have a DataStream, which has events with with a continuing number which
> signifies their belonging to a production cycle. In essence, this is what
> the data looks like:
>
> value, production cycle
> 12.0, 2000
> 12.3, 2000 one production cylce
> 12.2, 2000
>
> 0.0, 2001
> 0.4, 2002 another production cycle
> 1.1, 2002
>
> 55.0, 2003
> 60.0, 2003 another production cycle
> 70.0, 2003
>
> I have to do some calculations over the events of each production cycle. I
> want to use Flink's window API for that. This is how I'm doing it right now:
>
> DataStream<String> test = streamExecEnv.readTextFile(
> "C:/Projects/Python/testdata.txt")
>     .map(new ImaginePaperDataConverterTest()) // convert data to POJO
>     .assignTimestampsAndWatermarks(new ImaginePaperAssigner()) // Assign
> timestamps for event time
>     .keyBy((ImaginePaperData event) -> event.lunum) //<- the production
> cycle number
>     .window(GlobalWindows.create()) // create global window
>     .trigger(new LunumTrigger()) // "split" the window with a custom
> trigger
>     .process(new ImaginePaperWindowReportFunction()); // apply a function
> over the aggregated events
>
> I'm getting a "DataStream" out of a text file, just for testing purposes.
> The problem is that what I'm doing only aggregates one single event for a
> production cycle. Why is that? I thought keying the stream by the
> production cycle number already partitions the stream anyways. The trigger
> says when the production cycle number is changed, a new global window is
> started and the events of the current window are aggregated. What am I
> missing here?
> Just to be safe, here is my implementation of the custom trigger:
>
> public class LunumTrigger extends Trigger<ImaginePaperData, GlobalWindow>
> {
>
> private static final long serialVersionUID = 1L;
>
> public LunumTrigger() {}
>
> private final ValueStateDescriptor<Integer> prevLunum = new
> ValueStateDescriptor<>("lunum", Integer.class);
>
> @Override
> public TriggerResult onElement(ImaginePaperData element, long timestamp,
> GlobalWindow window, TriggerContext ctx) throws Exception {
>
> ValueState<Integer> lunumState = ctx.getPartitionedState(prevLunum);
>
> if (lunumState.value() == null || !(element.lunum.equals(lunumState.value())))
> {
> System.out.println("LUNUM BEFORE: " + lunumState.value() + " NEW LUNUM: "
> + element.lunum + " ==> FIRE!");
> lunumState.update(element.lunum);
> return TriggerResult.FIRE_AND_PURGE;
> }
>
> System.out.println("LUNUM BEFORE: " + lunumState.value() + " NEW LUNUM: "
> + element.lunum + " ==> HOLD!");
> lunumState.update(element.lunum);
> return TriggerResult.CONTINUE;
> }
>
> @Override
> public TriggerResult onProcessingTime(long time, GlobalWindow window,
> TriggerContext ctx) throws Exception {
> return TriggerResult.CONTINUE;
> }
>
> @Override
> public TriggerResult onEventTime(long time, GlobalWindow window,
> TriggerContext ctx) throws Exception {
> return TriggerResult.CONTINUE;
> }
>
> @Override
> public void clear(GlobalWindow window, TriggerContext ctx) throws
> Exception {
> ctx.getPartitionedState(prevLunum).clear();
> }
> }
>
> I'm very grateful for your help.
>
> Regards,
>
> Daniel
>
>

-- 
*David Anderson* | Training Coordinator
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time

Reply via email to