Hi!

Have any other devs noticed issues with Flink missing Kafka records with
long-running Flink jobs?  When I re-run my Flink job and start from the
earliest Kafka offset, Flink processes the events correctly.  I'm using
Flink v1.11.1.

I have a simple job that takes records (Requests) from Kafka and serializes
them to S3.  Pretty basic.  No related issues in the text logs.  I'm hoping
I just have a configuration issue.  I'm guessing idleness is working in a
way that I'm not expecting.

Any ideas?
- Dan


void createLogRequestJob(StreamExecutionEnvironment env) throws Exception {

  Properties kafkaSourceProperties = getKafkaSourceProperties("logrequest");

  SingleOutputStreamOperator<Request> rawRequestInput = env.addSource(

    new FlinkKafkaConsumer(getInputRequestTopic(),
getProtoDeserializationSchema(Request.class), kafkaSourceProperties))

      .uid("source-request")

      .name("Request")

      .assignTimestampsAndWatermarks(


WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withIdleness(Duration.ofMinutes(1)));


  executeLogRequest(rawRequestInput);

  env.execute("log-request");

}


void executeLogRequest(SingleOutputStreamOperator<Request> rawRequestInput)
{

  AvroWriterFactory<Request> factory = getAvroWriterFactory(Request.class);

  rawRequestInput.addSink(StreamingFileSink

      .forBulkFormat(new Path(getS3OutputDirectory(), "raw/request"),
factory)

      .withBucketAssigner(new DateHourBucketAssigner<Request>(request ->
request.getTiming().getEventApiTimestamp()))

      .withRollingPolicy(OnCheckpointRollingPolicy.build())

      .withOutputFileConfig(createOutputFileConfig())

      .build())

    .uid("sink-s3-raw-request")

    .name("S3 Raw Request");

}

Reply via email to