Hi,

If you require only a single file per 5 min then what you are missing is that you need to window data into fixed windows, so that state-full DoFn could store the elements per key per window.

Something like (did not test this code just pseudo code):

classStatefulWriteToFileDoFn(beam.DoFn):
BAG_STATE = BagStateSpec('bag_state', VarIntCoder())
TIMER = TimerSpec('timer', TimeDomain.WATERMARK)
defprocess(self, element, timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam,
bag_state=beam.DoFn.StateParam(BAG_STATE),
timer=beam.DoFn.TimerParam(TIMER)):
bag_state.add(element)
timer.set(window.end)
defon_timer(self, window=beam.DoFn.WindowParam, bag_state=beam.DoFn.StateParam(BAG_STATE)): # Here, you can generate a filename based on the window's end time, for example filename = f'output_{window.end.to_utc_datetime().strftime("%Y%m%d%H%M%S")}.txt'
withopen(filename, 'w') asf:
forelement inbag_state.read():
f.write(f'{element}\n')
defrun():
withbeam.Pipeline(options=PipelineOptions()) asp:
(p
|'ReadStream'>>beam.io.ReadFromPubSub(topic='projects/.../topics/...')
|'WindowIntoFixed'>>beam.WindowInto(FixedWindows(300)) # 5-minute windows
|'MapToSingleKey'>>beam.Map(lambdax: (1, x)) # Map to a single key
|'StatefulWriteToFile'>>beam.ParDo(StatefulWriteToFileDoFn())
)

So the state-full dofn will keep its state per key per window. So each window should crate its own output file. But note that this requires you to put all the 5 min data in same worker as only a single worker can be responsible for creating the file and hence the shuffling is required.

There is a workaround If the traffic might be too big, but it would mean to generate more files per 5 min window (one file per worker).

The trick is to assign key that uniquely represents the worker, not the data. So every worker that maps the key should have his unique value put in this key.
See this example:

class_Key(DoFn):
defsetup(self):
self.shard_prefix=str(uuid4())
defprocess(
self,
x: input_type,
) -> Iterable[tuple[str, input_type]]:
yield(
self.shard_prefix+str(threading.get_ident()), # each worker may create his batch
x,
)
And then you can use it like
|"KeyPerWorker">>ParDo(_Key())
instead of using constant key with the first approach. Also remember to make sure file names are unique if using this approach.
Best
Wisniowski Piotr

On 25.08.2024 20:30, vamsikrishna korada wrote:

Hi everyone,

I'm new to Apache Beam and have a question regarding its usage.

I have a scenario where I need to read a stream of elements from a PCollection and write them to a new file every 5 minutes.

Initially, I considered using Timers and state stores, but I discovered that Timers are only applicable to KV pairs. If I convert my PCollection into a key-value pair with a dummy key and then use timers, I encountered several issues:

 1. It introduces an additional shuffle.
 2. With all elements sharing the same key, they would be processed by
    a single task in the Flink on Beam application. I prefer not to
    manually define the number of keys based on load because I plan to
    run multiple pipelines, each with varying loads.

One alternative I considered is using a custom executor thread within my Writer DoFn to flush the records every 5 minutes. However, this approach would require me to use a lock to make sure only one of the process element and the flush blocks are running at a time.

Is there a more effective way to accomplish this?



Thanks,

Vamsi


Reply via email to