Hi Rainie,

From the code it seems the current problem does not use the time-related 
functionality like
window/timer? If so, the problem would be indepdent with the time type used.

Also, it would not likely due to rebalance() since the network layer has the 
check of sequence
number. If there are missed record there would be failover. 

Since the current logic seems not rely on too much complex functionality, would 
it be possible
that there might be some inconsistency between the flink implementation and the 
presto one ?

Best,
Yun


------------------------------------------------------------------
Sender:Rainie Li<raini...@pinterest.com>
Date:2021/03/08 17:14:30
Recipient:Smile<letters_sm...@163.com>
Cc:user<user@flink.apache.org>
Theme:Re: Flink application has slightly data loss using Processing Time

Thanks for the quick response, Smile.
I don't use window operators or flatmap.
Here is the core logic of my filter, it only iterates on filters list. Will 
rebalance() cause it? 

Thanks again.
Best regards
Rainie
SingleOutputStreamOperator<SplitterIntermediateRecord<T>> matchedRecordsStream =
    eventStream
        .rebalance()
        .process(new ProcessFunction<T, SplitterIntermediateRecord<T>>() {
          public void processElement(
              T element,
              ProcessFunction<T, SplitterIntermediateRecord<T>>.Context context,
              Collector<SplitterIntermediateRecord<T>> collector) {
            for (StreamFilter filter : filters) {
              if (filter.match(element)) {
                SubstreamConfig substreamConfig = filter.getSubstreamConfig();
                SplitterIntermediateRecord<T> result = new 
SplitterIntermediateRecord<>(
                    substreamConfig.getKafkaCluster(),
                    substreamConfig.getKafkaTopic(),
                    substreamConfig.getCutoverKafkaTopic(),
                    substreamConfig.getCutoverTimestampInMs(),
                    element);
                collector.collect(result);
              }
            }
          }
        })
        .name("Process-" + eventClass.getSimpleName());
On Mon, Mar 8, 2021 at 1:03 AM Smile <letters_sm...@163.com> wrote:
Hi Rainie, 

 Could you please provide more information about your processing logic?
 Do you use window operators?
 If there's no time-based operator in your logic, late arrival data won't be
 dropped by default and there might be something wrong with your flat map or
 filter operator. Otherwise, you can use sideOutputLateData() to get the late
 data of the window and have a look at them. See [1] for more information
 about sideOutputLateData().

 [1].
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
 

 Regards,
 Smile



 --
 Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to