Hi,
Thx for the replies - Congxian & Dawdi.
Watermarks are advancing.Not sure how to check every new generated
watermark is reaching end of the window ????

I did check the Flink UI for the currentInputWatermark and it is increasing
monotonically.

Narrowed down the problem to not calling the windowStream.aggregate.
I also *added a checkpoint *to see if it was causing the issue.Didn't seem
to help.
Most of the code is reached during the creation of the ExecutionGraph on
the start of the program.

I generate an incrementing sequence of timestamps(delay of 5000ms between
each rec) from a Producer to Kinesis and it emits a new watermark as it
starts receiving the input records.
My window size is 15s.
I see a WindowedStream is created with windowAssigner:
TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger
but the *code never gets into the EventTimeTrigger.onElement() or
onEventTime() to fire the trigger*.
It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark().
I even tried to use ProcessingTime but that also didn't help.


//code to create kinesis consumer successfully......
for (Rule rule : rules.getRules()) {
//gets in here fine
    final SingleOutputStreamOperator<Map<String, Object>>
filteredKinesisStream = kinesisStream.filter(mon -> {
                boolean result;
                String eventName = mon.get(MEASUREMENT) != null ? (String)
mon.get(MEASUREMENT) : "";
                InputMetricSelector inputMetricSelector =
rule.getInputMetricSelector();
                String measurement = inputMetricSelector != null ?
inputMetricSelector.getMeasurement() : "";
                result = eventName.equals(measurement);
                if (result) {
                    Map<String, String> inputTags = mon.get(TAGS) != null ?
(Map<String, String>) mon.get(TAGS) : new HashMap<>();
                    Map<String, String> ruleTags = inputMetricSelector !=
null ? inputMetricSelector.getTags() : new HashMap<>();
                    result = matchTags(inputTags, ruleTags);
                }
                return result;//*<== this is true*
            }
    ).flatMap((FlatMapFunction<Map<String, Object>, Map<String, Object>>)
(input, out) -> {
            out.collect(input);//*<==== runs up till here fine*
    }).returns(new TypeHint<Map<String, Object>>() {
    });
//*doesn't do anything beyond this point at runtime*
    DataStream<InfluxDBPoint> enrichedMGStream =
pms.createAggregatedMonitoringGroupingWindowStream1
            (filteredKinesisStream, ruleFactory, rule, parallelProcess);
    enrichedMGStream.addSink(influxSink)
            .setParallelism(nbrSinks);
}

private DataStream<InfluxDBPoint>
createAggregatedMonitoringGroupingWindowStream1(DataStream<Map<String,
Object>> kinesisStream, RuleFactory ruleFactory, Rule rule, int
parallelProcess) {
    DataStream<InfluxDBPoint> enrichedComponentInstanceStream1;
    RuleConfig ruleConfig = rule.getRuleConfig();
    String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : "";
    RuleIF ruleImpl = ruleFactory.getRule(ruleType);
    Map<String, Object> ruleProps = ruleConfig != null ?
ruleConfig.getRuleProps() : new HashMap<>();
    Object intervalObj = ruleProps.get("rule_eval_window");
    String timeInterval = intervalObj != null ? (String) intervalObj : "";
    org.apache.flink.streaming.api.windowing.time.Time timeWindow =
getTimeWindowFromInterval(timeInterval);

    Object windowTypeObj = ruleProps.get("window_type");
    String windowType = windowTypeObj != null ? (String) windowTypeObj : "";

    InputMetricSelector inputMetricSelector = rule.getInputMetricSelector();
    Map<String, String> tags = inputMetricSelector != null ?
inputMetricSelector.getTags() : new HashMap<>();
    String groupByObj = tags.get(GROUP_BY);
    String groupBy = groupByObj != null ? groupByObj : "";
    kinesisStream = kinesisStream.filter((FilterFunction<Map<String,
Object>>) inputMap -> {
        Object groupByValueObj = inputMap.get(groupBy);
        return groupByValueObj != null;
    });
    Set<String> groupBySet = new
HashSet<>(Arrays.asList(groupBy.split(KEY_DELIMITER)));
    String metric = Objects.requireNonNull(inputMetricSelector).getMetric();
//till here, it went through fine during creation of ExceutionGraph
    KeyedStream<Map<String, Object>, MonitoringTuple>
monitoringTupleKeyedStream =
            kinesisStream.keyBy(new MapTupleKeySelector(groupBySet,
metric));*<=== never gets into the MapTupleKeySelector.getKey() - a similar
class works in another project*
    enrichedComponentInstanceStream1 =
getMonitoringGroupDataStream1(monitoringTupleKeyedStream, timeWindow,
windowType, timeInterval, ruleImpl, rule, parallelProcess);
    return enrichedComponentInstanceStream1;
}

private DataStream<InfluxDBPoint>
getMonitoringGroupDataStream1(KeyedStream<Map<String, Object>,
MonitoringTuple> monitoringTupleKeyedStream,

org.apache.flink.streaming.api.windowing.time.Time timeWindow, String
windowType,
                                                                String
interval,
                                                                RuleIF
ruleImpl, Rule rule, int parallelProcess) {
    long slide = 100;
    final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow>
windowStream =
            windowType.equalsIgnoreCase(SLIDING) ?
                    monitoringTupleKeyedStream
                            .timeWindow(timeWindow,
org.apache.flink.streaming.api.windowing.time.Time.milliseconds(slide)) :
                    monitoringTupleKeyedStream
                            .timeWindow(timeWindow);
    return windowStream.aggregate(
            new MGroupingWindowAggregate(interval),//*<=== never gets into
add() here*
            new MGroupingAggregateWindowProcessing(interval, ruleImpl,
rule))
            .map(new MonitoringGroupingToInfluxDBPoint(rule));

}

