Yi Hu created BEAM-14497:
----------------------------

             Summary: Python Reshuffle holds elements
                 Key: BEAM-14497
                 URL: https://issues.apache.org/jira/browse/BEAM-14497
             Project: Beam
          Issue Type: Bug
          Components: sdk-py-core
            Reporter: Yi Hu


Python Reshuffle holds elements when pipeline is running, and likely release 
them in a batch. In contrast, Java Reshuffle triggers on every element as noted 
in its documentation 
"the trigger used with {@link Reshuffle} which triggers on every element and 
never buffers
 * state."

Here is a working example:
{code:python}
def test(p: Pipeline):
  class SlowProcessFn(beam.DoFn):
    def process(self, element):
      time.sleep(0.5)
      yield element

  result = (p 
    | beam.Create(range(100)) 
    | beam.ParDo(SlowProcessFn())
    | beam.Reshuffle() # HERE
    | beam.Map(lambda x: print(x, time.time())))
  return result
{code}
Tested on local runner and flink runner (1.14), the elements are printed after 
50 secs. If commenting out Reshuffle, every half second an element gets printed.

This behavior introduces issue when downstream PTransform involves some kind of 
time-sensitive operation, like receiving a list of updated files from input and 
read them done by filebasedsource.ReadAllFiles transform. Because there is a 
Reshuffle in ReadAll, the actual read will be blocked.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to