Hello Lydian, Do you always observe data loss? Or - maybe, it happens only when you restart your pipeline from a Flink savepoint? If you lose data only between restarts - is you issue similar to https://github.com/apache/beam/issues/26041 ?
Best Regards, Pavel Solomin Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin <https://www.linkedin.com/in/pavelsolomin> On Tue, 18 Apr 2023 at 18:58, Lydian <lydia...@gmail.com> wrote: > Hi, > > We are using Beam (Python SDK + Flink Runner) to backup our streaming data > from Kafka to S3. To avoid hitting the s3 threshold, we use 1 minute fixed > window to group messages. We've had similar pipeline in spark that we want > to replace it with this new pipeline. However, the Beam pipeline seems > always having events missing, which we are thinking could be due to late > events (because the number of missing events get lower when having higher > allow_lateness) > > We've tried the following approach to avoid late events, but none of them > are working: > 1. Use Processing timestamp instead of event time. Ideally if windowing > is using the processing timestamp, It shouldn't consider any event as late. > But this doesn't seem to work at all. > 2. Configure allow_lateness to 12 hour. Given that approach 1 seems not > working as expected, we've also configured the allow_lateness. But it still > have missing events compared to our old spark pipelines. > > Here's the simplified code we have > ``` > > def *add_timestamp*(event: Any) -> Any: > > import time > > from apache_beam import window > > return window.*TimestampedValue*(event, time.*time*()) > > > (pipeline > > | "Kafka Read" >> *ReadFromKafka*(topic="test-topic", > consumer_config=consumer_config) > > | "Adding 'trigger_processing_time' timestamp" >> beam.*Map* > (add_timestamp) > > | "Window into Fixed Intervals" > > >> beam.*WindowInto*( > > beam.window.*FixedWindows*(fixed_window_size), > > allowed_lateness=beam.utils.timestamp.*Duration*(allowed_lateness) > > ) > > | "Write to s3" >> beam.*ParDo*(*WriteBatchesToS3*(s3_path)) > ``` > > I am wondering: > 1. Is the add_timestamp approach correctly marked it to use processing > time for windowing? If so, why there still late event consider we are > using processing time and not event time? > 2. Are there are any other approaches to avoid dropping any late event > besides ` allowed_lateness`? In flink you can output those late events as > side output, wondering if we can do similar thing in Beam as well? Would > someone provide some code example? > > Could someone help us debugging this? Thanks! > > --- > * Flink's documentation about late event as side output: > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output > > > Sincerely, > Lydian Lee > >