Hi,
Thx for the replies - Congxian & Dawdi.
Watermarks are advancing.Not sure how to check every new generated
watermark is reaching end of the window ????
I did check the Flink UI for the currentInputWatermark and it is increasing
monotonically.
Narrowed down the problem to not calling the windowStream.aggregate.
I also *added a checkpoint *to see if it was causing the issue.Didn't seem
to help.
Most of the code is reached during the creation of the ExecutionGraph on
the start of the program.
I generate an incrementing sequence of timestamps(delay of 5000ms between
each rec) from a Producer to Kinesis and it emits a new watermark as it
starts receiving the input records.
My window size is 15s.
I see a WindowedStream is created with windowAssigner:
TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger
but the *code never gets into the EventTimeTrigger.onElement() or
onEventTime() to fire the trigger*.
It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark().
I even tried to use ProcessingTime but that also didn't help.
//code to create kinesis consumer successfully......
for (Rule rule : rules.getRules()) {
//gets in here fine
final SingleOutputStreamOperator<Map<String, Object>>
filteredKinesisStream = kinesisStream.filter(mon -> {
boolean result;
String eventName = mon.get(MEASUREMENT) != null ? (String)
mon.get(MEASUREMENT) : "";
InputMetricSelector inputMetricSelector =
rule.getInputMetricSelector();
String measurement = inputMetricSelector != null ?
inputMetricSelector.getMeasurement() : "";
result = eventName.equals(measurement);
if (result) {
Map<String, String> inputTags = mon.get(TAGS) != null ?
(Map<String, String>) mon.get(TAGS) : new HashMap<>();
Map<String, String> ruleTags = inputMetricSelector !=
null ? inputMetricSelector.getTags() : new HashMap<>();
result = matchTags(inputTags, ruleTags);
}
return result;//*<== this is true*
}
).flatMap((FlatMapFunction<Map<String, Object>, Map<String, Object>>)
(input, out) -> {
out.collect(input);//*<==== runs up till here fine*
}).returns(new TypeHint<Map<String, Object>>() {
});
//*doesn't do anything beyond this point at runtime*
DataStream<InfluxDBPoint> enrichedMGStream =
pms.createAggregatedMonitoringGroupingWindowStream1
(filteredKinesisStream, ruleFactory, rule, parallelProcess);
enrichedMGStream.addSink(influxSink)
.setParallelism(nbrSinks);
}
private DataStream<InfluxDBPoint>
createAggregatedMonitoringGroupingWindowStream1(DataStream<Map<String,
Object>> kinesisStream, RuleFactory ruleFactory, Rule rule, int
parallelProcess) {
DataStream<InfluxDBPoint> enrichedComponentInstanceStream1;
RuleConfig ruleConfig = rule.getRuleConfig();
String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : "";
RuleIF ruleImpl = ruleFactory.getRule(ruleType);
Map<String, Object> ruleProps = ruleConfig != null ?
ruleConfig.getRuleProps() : new HashMap<>();
Object intervalObj = ruleProps.get("rule_eval_window");
String timeInterval = intervalObj != null ? (String) intervalObj : "";
org.apache.flink.streaming.api.windowing.time.Time timeWindow =
getTimeWindowFromInterval(timeInterval);
Object windowTypeObj = ruleProps.get("window_type");
String windowType = windowTypeObj != null ? (String) windowTypeObj : "";
InputMetricSelector inputMetricSelector = rule.getInputMetricSelector();
Map<String, String> tags = inputMetricSelector != null ?
inputMetricSelector.getTags() : new HashMap<>();
String groupByObj = tags.get(GROUP_BY);
String groupBy = groupByObj != null ? groupByObj : "";
kinesisStream = kinesisStream.filter((FilterFunction<Map<String,
Object>>) inputMap -> {
Object groupByValueObj = inputMap.get(groupBy);
return groupByValueObj != null;
});
Set<String> groupBySet = new
HashSet<>(Arrays.asList(groupBy.split(KEY_DELIMITER)));
String metric = Objects.requireNonNull(inputMetricSelector).getMetric();
//till here, it went through fine during creation of ExceutionGraph
KeyedStream<Map<String, Object>, MonitoringTuple>
monitoringTupleKeyedStream =
kinesisStream.keyBy(new MapTupleKeySelector(groupBySet,
metric));*<=== never gets into the MapTupleKeySelector.getKey() - a similar
class works in another project*
enrichedComponentInstanceStream1 =
getMonitoringGroupDataStream1(monitoringTupleKeyedStream, timeWindow,
windowType, timeInterval, ruleImpl, rule, parallelProcess);
return enrichedComponentInstanceStream1;
}
private DataStream<InfluxDBPoint>
getMonitoringGroupDataStream1(KeyedStream<Map<String, Object>,
MonitoringTuple> monitoringTupleKeyedStream,
org.apache.flink.streaming.api.windowing.time.Time timeWindow, String
windowType,
String
interval,
RuleIF
ruleImpl, Rule rule, int parallelProcess) {
long slide = 100;
final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow>
windowStream =
windowType.equalsIgnoreCase(SLIDING) ?
monitoringTupleKeyedStream
.timeWindow(timeWindow,
org.apache.flink.streaming.api.windowing.time.Time.milliseconds(slide)) :
monitoringTupleKeyedStream
.timeWindow(timeWindow);
return windowStream.aggregate(
new MGroupingWindowAggregate(interval),//*<=== never gets into
add() here*
new MGroupingAggregateWindowProcessing(interval, ruleImpl,
rule))
.map(new MonitoringGroupingToInfluxDBPoint(rule));
}
On Mon, Oct 14, 2019 at 12:41 AM Dawid Wysakowicz <[email protected]>
wrote:
> Hi Vijay,
>
> Could you check if the Watermark for the aggregate operator advances? You
> should be able to check that in the Flink WebUI. Could it be that the
> Watermark does not advance for all of the upstream operators? The watermark
> for a particular operator is a minimum of watermarks received from all of
> the upstream operators. Therefore if some of them does not produce any, the
> resulting watermark will not advance.
>
> Best,
>
> Dawdi
> On 11/10/2019 21:37, Vijay Balakrishnan wrote:
>
> Hi,
> Here is my issue with *Event Processing* with the *add() method of
> MGroupingWindowAggregate not being called* even though a new watermark is
> fired
> 1. *Ingest data from Kinesis (works fine)*
> 2. *Deserialize* in MonitoringMapKinesisSchema(*works fine* and get json
> back)
> 3. I do *assign MonitoringTSWAssigner*(code below) to the source with
> bound of 10(have tried 3000, 30000). *It fires a new WaterMark* with each
> incoming record but the *windowStream.aggregate method doesn't seem to
> fire* and I
> *don't see the add() method of MGroupingWindowAggregate called *???? I *can
> see the newWaterMark being emitted in
> TimestampsAndPunctuatedWatermarksOperator.processElement*
> 4. I have tried with timeWindow of 1m and 15s
>
> *Main* code:
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.*EventTime*);
>
> //Setup Kinesis Consumer
> Properties kinesisConsumerConfig = new Properties();
> ..
> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
> ConsumerConfigConstants.InitialPosition.LATEST.name());//LATEST
> FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new
> FlinkKinesisConsumer<>(
> "kinesisTopicRead", new MonitoringMapKinesisSchema(true),
> kinesisConsumerConfig);
>
> DataStream<Map<String, Object>> kinesisStream;
> RichSinkFunction<InfluxDBPoint> influxSink;
>
> DataStreamSource<Map<String, Object>> monitoringDataStreamSource =
> env.addSource(kinesisConsumer);
> kinesisStream = monitoringDataStreamSource
> .assignTimestampsAndWatermarks(new *MonitoringTSWAssigner*(bound
> ));
> influxSink = pms.createInfluxMonitoringSink(....);
> ......
> ...timeWindow = Time.seconds(*timeIntervalL*);//tried with
> timeIntervalL=15s, 1m
>
> KeyedStream<Map<String, Object>, MonitoringTuple>
> monitoringTupleKeyedStream =
> kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));
> final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow>
> windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow);
> DataStream<InfluxDBPoint> enrichedMGStream =
> *windowStream.aggregate*(//*<=====
> never reaches here ?????*
> *new MGroupingWindowAggregate(interval)*,
> new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
> .map(new MonitoringGroupingToInfluxDBPoint(rule));
> enrichedMGStream.addSink(influxSink);
> env.execute("Aggregation of Map data");
>
> *MonitoringTSWAssigner* code:
> public class MonitoringTSWAssigner implements
> AssignerWithPunctuatedWatermarks<Map<String, Object>> {
> private long bound = 5 * (long) 1000;//5 secs out of order bound in
> millisecs
> private long maxTimestamp = Long.MIN_VALUE;
>
> public MonitoringTSWAssigner() {
> }
>
> public MonitoringTSWAssigner(long bound) {
> this.bound = bound;
> }
>
> public long extractTimestamp(Map<String, Object> monitoring, long
> previousTS) {
> long extractedTS = getExtractedTS(monitoring);
> if (extractedTS > maxTimestamp) {
> maxTimestamp = extractedTS;
> }
>
> return extractedTS;//return System.currentTimeMillis();
>
> }
>
> public long getExtractedTS(Map<String, Object> monitoring) {
> final String eventTimestamp =
> monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String)
> monitoring.get(Utils.EVENT_TIMESTAMP) : "";
> return Utils.getLongFromDateStr(eventTimestamp);
> }
>
> @Override
> public Watermark checkAndGetNextWatermark(Map<String, Object>
> monitoring, long extractedTimestamp) {
> long extractedTS = getExtractedTS(monitoring);
> long nextWatermark = maxTimestamp - bound;
> return new Watermark(nextWatermark);
> }
> }
>
> *MGroupingWindowAggregate*:
> public class MGroupingWindowAggregate implements
> *AggregateFunction*<Map<String,
> Object>, Map<String, Object>, Map<String, Object>> {
> private final String interval;
> public MGroupingWindowAggregate(String interval) {
> this.interval = interval;
> }
> public Map<String, Object> createAccumulator() {
> return new ConcurrentHashMap<>();
> }
>
> public Map<String, Object> add(Map<String, Object> monitoring,
> Map<String, Object> timedMap) {
> .....
> }
>
> .....
>
> }
>
> TIA,
>
>
>
>