Thanks all for the suggestions, I’ve implemented the suggested changes, but don’t have a good way to reproduce the error, so have to wait and see.
On Wed, Sep 25, 2024 at 12:33 AM Jan Lukavský <[email protected]> wrote: > Hi Piotr, > > answer inline. > On 9/24/24 09:53, Piotr Wiśniowski wrote: > > Subject: Input on Timestamps and Late Events in Beam Pipelines > > Hi team, > > I’d like to contribute to this discussion as I find it quite interesting. > > Regarding the element timestamps mentioned by Jan, I can confirm that it's > accurate—users can reassign element timestamps in the same way described. > This should be sufficient for the timestamps to be recognized by downstream > aggregation. Additionally, clock synchronization issues could indeed be > causing late events, as Jan suggested. > > It’s also worth noting that, by default, triggers output the aggregated > result when they estimate that all data for a window has arrived, > discarding any subsequent data for that window (as referenced in the same > documentation Jan mentioned). I noticed that while your code defines > allowed lateness, it doesn't specify a trigger to handle late events. As a > result, these late events will likely be ignored. You might want to > consider adding a trigger to the windowing function to re-output the > results when late events arrive. This could help confirm the hypothesis, > though in production, it's generally better to rely on the timestamps > assigned by the source rather than reassigning them, as they should already > be processing timestamps. > > I also have a question for the Beam developers, or anyone who might know: > > Assuming that Lydian does not reassign processing timestamps but instead > reassigns data timestamps (which are not directly tied to processing time), > what heuristics are used to determine when to close the downstream window > in stream mode? Does Beam track the minimal timestamp seen and maintain > state for this? What would the time window for such a heuristic be? Or, in > this case, would the pipeline behave like it does in batch mode, halting > while waiting for all data to arrive? I understand that the answer likely > depends on the runner—I'm particularly interested in how this works in both > Dataflow and Flink. > > Beam creates watermarks propagating from sources to sinks. PTransforms > have two watermarks - input watermark and output watermark. Output > watermark might be _hold back_ by some logic (typically buffers and/or > timers). Ressigning timestamps is a stateless process, which means it does > not interfere with watermark propagation and as such can without additional > care cause late data. SDKs have access to "watermark hold state" by which a > stateful transform can control how input watermark propagate to output > watermark. But this is not (directly) exposed to users. Users can control > watermark hold only through timers and their output timestamp, which seems > to be sufficient under the Beam model. > > Best regards, > Piotr Wiśniowski > > wt., 24 wrz 2024, 08:36 użytkownik Jan Lukavský <[email protected]> napisał: > >> Hi, >> >> I do not use Python SDK, but it seems, that - as opposed to Java SDK - >> using simple lambda returning TimestampedValue, can really change the >> timestamp of element [1]. Maybe some more experienced user of Python SDK >> can confirm this? >> >> Assuming this is the case, then we have two factors at play: >> >> a) watermarks are computed at the source transform (ReadFromKafka) using >> Java millisecond precision >> >> b) timestamps are later reassigned using Python's time.time() >> >> Both calls use system clock to compute the timestamp and thus can be >> influenced by clock synchronization (e.g. NTP). This can (at least in >> theory) cause the second call to time.time() return _smaller_ timestamp >> than the one used to compute the watermark, which could cause the element >> to become late event. If this is the issue, you can either increase allowed >> lateness, or (maybe more conveniently) not reassign the timestamps, because >> there already should be processing time assigned. >> >> Let us know if any of this works for you! >> Best, >> Jan >> >> [1] >> https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements >> On 9/24/24 02:09, marc hurabielle wrote: >> >> Hello, >> >> I am jumping on this, because we are doing same things as Lydian. >> In our case, we are using default timestamp strategy in kafka (so >> processing timestamp). >> We were also doing same as Lydian to add processing timestamp manually. >> >> >> However we have late data. It mainly happen in our integration test with >> flink. (parallelism 1), and happen really rarely in production. >> >> So it means we can't control the timestamp of an item even with >> `window.TimestampedValue(event, time.time()))`? >> >> Best, >> >> Marc >> >> >> On Tue, Sep 24, 2024, 04:23 Reuven Lax via user <[email protected]> >> wrote: >> >>> Also as I said, the duplicate files might not appear like duplicates, >>> which can be quite confusing. >>> >>> Out of curiosity, I would try - just for testing_ remove the line where >>> you "add" processing time, and also set allowed_lateness to the largest >>> allowed value. This will help determine whether late data is causing the >>> dropped records. >>> >>> Reuven >>> >>> >>> On Mon, Sep 23, 2024 at 2:10 PM Lydian Lee <[email protected]> >>> wrote: >>> >>>> Hi Jan, >>>> >>>> Thanks for taking a quick look! Yes, the "with" statement would close >>>> after the write. In our use cases, we actually don't mind if there are >>>> duplicates of data written, but we are more concerned about the missing >>>> data, which is the issue we are facing right now. >>>> >>>> On Mon, Sep 23, 2024 at 11:54 AM Reuven Lax via user < >>>> [email protected]> wrote: >>>> >>>>> Do you close the write afterwards? If not, I wonder if you could lose >>>>> records due to unflushed data. >>>>> >>>>> Also - what you're doing here can definitely lead to duplicate data >>>>> written, since this DoFn can be run multiple times. The duplicates might >>>>> also appear different if the Iterables are slightly different on retries, >>>>> especially in the case when Flink restarts a checkpoint. >>>>> >>>>> Reuven >>>>> >>>>> On Mon, Sep 23, 2024 at 1:40 PM Lydian Lee <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Jan, >>>>>> >>>>>> Thanks so much for your help. Here's our write to s3: >>>>>> >>>>>> from pyarrow.parquet import ParquetWriter >>>>>> class WriteBatchesToS3(beam.DoFn): >>>>>> def __init__( >>>>>> self, >>>>>> output_path: str, >>>>>> schema: pa.schema, >>>>>> pipeline_options: PipelineOptions, >>>>>> ) -> None: >>>>>> self.output_path = output_path >>>>>> self.schema = schema >>>>>> self.pipeline_options = pipeline_options >>>>>> def process(self, data: Iterable[List[Dict]]) -> None: >>>>>> """Write one batch per file to S3.""" >>>>>> client = beam.io.aws.clients.s3.boto3_client.Client(options=self. >>>>>> pipeline_options) >>>>>> fields_without_metadata = [pa.field(f.name, f.type) for f in self. >>>>>> schema] >>>>>> schema_without_field_metadata = pa.schema(fields_without_metadata) >>>>>> filename = os.path.join( >>>>>> self.output_path, >>>>>> f"uuid_{str(uuid4())}.parquet", >>>>>> ) >>>>>> tables = [pa.Table.from_pylist(batch, schema= >>>>>> schema_without_field_metadata) for batch in data] >>>>>> if len(tables) == 0: >>>>>> logging.info(f"No data to write for key: {partition_date}, the >>>>>> grouped contents are: {data}") >>>>>> return >>>>>> with beam.io.aws.s3io.S3IO(client=client).open(filename=filename, >>>>>> mode="w") as s3_writer: >>>>>> with ParquetWriter( >>>>>> s3_writer, schema_without_field_metadata, compression="SNAPPY", >>>>>> use_deprecated_int96_timestamps=True >>>>>> ) as parquet_writer: >>>>>> merged_tables = pa.concat_tables(tables) >>>>>> parquet_writer.write_table(merged_tables) >>>>>> >>>>>> On Fri, Sep 20, 2024 at 12:02 AM Jan Lukavský <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Lydian, >>>>>>> >>>>>>> because you do not specify 'timestamp_policy' it should use the >>>>>>> default, which should be processing time, so this should not be the >>>>>>> issue. >>>>>>> The one potentially left transform is the sink transform, as Reuven >>>>>>> mentioned. Can you share details of the implementation? >>>>>>> >>>>>>> Jan >>>>>>> On 9/19/24 23:10, Lydian Lee wrote: >>>>>>> >>>>>>> Hi, Jan >>>>>>> >>>>>>> Here's how we do ReadFromKafka, the expansion service is just to >>>>>>> ensure we can work with xlang in k8s, so please ignore them. >>>>>>> from apache_beam.io.kafka import default_io_expansion_service >>>>>>> ReadFromKafka( >>>>>>> consumer_config={ >>>>>>> "group.id": "group-name", >>>>>>> "auto.offset.reset": "latest", >>>>>>> "enable.auto.commit": "false", >>>>>>> }, >>>>>>> topics="topic-name", >>>>>>> with_metadata=False, >>>>>>> expansion_service=default_io_expansion_service( >>>>>>> append_args=[ >>>>>>> f"--defaultEnvironmentType=PROCESS", >>>>>>> f'--defaultEnvironmentConfig={"command":"/opt/apache/beam/java_boot" >>>>>>> }', >>>>>>> "--experiments=use_deprecated_read", >>>>>>> ] >>>>>>> ), >>>>>>> commit_offset_in_finalize=True, >>>>>>> ) >>>>>>> >>>>>>> Do you know what would be the right approach for using processing >>>>>>> time instead? I thought the WindowInto supposed to use the timestamp we >>>>>>> appened to the event? Do you think it is still using the original Kafka >>>>>>> event timestamp? Thanks! >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Sep 19, 2024 at 7:53 AM Jan Lukavský <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Can you share the (relevant) parameters of the ReadFromKafka >>>>>>>> transform? >>>>>>>> >>>>>>>> This feels strange, and it might not do what you'd expect: >>>>>>>> | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda >>>>>>>> event: window.TimestampedValue(event, time.time())) >>>>>>>> >>>>>>>> This does not change the assigned timestamp of an element, but >>>>>>>> creates a new element which contains processing time. It will not be >>>>>>>> used >>>>>>>> for windowing, though. >>>>>>>> On 9/19/24 00:49, Lydian Lee wrote: >>>>>>>> >>>>>>>> Hi Reuven, >>>>>>>> >>>>>>>> Here's a quick look for our pipeline: >>>>>>>> ( >>>>>>>> pipeline >>>>>>>> | "Reading message from Kafka" >> ReadFromKafka(...) >>>>>>>> | "Deserializing events" >> Deserialize(**deserializer_args) >>>>>>>> | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda >>>>>>>> event: window.TimestampedValue(event, time.time())) >>>>>>>> | "Window into Fixed Intervals" >> beam.WindowInto( >>>>>>>> beam.transforms.window.FixedWindows(fixed_window_size), # >>>>>>>> fixed_window_size is 1 min. >>>>>>>> allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness), # >>>>>>>> although we configured lateness, but because we are using processing >>>>>>>> time, >>>>>>>> i don't expect any late events >>>>>>>> ) >>>>>>>> | "Adding random integer partition key" >> beam.Map( >>>>>>>> lambda event: (random.randint(1, 5), element) # add dummy key to >>>>>>>> reshuffle to less partitions. Kafka have 16 partition, but we only >>>>>>>> want to >>>>>>>> generate 2 files every minute >>>>>>>> ) >>>>>>>> | "Group by randomly-assigned integer key" >> beam.GroupByKey() >>>>>>>> | "Abandon Dummy Key" >> beam.MapTuple(lambda key, val: val) >>>>>>>> | "Writing event data batches to parquet" >> WriteBatchesToS3(...) >>>>>>>> # call boto3 to write the events into S3 with parquet format >>>>>>>> ) >>>>>>>> >>>>>>>> Thanks! >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Sep 18, 2024 at 3:16 PM Reuven Lax via user < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> How are you doing this aggregation? >>>>>>>>> >>>>>>>>> On Wed, Sep 18, 2024 at 3:11 PM Lydian Lee < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hi Jan, >>>>>>>>>> >>>>>>>>>> Thanks for the recommendation. In our case, we are windowing with >>>>>>>>>> the processing time, which means that there should be no late event >>>>>>>>>> at all. >>>>>>>>>> >>>>>>>>>> You’ve mentioned that GroupByKey is stateful and can potentially >>>>>>>>>> drop the data. Given that after reshuffle (add random shuffle id to >>>>>>>>>> the >>>>>>>>>> key), we then do the aggregation (combine the data and write those >>>>>>>>>> data to >>>>>>>>>> S3.) Do you think the example I mentioned earlier could potentially >>>>>>>>>> be the >>>>>>>>>> reason for the dropping data? >>>>>>>>>> >>>>>>>>>> If so, in general how does Beam being able to prevent that ? Are >>>>>>>>>> there any suggested approaches? Thanks >>>>>>>>>> >>>>>>>>>> On Wed, Sep 18, 2024 at 12:33 AM Jan Lukavský <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Lydian, >>>>>>>>>>> >>>>>>>>>>> in that case, there is only a generic advice you can look into. >>>>>>>>>>> Reshuffle is a stateless operation that should not cause dropping >>>>>>>>>>> data. A >>>>>>>>>>> GroupByKey on the other hand is stateful and thus can - when >>>>>>>>>>> dealing with >>>>>>>>>>> late data - drop some of them. You should be able to confirm this >>>>>>>>>>> looking >>>>>>>>>>> for 'droppedDueToLateness' counter and/or log in here [1]. This >>>>>>>>>>> happens >>>>>>>>>>> when elements arrive after watermark passes element's timestamp >>>>>>>>>>> minus >>>>>>>>>>> allowed lateness. If you see the log, you might need to either >>>>>>>>>>> change how >>>>>>>>>>> you assign timestamps to elements (e.g. use log append time) or >>>>>>>>>>> increase >>>>>>>>>>> allowed lateness of your windowfn. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> >>>>>>>>>>> Jan >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132 >>>>>>>>>>> On 9/18/24 08:53, Lydian Lee wrote: >>>>>>>>>>> >>>>>>>>>>> I would love to, but there are some limitations on our ends that >>>>>>>>>>> the version bump won’t be happened soon. Thus I need to figure out >>>>>>>>>>> what >>>>>>>>>>> might be the root cause though. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Lydian, >>>>>>>>>>>> >>>>>>>>>>>> 2.41.0 is quite old, can you please try current version to see >>>>>>>>>>>> if this issue is still present? There were lots of changes between >>>>>>>>>>>> 2.41.0 >>>>>>>>>>>> and 2.59.0. >>>>>>>>>>>> >>>>>>>>>>>> Jan >>>>>>>>>>>> On 9/17/24 17:49, Lydian Lee wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> We are using Beam Python SDK with Flink Runner, the Beam >>>>>>>>>>>> version is 2.41.0 and the Flink version is 1.15.4. >>>>>>>>>>>> >>>>>>>>>>>> We have a pipeline that has 2 stages: >>>>>>>>>>>> 1. read from kafka and fixed window for every 1 minute >>>>>>>>>>>> 2. aggregate the data for the past 1 minute and reshuffle so >>>>>>>>>>>> that we have less partition count and write them into s3. >>>>>>>>>>>> >>>>>>>>>>>> We disabled the enable.auto.commit and enabled >>>>>>>>>>>> commit_offset_in_finalize. also the auto.offset.reset is set to >>>>>>>>>>>> "latest" >>>>>>>>>>>> [image: image.png] >>>>>>>>>>>> >>>>>>>>>>>> According to the log, I can definitely find the data is >>>>>>>>>>>> consuming from Kafka Offset, Because there are many >>>>>>>>>>>> ``` >>>>>>>>>>>> Resetting offset for topic XXXX-<PARTITION> to offset <OFFSET> >>>>>>>>>>>> ``` >>>>>>>>>>>> and that partition/offset pair does match the missing records. >>>>>>>>>>>> However, it doesn't show up in the final S3. >>>>>>>>>>>> >>>>>>>>>>>> My current hypothesis is that the shuffling might be the reason >>>>>>>>>>>> for the issue, for example, originally in kafka for the past >>>>>>>>>>>> minute in >>>>>>>>>>>> partition 1, I have offset 1, 2, 3 records. After reshuffle, it >>>>>>>>>>>> now >>>>>>>>>>>> distribute, for example: >>>>>>>>>>>> - partition A: 1, 3 >>>>>>>>>>>> - partition B: 2 >>>>>>>>>>>> >>>>>>>>>>>> And if partition A is done successfully but partition B fails. >>>>>>>>>>>> Given that A is succeeded, it will commit its offset to Kafka, and >>>>>>>>>>>> thus >>>>>>>>>>>> kafka now has an offset to 3. And when kafka retries , it will >>>>>>>>>>>> skip the >>>>>>>>>>>> offset 2. However, I am not sure how exactly the offset commit >>>>>>>>>>>> works, >>>>>>>>>>>> wondering how it interacts with the checkpoints. But it does seem >>>>>>>>>>>> like if >>>>>>>>>>>> my hypothesis is correct, we should be seeing more missing records, >>>>>>>>>>>> however, this seems rare to happen. Wondering if anyone can help >>>>>>>>>>>> identify >>>>>>>>>>>> potential root causes? Thanks >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>
