Hey Arvid, I'll try to repo sometime in the next few weeks. I need to make some larger changes to get a full diff to see what is being dropped.
On Thu, Apr 29, 2021 at 4:03 AM Arvid Heise <ar...@apache.org> wrote: > Hi Dan, > > could you check which records are missing? I'm suspecting it could be > records that are emitted right before roll over of the bucket strategy from > an otherwise idling partition. > > If so it could be indeed connected to idleness. Idleness tells Flink to > not wait on the particular partition to advance watermark. If a record > appears in a previously idle partition with an event timestamp before the > watermark of the other partitions, that record would be deemed late and is > discarded. > > On Tue, Apr 27, 2021 at 2:42 AM Dan Hill <quietgol...@gmail.com> wrote: > >> Hey Robert. >> >> Nothing weird. I was trying to find recent records (not the latest). No >> savepoints (just was running about ~1 day). No checkpoint issues (all >> successes). I don't know how many are missing. >> >> I removed the withIdleness. The other parts are very basic. The text >> logs look pretty useless. >> >> On Mon, Apr 26, 2021 at 11:07 AM Robert Metzger <rmetz...@apache.org> >> wrote: >> >>> Hi Dan, >>> >>> Can you describe under which conditions you are missing records (after a >>> machine failure, after a Kafka failure, after taking and restoring from a >>> savepoint, ...). >>> Are many records missing? Are "the first records" or the "latest >>> records" missing? Any individual records missing, or larger blocks of data? >>> >>> I don't think that there's a bug in Flink or the Kafka connector. Maybe >>> its just a configuration or systems design issue. >>> >>> >>> On Sun, Apr 25, 2021 at 9:56 AM Dan Hill <quietgol...@gmail.com> wrote: >>> >>>> Hi! >>>> >>>> Have any other devs noticed issues with Flink missing Kafka records >>>> with long-running Flink jobs? When I re-run my Flink job and start from >>>> the earliest Kafka offset, Flink processes the events correctly. I'm using >>>> Flink v1.11.1. >>>> >>>> I have a simple job that takes records (Requests) from Kafka and >>>> serializes them to S3. Pretty basic. No related issues in the text logs. >>>> I'm hoping I just have a configuration issue. I'm guessing idleness is >>>> working in a way that I'm not expecting. >>>> >>>> Any ideas? >>>> - Dan >>>> >>>> >>>> void createLogRequestJob(StreamExecutionEnvironment env) throws >>>> Exception { >>>> >>>> Properties kafkaSourceProperties = >>>> getKafkaSourceProperties("logrequest"); >>>> >>>> SingleOutputStreamOperator<Request> rawRequestInput = env.addSource( >>>> >>>> new FlinkKafkaConsumer(getInputRequestTopic(), >>>> getProtoDeserializationSchema(Request.class), kafkaSourceProperties)) >>>> >>>> .uid("source-request") >>>> >>>> .name("Request") >>>> >>>> .assignTimestampsAndWatermarks( >>>> >>>> >>>> WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withIdleness(Duration.ofMinutes(1))); >>>> >>>> >>>> executeLogRequest(rawRequestInput); >>>> >>>> env.execute("log-request"); >>>> >>>> } >>>> >>>> >>>> void executeLogRequest(SingleOutputStreamOperator<Request> >>>> rawRequestInput) { >>>> >>>> AvroWriterFactory<Request> factory = >>>> getAvroWriterFactory(Request.class); >>>> >>>> rawRequestInput.addSink(StreamingFileSink >>>> >>>> .forBulkFormat(new Path(getS3OutputDirectory(), "raw/request"), >>>> factory) >>>> >>>> .withBucketAssigner(new DateHourBucketAssigner<Request>(request >>>> -> request.getTiming().getEventApiTimestamp())) >>>> >>>> .withRollingPolicy(OnCheckpointRollingPolicy.build()) >>>> >>>> .withOutputFileConfig(createOutputFileConfig()) >>>> >>>> .build()) >>>> >>>> .uid("sink-s3-raw-request") >>>> >>>> .name("S3 Raw Request"); >>>> >>>> } >>>> >>>> >>>> >>>>