Hi Aljoscha, My 2 cents on this would be that it is worth maintaining the access to the watermarks. I think having the option to customize this is a strong point of Flink.
Regarding the solution you proposed based on 2 input timers " would fire if the watermark from both inputs advances sufficiently far." I would propose to have the option to set a strategy for the timer. We could have: - EgerTrigger - when the triggering is fired when any of the inputs watermarks has advanced sufficiently far - LeftEgerTrigger - when the triggering is fired when any of left input watermarks has advanced sufficiently far - RightEgerTrigger - when the triggering is fired when any of right input watermarks has advanced sufficiently far - SyncTrigger - when the triggering is fired if the watermark from both inputs advances sufficiently far We could potentially include here the custom handling of the watermarks under a CustomTrigger strategy implemented as an operator that can be provided. -----Original Message----- From: Aljoscha Krettek [mailto:aljos...@apache.org] Sent: Tuesday, September 27, 2016 11:28 AM To: Dev Subject: [DISCUSS] Timely User Functions and Watermarks Hi Folks, I'm in the process of implementing https://issues.apache.org/jira/browse/FLINK-3674 and now I'm having a bit of a problem with deciding how watermarks should be treated for operators that have more than one input. The problem is deciding when to fire event-time timers. For one-input operators it's pretty straightforward: fire once the watermark surpasses a given timer. For two-input operators we allow the operator implementer to observe the watermarks from the two inputs separately and react to that and also to decide what watermark to forward. With this it becomes hard to figure out when to fire timers. My thinking is that we should not allow operators to observe the watermark anymore but route it past the operator and deal with watermarks and timers outside of the operator. A timer for operators with more than one inputs (TwoInputOperator) would fire if the watermark from both inputs advances sufficiently far. Alternatively, we could still let operators observe watermarks but grab the watermark before it enters the operator and still deal with timers in the same way as proposed above. Any feedback on this is very welcome! What would you expect to happen for timers of operators with more than one input? Cheers, Aljoscha P.S. One reason for why I want to deal with watermark outside of operators is that otherwise every operator has to implement the functionality to update the current watermark at the timer service. i.e. something like this: @Internal public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT> { private static final long serialVersionUID = 1L; public StreamMap(MapFunction<IN, OUT> mapper) { super(mapper); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void processElement(StreamRecord<IN> element) throws Exception { output.collect(element.replace(userFunction.map(element.getValue()))); } @Override public void processWatermark(Watermark mark) throws Exception { timerService.updateWatermark(mark); // *<--- that's the thing I don't want* output.emitWatermark(mark); } } This becomes more complicated for two input operators which also do the merging of the watermarks from the two inputs right now.