Hello Lydian,

Do you always observe data loss? Or - maybe, it happens only when you
restart your pipeline from a Flink savepoint? If you lose data only between
restarts - is you issue similar to
https://github.com/apache/beam/issues/26041 ?

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Tue, 18 Apr 2023 at 18:58, Lydian <lydia...@gmail.com> wrote:

> Hi,
>
> We are using Beam (Python SDK + Flink Runner) to backup our streaming data
> from Kafka to S3. To avoid hitting the s3 threshold, we use 1 minute fixed
> window to group messages.  We've had similar pipeline in spark that we want
> to replace it with this new pipeline.  However, the Beam pipeline seems
> always having events missing, which we are thinking could be due to late
> events (because the number of missing events get lower when having higher
> allow_lateness)
>
> We've tried the following approach to avoid late events, but none of them
> are working:
> 1.  Use Processing timestamp instead of event time. Ideally if windowing
> is using the processing timestamp, It shouldn't consider any event as late.
> But this doesn't seem to work at all.
> 2.  Configure allow_lateness to 12 hour.  Given that approach 1 seems not
> working as expected, we've also configured the allow_lateness. But it still
> have missing events compared to our old spark pipelines.
>
> Here's the simplified code we have
> ```
>
> def *add_timestamp*(event: Any) -> Any:
>
>     import time
>
>     from apache_beam import window
>
>     return window.*TimestampedValue*(event, time.*time*())
>
>
> (pipeline
>
>     | "Kafka Read" >> *ReadFromKafka*(topic="test-topic",
> consumer_config=consumer_config)
>
>     | "Adding 'trigger_processing_time' timestamp" >> beam.*Map*
> (add_timestamp)
>
>     | "Window into Fixed Intervals"
>
>     >> beam.*WindowInto*(
>
>         beam.window.*FixedWindows*(fixed_window_size),
>
>         allowed_lateness=beam.utils.timestamp.*Duration*(allowed_lateness)
>
>     )
>
>     |  "Write to s3" >> beam.*ParDo*(*WriteBatchesToS3*(s3_path))
> ```
>
> I am wondering:
> 1. Is the add_timestamp approach correctly marked it to use processing
> time for windowing?  If so, why there still late event consider we are
> using processing time and not event time?
> 2.  Are there are any other approaches to avoid dropping any late event
> besides ` allowed_lateness`?  In flink you can output those late events as
> side output, wondering if we can do similar thing in Beam as well? Would
> someone provide some code example?
>
> Could someone help us debugging this?  Thanks!
>
> ---
> * Flink's documentation about late event as side output:
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output
>
>
> Sincerely,
> Lydian Lee
>
>

Reply via email to