On Mon, Oct 14, 2019 at 12:41 AM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hi Vijay,
>
> Could you check if the Watermark for the aggregate operator advances? You
> should be able to check that in the Flink WebUI. Could it be that the
> Watermark does not advance for all of the upstream operators? The watermark
> for a particular operator is a minimum of watermarks received from all of
> the upstream operators. Therefore if some of them does not produce any, the
> resulting watermark will not advance.
>
> Best,
>
> Dawdi
> On 11/10/2019 21:37, Vijay Balakrishnan wrote:
>
> Hi,
> Here is my issue with *Event Processing* with the *add() method of
> MGroupingWindowAggregate not being called* even though a new watermark is
> fired
> 1. *Ingest data from Kinesis (works fine)*
> 2. *Deserialize* in MonitoringMapKinesisSchema(*works fine* and get json
> back)
> 3. I do *assign MonitoringTSWAssigner*(code below) to the source with
> bound of 10(have tried 3000, 30000). *It fires a new WaterMark* with each
> incoming record but the *windowStream.aggregate method doesn't seem to
> fire* and I
> *don't see the add() method of MGroupingWindowAggregate called *???? I *can
> see the newWaterMark being emitted in
> TimestampsAndPunctuatedWatermarksOperator.processElement*
> 4. I have tried with timeWindow of 1m and 15s
>
> *Main* code:
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.*EventTime*);
>
> //Setup Kinesis Consumer
> Properties kinesisConsumerConfig = new Properties();
> ..
> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
> ConsumerConfigConstants.InitialPosition.LATEST.name());//LATEST
> FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new
> FlinkKinesisConsumer<>(
>                 "kinesisTopicRead", new MonitoringMapKinesisSchema(true),
> kinesisConsumerConfig);
>
> DataStream<Map<String, Object>> kinesisStream;
> RichSinkFunction<InfluxDBPoint> influxSink;
>
> DataStreamSource<Map<String, Object>> monitoringDataStreamSource =
> env.addSource(kinesisConsumer);
> kinesisStream = monitoringDataStreamSource
>         .assignTimestampsAndWatermarks(new *MonitoringTSWAssigner*(bound
> ));
> influxSink = pms.createInfluxMonitoringSink(....);
> ......
> ...timeWindow = Time.seconds(*timeIntervalL*);//tried with
> timeIntervalL=15s, 1m
>
> KeyedStream<Map<String, Object>, MonitoringTuple>
> monitoringTupleKeyedStream =
>         kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));
> final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow>
> windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow);
> DataStream<InfluxDBPoint> enrichedMGStream = 
> *windowStream.aggregate*(//*<=====
> never reaches here ?????*
>         *new MGroupingWindowAggregate(interval)*,
>         new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
>         .map(new MonitoringGroupingToInfluxDBPoint(rule));
> enrichedMGStream.addSink(influxSink);
> env.execute("Aggregation of Map data");
>
> *MonitoringTSWAssigner* code:
> public class MonitoringTSWAssigner implements
> AssignerWithPunctuatedWatermarks<Map<String, Object>> {
>     private long bound = 5 * (long) 1000;//5 secs out of order bound in
> millisecs
>     private long maxTimestamp = Long.MIN_VALUE;
>
>     public MonitoringTSWAssigner() {
>     }
>
>     public MonitoringTSWAssigner(long bound) {
>         this.bound = bound;
>     }
>
>     public long extractTimestamp(Map<String, Object> monitoring, long
> previousTS) {
>         long extractedTS = getExtractedTS(monitoring);
>         if (extractedTS > maxTimestamp) {
>             maxTimestamp = extractedTS;
>         }
>
>    return extractedTS;//return System.currentTimeMillis();
>
>     }
>
>     public long getExtractedTS(Map<String, Object> monitoring) {
>         final String eventTimestamp =
> monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String)
> monitoring.get(Utils.EVENT_TIMESTAMP) : "";
>         return Utils.getLongFromDateStr(eventTimestamp);
>     }
>
>     @Override
>     public Watermark checkAndGetNextWatermark(Map<String, Object>
> monitoring, long extractedTimestamp) {
>         long extractedTS = getExtractedTS(monitoring);
>         long nextWatermark = maxTimestamp - bound;
>         return new Watermark(nextWatermark);
>     }
> }
>
> *MGroupingWindowAggregate*:
> public class MGroupingWindowAggregate implements 
> *AggregateFunction*<Map<String,
> Object>, Map<String, Object>, Map<String, Object>> {
>     private final String interval;
>     public MGroupingWindowAggregate(String interval) {
>         this.interval = interval;
>     }
>     public Map<String, Object> createAccumulator() {
>         return new ConcurrentHashMap<>();
>     }
>
>     public Map<String, Object> add(Map<String, Object> monitoring,
> Map<String, Object> timedMap) {
> .....
> }
>
> .....
>
> }
>
> TIA,
>
>
>
>

Reply via email to