[
https://issues.apache.org/jira/browse/BEAM-10617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Leiyi Zhang updated BEAM-10617:
-------------------------------
Description:
not only there are more than 1 result per window, results for each window got
duplicated as well.
here is some code I made to reproduce the issue, just run it with and without
{{*.with_fanout*}}
if running with Dataflow runner, add appropriate {{*gs://path/*}} in
{{*WriteToText*}}
{code:python}
import apache_beam as beam
from apache_beam.transforms import window
from apache_beam.utils.timestamp import Timestamp
class ListFn(beam.CombineFn):
def create_accumulator(self):
return []
def add_input(self, mutable_accumulator, element):
return mutable_accumulator + [element]
def merge_accumulators(self, accumulators):
res = []
for accu in accumulators:
res = res + accu
return res
def extract_output(self, accumulator):
return accumulator
p = beam.Pipeline()
(
p
| beam.Create([
window.TimestampedValue(1, Timestamp(seconds=1596216396)),
window.TimestampedValue(2, Timestamp(seconds=1596216397)),
window.TimestampedValue(3, Timestamp(seconds=1596216398)),
window.TimestampedValue(4, Timestamp(seconds=1596216399)),
window.TimestampedValue(5, Timestamp(seconds=1596216400)),
window.TimestampedValue(6, Timestamp(seconds=1596216402)),
window.TimestampedValue(7, Timestamp(seconds=1596216403)),
window.TimestampedValue(8, Timestamp(seconds=1596216405))])
| beam.WindowInto(window.SlidingWindows(10, 5))
| beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)
| beam.Map(repr)
| beam.io.WriteToText("py-test-result", file_name_suffix='.json',
num_shards=1))
p.run()
{code}
was:
not only there are more than 1 result per window, results for each window got
duplicated as well.
here is some code I made to reproduce the issue, just run it with and without
{{*.with_fanout*}}
{code:python}
import apache_beam as beam
from apache_beam.transforms import window
from apache_beam.utils.timestamp import Timestamp
class ListFn(beam.CombineFn):
def create_accumulator(self):
return []
def add_input(self, mutable_accumulator, element):
return mutable_accumulator + [element]
def merge_accumulators(self, accumulators):
res = []
for accu in accumulators:
res = res + accu
return res
def extract_output(self, accumulator):
return accumulator
p = beam.Pipeline()
(
p
| beam.Create([
window.TimestampedValue(1, Timestamp(seconds=1596216396)),
window.TimestampedValue(2, Timestamp(seconds=1596216397)),
window.TimestampedValue(3, Timestamp(seconds=1596216398)),
window.TimestampedValue(4, Timestamp(seconds=1596216399)),
window.TimestampedValue(5, Timestamp(seconds=1596216400)),
window.TimestampedValue(6, Timestamp(seconds=1596216402)),
window.TimestampedValue(7, Timestamp(seconds=1596216403)),
window.TimestampedValue(8, Timestamp(seconds=1596216405))])
| beam.WindowInto(window.SlidingWindows(10, 5))
| beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)
| beam.Map(repr)
| beam.io.WriteToText("py-test-result", file_name_suffix='.json',
num_shards=1))
p.run()
{code}
> python CombineGlobally().with_fanout() cause duplicate combine results for
> sliding windows
> ------------------------------------------------------------------------------------------
>
> Key: BEAM-10617
> URL: https://issues.apache.org/jira/browse/BEAM-10617
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow, runner-direct, sdk-py-core
> Reporter: Leiyi Zhang
> Priority: P2
>
> not only there are more than 1 result per window, results for each window got
> duplicated as well.
> here is some code I made to reproduce the issue, just run it with and without
> {{*.with_fanout*}}
> if running with Dataflow runner, add appropriate {{*gs://path/*}} in
> {{*WriteToText*}}
>
> {code:python}
> import apache_beam as beam
> from apache_beam.transforms import window
> from apache_beam.utils.timestamp import Timestamp
> class ListFn(beam.CombineFn):
> def create_accumulator(self):
> return []
> def add_input(self, mutable_accumulator, element):
> return mutable_accumulator + [element]
> def merge_accumulators(self, accumulators):
> res = []
> for accu in accumulators:
> res = res + accu
> return res
> def extract_output(self, accumulator):
> return accumulator
> p = beam.Pipeline()
> (
> p
> | beam.Create([
> window.TimestampedValue(1, Timestamp(seconds=1596216396)),
> window.TimestampedValue(2, Timestamp(seconds=1596216397)),
> window.TimestampedValue(3, Timestamp(seconds=1596216398)),
> window.TimestampedValue(4, Timestamp(seconds=1596216399)),
> window.TimestampedValue(5, Timestamp(seconds=1596216400)),
> window.TimestampedValue(6, Timestamp(seconds=1596216402)),
> window.TimestampedValue(7, Timestamp(seconds=1596216403)),
> window.TimestampedValue(8, Timestamp(seconds=1596216405))])
> | beam.WindowInto(window.SlidingWindows(10, 5))
> | beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)
> | beam.Map(repr)
> | beam.io.WriteToText("py-test-result", file_name_suffix='.json',
> num_shards=1))
> p.run()
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)