Hi, Thx for the replies - Congxian & Dawdi. Watermarks are advancing.Not sure how to check every new generated watermark is reaching end of the window ????
I did check the Flink UI for the currentInputWatermark and it is increasing monotonically. Narrowed down the problem to not calling the windowStream.aggregate. I also *added a checkpoint *to see if it was causing the issue.Didn't seem to help. Most of the code is reached during the creation of the ExecutionGraph on the start of the program. I generate an incrementing sequence of timestamps(delay of 5000ms between each rec) from a Producer to Kinesis and it emits a new watermark as it starts receiving the input records. My window size is 15s. I see a WindowedStream is created with windowAssigner: TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger but the *code never gets into the EventTimeTrigger.onElement() or onEventTime() to fire the trigger*. It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark(). I even tried to use ProcessingTime but that also didn't help. //code to create kinesis consumer successfully...... for (Rule rule : rules.getRules()) { //gets in here fine final SingleOutputStreamOperator<Map<String, Object>> filteredKinesisStream = kinesisStream.filter(mon -> { boolean result; String eventName = mon.get(MEASUREMENT) != null ? (String) mon.get(MEASUREMENT) : ""; InputMetricSelector inputMetricSelector = rule.getInputMetricSelector(); String measurement = inputMetricSelector != null ? inputMetricSelector.getMeasurement() : ""; result = eventName.equals(measurement); if (result) { Map<String, String> inputTags = mon.get(TAGS) != null ? (Map<String, String>) mon.get(TAGS) : new HashMap<>(); Map<String, String> ruleTags = inputMetricSelector != null ? inputMetricSelector.getTags() : new HashMap<>(); result = matchTags(inputTags, ruleTags); } return result;//*<== this is true* } ).flatMap((FlatMapFunction<Map<String, Object>, Map<String, Object>>) (input, out) -> { out.collect(input);//*<==== runs up till here fine* }).returns(new TypeHint<Map<String, Object>>() { }); //*doesn't do anything beyond this point at runtime* DataStream<InfluxDBPoint> enrichedMGStream = pms.createAggregatedMonitoringGroupingWindowStream1 (filteredKinesisStream, ruleFactory, rule, parallelProcess); enrichedMGStream.addSink(influxSink) .setParallelism(nbrSinks); } private DataStream<InfluxDBPoint> createAggregatedMonitoringGroupingWindowStream1(DataStream<Map<String, Object>> kinesisStream, RuleFactory ruleFactory, Rule rule, int parallelProcess) { DataStream<InfluxDBPoint> enrichedComponentInstanceStream1; RuleConfig ruleConfig = rule.getRuleConfig(); String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : ""; RuleIF ruleImpl = ruleFactory.getRule(ruleType); Map<String, Object> ruleProps = ruleConfig != null ? ruleConfig.getRuleProps() : new HashMap<>(); Object intervalObj = ruleProps.get("rule_eval_window"); String timeInterval = intervalObj != null ? (String) intervalObj : ""; org.apache.flink.streaming.api.windowing.time.Time timeWindow = getTimeWindowFromInterval(timeInterval); Object windowTypeObj = ruleProps.get("window_type"); String windowType = windowTypeObj != null ? (String) windowTypeObj : ""; InputMetricSelector inputMetricSelector = rule.getInputMetricSelector(); Map<String, String> tags = inputMetricSelector != null ? inputMetricSelector.getTags() : new HashMap<>(); String groupByObj = tags.get(GROUP_BY); String groupBy = groupByObj != null ? groupByObj : ""; kinesisStream = kinesisStream.filter((FilterFunction<Map<String, Object>>) inputMap -> { Object groupByValueObj = inputMap.get(groupBy); return groupByValueObj != null; }); Set<String> groupBySet = new HashSet<>(Arrays.asList(groupBy.split(KEY_DELIMITER))); String metric = Objects.requireNonNull(inputMetricSelector).getMetric(); //till here, it went through fine during creation of ExceutionGraph KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream = kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));*<=== never gets into the MapTupleKeySelector.getKey() - a similar class works in another project* enrichedComponentInstanceStream1 = getMonitoringGroupDataStream1(monitoringTupleKeyedStream, timeWindow, windowType, timeInterval, ruleImpl, rule, parallelProcess); return enrichedComponentInstanceStream1; } private DataStream<InfluxDBPoint> getMonitoringGroupDataStream1(KeyedStream<Map<String, Object>, MonitoringTuple> monitoringTupleKeyedStream, org.apache.flink.streaming.api.windowing.time.Time timeWindow, String windowType, String interval, RuleIF ruleImpl, Rule rule, int parallelProcess) { long slide = 100; final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> windowStream = windowType.equalsIgnoreCase(SLIDING) ? monitoringTupleKeyedStream .timeWindow(timeWindow, org.apache.flink.streaming.api.windowing.time.Time.milliseconds(slide)) : monitoringTupleKeyedStream .timeWindow(timeWindow); return windowStream.aggregate( new MGroupingWindowAggregate(interval),//*<=== never gets into add() here* new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule)) .map(new MonitoringGroupingToInfluxDBPoint(rule)); } On Mon, Oct 14, 2019 at 12:41 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi Vijay, > > Could you check if the Watermark for the aggregate operator advances? You > should be able to check that in the Flink WebUI. Could it be that the > Watermark does not advance for all of the upstream operators? The watermark > for a particular operator is a minimum of watermarks received from all of > the upstream operators. Therefore if some of them does not produce any, the > resulting watermark will not advance. > > Best, > > Dawdi > On 11/10/2019 21:37, Vijay Balakrishnan wrote: > > Hi, > Here is my issue with *Event Processing* with the *add() method of > MGroupingWindowAggregate not being called* even though a new watermark is > fired > 1. *Ingest data from Kinesis (works fine)* > 2. *Deserialize* in MonitoringMapKinesisSchema(*works fine* and get json > back) > 3. I do *assign MonitoringTSWAssigner*(code below) to the source with > bound of 10(have tried 3000, 30000). *It fires a new WaterMark* with each > incoming record but the *windowStream.aggregate method doesn't seem to > fire* and I > *don't see the add() method of MGroupingWindowAggregate called *???? I *can > see the newWaterMark being emitted in > TimestampsAndPunctuatedWatermarksOperator.processElement* > 4. I have tried with timeWindow of 1m and 15s > > *Main* code: > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.*EventTime*); > > //Setup Kinesis Consumer > Properties kinesisConsumerConfig = new Properties(); > .. > kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, > ConsumerConfigConstants.InitialPosition.LATEST.name());//LATEST > FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new > FlinkKinesisConsumer<>( > "kinesisTopicRead", new MonitoringMapKinesisSchema(true), > kinesisConsumerConfig); > > DataStream<Map<String, Object>> kinesisStream; > RichSinkFunction<InfluxDBPoint> influxSink; > > DataStreamSource<Map<String, Object>> monitoringDataStreamSource = > env.addSource(kinesisConsumer); > kinesisStream = monitoringDataStreamSource > .assignTimestampsAndWatermarks(new *MonitoringTSWAssigner*(bound > )); > influxSink = pms.createInfluxMonitoringSink(....); > ...... > ...timeWindow = Time.seconds(*timeIntervalL*);//tried with > timeIntervalL=15s, 1m > > KeyedStream<Map<String, Object>, MonitoringTuple> > monitoringTupleKeyedStream = > kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric)); > final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> > windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow); > DataStream<InfluxDBPoint> enrichedMGStream = > *windowStream.aggregate*(//*<===== > never reaches here ?????* > *new MGroupingWindowAggregate(interval)*, > new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule)) > .map(new MonitoringGroupingToInfluxDBPoint(rule)); > enrichedMGStream.addSink(influxSink); > env.execute("Aggregation of Map data"); > > *MonitoringTSWAssigner* code: > public class MonitoringTSWAssigner implements > AssignerWithPunctuatedWatermarks<Map<String, Object>> { > private long bound = 5 * (long) 1000;//5 secs out of order bound in > millisecs > private long maxTimestamp = Long.MIN_VALUE; > > public MonitoringTSWAssigner() { > } > > public MonitoringTSWAssigner(long bound) { > this.bound = bound; > } > > public long extractTimestamp(Map<String, Object> monitoring, long > previousTS) { > long extractedTS = getExtractedTS(monitoring); > if (extractedTS > maxTimestamp) { > maxTimestamp = extractedTS; > } > > return extractedTS;//return System.currentTimeMillis(); > > } > > public long getExtractedTS(Map<String, Object> monitoring) { > final String eventTimestamp = > monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String) > monitoring.get(Utils.EVENT_TIMESTAMP) : ""; > return Utils.getLongFromDateStr(eventTimestamp); > } > > @Override > public Watermark checkAndGetNextWatermark(Map<String, Object> > monitoring, long extractedTimestamp) { > long extractedTS = getExtractedTS(monitoring); > long nextWatermark = maxTimestamp - bound; > return new Watermark(nextWatermark); > } > } > > *MGroupingWindowAggregate*: > public class MGroupingWindowAggregate implements > *AggregateFunction*<Map<String, > Object>, Map<String, Object>, Map<String, Object>> { > private final String interval; > public MGroupingWindowAggregate(String interval) { > this.interval = interval; > } > public Map<String, Object> createAccumulator() { > return new ConcurrentHashMap<>(); > } > > public Map<String, Object> add(Map<String, Object> monitoring, > Map<String, Object> timedMap) { > ..... > } > > ..... > > } > > TIA, > > > >