Hi,
If you require only a single file per 5 min then what you are missing is
that you need to window data into fixed windows, so that state-full DoFn
could store the elements per key per window.
Something like (did not test this code just pseudo code):
classStatefulWriteToFileDoFn(beam.DoFn):
BAG_STATE = BagStateSpec('bag_state', VarIntCoder())
TIMER = TimerSpec('timer', TimeDomain.WATERMARK)
defprocess(self, element, timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam,
bag_state=beam.DoFn.StateParam(BAG_STATE),
timer=beam.DoFn.TimerParam(TIMER)):
bag_state.add(element)
timer.set(window.end)
defon_timer(self, window=beam.DoFn.WindowParam,
bag_state=beam.DoFn.StateParam(BAG_STATE)):
# Here, you can generate a filename based on the window's end time, for
example
filename =
f'output_{window.end.to_utc_datetime().strftime("%Y%m%d%H%M%S")}.txt'
withopen(filename, 'w') asf:
forelement inbag_state.read():
f.write(f'{element}\n')
defrun():
withbeam.Pipeline(options=PipelineOptions()) asp:
(p
|'ReadStream'>>beam.io.ReadFromPubSub(topic='projects/.../topics/...')
|'WindowIntoFixed'>>beam.WindowInto(FixedWindows(300)) # 5-minute windows
|'MapToSingleKey'>>beam.Map(lambdax: (1, x)) # Map to a single key
|'StatefulWriteToFile'>>beam.ParDo(StatefulWriteToFileDoFn())
)
So the state-full dofn will keep its state per key per window. So each
window should crate its own output file. But note that this requires you
to put all the 5 min data in same worker as only a single worker can be
responsible for creating the file and hence the shuffling is required.
There is a workaround If the traffic might be too big, but it would mean
to generate more files per 5 min window (one file per worker).
The trick is to assign key that uniquely represents the worker, not the
data. So every worker that maps the key should have his unique value put
in this key.
See this example:
class_Key(DoFn):
defsetup(self):
self.shard_prefix=str(uuid4())
defprocess(
self,
x: input_type,
) -> Iterable[tuple[str, input_type]]:
yield(
self.shard_prefix+str(threading.get_ident()), # each worker may create
his batch
x,
)
And then you can use it like
|"KeyPerWorker">>ParDo(_Key())
instead of using constant key with the first approach. Also remember to
make sure file names are unique if using this approach.
Best
Wisniowski Piotr
On 25.08.2024 20:30, vamsikrishna korada wrote:
Hi everyone,
I'm new to Apache Beam and have a question regarding its usage.
I have a scenario where I need to read a stream of elements from a
PCollection and write them to a new file every 5 minutes.
Initially, I considered using Timers and state stores, but I
discovered that Timers are only applicable to KV pairs. If I convert
my PCollection into a key-value pair with a dummy key and then use
timers, I encountered several issues:
1. It introduces an additional shuffle.
2. With all elements sharing the same key, they would be processed by
a single task in the Flink on Beam application. I prefer not to
manually define the number of keys based on load because I plan to
run multiple pipelines, each with varying loads.
One alternative I considered is using a custom executor thread within
my Writer DoFn to flush the records every 5 minutes. However, this
approach would require me to use a lock to make sure only one of the
process element and the flush blocks are running at a time.
Is there a more effective way to accomplish this?
Thanks,
Vamsi