Hi Theo, It gets to the FilterFunction during the creation of the ExecutionGraph initially but not during the runtime when recs are streaming in.So, it is not getting that far- seems to be stuck in the
final SingleOutputStreamOperator<Map<String, Object>> filteredKinesisStream = kinesisStream.filter code. Doesn't seem to get past it as it keeps incrementing watermarks but the Watermark never seems to hit the end of the window.Maybe I am doing something super simple stupid. TIA, Vijay On Tue, Oct 15, 2019 at 12:48 AM Theo Diefenthal < theo.diefent...@scoop-software.de> wrote: > Hi Vijay, > > Maybe a stupid question, but according to your comments, the code works > fine up till a "flatMap" operation. It seems that this flatMap is directly > followed by a filter-Function in the method > createAggregatedMonitoringGroupingWindowStream1. Is ist maybe filtering out > all events? Or is not even the filter function itself called? (Due to your > comments suggesting it). > > Best regards > Theo > > ------------------------------ > *Von: *"Vijay Balakrishnan" <bvija...@gmail.com> > *An: *"Dawid Wysakowicz" <dwysakow...@apache.org> > *CC: *"user" <user@flink.apache.org> > *Gesendet: *Dienstag, 15. Oktober 2019 02:01:05 > *Betreff: *Re: add() method of AggregateFunction not called even though > new watermark is emitted > > Hi, > Thx for the replies - Congxian & Dawdi. > Watermarks are advancing.Not sure how to check every new generated > watermark is reaching end of the window ???? > > I did check the Flink UI for the currentInputWatermark and it is > increasing monotonically. > > Narrowed down the problem to not calling the windowStream.aggregate. > I also *added a checkpoint *to see if it was causing the issue.Didn't > seem to help. > Most of the code is reached during the creation of the ExecutionGraph on > the start of the program. > > I generate an incrementing sequence of timestamps(delay of 5000ms between > each rec) from a Producer to Kinesis and it emits a new watermark as it > starts receiving the input records. > My window size is 15s. > I see a WindowedStream is created with windowAssigner: > TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger > but the *code never gets into the EventTimeTrigger.onElement() or > onEventTime() to fire the trigger*. > It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark(). > I even tried to use ProcessingTime but that also didn't help. > > > //code to create kinesis consumer successfully...... > for (Rule rule : rules.getRules()) { > //gets in here fine > final SingleOutputStreamOperator<Map<String, Object>> > filteredKinesisStream = kinesisStream.filter(mon -> { > boolean result; > String eventName = mon.get(MEASUREMENT) != null ? (String) > mon.get(MEASUREMENT) : ""; > InputMetricSelector inputMetricSelector = > rule.getInputMetricSelector(); > String measurement = inputMetricSelector != null ? > inputMetricSelector.getMeasurement() : ""; > result = eventName.equals(measurement); > if (result) { > Map<String, String> inputTags = mon.get(TAGS) != null > ? (Map<String, String>) mon.get(TAGS) : new HashMap<>(); > Map<String, String> ruleTags = inputMetricSelector != > null ? inputMetricSelector.getTags() : new HashMap<>(); > result = matchTags(inputTags, ruleTags); > } > return result;//*<== this is true* > } > ).flatMap((FlatMapFunction<Map<String, Object>, Map<String, Object>>) > (input, out) -> { > out.collect(input);//*<==== runs up till here fine* > }).returns(new TypeHint<Map<String, Object>>() { > }); > //*doesn't do anything beyond this point at runtime* > DataStream<InfluxDBPoint> enrichedMGStream = > pms.createAggregatedMonitoringGroupingWindowStream1 > (filteredKinesisStream, ruleFactory, rule, parallelProcess); > enrichedMGStream.addSink(influxSink) > .setParallelism(nbrSinks); > } > > private DataStream<InfluxDBPoint> > createAggregatedMonitoringGroupingWindowStream1(DataStream<Map<String, > Object>> kinesisStream, RuleFactory ruleFactory, Rule rule, int > parallelProcess) { > DataStream<InfluxDBPoint> enrichedComponentInstanceStream1; > RuleConfig ruleConfig = rule.getRuleConfig(); > String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : ""; > RuleIF ruleImpl = ruleFactory.getRule(ruleType); > Map<String, Object> ruleProps = ruleConfig != null ? > ruleConfig.getRuleProps() : new HashMap<>(); > Object intervalObj = ruleProps.get("rule_eval_window"); > String timeInterval = intervalObj != null ? (String) intervalObj : ""; > org.apache.flink.streaming.api.windowing.time.Time timeWindow = > getTimeWindowFromInterval(timeInterval); > > Object windowTypeObj = ruleProps.get("window_type"); > String windowType = windowTypeObj != null ? (String) windowTypeObj : > ""; > > InputMetricSelector inputMetricSelector = > rule.getInputMetricSelector(); > Map<String, String> tags = inputMetricSelector != null ? > inputMetricSelector.getTags() : new HashMap<>(); > String groupByObj = tags.get(GROUP_BY); > String groupBy = groupByObj != null ? groupByObj : ""; > kinesisStream = kinesisStream.filter((FilterFunction<Map<String, > Object>>) inputMap -> { > Object groupByValueObj = inputMap.get(groupBy); > return groupByValueObj != null; > }); > Set<String> groupBySet = new > HashSet<>(Arrays.asList(groupBy.split(KEY_DELIMITER))); > String metric = > Objects.requireNonNull(inputMetricSelector).getMetric(); > //till here, it went through fine during creation of ExceutionGraph > KeyedStream<Map<String, Object>, MonitoringTuple> > monitoringTupleKeyedStream = > kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, > metric));*<=== never gets into the MapTupleKeySelector.getKey() - a > similar class works in another project* > enrichedComponentInstanceStream1 = > getMonitoringGroupDataStream1(monitoringTupleKeyedStream, timeWindow, > windowType, timeInterval, ruleImpl, rule, parallelProcess); > return enrichedComponentInstanceStream1; > } > > private DataStream<InfluxDBPoint> > getMonitoringGroupDataStream1(KeyedStream<Map<String, Object>, > MonitoringTuple> monitoringTupleKeyedStream, > > org.apache.flink.streaming.api.windowing.time.Time timeWindow, String > windowType, > String > interval, > RuleIF > ruleImpl, Rule rule, int parallelProcess) { > long slide = 100; > final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> > windowStream = > windowType.equalsIgnoreCase(SLIDING) ? > monitoringTupleKeyedStream > .timeWindow(timeWindow, > org.apache.flink.streaming.api.windowing.time.Time.milliseconds(slide)) : > monitoringTupleKeyedStream > .timeWindow(timeWindow); > return windowStream.aggregate( > new MGroupingWindowAggregate(interval),//*<=== never gets > into add() here* > new MGroupingAggregateWindowProcessing(interval, ruleImpl, > rule)) > .map(new MonitoringGroupingToInfluxDBPoint(rule)); > > } > > On Mon, Oct 14, 2019 at 12:41 AM Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > >> Hi Vijay, >> >> Could you check if the Watermark for the aggregate operator advances? You >> should be able to check that in the Flink WebUI. Could it be that the >> Watermark does not advance for all of the upstream operators? The watermark >> for a particular operator is a minimum of watermarks received from all of >> the upstream operators. Therefore if some of them does not produce any, the >> resulting watermark will not advance. >> >> Best, >> >> Dawdi >> On 11/10/2019 21:37, Vijay Balakrishnan wrote: >> >> Hi, >> Here is my issue with *Event Processing* with the *add() method of >> MGroupingWindowAggregate not being called* even though a new watermark >> is fired >> 1. *Ingest data from Kinesis (works fine)* >> 2. *Deserialize* in MonitoringMapKinesisSchema(*works fine* and get json >> back) >> 3. I do *assign MonitoringTSWAssigner*(code below) to the source with >> bound of 10(have tried 3000, 30000). *It fires a new WaterMark* with each >> incoming record but the *windowStream.aggregate method doesn't seem to >> fire* and I >> *don't see the add() method of MGroupingWindowAggregate called *???? I *can >> see the newWaterMark being emitted in >> TimestampsAndPunctuatedWatermarksOperator.processElement* >> 4. I have tried with timeWindow of 1m and 15s >> >> *Main* code: >> >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.setStreamTimeCharacteristic(TimeCharacteristic.*EventTime*); >> >> //Setup Kinesis Consumer >> Properties kinesisConsumerConfig = new Properties(); >> .. >> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, >> ConsumerConfigConstants.InitialPosition.LATEST.name());//LATEST >> FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new >> FlinkKinesisConsumer<>( >> "kinesisTopicRead", new MonitoringMapKinesisSchema(true), >> kinesisConsumerConfig); >> >> DataStream<Map<String, Object>> kinesisStream; >> RichSinkFunction<InfluxDBPoint> influxSink; >> >> DataStreamSource<Map<String, Object>> monitoringDataStreamSource = >> env.addSource(kinesisConsumer); >> kinesisStream = monitoringDataStreamSource >> .assignTimestampsAndWatermarks(new *MonitoringTSWAssigner*(bound >> )); >> influxSink = pms.createInfluxMonitoringSink(....); >> ...... >> ...timeWindow = Time.seconds(*timeIntervalL*);//tried with >> timeIntervalL=15s, 1m >> >> KeyedStream<Map<String, Object>, MonitoringTuple> >> monitoringTupleKeyedStream = >> kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric)); >> final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> >> windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow); >> DataStream<InfluxDBPoint> enrichedMGStream = >> *windowStream.aggregate*(//*<===== >> never reaches here ?????* >> *new MGroupingWindowAggregate(interval)*, >> new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule)) >> .map(new MonitoringGroupingToInfluxDBPoint(rule)); >> enrichedMGStream.addSink(influxSink); >> env.execute("Aggregation of Map data"); >> >> *MonitoringTSWAssigner* code: >> public class MonitoringTSWAssigner implements >> AssignerWithPunctuatedWatermarks<Map<String, Object>> { >> private long bound = 5 * (long) 1000;//5 secs out of order bound in >> millisecs >> private long maxTimestamp = Long.MIN_VALUE; >> >> public MonitoringTSWAssigner() { >> } >> >> public MonitoringTSWAssigner(long bound) { >> this.bound = bound; >> } >> >> public long extractTimestamp(Map<String, Object> monitoring, long >> previousTS) { >> long extractedTS = getExtractedTS(monitoring); >> if (extractedTS > maxTimestamp) { >> maxTimestamp = extractedTS; >> } >> >> return extractedTS;//return System.currentTimeMillis(); >> >> } >> >> public long getExtractedTS(Map<String, Object> monitoring) { >> final String eventTimestamp = >> monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String) >> monitoring.get(Utils.EVENT_TIMESTAMP) : ""; >> return Utils.getLongFromDateStr(eventTimestamp); >> } >> >> @Override >> public Watermark checkAndGetNextWatermark(Map<String, Object> >> monitoring, long extractedTimestamp) { >> long extractedTS = getExtractedTS(monitoring); >> long nextWatermark = maxTimestamp - bound; >> return new Watermark(nextWatermark); >> } >> } >> >> *MGroupingWindowAggregate*: >> public class MGroupingWindowAggregate implements >> *AggregateFunction*<Map<String, >> Object>, Map<String, Object>, Map<String, Object>> { >> private final String interval; >> public MGroupingWindowAggregate(String interval) { >> this.interval = interval; >> } >> public Map<String, Object> createAccumulator() { >> return new ConcurrentHashMap<>(); >> } >> >> public Map<String, Object> add(Map<String, Object> monitoring, >> Map<String, Object> timedMap) { >> ..... >> } >> >> ..... >> >> } >> >> TIA, >> >> >> >>