Hi Theo,
You were right. For some reason(I still haven't figured it out) but the
FilterFunction was causing issues. I commented it out and it started
getting into the add() method of the aggregate method.
/*kinesisStream = kinesisStream.filter((FilterFunction<Map<String,
Object>>) inputMap -> {
Object groupByValueObj = inputMap.get(groupBy);
return groupByValueObj != null;
});*/
//String metric = Objects.requireNonNull(inputMetricSelector).getMetric();
TIA,
Vijay
On Tue, Oct 15, 2019 at 9:34 AM Vijay Balakrishnan <[email protected]>
wrote:
> Hi Theo,
> It gets to the FilterFunction during the creation of the ExecutionGraph
> initially but not during the runtime when recs are streaming in.So, it is
> not getting that far- seems to be stuck in the
>
> final SingleOutputStreamOperator<Map<String, Object>> filteredKinesisStream =
> kinesisStream.filter code.
>
> Doesn't seem to get past it as it keeps incrementing watermarks but the
> Watermark never seems to hit the end of the window.Maybe I am doing
>
> something super simple stupid.
>
> TIA,
> Vijay
>
> On Tue, Oct 15, 2019 at 12:48 AM Theo Diefenthal <
> [email protected]> wrote:
>
>> Hi Vijay,
>>
>> Maybe a stupid question, but according to your comments, the code works
>> fine up till a "flatMap" operation. It seems that this flatMap is directly
>> followed by a filter-Function in the method
>> createAggregatedMonitoringGroupingWindowStream1. Is ist maybe filtering out
>> all events? Or is not even the filter function itself called? (Due to your
>> comments suggesting it).
>>
>> Best regards
>> Theo
>>
>> ------------------------------
>> *Von: *"Vijay Balakrishnan" <[email protected]>
>> *An: *"Dawid Wysakowicz" <[email protected]>
>> *CC: *"user" <[email protected]>
>> *Gesendet: *Dienstag, 15. Oktober 2019 02:01:05
>> *Betreff: *Re: add() method of AggregateFunction not called even though
>> new watermark is emitted
>>
>> Hi,
>> Thx for the replies - Congxian & Dawdi.
>> Watermarks are advancing.Not sure how to check every new generated
>> watermark is reaching end of the window ????
>>
>> I did check the Flink UI for the currentInputWatermark and it is
>> increasing monotonically.
>>
>> Narrowed down the problem to not calling the windowStream.aggregate.
>> I also *added a checkpoint *to see if it was causing the issue.Didn't
>> seem to help.
>> Most of the code is reached during the creation of the ExecutionGraph on
>> the start of the program.
>>
>> I generate an incrementing sequence of timestamps(delay of 5000ms between
>> each rec) from a Producer to Kinesis and it emits a new watermark as it
>> starts receiving the input records.
>> My window size is 15s.
>> I see a WindowedStream is created with windowAssigner:
>> TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger
>> but the *code never gets into the EventTimeTrigger.onElement() or
>> onEventTime() to fire the trigger*.
>> It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark().
>> I even tried to use ProcessingTime but that also didn't help.
>>
>>
>> //code to create kinesis consumer successfully......
>> for (Rule rule : rules.getRules()) {
>> //gets in here fine
>> final SingleOutputStreamOperator<Map<String, Object>>
>> filteredKinesisStream = kinesisStream.filter(mon -> {
>> boolean result;
>> String eventName = mon.get(MEASUREMENT) != null ?
>> (String) mon.get(MEASUREMENT) : "";
>> InputMetricSelector inputMetricSelector =
>> rule.getInputMetricSelector();
>> String measurement = inputMetricSelector != null ?
>> inputMetricSelector.getMeasurement() : "";
>> result = eventName.equals(measurement);
>> if (result) {
>> Map<String, String> inputTags = mon.get(TAGS) != null
>> ? (Map<String, String>) mon.get(TAGS) : new HashMap<>();
>> Map<String, String> ruleTags = inputMetricSelector !=
>> null ? inputMetricSelector.getTags() : new HashMap<>();
>> result = matchTags(inputTags, ruleTags);
>> }
>> return result;//*<== this is true*
>> }
>> ).flatMap((FlatMapFunction<Map<String, Object>, Map<String, Object>>)
>> (input, out) -> {
>> out.collect(input);//*<==== runs up till here fine*
>> }).returns(new TypeHint<Map<String, Object>>() {
>> });
>> //*doesn't do anything beyond this point at runtime*
>> DataStream<InfluxDBPoint> enrichedMGStream =
>> pms.createAggregatedMonitoringGroupingWindowStream1
>> (filteredKinesisStream, ruleFactory, rule, parallelProcess);
>> enrichedMGStream.addSink(influxSink)
>> .setParallelism(nbrSinks);
>> }
>>
>> private DataStream<InfluxDBPoint>
>> createAggregatedMonitoringGroupingWindowStream1(DataStream<Map<String,
>> Object>> kinesisStream, RuleFactory ruleFactory, Rule rule, int
>> parallelProcess) {
>> DataStream<InfluxDBPoint> enrichedComponentInstanceStream1;
>> RuleConfig ruleConfig = rule.getRuleConfig();
>> String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : "";
>> RuleIF ruleImpl = ruleFactory.getRule(ruleType);
>> Map<String, Object> ruleProps = ruleConfig != null ?
>> ruleConfig.getRuleProps() : new HashMap<>();
>> Object intervalObj = ruleProps.get("rule_eval_window");
>> String timeInterval = intervalObj != null ? (String) intervalObj : "";
>> org.apache.flink.streaming.api.windowing.time.Time timeWindow =
>> getTimeWindowFromInterval(timeInterval);
>>
>> Object windowTypeObj = ruleProps.get("window_type");
>> String windowType = windowTypeObj != null ? (String) windowTypeObj :
>> "";
>>
>> InputMetricSelector inputMetricSelector =
>> rule.getInputMetricSelector();
>> Map<String, String> tags = inputMetricSelector != null ?
>> inputMetricSelector.getTags() : new HashMap<>();
>> String groupByObj = tags.get(GROUP_BY);
>> String groupBy = groupByObj != null ? groupByObj : "";
>> kinesisStream = kinesisStream.filter((FilterFunction<Map<String,
>> Object>>) inputMap -> {
>> Object groupByValueObj = inputMap.get(groupBy);
>> return groupByValueObj != null;
>> });
>> Set<String> groupBySet = new
>> HashSet<>(Arrays.asList(groupBy.split(KEY_DELIMITER)));
>> String metric =
>> Objects.requireNonNull(inputMetricSelector).getMetric();
>> //till here, it went through fine during creation of ExceutionGraph
>> KeyedStream<Map<String, Object>, MonitoringTuple>
>> monitoringTupleKeyedStream =
>> kinesisStream.keyBy(new MapTupleKeySelector(groupBySet,
>> metric));*<=== never gets into the MapTupleKeySelector.getKey() - a
>> similar class works in another project*
>> enrichedComponentInstanceStream1 =
>> getMonitoringGroupDataStream1(monitoringTupleKeyedStream, timeWindow,
>> windowType, timeInterval, ruleImpl, rule, parallelProcess);
>> return enrichedComponentInstanceStream1;
>> }
>>
>> private DataStream<InfluxDBPoint>
>> getMonitoringGroupDataStream1(KeyedStream<Map<String, Object>,
>> MonitoringTuple> monitoringTupleKeyedStream,
>>
>> org.apache.flink.streaming.api.windowing.time.Time timeWindow, String
>> windowType,
>> String
>> interval,
>> RuleIF
>> ruleImpl, Rule rule, int parallelProcess) {
>> long slide = 100;
>> final WindowedStream<Map<String, Object>, MonitoringTuple,
>> TimeWindow> windowStream =
>> windowType.equalsIgnoreCase(SLIDING) ?
>> monitoringTupleKeyedStream
>> .timeWindow(timeWindow,
>> org.apache.flink.streaming.api.windowing.time.Time.milliseconds(slide)) :
>> monitoringTupleKeyedStream
>> .timeWindow(timeWindow);
>> return windowStream.aggregate(
>> new MGroupingWindowAggregate(interval),//*<=== never gets
>> into add() here*
>> new MGroupingAggregateWindowProcessing(interval, ruleImpl,
>> rule))
>> .map(new MonitoringGroupingToInfluxDBPoint(rule));
>>
>> }
>>
>> On Mon, Oct 14, 2019 at 12:41 AM Dawid Wysakowicz <[email protected]>
>> wrote:
>>
>>> Hi Vijay,
>>>
>>> Could you check if the Watermark for the aggregate operator advances?
>>> You should be able to check that in the Flink WebUI. Could it be that the
>>> Watermark does not advance for all of the upstream operators? The watermark
>>> for a particular operator is a minimum of watermarks received from all of
>>> the upstream operators. Therefore if some of them does not produce any, the
>>> resulting watermark will not advance.
>>>
>>> Best,
>>>
>>> Dawdi
>>> On 11/10/2019 21:37, Vijay Balakrishnan wrote:
>>>
>>> Hi,
>>> Here is my issue with *Event Processing* with the *add() method of
>>> MGroupingWindowAggregate not being called* even though a new watermark
>>> is fired
>>> 1. *Ingest data from Kinesis (works fine)*
>>> 2. *Deserialize* in MonitoringMapKinesisSchema(*works fine* and get
>>> json back)
>>> 3. I do *assign MonitoringTSWAssigner*(code below) to the source with
>>> bound of 10(have tried 3000, 30000). *It fires a new WaterMark* with
>>> each
>>> incoming record but the *windowStream.aggregate method doesn't seem to
>>> fire* and I
>>> *don't see the add() method of MGroupingWindowAggregate called *???? I *can
>>> see the newWaterMark being emitted in
>>> TimestampsAndPunctuatedWatermarksOperator.processElement*
>>> 4. I have tried with timeWindow of 1m and 15s
>>>
>>> *Main* code:
>>>
>>> final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.*EventTime*);
>>>
>>> //Setup Kinesis Consumer
>>> Properties kinesisConsumerConfig = new Properties();
>>> ..
>>> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
>>> ConsumerConfigConstants.InitialPosition.LATEST.name());//LATEST
>>> FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new
>>> FlinkKinesisConsumer<>(
>>> "kinesisTopicRead", new
>>> MonitoringMapKinesisSchema(true), kinesisConsumerConfig);
>>>
>>> DataStream<Map<String, Object>> kinesisStream;
>>> RichSinkFunction<InfluxDBPoint> influxSink;
>>>
>>> DataStreamSource<Map<String, Object>> monitoringDataStreamSource =
>>> env.addSource(kinesisConsumer);
>>> kinesisStream = monitoringDataStreamSource
>>> .assignTimestampsAndWatermarks(new *MonitoringTSWAssigner*(bound
>>> ));
>>> influxSink = pms.createInfluxMonitoringSink(....);
>>> ......
>>> ...timeWindow = Time.seconds(*timeIntervalL*);//tried with
>>> timeIntervalL=15s, 1m
>>>
>>> KeyedStream<Map<String, Object>, MonitoringTuple>
>>> monitoringTupleKeyedStream =
>>> kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));
>>> final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow>
>>> windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow);
>>> DataStream<InfluxDBPoint> enrichedMGStream =
>>> *windowStream.aggregate*(//*<=====
>>> never reaches here ?????*
>>> *new MGroupingWindowAggregate(interval)*,
>>> new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
>>> .map(new MonitoringGroupingToInfluxDBPoint(rule));
>>> enrichedMGStream.addSink(influxSink);
>>> env.execute("Aggregation of Map data");
>>>
>>> *MonitoringTSWAssigner* code:
>>> public class MonitoringTSWAssigner implements
>>> AssignerWithPunctuatedWatermarks<Map<String, Object>> {
>>> private long bound = 5 * (long) 1000;//5 secs out of order bound in
>>> millisecs
>>> private long maxTimestamp = Long.MIN_VALUE;
>>>
>>> public MonitoringTSWAssigner() {
>>> }
>>>
>>> public MonitoringTSWAssigner(long bound) {
>>> this.bound = bound;
>>> }
>>>
>>> public long extractTimestamp(Map<String, Object> monitoring, long
>>> previousTS) {
>>> long extractedTS = getExtractedTS(monitoring);
>>> if (extractedTS > maxTimestamp) {
>>> maxTimestamp = extractedTS;
>>> }
>>>
>>> return extractedTS;//return System.currentTimeMillis();
>>>
>>> }
>>>
>>> public long getExtractedTS(Map<String, Object> monitoring) {
>>> final String eventTimestamp =
>>> monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String)
>>> monitoring.get(Utils.EVENT_TIMESTAMP) : "";
>>> return Utils.getLongFromDateStr(eventTimestamp);
>>> }
>>>
>>> @Override
>>> public Watermark checkAndGetNextWatermark(Map<String, Object>
>>> monitoring, long extractedTimestamp) {
>>> long extractedTS = getExtractedTS(monitoring);
>>> long nextWatermark = maxTimestamp - bound;
>>> return new Watermark(nextWatermark);
>>> }
>>> }
>>>
>>> *MGroupingWindowAggregate*:
>>> public class MGroupingWindowAggregate implements
>>> *AggregateFunction*<Map<String,
>>> Object>, Map<String, Object>, Map<String, Object>> {
>>> private final String interval;
>>> public MGroupingWindowAggregate(String interval) {
>>> this.interval = interval;
>>> }
>>> public Map<String, Object> createAccumulator() {
>>> return new ConcurrentHashMap<>();
>>> }
>>>
>>> public Map<String, Object> add(Map<String, Object> monitoring,
>>> Map<String, Object> timedMap) {
>>> .....
>>> }
>>>
>>> .....
>>>
>>> }
>>>
>>> TIA,
>>>
>>>
>>>
>>>