Hey Arvid,

I'll try to repo sometime in the next few weeks.  I need to make some
larger changes to get a full diff to see what is being dropped.


On Thu, Apr 29, 2021 at 4:03 AM Arvid Heise <ar...@apache.org> wrote:

> Hi Dan,
>
> could you check which records are missing? I'm suspecting it could be
> records that are emitted right before roll over of the bucket strategy from
> an otherwise idling partition.
>
> If so it could be indeed connected to idleness. Idleness tells Flink to
> not wait on the particular partition to advance watermark. If a record
> appears in a previously idle partition with an event timestamp before the
> watermark of the other partitions, that record would be deemed late and is
> discarded.
>
> On Tue, Apr 27, 2021 at 2:42 AM Dan Hill <quietgol...@gmail.com> wrote:
>
>> Hey Robert.
>>
>> Nothing weird.  I was trying to find recent records (not the latest).  No
>> savepoints (just was running about ~1 day).  No checkpoint issues (all
>> successes).  I don't know how many are missing.
>>
>> I removed the withIdleness. The other parts are very basic.  The text
>> logs look pretty useless.
>>
>> On Mon, Apr 26, 2021 at 11:07 AM Robert Metzger <rmetz...@apache.org>
>> wrote:
>>
>>> Hi Dan,
>>>
>>> Can you describe under which conditions you are missing records (after a
>>> machine failure, after a Kafka failure, after taking and restoring from a
>>> savepoint, ...).
>>> Are many records missing? Are "the first records" or the "latest
>>> records" missing? Any individual records missing, or larger blocks of data?
>>>
>>> I don't think that there's a bug in Flink or the Kafka connector. Maybe
>>> its just a configuration or systems design issue.
>>>
>>>
>>> On Sun, Apr 25, 2021 at 9:56 AM Dan Hill <quietgol...@gmail.com> wrote:
>>>
>>>> Hi!
>>>>
>>>> Have any other devs noticed issues with Flink missing Kafka records
>>>> with long-running Flink jobs?  When I re-run my Flink job and start from
>>>> the earliest Kafka offset, Flink processes the events correctly.  I'm using
>>>> Flink v1.11.1.
>>>>
>>>> I have a simple job that takes records (Requests) from Kafka and
>>>> serializes them to S3.  Pretty basic.  No related issues in the text logs.
>>>> I'm hoping I just have a configuration issue.  I'm guessing idleness is
>>>> working in a way that I'm not expecting.
>>>>
>>>> Any ideas?
>>>> - Dan
>>>>
>>>>
>>>> void createLogRequestJob(StreamExecutionEnvironment env) throws
>>>> Exception {
>>>>
>>>>   Properties kafkaSourceProperties =
>>>> getKafkaSourceProperties("logrequest");
>>>>
>>>>   SingleOutputStreamOperator<Request> rawRequestInput = env.addSource(
>>>>
>>>>     new FlinkKafkaConsumer(getInputRequestTopic(),
>>>> getProtoDeserializationSchema(Request.class), kafkaSourceProperties))
>>>>
>>>>       .uid("source-request")
>>>>
>>>>       .name("Request")
>>>>
>>>>       .assignTimestampsAndWatermarks(
>>>>
>>>>
>>>> WatermarkStrategy.forBoundedOutOfOrderness(maxOutOfOrderness).withIdleness(Duration.ofMinutes(1)));
>>>>
>>>>
>>>>   executeLogRequest(rawRequestInput);
>>>>
>>>>   env.execute("log-request");
>>>>
>>>> }
>>>>
>>>>
>>>> void executeLogRequest(SingleOutputStreamOperator<Request>
>>>> rawRequestInput) {
>>>>
>>>>   AvroWriterFactory<Request> factory =
>>>> getAvroWriterFactory(Request.class);
>>>>
>>>>   rawRequestInput.addSink(StreamingFileSink
>>>>
>>>>       .forBulkFormat(new Path(getS3OutputDirectory(), "raw/request"),
>>>> factory)
>>>>
>>>>       .withBucketAssigner(new DateHourBucketAssigner<Request>(request
>>>> -> request.getTiming().getEventApiTimestamp()))
>>>>
>>>>       .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>>>
>>>>       .withOutputFileConfig(createOutputFileConfig())
>>>>
>>>>       .build())
>>>>
>>>>     .uid("sink-s3-raw-request")
>>>>
>>>>     .name("S3 Raw Request");
>>>>
>>>> }
>>>>
>>>>
>>>>
>>>>

Reply via email to