Vamsi,

Actually I do not think it is possible to avoid shuffling just because you want to put data into 5 minute fixed windows and there is no ordering on the incoming input data. I am not sure what is your upstream source or transformation that creates the pcollection that you want to save, but there is no assumption that the elements are correlated to time. The only guarantee is that there will not be any earlier element than watermark.

So in your problem source pcollection can get messages with this timestamps at same moment into different workers:
-e1, 2024-01-01T00:00:10

-e2, 2024-01-01T00:03:10

-e3, 2024-01-01T00:08:10

-e4, 2024-01-01T00:04:10

-e5, 2024-01-01T00:06:10

-e6, 2024-01-01T00:09:10

Since this is distributed computation there is no possibility to order the events globally. Also there is no guarantee that all workers will be processing all time window buckets.

What can be done is that the worker that receives the message checks if it is responsible for computing this message window and stores it on a disc or forwards the message to the worker that is actually responsible for the message window (which is really a shuffle operation).

What could be done to optimize more the code with sharding prefix is try to use `GroupIntoBatches.WithShardedKey`(impl ref https://github.com/apache/beam/blob/bcee5d081d05841bb52f0851f94a04f9c8968b88/sdks/python/apache_beam/transforms/util.py#L1113) as some runners might optimize this shuffle so that minimum amount of data is actually sent thru network. You may try to use that transform (if it will meed your requirements) or take a look there how `ShardedKey` is used there.

On the idea of custom executor thread - this is asking for troubles. Typically beam (and all other distributed computation engines) are designed to handle parallelization for you and if you try to actually do something with threads on worker things typically end badly (random failures, missing data, hard debugging). I really doubt it would be more performant than actual shuffle with beam way.

Best,

Wisniowski Piotr


On 27.08.2024 20:35, vamsikrishna korada wrote:
Thanks for the reply Wisniowski Piotr.

As you mentioned, the data can be big and I am okay with writing multiple files every 5 minutes.. If I assign a key that uniquely represents a worker, that makes sure all the data that is processed together stays together. But there can still be a shuffle and the data can collectively be moved across nodes when we are assigning the keys right? Is there any way I can avoid the extra shuffle?

Do you think it is a good idea to instead have a separate scheduler thread within the DoFn which will periodically flush the data every 5 minutes from each worker?
Thanks,
Vamsi

On Tue, 27 Aug 2024 at 17:49, Wiśniowski Piotr <[email protected]> wrote:

    Hi,

    If you require only a single file per 5 min then what you are
    missing is that you need to window data into fixed windows, so
    that state-full DoFn could store the elements per key per window.

    Something like (did not test this code just pseudo code):

    classStatefulWriteToFileDoFn(beam.DoFn):
    BAG_STATE = BagStateSpec('bag_state', VarIntCoder())
    TIMER = TimerSpec('timer', TimeDomain.WATERMARK)
    defprocess(self, element, timestamp=beam.DoFn.TimestampParam,
    window=beam.DoFn.WindowParam,
    bag_state=beam.DoFn.StateParam(BAG_STATE),
    timer=beam.DoFn.TimerParam(TIMER)):
    bag_state.add(element)
    timer.set(window.end)
    defon_timer(self, window=beam.DoFn.WindowParam,
    bag_state=beam.DoFn.StateParam(BAG_STATE)):
    # Here, you can generate a filename based on the window's end
    time, for example
    filename =
    f'output_{window.end.to_utc_datetime().strftime("%Y%m%d%H%M%S")}.txt'
    withopen(filename, 'w') asf:
    forelement inbag_state.read():
    f.write(f'{element}\n')
    defrun():
    withbeam.Pipeline(options=PipelineOptions()) asp:
    (p
    |'ReadStream'>>beam.io.ReadFromPubSub(topic='projects/.../topics/...')
    |'WindowIntoFixed'>>beam.WindowInto(FixedWindows(300)) # 5-minute
    windows
    |'MapToSingleKey'>>beam.Map(lambdax: (1, x)) # Map to a single key
    |'StatefulWriteToFile'>>beam.ParDo(StatefulWriteToFileDoFn())
    )

    So the state-full dofn will keep its state per key per window. So
    each window should crate its own output file. But note that this
    requires you to put all the 5 min data in same worker as only a
    single worker can be responsible for creating the file and hence
    the shuffling is required.

    There is a workaround If the traffic might be too big, but it
    would mean to generate more files per 5 min window (one file per
    worker).

    The trick is to assign key that uniquely represents the worker,
    not the data. So every worker that maps the key should have his
    unique value put in this key.
    See this example:

    class_Key(DoFn):
    defsetup(self):
    self.shard_prefix=str(uuid4())
    defprocess(
    self,
    x: input_type,
    ) -> Iterable[tuple[str, input_type]]:
    yield(
    self.shard_prefix+str(threading.get_ident()), # each worker may
    create his batch
    x,
    )
    And then you can use it like
    |"KeyPerWorker">>ParDo(_Key())
    instead of using constant key with the first approach. Also
    remember to make sure file names are unique if using this approach.
    Best
    Wisniowski Piotr

    On 25.08.2024 20:30, vamsikrishna korada wrote:

    Hi everyone,

    I'm new to Apache Beam and have a question regarding its usage.

    I have a scenario where I need to read a stream of elements from
    a PCollection and write them to a new file every 5 minutes.

    Initially, I considered using Timers and state stores, but I
    discovered that Timers are only applicable to KV pairs. If I
    convert my PCollection into a key-value pair with a dummy key and
    then use timers, I encountered several issues:

     1. It introduces an additional shuffle.
     2. With all elements sharing the same key, they would be
        processed by a single task in the Flink on Beam application.
        I prefer not to manually define the number of keys based on
        load because I plan to run multiple pipelines, each with
        varying loads.

    One alternative I considered is using a custom executor thread
    within my Writer DoFn to flush the records every 5 minutes.
    However, this approach would require me to use a lock to make
    sure only one of the process element and the flush blocks are
    running at a time.

    Is there a more effective way to accomplish this?



    Thanks,

    Vamsi


Reply via email to