Hi Aljoscha,

My 2 cents on this would be that it is worth maintaining the access to the 
watermarks. I think having the option to customize this is a strong point of 
Flink.

Regarding the solution you proposed based on 2 input timers " would fire if the 
watermark from both inputs advances sufficiently far." I would propose to have 
the option to set a strategy for the timer. We could have:
- EgerTrigger - when the triggering is fired when any of the inputs watermarks 
has advanced sufficiently far
- LeftEgerTrigger - when the triggering is fired when any of left input 
watermarks has advanced sufficiently far
- RightEgerTrigger - when the triggering is fired when any of right input 
watermarks has advanced sufficiently far
- SyncTrigger - when the triggering is fired if the watermark from both inputs 
advances sufficiently far


We could potentially include here the custom handling of the watermarks under a 
CustomTrigger strategy implemented as an operator that can be provided.


-----Original Message-----
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: Tuesday, September 27, 2016 11:28 AM
To: Dev
Subject: [DISCUSS] Timely User Functions and Watermarks

Hi Folks,
I'm in the process of implementing
https://issues.apache.org/jira/browse/FLINK-3674 and now I'm having a bit of a 
problem with deciding how watermarks should be treated for operators that have 
more than one input.

The problem is deciding when to fire event-time timers. For one-input operators 
it's pretty straightforward: fire once the watermark surpasses a given timer. 
For two-input operators we allow the operator implementer to observe the 
watermarks from the two inputs separately and react to that and also to decide 
what watermark to forward. With this it becomes hard to figure out when to fire 
timers.

My thinking is that we should not allow operators to observe the watermark 
anymore but route it past the operator and deal with watermarks and timers 
outside of the operator. A timer for operators with more than one inputs
(TwoInputOperator) would fire if the watermark from both inputs advances 
sufficiently far.

Alternatively, we could still let operators observe watermarks but grab the 
watermark before it enters the operator and still deal with timers in the same 
way as proposed above.

Any feedback on this is very welcome! What would you expect to happen for 
timers of operators with more than one input?

Cheers,
Aljoscha

P.S. One reason for why I want to deal with watermark outside of operators is 
that otherwise every operator has to implement the functionality to update the 
current watermark at the timer service. i.e. something like this:

@Internal
public class StreamMap<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

    private static final long serialVersionUID = 1L;

    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {

output.collect(element.replace(userFunction.map(element.getValue())));
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        timerService.updateWatermark(mark); // *<--- that's the thing I don't 
want*
        output.emitWatermark(mark);
    }
}

This becomes more complicated for two input operators which also do the merging 
of the watermarks from the two inputs right now.

Reply via email to