Hi Jan, Thanks for taking a quick look! Yes, the "with" statement would close after the write. In our use cases, we actually don't mind if there are duplicates of data written, but we are more concerned about the missing data, which is the issue we are facing right now.
On Mon, Sep 23, 2024 at 11:54 AM Reuven Lax via user <[email protected]> wrote: > Do you close the write afterwards? If not, I wonder if you could lose > records due to unflushed data. > > Also - what you're doing here can definitely lead to duplicate data > written, since this DoFn can be run multiple times. The duplicates might > also appear different if the Iterables are slightly different on retries, > especially in the case when Flink restarts a checkpoint. > > Reuven > > On Mon, Sep 23, 2024 at 1:40 PM Lydian Lee <[email protected]> > wrote: > >> Hi Jan, >> >> Thanks so much for your help. Here's our write to s3: >> >> from pyarrow.parquet import ParquetWriter >> >> class WriteBatchesToS3(beam.DoFn): >> def __init__( >> self, >> output_path: str, >> schema: pa.schema, >> pipeline_options: PipelineOptions, >> ) -> None: >> self.output_path = output_path >> self.schema = schema >> self.pipeline_options = pipeline_options >> >> def process(self, data: Iterable[List[Dict]]) -> None: >> """Write one batch per file to S3.""" >> client = beam.io.aws.clients.s3.boto3_client.Client(options=self. >> pipeline_options) >> >> fields_without_metadata = [pa.field(f.name, f.type) for f in self.schema] >> schema_without_field_metadata = pa.schema(fields_without_metadata) >> >> filename = os.path.join( >> self.output_path, >> f"uuid_{str(uuid4())}.parquet", >> ) >> >> tables = [pa.Table.from_pylist(batch, schema= >> schema_without_field_metadata) for batch in data] >> if len(tables) == 0: >> logging.info(f"No data to write for key: {partition_date}, the grouped >> contents are: {data}") >> return >> >> with beam.io.aws.s3io.S3IO(client=client).open(filename=filename, mode= >> "w") as s3_writer: >> with ParquetWriter( >> s3_writer, schema_without_field_metadata, compression="SNAPPY", >> use_deprecated_int96_timestamps=True >> ) as parquet_writer: >> merged_tables = pa.concat_tables(tables) >> parquet_writer.write_table(merged_tables) >> >> On Fri, Sep 20, 2024 at 12:02 AM Jan Lukavský <[email protected]> wrote: >> >>> Hi Lydian, >>> >>> because you do not specify 'timestamp_policy' it should use the default, >>> which should be processing time, so this should not be the issue. The one >>> potentially left transform is the sink transform, as Reuven mentioned. Can >>> you share details of the implementation? >>> >>> Jan >>> On 9/19/24 23:10, Lydian Lee wrote: >>> >>> Hi, Jan >>> >>> Here's how we do ReadFromKafka, the expansion service is just to ensure >>> we can work with xlang in k8s, so please ignore them. >>> from apache_beam.io.kafka import default_io_expansion_service >>> ReadFromKafka( >>> consumer_config={ >>> "group.id": "group-name", >>> "auto.offset.reset": "latest", >>> "enable.auto.commit": "false", >>> }, >>> topics="topic-name", >>> with_metadata=False, >>> expansion_service=default_io_expansion_service( >>> append_args=[ >>> f"--defaultEnvironmentType=PROCESS", >>> f'--defaultEnvironmentConfig={"command":"/opt/apache/beam/java_boot"}', >>> "--experiments=use_deprecated_read", >>> ] >>> ), >>> commit_offset_in_finalize=True, >>> ) >>> >>> Do you know what would be the right approach for using processing time >>> instead? I thought the WindowInto supposed to use the timestamp we appened >>> to the event? Do you think it is still using the original Kafka event >>> timestamp? Thanks! >>> >>> >>> >>> On Thu, Sep 19, 2024 at 7:53 AM Jan Lukavský <[email protected]> wrote: >>> >>>> Can you share the (relevant) parameters of the ReadFromKafka transform? >>>> >>>> This feels strange, and it might not do what you'd expect: >>>> | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda event: >>>> window.TimestampedValue(event, time.time())) >>>> >>>> This does not change the assigned timestamp of an element, but creates >>>> a new element which contains processing time. It will not be used for >>>> windowing, though. >>>> On 9/19/24 00:49, Lydian Lee wrote: >>>> >>>> Hi Reuven, >>>> >>>> Here's a quick look for our pipeline: >>>> ( >>>> pipeline >>>> | "Reading message from Kafka" >> ReadFromKafka(...) >>>> | "Deserializing events" >> Deserialize(**deserializer_args) >>>> | "Adding 'trigger_processing_time' timestamp" >> beam.Map(lambda event: >>>> window.TimestampedValue(event, time.time())) >>>> | "Window into Fixed Intervals" >> beam.WindowInto( >>>> beam.transforms.window.FixedWindows(fixed_window_size), # >>>> fixed_window_size is 1 min. >>>> allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness), # >>>> although we configured lateness, but because we are using processing time, >>>> i don't expect any late events >>>> ) >>>> | "Adding random integer partition key" >> beam.Map( >>>> lambda event: (random.randint(1, 5), element) # add dummy key to >>>> reshuffle to less partitions. Kafka have 16 partition, but we only want to >>>> generate 2 files every minute >>>> ) >>>> | "Group by randomly-assigned integer key" >> beam.GroupByKey() >>>> | "Abandon Dummy Key" >> beam.MapTuple(lambda key, val: val) >>>> | "Writing event data batches to parquet" >> WriteBatchesToS3(...) # >>>> call boto3 to write the events into S3 with parquet format >>>> ) >>>> >>>> Thanks! >>>> >>>> >>>> On Wed, Sep 18, 2024 at 3:16 PM Reuven Lax via user < >>>> [email protected]> wrote: >>>> >>>>> How are you doing this aggregation? >>>>> >>>>> On Wed, Sep 18, 2024 at 3:11 PM Lydian Lee <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Jan, >>>>>> >>>>>> Thanks for the recommendation. In our case, we are windowing with the >>>>>> processing time, which means that there should be no late event at all. >>>>>> >>>>>> You’ve mentioned that GroupByKey is stateful and can potentially drop >>>>>> the data. Given that after reshuffle (add random shuffle id to the key), >>>>>> we >>>>>> then do the aggregation (combine the data and write those data to S3.) Do >>>>>> you think the example I mentioned earlier could potentially be the reason >>>>>> for the dropping data? >>>>>> >>>>>> If so, in general how does Beam being able to prevent that ? Are >>>>>> there any suggested approaches? Thanks >>>>>> >>>>>> On Wed, Sep 18, 2024 at 12:33 AM Jan Lukavský <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Lydian, >>>>>>> >>>>>>> in that case, there is only a generic advice you can look into. >>>>>>> Reshuffle is a stateless operation that should not cause dropping data. >>>>>>> A >>>>>>> GroupByKey on the other hand is stateful and thus can - when dealing >>>>>>> with >>>>>>> late data - drop some of them. You should be able to confirm this >>>>>>> looking >>>>>>> for 'droppedDueToLateness' counter and/or log in here [1]. This happens >>>>>>> when elements arrive after watermark passes element's timestamp minus >>>>>>> allowed lateness. If you see the log, you might need to either change >>>>>>> how >>>>>>> you assign timestamps to elements (e.g. use log append time) or increase >>>>>>> allowed lateness of your windowfn. >>>>>>> >>>>>>> Best, >>>>>>> >>>>>>> Jan >>>>>>> >>>>>>> [1] >>>>>>> https://github.com/apache/beam/blob/f37795e326a75310828518464189440b14863834/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L132 >>>>>>> On 9/18/24 08:53, Lydian Lee wrote: >>>>>>> >>>>>>> I would love to, but there are some limitations on our ends that the >>>>>>> version bump won’t be happened soon. Thus I need to figure out what >>>>>>> might >>>>>>> be the root cause though. >>>>>>> >>>>>>> >>>>>>> On Tue, Sep 17, 2024 at 11:26 PM Jan Lukavský <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Lydian, >>>>>>>> >>>>>>>> 2.41.0 is quite old, can you please try current version to see if >>>>>>>> this issue is still present? There were lots of changes between 2.41.0 >>>>>>>> and >>>>>>>> 2.59.0. >>>>>>>> >>>>>>>> Jan >>>>>>>> On 9/17/24 17:49, Lydian Lee wrote: >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> We are using Beam Python SDK with Flink Runner, the Beam version is >>>>>>>> 2.41.0 and the Flink version is 1.15.4. >>>>>>>> >>>>>>>> We have a pipeline that has 2 stages: >>>>>>>> 1. read from kafka and fixed window for every 1 minute >>>>>>>> 2. aggregate the data for the past 1 minute and reshuffle so that >>>>>>>> we have less partition count and write them into s3. >>>>>>>> >>>>>>>> We disabled the enable.auto.commit and enabled >>>>>>>> commit_offset_in_finalize. also the auto.offset.reset is set to >>>>>>>> "latest" >>>>>>>> [image: image.png] >>>>>>>> >>>>>>>> According to the log, I can definitely find the data is consuming >>>>>>>> from Kafka Offset, Because there are many >>>>>>>> ``` >>>>>>>> Resetting offset for topic XXXX-<PARTITION> to offset <OFFSET> >>>>>>>> ``` >>>>>>>> and that partition/offset pair does match the missing records. >>>>>>>> However, it doesn't show up in the final S3. >>>>>>>> >>>>>>>> My current hypothesis is that the shuffling might be the reason for >>>>>>>> the issue, for example, originally in kafka for the past minute in >>>>>>>> partition 1, I have offset 1, 2, 3 records. After reshuffle, it now >>>>>>>> distribute, for example: >>>>>>>> - partition A: 1, 3 >>>>>>>> - partition B: 2 >>>>>>>> >>>>>>>> And if partition A is done successfully but partition B fails. >>>>>>>> Given that A is succeeded, it will commit its offset to Kafka, and thus >>>>>>>> kafka now has an offset to 3. And when kafka retries , it will skip >>>>>>>> the >>>>>>>> offset 2. However, I am not sure how exactly the offset commit works, >>>>>>>> wondering how it interacts with the checkpoints. But it does seem >>>>>>>> like if >>>>>>>> my hypothesis is correct, we should be seeing more missing records, >>>>>>>> however, this seems rare to happen. Wondering if anyone can help >>>>>>>> identify >>>>>>>> potential root causes? Thanks >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>
