Thanks! I've created BEAM-5392 <https://issues.apache.org/jira/browse/BEAM-5392> to track the issue.
On Fri, Sep 14, 2018 at 4:46 PM Robert Bradshaw <[email protected]> wrote: > On Fri, Sep 14, 2018 at 4:22 PM David Morávek <[email protected]> > wrote: > >> Hello Robert, >> >> thanks for the answer! Spark allows us to sort the single partition >> (after repartition), by user provided comparator, so it is definitely >> possible to do secondary sort by timestamp. The "more intelligent >> ReduceFnRunner" you are talking about, is it part of Beam codebase already >> (I guess it would lower the contribution investment, if we'd try to fix >> this)? >> > > No, but it is part of the dataflow worker codebase we're trying to donate > (being discussed on the other thread on this very list today), so hopefully > soon. > > This would definitely work for our use case (we used exact same approach >> in our custom SparkRunner). >> >> Although, there is one think to consider. This approach would solve the >> scaling issue, but it would be probably less effective for the smaller >> scale. I think this could be solved by providing support for multiple >> translators for a single operator and let user "hint" the translation layer >> to decide which one to use. What do you think? >> > > If at all possible, I generally prefer to avoid providing hints like this > that the user needs to use to get decent performance as it simply doesn't > scale (in many directions). Fortunately in this case, though you have to be > a bit more careful about things, it is not less efficient. > > On Fri, Sep 14, 2018 at 4:10 PM Robert Bradshaw <[email protected]> >> wrote: >> >>> >>> If Spark supports producing grouped elements in timestamp order, a more >>> intelligent ReduceFnRunner can be used. (We take advantage of that in >>> Dataflow for example.) >>> >>> For non-merging windows, you could also put the window itself (or some >>> subset thereof) into the key resulting in smaller groupings. I'm not sure I >>> understand your output requirements enough to know if this would work. >>> >>> On Fri, Sep 14, 2018 at 3:28 PM David Morávek <[email protected]> >>> wrote: >>> >>>> Hello, >>>> >>>> currently, we are trying to move one of our large scale batch jobs >>>> (~100TB inputs) from our Euphoria <http://github.com/seznam/euphoria> >>>> based SparkRunner to Beam's Spark runner and we came across the following >>>> issue. >>>> >>>> Because we rely on hadoop ecosystem, we need to group outputs by >>>> TaskAttemptID, in order to use OutputFormats based on FileOutputFormat. >>>> >>>> We do this by using *GroupByKey*, but we came across the known >>>> problem, that all values for any single key need to fit in-memory at once. >>>> >>>> I did a quick research and I think that following needs to be addressed: >>>> a) We can not use Spark's *groupByKey*, because it requires all values >>>> to fit in memory for a single key (it is implemented as "list combiner") >>>> b) *ReduceFnRunner* iterates over values multiple times in order to >>>> group also by window >>>> >>> >>>> In Euphoria based runner, we solved this for *non-merging* windowing >>>> by using Spark's *repartitionAndSortWithinPartitions*, where we sorted >>>> output by key and window, so the output could be processed sequentially. >>>> >>>> Did anyone run into the same issue? Is there currently any workaround >>>> for this? How should we approach this? >>>> >>>> Thanks, >>>> David >>>> >>>>
