Vamsi,
Actually I do not think it is possible to avoid shuffling just because
you want to put data into 5 minute fixed windows and there is no
ordering on the incoming input data.
I am not sure what is your upstream source or transformation that
creates the pcollection that you want to save, but there is no
assumption that the elements are correlated to time. The only guarantee
is that there will not be any earlier element than watermark.
So in your problem source pcollection can get messages with this
timestamps at same moment into different workers:
-e1, 2024-01-01T00:00:10
-e2, 2024-01-01T00:03:10
-e3, 2024-01-01T00:08:10
-e4, 2024-01-01T00:04:10
-e5, 2024-01-01T00:06:10
-e6, 2024-01-01T00:09:10
Since this is distributed computation there is no possibility to order
the events globally. Also there is no guarantee that all workers will be
processing all time window buckets.
What can be done is that the worker that receives the message checks if
it is responsible for computing this message window and stores it on a
disc or forwards the message to the worker that is actually responsible
for the message window (which is really a shuffle operation).
What could be done to optimize more the code with sharding prefix is try
to use `GroupIntoBatches.WithShardedKey`(impl ref
https://github.com/apache/beam/blob/bcee5d081d05841bb52f0851f94a04f9c8968b88/sdks/python/apache_beam/transforms/util.py#L1113)
as some runners might optimize this shuffle so that minimum amount of
data is actually sent thru network.
You may try to use that transform (if it will meed your requirements) or
take a look there how `ShardedKey` is used there.
On the idea of custom executor thread - this is asking for troubles.
Typically beam (and all other distributed computation engines) are
designed to handle parallelization for you and if you try to actually do
something with threads on worker things typically end badly (random
failures, missing data, hard debugging). I really doubt it would be more
performant than actual shuffle with beam way.
Best,
Wisniowski Piotr
On 27.08.2024 20:35, vamsikrishna korada wrote:
Thanks for the reply Wisniowski Piotr.
As you mentioned, the data can be big and I am okay with writing
multiple files every 5 minutes..
If I assign a key that uniquely represents a worker, that makes sure
all the data that is processed together stays together. But there can
still be a shuffle and the data can collectively be moved across nodes
when we are assigning the keys right? Is there any way I can avoid the
extra shuffle?
Do you think it is a good idea to instead have a separate scheduler
thread within the DoFn which will periodically flush the data every 5
minutes from each worker?
Thanks,
Vamsi
On Tue, 27 Aug 2024 at 17:49, Wiśniowski Piotr
<[email protected]> wrote:
Hi,
If you require only a single file per 5 min then what you are
missing is that you need to window data into fixed windows, so
that state-full DoFn could store the elements per key per window.
Something like (did not test this code just pseudo code):
classStatefulWriteToFileDoFn(beam.DoFn):
BAG_STATE = BagStateSpec('bag_state', VarIntCoder())
TIMER = TimerSpec('timer', TimeDomain.WATERMARK)
defprocess(self, element, timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam,
bag_state=beam.DoFn.StateParam(BAG_STATE),
timer=beam.DoFn.TimerParam(TIMER)):
bag_state.add(element)
timer.set(window.end)
defon_timer(self, window=beam.DoFn.WindowParam,
bag_state=beam.DoFn.StateParam(BAG_STATE)):
# Here, you can generate a filename based on the window's end
time, for example
filename =
f'output_{window.end.to_utc_datetime().strftime("%Y%m%d%H%M%S")}.txt'
withopen(filename, 'w') asf:
forelement inbag_state.read():
f.write(f'{element}\n')
defrun():
withbeam.Pipeline(options=PipelineOptions()) asp:
(p
|'ReadStream'>>beam.io.ReadFromPubSub(topic='projects/.../topics/...')
|'WindowIntoFixed'>>beam.WindowInto(FixedWindows(300)) # 5-minute
windows
|'MapToSingleKey'>>beam.Map(lambdax: (1, x)) # Map to a single key
|'StatefulWriteToFile'>>beam.ParDo(StatefulWriteToFileDoFn())
)
So the state-full dofn will keep its state per key per window. So
each window should crate its own output file. But note that this
requires you to put all the 5 min data in same worker as only a
single worker can be responsible for creating the file and hence
the shuffling is required.
There is a workaround If the traffic might be too big, but it
would mean to generate more files per 5 min window (one file per
worker).
The trick is to assign key that uniquely represents the worker,
not the data. So every worker that maps the key should have his
unique value put in this key.
See this example:
class_Key(DoFn):
defsetup(self):
self.shard_prefix=str(uuid4())
defprocess(
self,
x: input_type,
) -> Iterable[tuple[str, input_type]]:
yield(
self.shard_prefix+str(threading.get_ident()), # each worker may
create his batch
x,
)
And then you can use it like
|"KeyPerWorker">>ParDo(_Key())
instead of using constant key with the first approach. Also
remember to make sure file names are unique if using this approach.
Best
Wisniowski Piotr
On 25.08.2024 20:30, vamsikrishna korada wrote:
Hi everyone,
I'm new to Apache Beam and have a question regarding its usage.
I have a scenario where I need to read a stream of elements from
a PCollection and write them to a new file every 5 minutes.
Initially, I considered using Timers and state stores, but I
discovered that Timers are only applicable to KV pairs. If I
convert my PCollection into a key-value pair with a dummy key and
then use timers, I encountered several issues:
1. It introduces an additional shuffle.
2. With all elements sharing the same key, they would be
processed by a single task in the Flink on Beam application.
I prefer not to manually define the number of keys based on
load because I plan to run multiple pipelines, each with
varying loads.
One alternative I considered is using a custom executor thread
within my Writer DoFn to flush the records every 5 minutes.
However, this approach would require me to use a lock to make
sure only one of the process element and the flush blocks are
running at a time.
Is there a more effective way to accomplish this?
Thanks,
Vamsi