After following suggestion from SO
I added few changes, so now I'm using Event Time
Water marks are progressing, I've checked them in Flink's metrics. The
Window operator is triggered but still I don't see any late outputs for
this.
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000,
1000));
env.setParallelism(1);
env.disableOperatorChaining();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
DataStream<RawMessage> rawBusinessTransaction = env
.addSource(new FlinkKafkaConsumer<>("business",
new JSONKeyValueDeserializationSchema(false),
properties))
.map(new KafkaTransactionObjectMapOperator())
.assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks<RawMessage>() {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis());
}
@Override
public long extractTimestamp(RawMessage element, long
previousElementTimestamp) {
return element.messageCreationTime;
}
})
.name("Kafka Transaction Raw Data Source.");
messageStream
.keyBy(tradeKeySelector)
.window(EventTimeSessionWindows.withDynamicGap(new
TradeAggregationGapExtractor()))
.sideOutputLateData(lateTradeMessages)
.process(new CumulativeTransactionOperator())
.name("Aggregate Transaction Builder");
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/