Hi James, I believe you have encountered a bug that we have already fixed [1]. The small problem is that in order to fix this bug, we had to change some `@PublicEvolving` interfaces and thus we were not able to backport this fix to earlier minor releases. As such, this is only fixed starting from 1.14.x.
Best, Piotrek [1] https://issues.apache.org/jira/browse/FLINK-18934 pt., 8 paź 2021 o 11:55 James Sandys-Lumsdaine <jas...@hotmail.com> napisał(a): > Hi everyone, > > I'm putting together a Flink workflow that needs to merge historic data > from a custom JDBC source with a Kafka flow (for the realtime data). I have > successfully written the custom JDBC source that emits a watermark for the > last event time after all the DB data has been emitted but now I face a > problem when joining with data from the Kafka stream. > > I register a timer in my KeyedCoProcessFunction joining the DB stream > with live Kafka stream so I can emit all the "batch" data from the DB in > one go when completely read up to the watermark but the timer never fires > as the Kafka stream is empty and therefore doesn't emit a watermark. My > Kafka stream will allowed to be empty since all the data will have been > retrieved from the DB call so I only expect new events to appear over > Kafka. Note that if I replace the Kafka input with a simple > env.fromCollection(...) empty list then the timer triggers fine as Flink > seems to detect it doesn't need to wait for any input from stream 2. So it > seems to be something related to the Kafka stream status that is preventing > the watermark from advancing in the KeyedCoProcessFunction. > > I have tried configuring the Kafka stream timestamp and watermark > strategies to so the source is marked as idle after 10 seconds but still it > seems the watermark in the join operator combining these 2 streams is not > advancing. (See example code below). > > Maybe this is my bad understanding but I thought if an input stream into a > KeyedCoProcessFunction is idle then it wouldn't be considered by the > operator for forwarding the watermark i.e. it would forward the non-idle > input stream's watermark and not do a min(stream1WM, stream2WM). With the > below example I never see the onTimer fire and the only effect the > withIdleness() strategy has is to stop the print statements in > onPeriodicEmit() happening after 5 seconds (env periodic emit is set to the > default 200ms so I see 25 rows before it stops). > > The only way I can get my KeyedCoProcessFunction timer to fire is to force > an emit of the watermark I want in the onPeriodicEmit() after x numbers of > attempts to advance an initial watermark i.e. if onPeriodicEmit() is called > 100 times and the "latestWatermark" is still Long.MIN_VALUE then I emit the > watermark I want so the join can progress. This seems like a nasty hack to > me but perhaps something like this is actually necessary? > > I am currently using Flink 1.12.3, a Confluent Kafka client 6.1.1. Any > pointers would be appreciated. > > Thanks in advance, > > James. > > FlinkKafkaConsumer<Position> positionsFlinkKafkaConsumer = new > FlinkKafkaConsumer<>("poc.positions", > ConfluentRegistryAvroDeserializationSchema.forSpecific(Position.class, > SchemaRegistryURL), kafkaConsumerProperties); > > positionsFlinkKafkaConsumer.setStartFromEarliest(); > > positionsFlinkKafkaConsumer.assignTimestampsAndWatermarks( > > new WatermarkStrategy<Position>() { > > @Override > > public TimestampAssigner<Position> > createTimestampAssigner(TimestampAssignerSupplier.Context context) { > > return (event, recordTimestamp) -> { > > return event.getPhysicalFrom(); > > }; > > } > > > > @Override > > public WatermarkGenerator<Position> > createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { > > return new WatermarkGenerator<Position>() { > > public long latestWatermark = Long.MIN_VALUE; > > > > @Override > > public void onEvent(Position event, long > timestamp, WatermarkOutput output) { > > long eventWatermark = > event.getPhysicalFrom(); > > if (eventWatermark > latestWatermark) > > latestWatermark = eventWatermark; > > } > > > > @Override > > public void onPeriodicEmit(WatermarkOutput > output) { > > System.out.printf("Emitting watermark > %d\n", latestWatermark); > > output.emitWatermark(new > Watermark(latestWatermark)); > > } > > }; > > } > > }.withIdleness(Duration.ofSeconds(5))); > > > > DataStream<Position> positionKafkaInputStream = > env.addSource(positionsFlinkKafkaConsumer, "Kafka-Source"); > > > > DataStream<Position> otherPositionStream = > env.fromCollection(Lists.newArrayList(new Position(..., > timestamp.getMillis())), TypeInformation.of(Position.class)); > > otherPositionStream.assignTimestampsAndWatermarks( > > WatermarkStrategy > > > .<Position>forBoundedOutOfOrderness(Duration.ofSeconds(10)) > > .withTimestampAssigner((e, t) -> > e.getPhysicalFrom())); > > > > KeyedStream<Position, String> keyedPositionKafkaInputStream = > positionKafkaInputStream.keyBy(p -> p.getMarketName()); > > KeyedStream<Position, String> keyedOtherPositionStream = > otherPositionStream.keyBy(p -> p.getMarketName()); > > > > DataStream<Position> joinedStream = keyedOtherPositionStream > > .connect(keyedPositionKafkaInputStream) > > .process(new JoinProcessFunction()); > > ... > > > private static class JoinProcessFunction extends > KeyedCoProcessFunction<String, Position, Position, Position> { > > private static Logger logger = > LoggerFactory.getLogger(JoinProcessFunction.class); > > > > @Override > > public void processElement1(Position position, Context context, > Collector<Position> collector) { > > logger.info("processPosition1: key: {}, ts: {}, watermark: > {}", context.getCurrentKey(), context.timestamp(), > context.timerService().currentWatermark()); > > > context.timerService().registerEventTimeTimer(position.getPhysicalFrom()); > > collector.collect(position); > > } > > > > @Override > > public void processElement2(Position position, Context context, > Collector<Position> collector) { > > logger.info("processPosition2: key: {}, ts: {}, watermark: > {}", context.getCurrentKey(), context.timestamp(), > context.timerService().currentWatermark()); > > > context.timerService().registerEventTimeTimer(position.getPhysicalFrom()); > > collector.collect(position); > > } > > > > @Override > > public void onTimer(long timestamp, OnTimerContext ctx, > Collector<Position> out) { > > logger.info("Timer triggered for timestamp {} and key '{}' - > current Watermark: {}.", timestamp, ctx.getCurrentKey(), > ctx.timerService().currentWatermark()); > > } > > } > >