Thanks all for the suggestions, I’ve implemented the suggested changes, but
don’t have a good way to reproduce the error, so have to wait and see.

On Wed, Sep 25, 2024 at 12:33 AM Jan Lukavský <[email protected]> wrote:

> Hi Piotr,
>
> answer inline.
> On 9/24/24 09:53, Piotr Wiśniowski wrote:
>
> Subject: Input on Timestamps and Late Events in Beam Pipelines
>
> Hi team,
>
> I’d like to contribute to this discussion as I find it quite interesting.
>
> Regarding the element timestamps mentioned by Jan, I can confirm that it's
> accurate—users can reassign element timestamps in the same way described.
> This should be sufficient for the timestamps to be recognized by downstream
> aggregation. Additionally, clock synchronization issues could indeed be
> causing late events, as Jan suggested.
>
> It’s also worth noting that, by default, triggers output the aggregated
> result when they estimate that all data for a window has arrived,
> discarding any subsequent data for that window (as referenced in the same
> documentation Jan mentioned). I noticed that while your code defines
> allowed lateness, it doesn't specify a trigger to handle late events. As a
> result, these late events will likely be ignored. You might want to
> consider adding a trigger to the windowing function to re-output the
> results when late events arrive. This could help confirm the hypothesis,
> though in production, it's generally better to rely on the timestamps
> assigned by the source rather than reassigning them, as they should already
> be processing timestamps.
>
> I also have a question for the Beam developers, or anyone who might know:
>
> Assuming that Lydian does not reassign processing timestamps but instead
> reassigns data timestamps (which are not directly tied to processing time),
> what heuristics are used to determine when to close the downstream window
> in stream mode? Does Beam track the minimal timestamp seen and maintain
> state for this? What would the time window for such a heuristic be? Or, in
> this case, would the pipeline behave like it does in batch mode, halting
> while waiting for all data to arrive? I understand that the answer likely
> depends on the runner—I'm particularly interested in how this works in both
> Dataflow and Flink.
>
> Beam creates watermarks propagating from sources to sinks. PTransforms
> have two watermarks - input watermark and output watermark. Output
> watermark might be _hold back_ by some logic (typically buffers and/or
> timers). Ressigning timestamps is a stateless process, which means it does
> not interfere with watermark propagation and as such can without additional
> care cause late data. SDKs have access to "watermark hold state" by which a
> stateful transform can control how input watermark propagate to output
> watermark. But this is not (directly) exposed to users. Users can control
> watermark hold only through timers and their output timestamp, which seems
> to be sufficient under the Beam model.
>
> Best regards,
> Piotr Wiśniowski
>
> wt., 24 wrz 2024, 08:36 użytkownik Jan Lukavský <[email protected]> napisał:
>
>> Hi,
>>
>> I do not use Python SDK, but it seems, that - as opposed to Java SDK -
>> using simple lambda returning TimestampedValue, can really change the
>> timestamp of element [1]. Maybe some more experienced user of Python SDK
>> can confirm this?
>>
>> Assuming this is the case, then we have two factors at play:
>>
>>  a) watermarks are computed at the source transform (ReadFromKafka) using
>> Java millisecond precision
>>
>>  b) timestamps are later reassigned using Python's time.time()
>>
>> Both calls use system clock to compute the timestamp and thus can be
>> influenced by clock synchronization (e.g. NTP). This can (at least in
>> theory) cause the second call to time.time() return _smaller_ timestamp
>> than the one used to compute the watermark, which could cause the element
>> to become late event. If this is the issue, you can either increase allowed
>> lateness, or (maybe more conveniently) not reassign the timestamps, because
>> there already should be processing time assigned.
>>
>> Let us know if any of this works for you!
>> Best,
>>  Jan
>>
>> [1]
>> https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements
>> On 9/24/24 02:09, marc hurabielle wrote:
>>
>> Hello,
>>
>> I am jumping on this, because we are doing same things as Lydian.
>> In our case, we are using default timestamp strategy in kafka (so
>> processing timestamp).
>> We were also doing same as Lydian to add processing timestamp manually.
>>
>>
>> However we have late data. It mainly happen in our integration test with
>> flink. (parallelism 1), and happen really rarely in production.
>>
>> So it means we can't control the timestamp of an item even with
>> `window.TimestampedValue(event, time.time()))`?
>>
>> Best,
>>
>> Marc
>>
>>
>> On Tue, Sep 24, 2024, 04:23 Reuven Lax via user <[email protected]>
>> wrote:
>>
>>> Also as I said, the duplicate files might not appear like duplicates,
>>> which can be quite confusing.
>>>
>>> Out of curiosity, I would try - just for testing_ remove the line where
>>> you "add" processing time, and also set allowed_lateness to the largest
>>> allowed value. This will help determine whether late data is causing the
>>> dropped records.
>>>
>>> Reuven
>>>
>>>
>>> On Mon, Sep 23, 2024 at 2:10 PM Lydian Lee <[email protected]>
>>> wrote:
>>>
>>>> Hi Jan,
>>>>
>>>> Thanks for taking a quick look!  Yes, the "with" statement would close
>>>> after the write.  In our use cases, we actually don't mind if there are
>>>> duplicates of data written, but we are more concerned about the missing
>>>> data, which is the issue we are facing right now.
>>>>
>>>> On Mon, Sep 23, 2024 at 11:54 AM Reuven Lax via user <
>>>> [email protected]> wrote:
>>>>
>>>>> Do you close the write afterwards? If not, I wonder if you could lose
>>>>> records due to unflushed data.
>>>>>
>>>>> Also - what you're doing here can definitely lead to duplicate data
>>>>> written, since this DoFn can be run multiple times. The duplicates might
>>>>> also appear different if the Iterables are slightly different on retries,
>>>>> especially in the case when Flink restarts a checkpoint.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Mon, Sep 23, 2024 at 1:40 PM Lydian Lee <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Jan,
>>>>>>
>>>>>> Thanks so much for your help. Here's our write to s3:
>>>>>>
>>>>>> from pyarrow.parquet import ParquetWriter
>>>>>> class WriteBatchesToS3(beam.DoFn):
>>>>>> def __init__(
>>>>>> self,
>>>>>> output_path: str,
>>>>>> schema: pa.schema,
>>>>>> pipeline_options: PipelineOptions,
>>>>>> ) -> None:
>>>>>> self.output_path = output_path
>>>>>> self.schema = schema
>>>>>> self.pipeline_options = pipeline_options
>>>>>> def process(self, data: Iterable[List[Dict]]) -> None:
>>>>>> """Write one batch per file to S3."""
>>>>>> client = beam.io.aws.clients.s3.boto3_client.Client(options=self.
>>>>>> pipeline_options)
>>>>>> fields_without_metadata = [pa.field(f.name, f.type) for f in self.
>>>>>> schema]
>>>>>> schema_without_field_metadata = pa.schema(fields_without_metadata)
>>>>>> filename = os.path.join(
>>>>>> self.output_path,
>>>>>> f"uuid_{str(uuid4())}.parquet",
>>>>>> )
>>>>>> tables = [pa.Table.from_pylist(batch, schema=
>>>>>> schema_without_field_metadata) for batch in data]
>>>>>> if len(tables) == 0:
>>>>>> logging.info(f"No data to write for key: {partition_date}, the
>>>>>> grouped contents are: {data}")
>>>>>> return
>>>>>> with beam.io.aws.s3io.S3IO(client=client).open(filename=filename,
>>>>>> mode="w") as s3_writer:
>>>>>> with ParquetWriter(
>>>>>> s3_writer, schema_without_field_metadata, compression="SNAPPY",
>>>>>> use_deprecated_int96_timestamps=True
>>>>>> ) as parquet_writer:
>>>>>> merged_tables = pa.concat_tables(tables)
>>>>>> parquet_writer.write_table(merged_tables)
>>>>>>
>>>>>> On Fri, Sep 20, 2024 at 12:02 AM Jan Lukavský <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Lydian,
>>>>>>>
>>>>>>> because you do not specify 'timestamp_policy' it should use the
>>>>>>> default, which should be processing time, so this should not be the 
>>>>>>> issue.
>>>>>>> The one potentially left transform is the sink transform, as Reuven
>>>>>>> mentioned. Can you share details of the implementation?
>>>>>>>
>>>>>>>  Jan
>>>>>>> On 9/19/24 23:10, Lydian Lee wrote:
>>>>>>>
>>>>>>> Hi, Jan
>>>>>>>
>>>>>>> Here's how we do ReadFromKafka, the expansion service is just to
>>>>>>> ensure we can work with xlang in k8s, so please ignore them.
>>>>>>> from apache_beam.io.kafka import default_io_expansion_service
>>>>>>> ReadFromKafka(
>>>>>>> consumer_config={
>>>>>>> "group.id": "group-name",
>>>>>>> "auto.offset.reset": "latest",
>>>>>>> "enable.auto.commit": "false",
>>>>>>> },
>>>>>>> topics="topic-name",
>>>>>>> with_metadata=False,
>>>>>>> expansion_service=default_io_expansion_service(
>>>>>>> append_args=[
>>>>>>> f"--defaultEnvironmentType=PROCESS",
>>>>>>> f'--defaultEnvironmentConfig={"command":"/opt/apache/beam/java_boot"
>>>>>>> }',
>>>>>>> "--experiments=use_deprecated_read",
>>>>>>> ]
>>>>>>> ),
>>>>>>> commit_offset_in_finalize=True,
>>>>>>> )
>>>>>>>
>>>>>>> Do you know what would be the right approach for using processing
>>>>>>> time instead? I thought the WindowInto supposed to use the timestamp we
>>>>>>> appened to the event?  Do you think it is still using the original Kafka
>>>>>>> event timestamp?  Thanks!
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Sep 19, 2024 at 7:53 AM Jan Lukavský <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Can you share the (relevant) parameters of the ReadFromKafka
>>>>>>>> transform?
>>>>>>>>
>>>>>>>> This feels strange, and it might not do what you'd expect:
>>>>>>>> | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda
>>>>>>>> event: window.TimestampedValue(event, time.time()))
>>>>>>>>
>>>>>>>> This does not change the assigned timestamp of an element, but
>>>>>>>> creates a new element which contains processing time. It will not be 
>>>>>>>> used
>>>>>>>> for windowing, though.
>>>>>>>> On 9/19/24 00:49, Lydian Lee wrote:
>>>>>>>>
>>>>>>>> Hi Reuven,
>>>>>>>>
>>>>>>>> Here's a quick look for our pipeline:
>>>>>>>> (
>>>>>>>> pipeline
>>>>>>>> | "Reading message from Kafka" >> ReadFromKafka(...)
>>>>>>>> | "Deserializing events" >> Deserialize(**deserializer_args)
>>>>>>>> | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda
>>>>>>>> event: window.TimestampedValue(event, time.time()))
>>>>>>>> | "Window into Fixed Intervals" >> beam.WindowInto(
>>>>>>>> beam.transforms.window.FixedWindows(fixed_window_size), #
>>>>>>>> fixed_window_size is 1 min.
>>>>>>>> allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness), #
>>>>>>>> although we configured lateness, but because we are using processing 
>>>>>>>> time,
>>>>>>>> i don't expect any late events
>>>>>>>> )
>>>>>>>> | "Adding random integer partition key" >> beam.Map(
>>>>>>>> lambda event: (random.randint(1, 5), element) # add dummy key to
>>>>>>>> reshuffle to less partitions.  Kafka have 16 partition, but we only 
>>>>>>>> want to
>>>>>>>> generate 2 files every minute
>>>>>>>> )
>>>>>>>> | "Group by randomly-assigned integer key" >> beam.GroupByKey()
>>>>>>>> | "Abandon Dummy Key" >> beam.MapTuple(lambda key, val: val)
>>>>>>>> | "Writing event data batches to parquet" >> WriteBatchesToS3(...)
>>>>>>>> # call boto3 to write the events into S3 with parquet format
>>>>>>>> )
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Sep 18, 2024 at 3:16 PM Reuven Lax via user <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> How are you doing this aggregation?
>>>>>>>>>
>>>>>>>>> On Wed, Sep 18, 2024 at 3:11 PM Lydian Lee <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Jan,
>>>>>>>>>>
>>>>>>>>>> Thanks for the recommendation. In our case, we are windowing with
>>>>>>>>>> the processing time, which means that there should be no late event 
>>>>>>>>>> at all.
>>>>>>>>>>
>>>>>>>>>> You’ve mentioned that GroupByKey is stateful and can potentially
>>>>>>>>>> drop the data. Given that after reshuffle (add random shuffle id to 
>>>>>>>>>> the
>>>>>>>>>> key), we then do the aggregation (combine the data and write those 
>>>>>>>>>> data to
>>>>>>>>>> S3.) Do you think the example I mentioned earlier could potentially 
>>>>>>>>>> be the
>>>>>>>>>> reason for the dropping data?
>>>>>>>>>>
>>>>>>>>>> If so, in general how does Beam being able to prevent that ? Are
>>>>>>>>>> there any suggested approaches? Thanks
>>>>>>>>>>
>>>>>>>>>> On Wed, Sep 18, 2024 at 12:33 AM Jan Lukavský <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Lydian,
>>>>>>>>>>>
>>>>>>>>>>> in that case, there is only a generic advice you can look into.
>>>>>>>>>>> Reshuffle is a stateless operation that should not cause dropping 
>>>>>>>>>>> data. A
>>>>>>>>>>> GroupByKey on the other hand is stateful and thus can - when 
>>>>>>>>>>> dealing with
>>>>>>>>>>> late data - drop some of them. You should be able to confirm this 
>>>>>>>>>>> looking
>>>>>>>>>>> for 'droppedDueToLateness' counter and/or log in here [1]. This 
>>>>>>>>>>> happens
>>>>>>>>>>> when elements arrive after watermark passes element's timestamp 
>>>>>>>>>>> minus
>>>>>>>>>>> allowed lateness. If you see the log, you might need to either 
>>>>>>>>>>> change how
>>>>>>>>>>> you assign timestamps to elements (e.g. use log append time) or 
>>>>>>>>>>> increase
>>>>>>>>>>> allowed lateness of your windowfn.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>>
>>>>>>>>>>>  Jan
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132
>>>>>>>>>>> On 9/18/24 08:53, Lydian Lee wrote:
>>>>>>>>>>>
>>>>>>>>>>> I would love to, but there are some limitations on our ends that
>>>>>>>>>>> the version bump won’t be happened soon. Thus I need to figure out 
>>>>>>>>>>> what
>>>>>>>>>>> might be the root cause though.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Lydian,
>>>>>>>>>>>>
>>>>>>>>>>>> 2.41.0 is quite old, can you please try current version to see
>>>>>>>>>>>> if this issue is still present? There were lots of changes between 
>>>>>>>>>>>> 2.41.0
>>>>>>>>>>>> and 2.59.0.
>>>>>>>>>>>>
>>>>>>>>>>>>  Jan
>>>>>>>>>>>> On 9/17/24 17:49, Lydian Lee wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> We are using Beam Python SDK with Flink Runner, the Beam
>>>>>>>>>>>> version is 2.41.0 and the Flink version is 1.15.4.
>>>>>>>>>>>>
>>>>>>>>>>>> We have a pipeline that has 2 stages:
>>>>>>>>>>>> 1. read from kafka and fixed window for every 1 minute
>>>>>>>>>>>> 2. aggregate the data for the past 1 minute and reshuffle so
>>>>>>>>>>>> that we have less partition count and write them into s3.
>>>>>>>>>>>>
>>>>>>>>>>>> We disabled the enable.auto.commit and enabled
>>>>>>>>>>>> commit_offset_in_finalize. also the auto.offset.reset is set to 
>>>>>>>>>>>> "latest"
>>>>>>>>>>>> [image: image.png]
>>>>>>>>>>>>
>>>>>>>>>>>> According to the log, I can definitely find the data is
>>>>>>>>>>>> consuming from Kafka Offset, Because there are many
>>>>>>>>>>>> ```
>>>>>>>>>>>> Resetting offset for topic XXXX-<PARTITION>  to offset <OFFSET>
>>>>>>>>>>>> ```
>>>>>>>>>>>> and that partition/offset pair does match the missing records.
>>>>>>>>>>>> However, it doesn't show up in the final S3.
>>>>>>>>>>>>
>>>>>>>>>>>> My current hypothesis is that the shuffling might be the reason
>>>>>>>>>>>> for the issue, for example, originally in kafka for the past 
>>>>>>>>>>>> minute in
>>>>>>>>>>>> partition 1,  I have offset 1, 2, 3 records. After reshuffle, it 
>>>>>>>>>>>> now
>>>>>>>>>>>> distribute, for example:
>>>>>>>>>>>> - partition A: 1, 3
>>>>>>>>>>>> - partition B: 2
>>>>>>>>>>>>
>>>>>>>>>>>> And if partition A is done successfully but partition B fails.
>>>>>>>>>>>> Given that A is succeeded, it will commit its offset to Kafka, and 
>>>>>>>>>>>> thus
>>>>>>>>>>>> kafka now has an offset to 3.  And when kafka retries , it will 
>>>>>>>>>>>> skip the
>>>>>>>>>>>> offset 2.   However, I am not sure how exactly the offset commit 
>>>>>>>>>>>> works,
>>>>>>>>>>>> wondering how it interacts with the checkpoints.  But it does seem 
>>>>>>>>>>>> like if
>>>>>>>>>>>> my hypothesis is correct, we should be seeing more missing records,
>>>>>>>>>>>> however, this seems rare to happen.  Wondering if anyone can help 
>>>>>>>>>>>> identify
>>>>>>>>>>>> potential root causes?  Thanks
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to