Hello,

I am jumping on this, because we are doing same things as Lydian.
In our case, we are using default timestamp strategy in kafka (so
processing timestamp).
We were also doing same as Lydian to add processing timestamp manually.


However we have late data. It mainly happen in our integration test with
flink. (parallelism 1), and happen really rarely in production.

So it means we can't control the timestamp of an item even with
`window.TimestampedValue(event, time.time()))`?

Best,

Marc


On Tue, Sep 24, 2024, 04:23 Reuven Lax via user <[email protected]>
wrote:

> Also as I said, the duplicate files might not appear like duplicates,
> which can be quite confusing.
>
> Out of curiosity, I would try - just for testing_ remove the line where
> you "add" processing time, and also set allowed_lateness to the largest
> allowed value. This will help determine whether late data is causing the
> dropped records.
>
> Reuven
>
>
> On Mon, Sep 23, 2024 at 2:10 PM Lydian Lee <[email protected]>
> wrote:
>
>> Hi Jan,
>>
>> Thanks for taking a quick look!  Yes, the "with" statement would close
>> after the write.  In our use cases, we actually don't mind if there are
>> duplicates of data written, but we are more concerned about the missing
>> data, which is the issue we are facing right now.
>>
>> On Mon, Sep 23, 2024 at 11:54 AM Reuven Lax via user <
>> [email protected]> wrote:
>>
>>> Do you close the write afterwards? If not, I wonder if you could lose
>>> records due to unflushed data.
>>>
>>> Also - what you're doing here can definitely lead to duplicate data
>>> written, since this DoFn can be run multiple times. The duplicates might
>>> also appear different if the Iterables are slightly different on retries,
>>> especially in the case when Flink restarts a checkpoint.
>>>
>>> Reuven
>>>
>>> On Mon, Sep 23, 2024 at 1:40 PM Lydian Lee <[email protected]>
>>> wrote:
>>>
>>>> Hi Jan,
>>>>
>>>> Thanks so much for your help. Here's our write to s3:
>>>>
>>>> from pyarrow.parquet import ParquetWriter
>>>>
>>>> class WriteBatchesToS3(beam.DoFn):
>>>> def __init__(
>>>> self,
>>>> output_path: str,
>>>> schema: pa.schema,
>>>> pipeline_options: PipelineOptions,
>>>> ) -> None:
>>>> self.output_path = output_path
>>>> self.schema = schema
>>>> self.pipeline_options = pipeline_options
>>>>
>>>> def process(self, data: Iterable[List[Dict]]) -> None:
>>>> """Write one batch per file to S3."""
>>>> client = beam.io.aws.clients.s3.boto3_client.Client(options=self.
>>>> pipeline_options)
>>>>
>>>> fields_without_metadata = [pa.field(f.name, f.type) for f in self.
>>>> schema]
>>>> schema_without_field_metadata = pa.schema(fields_without_metadata)
>>>>
>>>> filename = os.path.join(
>>>> self.output_path,
>>>> f"uuid_{str(uuid4())}.parquet",
>>>> )
>>>>
>>>> tables = [pa.Table.from_pylist(batch, schema=
>>>> schema_without_field_metadata) for batch in data]
>>>> if len(tables) == 0:
>>>> logging.info(f"No data to write for key: {partition_date}, the grouped
>>>> contents are: {data}")
>>>> return
>>>>
>>>> with beam.io.aws.s3io.S3IO(client=client).open(filename=filename, mode=
>>>> "w") as s3_writer:
>>>> with ParquetWriter(
>>>> s3_writer, schema_without_field_metadata, compression="SNAPPY",
>>>> use_deprecated_int96_timestamps=True
>>>> ) as parquet_writer:
>>>> merged_tables = pa.concat_tables(tables)
>>>> parquet_writer.write_table(merged_tables)
>>>>
>>>> On Fri, Sep 20, 2024 at 12:02 AM Jan Lukavský <[email protected]> wrote:
>>>>
>>>>> Hi Lydian,
>>>>>
>>>>> because you do not specify 'timestamp_policy' it should use the
>>>>> default, which should be processing time, so this should not be the issue.
>>>>> The one potentially left transform is the sink transform, as Reuven
>>>>> mentioned. Can you share details of the implementation?
>>>>>
>>>>>  Jan
>>>>> On 9/19/24 23:10, Lydian Lee wrote:
>>>>>
>>>>> Hi, Jan
>>>>>
>>>>> Here's how we do ReadFromKafka, the expansion service is just to
>>>>> ensure we can work with xlang in k8s, so please ignore them.
>>>>> from apache_beam.io.kafka import default_io_expansion_service
>>>>> ReadFromKafka(
>>>>> consumer_config={
>>>>> "group.id": "group-name",
>>>>> "auto.offset.reset": "latest",
>>>>> "enable.auto.commit": "false",
>>>>> },
>>>>> topics="topic-name",
>>>>> with_metadata=False,
>>>>> expansion_service=default_io_expansion_service(
>>>>> append_args=[
>>>>> f"--defaultEnvironmentType=PROCESS",
>>>>> f'--defaultEnvironmentConfig={"command":"/opt/apache/beam/java_boot"}'
>>>>> ,
>>>>> "--experiments=use_deprecated_read",
>>>>> ]
>>>>> ),
>>>>> commit_offset_in_finalize=True,
>>>>> )
>>>>>
>>>>> Do you know what would be the right approach for using processing time
>>>>> instead? I thought the WindowInto supposed to use the timestamp we appened
>>>>> to the event?  Do you think it is still using the original Kafka event
>>>>> timestamp?  Thanks!
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Sep 19, 2024 at 7:53 AM Jan Lukavský <[email protected]> wrote:
>>>>>
>>>>>> Can you share the (relevant) parameters of the ReadFromKafka
>>>>>> transform?
>>>>>>
>>>>>> This feels strange, and it might not do what you'd expect:
>>>>>> | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda
>>>>>> event: window.TimestampedValue(event, time.time()))
>>>>>>
>>>>>> This does not change the assigned timestamp of an element, but
>>>>>> creates a new element which contains processing time. It will not be used
>>>>>> for windowing, though.
>>>>>> On 9/19/24 00:49, Lydian Lee wrote:
>>>>>>
>>>>>> Hi Reuven,
>>>>>>
>>>>>> Here's a quick look for our pipeline:
>>>>>> (
>>>>>> pipeline
>>>>>> | "Reading message from Kafka" >> ReadFromKafka(...)
>>>>>> | "Deserializing events" >> Deserialize(**deserializer_args)
>>>>>> | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda
>>>>>> event: window.TimestampedValue(event, time.time()))
>>>>>> | "Window into Fixed Intervals" >> beam.WindowInto(
>>>>>> beam.transforms.window.FixedWindows(fixed_window_size), #
>>>>>> fixed_window_size is 1 min.
>>>>>> allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness), #
>>>>>> although we configured lateness, but because we are using processing 
>>>>>> time,
>>>>>> i don't expect any late events
>>>>>> )
>>>>>> | "Adding random integer partition key" >> beam.Map(
>>>>>> lambda event: (random.randint(1, 5), element) # add dummy key to
>>>>>> reshuffle to less partitions.  Kafka have 16 partition, but we only want 
>>>>>> to
>>>>>> generate 2 files every minute
>>>>>> )
>>>>>> | "Group by randomly-assigned integer key" >> beam.GroupByKey()
>>>>>> | "Abandon Dummy Key" >> beam.MapTuple(lambda key, val: val)
>>>>>> | "Writing event data batches to parquet" >> WriteBatchesToS3(...)  #
>>>>>> call boto3 to write the events into S3 with parquet format
>>>>>> )
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>>
>>>>>> On Wed, Sep 18, 2024 at 3:16 PM Reuven Lax via user <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> How are you doing this aggregation?
>>>>>>>
>>>>>>> On Wed, Sep 18, 2024 at 3:11 PM Lydian Lee <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Jan,
>>>>>>>>
>>>>>>>> Thanks for the recommendation. In our case, we are windowing with
>>>>>>>> the processing time, which means that there should be no late event at 
>>>>>>>> all.
>>>>>>>>
>>>>>>>> You’ve mentioned that GroupByKey is stateful and can potentially
>>>>>>>> drop the data. Given that after reshuffle (add random shuffle id to the
>>>>>>>> key), we then do the aggregation (combine the data and write those 
>>>>>>>> data to
>>>>>>>> S3.) Do you think the example I mentioned earlier could potentially be 
>>>>>>>> the
>>>>>>>> reason for the dropping data?
>>>>>>>>
>>>>>>>> If so, in general how does Beam being able to prevent that ? Are
>>>>>>>> there any suggested approaches? Thanks
>>>>>>>>
>>>>>>>> On Wed, Sep 18, 2024 at 12:33 AM Jan Lukavský <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Lydian,
>>>>>>>>>
>>>>>>>>> in that case, there is only a generic advice you can look into.
>>>>>>>>> Reshuffle is a stateless operation that should not cause dropping 
>>>>>>>>> data. A
>>>>>>>>> GroupByKey on the other hand is stateful and thus can - when dealing 
>>>>>>>>> with
>>>>>>>>> late data - drop some of them. You should be able to confirm this 
>>>>>>>>> looking
>>>>>>>>> for 'droppedDueToLateness' counter and/or log in here [1]. This 
>>>>>>>>> happens
>>>>>>>>> when elements arrive after watermark passes element's timestamp minus
>>>>>>>>> allowed lateness. If you see the log, you might need to either change 
>>>>>>>>> how
>>>>>>>>> you assign timestamps to elements (e.g. use log append time) or 
>>>>>>>>> increase
>>>>>>>>> allowed lateness of your windowfn.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>>  Jan
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132
>>>>>>>>> On 9/18/24 08:53, Lydian Lee wrote:
>>>>>>>>>
>>>>>>>>> I would love to, but there are some limitations on our ends that
>>>>>>>>> the version bump won’t be happened soon. Thus I need to figure out 
>>>>>>>>> what
>>>>>>>>> might be the root cause though.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Lydian,
>>>>>>>>>>
>>>>>>>>>> 2.41.0 is quite old, can you please try current version to see if
>>>>>>>>>> this issue is still present? There were lots of changes between 
>>>>>>>>>> 2.41.0 and
>>>>>>>>>> 2.59.0.
>>>>>>>>>>
>>>>>>>>>>  Jan
>>>>>>>>>> On 9/17/24 17:49, Lydian Lee wrote:
>>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> We are using Beam Python SDK with Flink Runner, the Beam version
>>>>>>>>>> is 2.41.0 and the Flink version is 1.15.4.
>>>>>>>>>>
>>>>>>>>>> We have a pipeline that has 2 stages:
>>>>>>>>>> 1. read from kafka and fixed window for every 1 minute
>>>>>>>>>> 2. aggregate the data for the past 1 minute and reshuffle so that
>>>>>>>>>> we have less partition count and write them into s3.
>>>>>>>>>>
>>>>>>>>>> We disabled the enable.auto.commit and enabled
>>>>>>>>>> commit_offset_in_finalize. also the auto.offset.reset is set to 
>>>>>>>>>> "latest"
>>>>>>>>>> [image: image.png]
>>>>>>>>>>
>>>>>>>>>> According to the log, I can definitely find the data is consuming
>>>>>>>>>> from Kafka Offset, Because there are many
>>>>>>>>>> ```
>>>>>>>>>> Resetting offset for topic XXXX-<PARTITION>  to offset <OFFSET>
>>>>>>>>>> ```
>>>>>>>>>> and that partition/offset pair does match the missing records.
>>>>>>>>>> However, it doesn't show up in the final S3.
>>>>>>>>>>
>>>>>>>>>> My current hypothesis is that the shuffling might be the reason
>>>>>>>>>> for the issue, for example, originally in kafka for the past minute 
>>>>>>>>>> in
>>>>>>>>>> partition 1,  I have offset 1, 2, 3 records. After reshuffle, it now
>>>>>>>>>> distribute, for example:
>>>>>>>>>> - partition A: 1, 3
>>>>>>>>>> - partition B: 2
>>>>>>>>>>
>>>>>>>>>> And if partition A is done successfully but partition B fails.
>>>>>>>>>> Given that A is succeeded, it will commit its offset to Kafka, and 
>>>>>>>>>> thus
>>>>>>>>>> kafka now has an offset to 3.  And when kafka retries , it will skip 
>>>>>>>>>> the
>>>>>>>>>> offset 2.   However, I am not sure how exactly the offset commit 
>>>>>>>>>> works,
>>>>>>>>>> wondering how it interacts with the checkpoints.  But it does seem 
>>>>>>>>>> like if
>>>>>>>>>> my hypothesis is correct, we should be seeing more missing records,
>>>>>>>>>> however, this seems rare to happen.  Wondering if anyone can help 
>>>>>>>>>> identify
>>>>>>>>>> potential root causes?  Thanks
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>

Reply via email to