Yi Hu created BEAM-14497:
----------------------------
Summary: Python Reshuffle holds elements
Key: BEAM-14497
URL: https://issues.apache.org/jira/browse/BEAM-14497
Project: Beam
Issue Type: Bug
Components: sdk-py-core
Reporter: Yi Hu
Python Reshuffle holds elements when pipeline is running, and likely release
them in a batch. In contrast, Java Reshuffle triggers on every element as noted
in its documentation
"the trigger used with {@link Reshuffle} which triggers on every element and
never buffers
* state."
Here is a working example:
{code:python}
def test(p: Pipeline):
class SlowProcessFn(beam.DoFn):
def process(self, element):
time.sleep(0.5)
yield element
result = (p
| beam.Create(range(100))
| beam.ParDo(SlowProcessFn())
| beam.Reshuffle() # HERE
| beam.Map(lambda x: print(x, time.time())))
return result
{code}
Tested on local runner and flink runner (1.14), the elements are printed after
50 secs. If commenting out Reshuffle, every half second an element gets printed.
This behavior introduces issue when downstream PTransform involves some kind of
time-sensitive operation, like receiving a list of updated files from input and
read them done by filebasedsource.ReadAllFiles transform. Because there is a
Reshuffle in ReadAll, the actual read will be blocked.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)