Hi Pedro, if I read you code correctly, you are not assigning timestamps and watermarks to the rules stream.
Flink automatically derives watermarks from all streams involved. If you do not assign a watermark, the default is watermark is Long.MIN_VALUE which is exactly the value you are observing. Best, Fabian 2016-11-23 19:08 GMT+01:00 PedroMrChaves <pedro.mr.cha...@gmail.com>: > Hello, > > I have an application which has two different streams of data, one > represents a set of events and the other a set of rules that need to be > matched against the events. In order to do this I use a coFlatMapOperator. > The problem is that if I assign the timestamps and watermarks after the > streams have been connected everything works fine but if I do it before, I > get a negative *currentwatermark* at the window and the operations on > windows have no effect. What could be the problem? > > If I assign *Before *the connect: > > <http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/file/n10315/negativeWatermark.png> > > If I assign *After *the connect: > > <http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/file/n10315/normalWatermark.png> > > *Main *Code: > > / DataStream<CSVEvent> sourceStream = environment > .addSource(new SampleDataGenerator(sourceData, > true)).name("Source").setParallelism(1) > .assignTimestampsAndWatermarks(new > TimestampAssigner()); > *// if I assign the timestamps here the watermak seen at the window is > negative and the operations are not applied* > > DataStream<String> rulesStream = environment > .socketTextStream(monitorAddress, monitorPort, > DELIMITER) > .name("Rules Stream") > .setParallelism(1); > > > SplitStream<RBEvent> processedStream = > sourceStream.connect(rulesStream) > .flatMap(new > RProcessor(rulesPath)).name("RBProcessor").setParallelism(1) > //.assignTimestampsAndWatermarks(new > DynamicTimestampAssigner()).name("Assign Timestamps").setParallelism(1) > *// > If I assign the watermarks here everything works fine* > .split(new Spliter()); > > processedStream > .select(RuleOperations.WINDOW_AGGRATION) > .keyBy(new DynamicKeySelector()) > .window(new DynamicSlidingWindowAssigner()) > .apply(new AggregationOperation()).name("Aggregation > Operation").setParallelism(1) > .print().name("Windowed Rule > Output").setParallelism(1); > > (..omitted details..)/ > > * > Timestamps and watermarks* assigner: > / > public class TimestampAssigner implements > AssignerWithPeriodicWatermarks<CSVEvent> { > > private final long MAX_DELAY = 2000; // 2 seconds > private long currentMaxTimestamp; > private long lastEmittedWatermark = Long.MIN_VALUE; > > @Override > public long extractTimestamp(CSVEvent element, long > previousElementTimestamp) { > long timestamp = > Long.parseLong(element.event.get(element.getTimeField())); > currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); > return timestamp; > } > > @Override > public Watermark getCurrentWatermark() { > // return the watermark as current highest timestamp minus the > out-of-orderness bound > long potentialWM = currentMaxTimestamp - MAX_DELAY; > if (potentialWM >= lastEmittedWatermark) { > lastEmittedWatermark = potentialWM; > } > return new Watermark(lastEmittedWatermark); > } > > }/ > > Regards, > Pedro Chaves > > > > > > ----- > Best Regards, > Pedro Chaves > -- > View this message in context: http://apache-flink-user- > mailing-list-archive.2336050.n4.nabble.com/Problem- > Negative-currentWatermark-if-the-watermark-assignment-is- > made-before-connecting-the-streams-tp10315.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >