Hi, Problem: Watermark does not move within Dynamic Alert Function
Implementing ideas (as is) from this article - https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html Code: https://github.com/afedulov/fraud-detection-demo Pipeline: Kafka -> Dynamic Key Function -> Dynamic Alert Function -> Kafka sink Adapted code for Flink 1.14.3: 1. Init transaction source: ---------------------------------------------- KafkaSource<String> transactionSource = TransactionsSource.createTransactionsSource(config); int sourceParallelism = config.get(SOURCE_PARALLELISM); WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy .<String>forBoundedOutOfOrderness(Duration.ofMillis(config.get(OUT_OF_ORDERNESS))) .withTimestampAssigner((event, timestamp) -> System.currentTimeMillis()); DataStream<String> transactionsStringsStream = env .fromSource(transactionSource, watermarkStrategy, "KafkaTransactions") .name("Transactions Source").setParallelism(sourceParallelism); DataStream<Transaction> transactionsStream = TransactionsSource .stringsStreamToTransactions(transactionsStringsStream); return transactionsStream.assignTimestampsAndWatermarks( WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofMillis(config.get(OUT_OF_ORDERNESS))) .withTimestampAssigner((event, timestamp) -> event.getEventTime())); DataStream<Rule> rulesUpdateStream = getRulesUpdateStream(env); BroadcastStream<Rule> rulesStream = rulesUpdateStream.broadcast(Descriptors.rulesDescriptor); // Processing pipeline setup DataStream<Alert> alerts = transactions.connect(rulesStream).process(new DynamicKeyFunction()) .uid("DynamicKeyFunction").name("Dynamic Partitioning Function").keyBy((keyed) -> keyed.getKey()) .connect(rulesStream).process(new DynamicAlertFunction()).uid("DynamicAlertFunction") .name("Dynamic Rule Evaluation Function"); private DataStream<Rule> getRulesUpdateStream(StreamExecutionEnvironment env) throws IOException { RulesSource.Type rulesSourceEnumType = getRulesSourceType(); KafkaSource<String> rulesSource = RulesSource.createRulesSource(config); DataStream<String> rulesStrings = env .fromSource(rulesSource, WatermarkStrategy.noWatermarks(), "KafkaRules") .name(rulesSourceEnumType.getName()).setParallelism(1); return RulesSource.stringsStreamToRules(rulesStrings); } ---------------------------------------------- Watermark stays at -9223372036854775808. DynamicAlertFunction onTimer() does not fire. In Web UI, I see "No Watermark (Watermarks are only available if EventTime is used)" Please help. Thanks