Great idea! On Fri, 24 Apr 2020, 22:33 Ismaël Mejía, <ieme...@gmail.com> wrote:
> Sounds like a good addition to the Beam patterns page Reza :) > > On Fri, Apr 24, 2020 at 3:22 AM Aniruddh Sharma <asharma...@gmail.com> > wrote: > > > > Thanks Robert, > > > > This is a life saver and its a great help :). It works like a charm. > > > > Thanks > > Aniruddh > > > > On Thu, Apr 23, 2020 at 4:45 PM Robert Bradshaw <rober...@google.com> > wrote: > >> > >> I may have misinterpreted your email, I thought you didn't have a need > for keys at all. If this is actually the case, you don't need a GroupByKey, > just have your DoFn take Rows as input, and emit List<Row> as output. That > is, it's a DoFn<Row, List<Row>>. > >> > >> You can buffer multiple Rows in an instance variable between process > element calls. For example, > >> > >> class MyBufferingDoFn<T, List<T>> { > >> List<T> buffer = new ArrayList<>(); > >> @ProcessElement public void processElement(T elt, > OutputReceiver<List<T>> out) { > >> buffer.append(out); > >> if (buffer.size() > 100) { > >> out.output(buffer); > >> buffer = new ArrayList<>(); > >> } > >> } > >> @FinishBundle public void finishBundle(OutputReceiver<List<T>> out) { > >> out.output(buffer); > >> buffer = new ArrayList<>(); > >> } > >> } > >> > >> See > https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/ParDo.html > for more information on the lifetime of DoFns. > >> > >> As for why your GBK is taking so long, yes, this can be a bottleneck. > However, it should be noted that Dataflow (like most other runners) > executes this step in conjunction with other steps as part of a "fused > stage." So if your pipeline looks like > >> > >> Read -> DoFnA -> GBK -> DoFnB -> Write > >> > >> then Read, DoFnA, and GBK[part1] will execute concurrently (all > starting up almost immediately), one element at at time, and when that's > finished, GBK[part2, DoFnB, Write will execute concurrently, one element at > a time, so you can't just look at the last unfinished stage to determine > where the bottleneck is. (One helpful tool, however, is looking at the > amount of time spent on each step in the UI.) > >> > >> Hopefully that helps. > >> > >> - Robert > >> > >> > >> On Thu, Apr 23, 2020 at 12:43 PM Aniruddh Sharma <asharma...@gmail.com> > wrote: > >>> > >>> Thanks Robert and Luke > >>> > >>> This approach seems good to me. I am trying that , i have to include a > GroupBy to make Iterable<Rows> available to do ParDo function to do same. > Now GroupBy is a bottleneck, its working for last 2 hours and proceed only > 40 GB data (still waiting for rest of 100's of GB of data). > >>> > >>> Currently I used GroupByKey.Create() > >>> > >>> What's recommended way to use what key to make it execute faster like > same key for all rows, vs different key for each row vs same row for a > group of keys. > >>> > >>> Thanks > >>> Aniruddh > >>> > >>> On Thu, Apr 23, 2020 at 12:47 PM Luke Cwik <lc...@google.com> wrote: > >>>> > >>>> As Robert suggested, what prevents you from doing: > >>>> ReadFromBQ -> ParDo(BatchInMemory) -> DLP > >>>> where BatchInMemory stores elements in the @ProcessElement method in > an in memory list and produce output every time the list is large enough > with a final output in the @FinishBundle method? > >>>> > >>>> On Thu, Apr 23, 2020 at 9:42 AM Aniruddh Sharma <asharma...@gmail.com> > wrote: > >>>>> > >>>>> Hi Luke > >>>>> > >>>>> Sorry forgot to mention the functions. Dataflow adds following > function and ["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is > super slow, How to choose keys to make it faster ? > >>>>> > >>>>> .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>())) > >>>>> .setCoder( > >>>>> KvCoder.of( > >>>>> keyCoder, > >>>>> KvCoder.of(InstantCoder.of(), > WindowedValue.getFullCoder(kvCoder, windowCoder)))) > >>>>> > >>>>> // Group by key and sort by timestamp, dropping windows as > they are reified > >>>>> .apply("PartitionKeys", new > GroupByKeyAndSortValuesOnly<>()) > >>>>> > >>>>> // The GBKO sets the windowing strategy to the global > default > >>>>> .setWindowingStrategyInternal(inputWindowingStrategy); > >>>>> > >>>>> THanks > >>>>> ANiruddh > >>>>> > >>>>> On 2020/04/23 16:35:58, Aniruddh Sharma <asharma...@gmail.com> > wrote: > >>>>> > Thanks Luke for your response. > >>>>> > > >>>>> > My use case is following. > >>>>> > a) I read data from BQ (TableRow) > >>>>> > b) Convert it into (Table.Row) for DLP calls. > >>>>> > c) have to batch Table.Row collection up to a max size of 512 KB > (i.e fit may rows from BQ into a single DLP table) and call DLP. > >>>>> > > >>>>> > Functionally, I don't have a need of key and window. As I just > want to fit rows in DLP table up to a max size. > >>>>> > > >>>>> > In batch mode, when I call StateFulAPI, > >>>>> > it adds a > "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" step and this > step is super slow. Like it is running on 50 node cluster for 800 GB data > for last 10 hours. > >>>>> > > >>>>> > This step is not added when I call Dataflow in streaming mode. But > I can't call it in Streaming mode for other reasons. > >>>>> > > >>>>> > So I am trying to understand following > >>>>> > a) Either I give a hint somehow to Dataflow runner not to add this > step "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" at all, > then I don't have any issues. > >>>>> > b) if it adds this step, then how should I choose my ARTIFICIALLY > created keys that step can execute as fast as possible. It does a SORT by > on timestamps on records. As I don't have any functional key requirement, > shall I choose same keys for all rows vs randomkey for some rows vs random > key for each row; what timestamps shall I add same for all rows ? to make > this function work faster. > >>>>> > > >>>>> > Thanks > >>>>> > Aniruddh > >>>>> > > >>>>> > On 2020/04/23 16:15:44, Luke Cwik <lc...@google.com> wrote: > >>>>> > > Stateful & timely operations are always per key and window which > is the > >>>>> > > GbkBeforeStatefulParDo is being added. Do you not need your > stateful & > >>>>> > > timely operation to be done per key and window, if so can you > explain > >>>>> > > further? > >>>>> > > > >>>>> > > On Thu, Apr 23, 2020 at 6:29 AM Aniruddh Sharma < > asharma...@gmail.com> > >>>>> > > wrote: > >>>>> > > > >>>>> > > > Hi Kenn > >>>>> > > > > >>>>> > > > Thanks for your guidance, I understand that batch mode waits > for previous > >>>>> > > > stage. But the real issue in this particular case is not only > this. > >>>>> > > > > >>>>> > > > Dataflow runner adds a step automatically > >>>>> > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which not > only waits > >>>>> > > > for previous stage but it waits for a very very very long > time. Is there a > >>>>> > > > way to give hint to Dataflow runner not to add this step, as > in my case I > >>>>> > > > functionally do not require this step. > >>>>> > > > > >>>>> > > > Thanks for your suggestion, will create another thread to > understand BQ > >>>>> > > > options > >>>>> > > > > >>>>> > > > Thanks > >>>>> > > > Aniruddh > >>>>> > > > > >>>>> > > > On 2020/04/23 03:51:31, Kenneth Knowles <k...@apache.org> > wrote: > >>>>> > > > > The definition of batch mode for Dataflow is this: > completely compute the > >>>>> > > > > result of one stage of computation before starting the next > stage. There > >>>>> > > > is > >>>>> > > > > no way around this. It does not have to do with using state > and timers. > >>>>> > > > > > >>>>> > > > > If you are working with state & timers & triggers, and you > are hoping for > >>>>> > > > > output before the pipeline is completely terminated, then > you most likely > >>>>> > > > > want streaming mode. Perhaps it is best to investigate the > BQ read > >>>>> > > > > performance issue. > >>>>> > > > > > >>>>> > > > > Kenn > >>>>> > > > > > >>>>> > > > > On Wed, Apr 22, 2020 at 4:04 PM Aniruddh Sharma < > asharma...@gmail.com> > >>>>> > > > > wrote: > >>>>> > > > > > >>>>> > > > > > Hi > >>>>> > > > > > > >>>>> > > > > > I am reading a bounded collection from BQ. > >>>>> > > > > > > >>>>> > > > > > I have to use a Stateful & Timely operation. > >>>>> > > > > > > >>>>> > > > > > a) I am invoking job in batch mode. Dataflow runner adds a > step > >>>>> > > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which > has > >>>>> > > > partitionBy. > >>>>> > > > > > This partitionBy waits for all the data to come and > becomes a > >>>>> > > > bottleneck. > >>>>> > > > > > when I read about its documentation it seems its objective > it to be > >>>>> > > > added > >>>>> > > > > > when there are no windows. > >>>>> > > > > > > >>>>> > > > > > I tried added windows and triggering them before stateful > step, but > >>>>> > > > > > everything comes to this partitionBy step and waits till > all data is > >>>>> > > > here. > >>>>> > > > > > > >>>>> > > > > > Is there a way to write code in some way (like window etc) > or give > >>>>> > > > > > Dataflow a hint not to add this step in. > >>>>> > > > > > > >>>>> > > > > > b) I dont want to call this job in streaming mode, When I > call in > >>>>> > > > > > streaming mode, this Dataflow runner does not add this > step, but in > >>>>> > > > > > Streaming BQ read becomes a bottleneck. > >>>>> > > > > > > >>>>> > > > > > So either I have to solve how I read BQ faster if I call > job in > >>>>> > > > Streaming > >>>>> > > > > > mode or How I bypass this partitionBy from > >>>>> > > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" if I > invoke job in > >>>>> > > > > > batch mode ? > >>>>> > > > > > > >>>>> > > > > > Thanks > >>>>> > > > > > Aniruddh > >>>>> > > > > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> > > > > > >>>>> > > > > >>>>> > > > >>>>> > >