[ 
https://issues.apache.org/jira/browse/BEAM-10617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leiyi Zhang updated BEAM-10617:
-------------------------------
    Description: 
not only there are more than 1 result per window, results for each window got 
duplicated as well.

here is some code I made to reproduce the issue, just run it with and without 
{{*.with_fanout*}}

if running with Dataflow runner, add appropriate {{*gs://path/*}} in 
{{*WriteToText*}}

 
{code:python}
import apache_beam as beam
from apache_beam.transforms import window
from apache_beam.utils.timestamp import Timestamp

class ListFn(beam.CombineFn):
  def create_accumulator(self):
    return []

  def add_input(self, mutable_accumulator, element):
    return mutable_accumulator + [element]

  def merge_accumulators(self, accumulators):
    res = []
    for accu in accumulators:
      res = res + accu
    return res

  def extract_output(self, accumulator):
    return accumulator


p = beam.Pipeline()

(
    p
    | beam.Create([
      window.TimestampedValue(1, Timestamp(seconds=1596216396)),
      window.TimestampedValue(2, Timestamp(seconds=1596216397)),
      window.TimestampedValue(3, Timestamp(seconds=1596216398)),
      window.TimestampedValue(4, Timestamp(seconds=1596216399)),
      window.TimestampedValue(5, Timestamp(seconds=1596216400)),
      window.TimestampedValue(6, Timestamp(seconds=1596216402)),
      window.TimestampedValue(7, Timestamp(seconds=1596216403)),
      window.TimestampedValue(8, Timestamp(seconds=1596216405))])
    | beam.WindowInto(window.SlidingWindows(10, 5))
    | beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)
    | beam.Map(repr)
    | beam.io.WriteToText("py-test-result", file_name_suffix='.json', 
num_shards=1))

p.run()
{code}
 

  was:
not only there are more than 1 result per window, results for each window got 
duplicated as well.

here is some code I made to reproduce the issue, just run it with and without 
{{*.with_fanout*}}

 
{code:python}
import apache_beam as beam
from apache_beam.transforms import window
from apache_beam.utils.timestamp import Timestamp

class ListFn(beam.CombineFn):
  def create_accumulator(self):
    return []

  def add_input(self, mutable_accumulator, element):
    return mutable_accumulator + [element]

  def merge_accumulators(self, accumulators):
    res = []
    for accu in accumulators:
      res = res + accu
    return res

  def extract_output(self, accumulator):
    return accumulator


p = beam.Pipeline()

(
    p
    | beam.Create([
      window.TimestampedValue(1, Timestamp(seconds=1596216396)),
      window.TimestampedValue(2, Timestamp(seconds=1596216397)),
      window.TimestampedValue(3, Timestamp(seconds=1596216398)),
      window.TimestampedValue(4, Timestamp(seconds=1596216399)),
      window.TimestampedValue(5, Timestamp(seconds=1596216400)),
      window.TimestampedValue(6, Timestamp(seconds=1596216402)),
      window.TimestampedValue(7, Timestamp(seconds=1596216403)),
      window.TimestampedValue(8, Timestamp(seconds=1596216405))])
    | beam.WindowInto(window.SlidingWindows(10, 5))
    | beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)
    | beam.Map(repr)
    | beam.io.WriteToText("py-test-result", file_name_suffix='.json', 
num_shards=1))

p.run()
{code}
 


> python CombineGlobally().with_fanout() cause duplicate combine results for 
> sliding windows
> ------------------------------------------------------------------------------------------
>
>                 Key: BEAM-10617
>                 URL: https://issues.apache.org/jira/browse/BEAM-10617
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow, runner-direct, sdk-py-core
>            Reporter: Leiyi Zhang
>            Priority: P2
>
> not only there are more than 1 result per window, results for each window got 
> duplicated as well.
> here is some code I made to reproduce the issue, just run it with and without 
> {{*.with_fanout*}}
> if running with Dataflow runner, add appropriate {{*gs://path/*}} in 
> {{*WriteToText*}}
>  
> {code:python}
> import apache_beam as beam
> from apache_beam.transforms import window
> from apache_beam.utils.timestamp import Timestamp
> class ListFn(beam.CombineFn):
>   def create_accumulator(self):
>     return []
>   def add_input(self, mutable_accumulator, element):
>     return mutable_accumulator + [element]
>   def merge_accumulators(self, accumulators):
>     res = []
>     for accu in accumulators:
>       res = res + accu
>     return res
>   def extract_output(self, accumulator):
>     return accumulator
> p = beam.Pipeline()
> (
>     p
>     | beam.Create([
>       window.TimestampedValue(1, Timestamp(seconds=1596216396)),
>       window.TimestampedValue(2, Timestamp(seconds=1596216397)),
>       window.TimestampedValue(3, Timestamp(seconds=1596216398)),
>       window.TimestampedValue(4, Timestamp(seconds=1596216399)),
>       window.TimestampedValue(5, Timestamp(seconds=1596216400)),
>       window.TimestampedValue(6, Timestamp(seconds=1596216402)),
>       window.TimestampedValue(7, Timestamp(seconds=1596216403)),
>       window.TimestampedValue(8, Timestamp(seconds=1596216405))])
>     | beam.WindowInto(window.SlidingWindows(10, 5))
>     | beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)
>     | beam.Map(repr)
>     | beam.io.WriteToText("py-test-result", file_name_suffix='.json', 
> num_shards=1))
> p.run()
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to