Hi Niels,
you can log the watermarks by implementing a custom operator. (Operators
have access to the watermarks.) The map operator is a good example for this:

@Internal
public class StreamMap<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

    private static final long serialVersionUID = 1L;

    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {

output.collect(element.replace(userFunction.map(element.getValue())));
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        output.emitWatermark(mark);
    }
}

In processWatermark() you would print/log the watermark. You can have a
simple identity operator that just forwards and prints and insert it
anywhere in the pipeline.

For your second question this section in the doc might be interesting:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_time.html#watermarks-in-parallel-streams

-Aljoscha

On Sat, 21 May 2016 at 16:05 Niels Basjes <ni...@basjes.nl> wrote:

> Hi,
>
> I was working on a streaming application last week and I got stuck in a
> situation where I got the same time based window many times.
> I expect that I made a mistake in creating the watermarks in relation to
> the data I have and the watermark generating code.
>
> Writing the events to the console (for debugging) is easy, yet I have not
> been able to write the watermarks to my console.
>
> My question is very simple: How do I log the watermarks in the console so
> I can see the data and understand my mistake.
>
> I would also like to know "where do the watermarks live" in relation to
> the actual data.
>
> Thanks.
>
> Niels Basjes
>

Reply via email to