Hi! Have any other devs noticed issues with Flink missing Kafka records with long-running Flink jobs? When I re-run my Flink job and start from the earliest Kafka offset, Flink processes the events correctly. I'm using Flink v1.11.1.
I have a simple job that takes records (Requests) from Kafka and serializes them to S3. Pretty basic. No related issues in the text logs. I'm hoping I just have a configuration issue. I'm guessing idleness is working in a way that I'm not expecting. Any ideas? - Dan void createLogRequestJob(StreamExecutionEnvironment env) throws Exception { Properties kafkaSourceProperties = getKafkaSourceProperties("logrequest"); SingleOutputStreamOperator<Request> rawRequestInput = env.addSource( new FlinkKafkaConsumer(getInputRequestTopic(), getProtoDeserializationSchema(Request.class), kafkaSourceProperties)) .uid("source-request") .name("Request") .assignTimestampsAndWatermarks( WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withIdleness(Duration.ofMinutes(1))); executeLogRequest(rawRequestInput); env.execute("log-request"); } void executeLogRequest(SingleOutputStreamOperator<Request> rawRequestInput) { AvroWriterFactory<Request> factory = getAvroWriterFactory(Request.class); rawRequestInput.addSink(StreamingFileSink .forBulkFormat(new Path(getS3OutputDirectory(), "raw/request"), factory) .withBucketAssigner(new DateHourBucketAssigner<Request>(request -> request.getTiming().getEventApiTimestamp())) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .withOutputFileConfig(createOutputFileConfig()) .build()) .uid("sink-s3-raw-request") .name("S3 Raw Request"); }