Hi Rainie, From the code it seems the current problem does not use the time-related functionality like window/timer? If so, the problem would be indepdent with the time type used.
Also, it would not likely due to rebalance() since the network layer has the check of sequence number. If there are missed record there would be failover. Since the current logic seems not rely on too much complex functionality, would it be possible that there might be some inconsistency between the flink implementation and the presto one ? Best, Yun ------------------------------------------------------------------ Sender:Rainie Li<raini...@pinterest.com> Date:2021/03/08 17:14:30 Recipient:Smile<letters_sm...@163.com> Cc:user<user@flink.apache.org> Theme:Re: Flink application has slightly data loss using Processing Time Thanks for the quick response, Smile. I don't use window operators or flatmap. Here is the core logic of my filter, it only iterates on filters list. Will rebalance() cause it? Thanks again. Best regards Rainie SingleOutputStreamOperator<SplitterIntermediateRecord<T>> matchedRecordsStream = eventStream .rebalance() .process(new ProcessFunction<T, SplitterIntermediateRecord<T>>() { public void processElement( T element, ProcessFunction<T, SplitterIntermediateRecord<T>>.Context context, Collector<SplitterIntermediateRecord<T>> collector) { for (StreamFilter filter : filters) { if (filter.match(element)) { SubstreamConfig substreamConfig = filter.getSubstreamConfig(); SplitterIntermediateRecord<T> result = new SplitterIntermediateRecord<>( substreamConfig.getKafkaCluster(), substreamConfig.getKafkaTopic(), substreamConfig.getCutoverKafkaTopic(), substreamConfig.getCutoverTimestampInMs(), element); collector.collect(result); } } } }) .name("Process-" + eventClass.getSimpleName()); On Mon, Mar 8, 2021 at 1:03 AM Smile <letters_sm...@163.com> wrote: Hi Rainie, Could you please provide more information about your processing logic? Do you use window operators? If there's no time-based operator in your logic, late arrival data won't be dropped by default and there might be something wrong with your flat map or filter operator. Otherwise, you can use sideOutputLateData() to get the late data of the window and have a look at them. See [1] for more information about sideOutputLateData(). [1]. https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output Regards, Smile -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/