Hi Pedro,

if I read you code correctly, you are not assigning timestamps and
watermarks to the rules stream.

Flink automatically derives watermarks from all streams involved.
If you do not assign a watermark, the default is watermark is
Long.MIN_VALUE which is exactly the value you are observing.

Best,
Fabian

2016-11-23 19:08 GMT+01:00 PedroMrChaves <pedro.mr.cha...@gmail.com>:

> Hello,
>
> I have an application which has two different streams of data, one
> represents a set of events and the other a set of rules that need to be
> matched against the events. In order to do this I use a coFlatMapOperator.
> The problem is that if I assign the timestamps and watermarks after the
> streams have been connected everything works fine but if I do it before, I
> get a negative *currentwatermark* at the window and the operations on
> windows have no effect. What could be the problem?
>
> If I assign *Before *the connect:
>
> <http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/file/n10315/negativeWatermark.png>
>
> If I assign *After *the connect:
>
> <http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/file/n10315/normalWatermark.png>
>
> *Main *Code:
>
> /            DataStream<CSVEvent> sourceStream = environment
>                     .addSource(new SampleDataGenerator(sourceData,
> true)).name("Source").setParallelism(1)
>                     .assignTimestampsAndWatermarks(new
> TimestampAssigner());
> *// if I assign the timestamps here the watermak seen at the window is
> negative and the operations are not applied*
>
>             DataStream<String> rulesStream = environment
>                     .socketTextStream(monitorAddress, monitorPort,
> DELIMITER)
>                     .name("Rules Stream")
>                     .setParallelism(1);
>
>
>             SplitStream<RBEvent> processedStream =
> sourceStream.connect(rulesStream)
>                     .flatMap(new
> RProcessor(rulesPath)).name("RBProcessor").setParallelism(1)
>                     //.assignTimestampsAndWatermarks(new
> DynamicTimestampAssigner()).name("Assign Timestamps").setParallelism(1)
> *//
> If I assign the watermarks here everything works fine*
>                     .split(new Spliter());
>
>   processedStream
>                     .select(RuleOperations.WINDOW_AGGRATION)
>                     .keyBy(new DynamicKeySelector())
>                     .window(new DynamicSlidingWindowAssigner())
>                     .apply(new AggregationOperation()).name("Aggregation
> Operation").setParallelism(1)
>                     .print().name("Windowed Rule
> Output").setParallelism(1);
>
> (..omitted details..)/
>
> *
> Timestamps and watermarks* assigner:
> /
> public class TimestampAssigner implements
> AssignerWithPeriodicWatermarks<CSVEvent> {
>
>     private final long MAX_DELAY = 2000; // 2 seconds
>     private long currentMaxTimestamp;
>     private long lastEmittedWatermark = Long.MIN_VALUE;
>
>     @Override
>     public long extractTimestamp(CSVEvent element, long
> previousElementTimestamp) {
>         long timestamp =
> Long.parseLong(element.event.get(element.getTimeField()));
>         currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>         return timestamp;
>     }
>
>     @Override
>     public Watermark getCurrentWatermark() {
>         // return the watermark as current highest timestamp minus the
> out-of-orderness bound
>         long potentialWM = currentMaxTimestamp - MAX_DELAY;
>         if (potentialWM >= lastEmittedWatermark) {
>             lastEmittedWatermark = potentialWM;
>         }
>         return new Watermark(lastEmittedWatermark);
>     }
>
> }/
>
> Regards,
> Pedro Chaves
>
>
>
>
>
> -----
> Best Regards,
> Pedro Chaves
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Problem-
> Negative-currentWatermark-if-the-watermark-assignment-is-
> made-before-connecting-the-streams-tp10315.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Reply via email to