Hi Theo,
It gets to the FilterFunction during the creation of the ExecutionGraph
initially but not during the runtime when recs are streaming in.So, it is
not getting that far- seems to be stuck in the

final SingleOutputStreamOperator<Map<String, Object>>
filteredKinesisStream = kinesisStream.filter   code.

Doesn't seem to get past it as it keeps incrementing watermarks but
the Watermark never seems to hit the end of the window.Maybe I am
doing

something super simple stupid.

TIA,
Vijay

On Tue, Oct 15, 2019 at 12:48 AM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:

> Hi Vijay,
>
> Maybe a stupid question, but according to your comments, the code works
> fine up till a "flatMap" operation. It seems that this flatMap is directly
> followed by a filter-Function in the method
> createAggregatedMonitoringGroupingWindowStream1. Is ist maybe filtering out
> all events? Or is not even the filter function itself called? (Due to your
> comments suggesting it).
>
> Best regards
> Theo
>
> ------------------------------
> *Von: *"Vijay Balakrishnan" <bvija...@gmail.com>
> *An: *"Dawid Wysakowicz" <dwysakow...@apache.org>
> *CC: *"user" <user@flink.apache.org>
> *Gesendet: *Dienstag, 15. Oktober 2019 02:01:05
> *Betreff: *Re: add() method of AggregateFunction not called even though
> new watermark is emitted
>
> Hi,
> Thx for the replies - Congxian & Dawdi.
> Watermarks are advancing.Not sure how to check every new generated
> watermark is reaching end of the window ????
>
> I did check the Flink UI for the currentInputWatermark and it is
> increasing monotonically.
>
> Narrowed down the problem to not calling the windowStream.aggregate.
> I also *added a checkpoint *to see if it was causing the issue.Didn't
> seem to help.
> Most of the code is reached during the creation of the ExecutionGraph on
> the start of the program.
>
> I generate an incrementing sequence of timestamps(delay of 5000ms between
> each rec) from a Producer to Kinesis and it emits a new watermark as it
> starts receiving the input records.
> My window size is 15s.
> I see a WindowedStream is created with windowAssigner:
> TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger
> but the *code never gets into the EventTimeTrigger.onElement() or
> onEventTime() to fire the trigger*.
> It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark().
> I even tried to use ProcessingTime but that also didn't help.
>
>
> //code to create kinesis consumer successfully......
> for (Rule rule : rules.getRules()) {
> //gets in here fine
>     final SingleOutputStreamOperator<Map<String, Object>>
> filteredKinesisStream = kinesisStream.filter(mon -> {
>                 boolean result;
>                 String eventName = mon.get(MEASUREMENT) != null ? (String)
> mon.get(MEASUREMENT) : "";
>                 InputMetricSelector inputMetricSelector =
> rule.getInputMetricSelector();
>                 String measurement = inputMetricSelector != null ?
> inputMetricSelector.getMeasurement() : "";
>                 result = eventName.equals(measurement);
>                 if (result) {
>                     Map<String, String> inputTags = mon.get(TAGS) != null
> ? (Map<String, String>) mon.get(TAGS) : new HashMap<>();
>                     Map<String, String> ruleTags = inputMetricSelector !=
> null ? inputMetricSelector.getTags() : new HashMap<>();
>                     result = matchTags(inputTags, ruleTags);
>                 }
>                 return result;//*<== this is true*
>             }
>     ).flatMap((FlatMapFunction<Map<String, Object>, Map<String, Object>>)
> (input, out) -> {
>             out.collect(input);//*<==== runs up till here fine*
>     }).returns(new TypeHint<Map<String, Object>>() {
>     });
> //*doesn't do anything beyond this point at runtime*
>     DataStream<InfluxDBPoint> enrichedMGStream =
> pms.createAggregatedMonitoringGroupingWindowStream1
>             (filteredKinesisStream, ruleFactory, rule, parallelProcess);
>     enrichedMGStream.addSink(influxSink)
>             .setParallelism(nbrSinks);
> }
>
> private DataStream<InfluxDBPoint>
> createAggregatedMonitoringGroupingWindowStream1(DataStream<Map<String,
> Object>> kinesisStream, RuleFactory ruleFactory, Rule rule, int
> parallelProcess) {
>     DataStream<InfluxDBPoint> enrichedComponentInstanceStream1;
>     RuleConfig ruleConfig = rule.getRuleConfig();
>     String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : "";
>     RuleIF ruleImpl = ruleFactory.getRule(ruleType);
>     Map<String, Object> ruleProps = ruleConfig != null ?
> ruleConfig.getRuleProps() : new HashMap<>();
>     Object intervalObj = ruleProps.get("rule_eval_window");
>     String timeInterval = intervalObj != null ? (String) intervalObj : "";
>     org.apache.flink.streaming.api.windowing.time.Time timeWindow =
> getTimeWindowFromInterval(timeInterval);
>
>     Object windowTypeObj = ruleProps.get("window_type");
>     String windowType = windowTypeObj != null ? (String) windowTypeObj :
> "";
>
>     InputMetricSelector inputMetricSelector =
> rule.getInputMetricSelector();
>     Map<String, String> tags = inputMetricSelector != null ?
> inputMetricSelector.getTags() : new HashMap<>();
>     String groupByObj = tags.get(GROUP_BY);
>     String groupBy = groupByObj != null ? groupByObj : "";
>     kinesisStream = kinesisStream.filter((FilterFunction<Map<String,
> Object>>) inputMap -> {
>         Object groupByValueObj = inputMap.get(groupBy);
>         return groupByValueObj != null;
>     });
>     Set<String> groupBySet = new
> HashSet<>(Arrays.asList(groupBy.split(KEY_DELIMITER)));
>     String metric =
> Objects.requireNonNull(inputMetricSelector).getMetric();
> //till here, it went through fine during creation of ExceutionGraph
>     KeyedStream<Map<String, Object>, MonitoringTuple>
> monitoringTupleKeyedStream =
>             kinesisStream.keyBy(new MapTupleKeySelector(groupBySet,
> metric));*<=== never gets into the MapTupleKeySelector.getKey() - a
> similar class works in another project*
>     enrichedComponentInstanceStream1 =
> getMonitoringGroupDataStream1(monitoringTupleKeyedStream, timeWindow,
> windowType, timeInterval, ruleImpl, rule, parallelProcess);
>     return enrichedComponentInstanceStream1;
> }
>
> private DataStream<InfluxDBPoint>
> getMonitoringGroupDataStream1(KeyedStream<Map<String, Object>,
> MonitoringTuple> monitoringTupleKeyedStream,
>
> org.apache.flink.streaming.api.windowing.time.Time timeWindow, String
> windowType,
>                                                                 String
> interval,
>                                                                 RuleIF
> ruleImpl, Rule rule, int parallelProcess) {
>     long slide = 100;
>     final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow>
> windowStream =
>             windowType.equalsIgnoreCase(SLIDING) ?
>                     monitoringTupleKeyedStream
>                             .timeWindow(timeWindow,
> org.apache.flink.streaming.api.windowing.time.Time.milliseconds(slide)) :
>                     monitoringTupleKeyedStream
>                             .timeWindow(timeWindow);
>     return windowStream.aggregate(
>             new MGroupingWindowAggregate(interval),//*<=== never gets
> into add() here*
>             new MGroupingAggregateWindowProcessing(interval, ruleImpl,
> rule))
>             .map(new MonitoringGroupingToInfluxDBPoint(rule));
>
> }
>
> On Mon, Oct 14, 2019 at 12:41 AM Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
>> Hi Vijay,
>>
>> Could you check if the Watermark for the aggregate operator advances? You
>> should be able to check that in the Flink WebUI. Could it be that the
>> Watermark does not advance for all of the upstream operators? The watermark
>> for a particular operator is a minimum of watermarks received from all of
>> the upstream operators. Therefore if some of them does not produce any, the
>> resulting watermark will not advance.
>>
>> Best,
>>
>> Dawdi
>> On 11/10/2019 21:37, Vijay Balakrishnan wrote:
>>
>> Hi,
>> Here is my issue with *Event Processing* with the *add() method of
>> MGroupingWindowAggregate not being called* even though a new watermark
>> is fired
>> 1. *Ingest data from Kinesis (works fine)*
>> 2. *Deserialize* in MonitoringMapKinesisSchema(*works fine* and get json
>> back)
>> 3. I do *assign MonitoringTSWAssigner*(code below) to the source with
>> bound of 10(have tried 3000, 30000). *It fires a new WaterMark* with each
>> incoming record but the *windowStream.aggregate method doesn't seem to
>> fire* and I
>> *don't see the add() method of MGroupingWindowAggregate called *???? I *can
>> see the newWaterMark being emitted in
>> TimestampsAndPunctuatedWatermarksOperator.processElement*
>> 4. I have tried with timeWindow of 1m and 15s
>>
>> *Main* code:
>>
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStreamTimeCharacteristic(TimeCharacteristic.*EventTime*);
>>
>> //Setup Kinesis Consumer
>> Properties kinesisConsumerConfig = new Properties();
>> ..
>> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
>> ConsumerConfigConstants.InitialPosition.LATEST.name());//LATEST
>> FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new
>> FlinkKinesisConsumer<>(
>>                 "kinesisTopicRead", new MonitoringMapKinesisSchema(true),
>> kinesisConsumerConfig);
>>
>> DataStream<Map<String, Object>> kinesisStream;
>> RichSinkFunction<InfluxDBPoint> influxSink;
>>
>> DataStreamSource<Map<String, Object>> monitoringDataStreamSource =
>> env.addSource(kinesisConsumer);
>> kinesisStream = monitoringDataStreamSource
>>         .assignTimestampsAndWatermarks(new *MonitoringTSWAssigner*(bound
>> ));
>> influxSink = pms.createInfluxMonitoringSink(....);
>> ......
>> ...timeWindow = Time.seconds(*timeIntervalL*);//tried with
>> timeIntervalL=15s, 1m
>>
>> KeyedStream<Map<String, Object>, MonitoringTuple>
>> monitoringTupleKeyedStream =
>>         kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));
>> final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow>
>> windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow);
>> DataStream<InfluxDBPoint> enrichedMGStream = 
>> *windowStream.aggregate*(//*<=====
>> never reaches here ?????*
>>         *new MGroupingWindowAggregate(interval)*,
>>         new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
>>         .map(new MonitoringGroupingToInfluxDBPoint(rule));
>> enrichedMGStream.addSink(influxSink);
>> env.execute("Aggregation of Map data");
>>
>> *MonitoringTSWAssigner* code:
>> public class MonitoringTSWAssigner implements
>> AssignerWithPunctuatedWatermarks<Map<String, Object>> {
>>     private long bound = 5 * (long) 1000;//5 secs out of order bound in
>> millisecs
>>     private long maxTimestamp = Long.MIN_VALUE;
>>
>>     public MonitoringTSWAssigner() {
>>     }
>>
>>     public MonitoringTSWAssigner(long bound) {
>>         this.bound = bound;
>>     }
>>
>>     public long extractTimestamp(Map<String, Object> monitoring, long
>> previousTS) {
>>         long extractedTS = getExtractedTS(monitoring);
>>         if (extractedTS > maxTimestamp) {
>>             maxTimestamp = extractedTS;
>>         }
>>
>>    return extractedTS;//return System.currentTimeMillis();
>>
>>     }
>>
>>     public long getExtractedTS(Map<String, Object> monitoring) {
>>         final String eventTimestamp =
>> monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String)
>> monitoring.get(Utils.EVENT_TIMESTAMP) : "";
>>         return Utils.getLongFromDateStr(eventTimestamp);
>>     }
>>
>>     @Override
>>     public Watermark checkAndGetNextWatermark(Map<String, Object>
>> monitoring, long extractedTimestamp) {
>>         long extractedTS = getExtractedTS(monitoring);
>>         long nextWatermark = maxTimestamp - bound;
>>         return new Watermark(nextWatermark);
>>     }
>> }
>>
>> *MGroupingWindowAggregate*:
>> public class MGroupingWindowAggregate implements 
>> *AggregateFunction*<Map<String,
>> Object>, Map<String, Object>, Map<String, Object>> {
>>     private final String interval;
>>     public MGroupingWindowAggregate(String interval) {
>>         this.interval = interval;
>>     }
>>     public Map<String, Object> createAccumulator() {
>>         return new ConcurrentHashMap<>();
>>     }
>>
>>     public Map<String, Object> add(Map<String, Object> monitoring,
>> Map<String, Object> timedMap) {
>> .....
>> }
>>
>> .....
>>
>> }
>>
>> TIA,
>>
>>
>>
>>

Reply via email to