In general for others who may read the thread... If there is no natural key in your data and you would like to make use of state and timers. Then a simple pattern to use is :
WithKeys x - > randomint(1000) This allows the work to go parallel. On Fri, 24 Apr 2020, 04:45 Robert Bradshaw, <rober...@google.com> wrote: > I may have misinterpreted your email, I thought you didn't have a need for > keys at all. If this is actually the case, you don't need a GroupByKey, > just have your DoFn take Rows as input, and emit List<Row> as output. That > is, it's a DoFn<Row, List<Row>>. > > You can buffer multiple Rows in an instance variable between process > element calls. For example, > > class MyBufferingDoFn<T, List<T>> { > List<T> buffer = new ArrayList<>(); > @ProcessElement public void processElement(T elt, > OutputReceiver<List<T>> out) { > buffer.append(out); > if (buffer.size() > 100) { > out.output(buffer); > buffer = new ArrayList<>(); > } > } > @FinishBundle public void finishBundle(OutputReceiver<List<T>> out) { > out.output(buffer); > buffer = new ArrayList<>(); > } > } > > See > https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/ParDo.html > for > more information on the lifetime of DoFns. > > As for why your GBK is taking so long, yes, this can be a bottleneck. > However, it should be noted that Dataflow (like most other runners) > executes this step in conjunction with other steps as part of a "fused > stage." So if your pipeline looks like > > Read -> DoFnA -> GBK -> DoFnB -> Write > > then Read, DoFnA, and GBK[part1] will execute concurrently (all starting > up almost immediately), one element at at time, and when that's finished, > GBK[part2, DoFnB, Write will execute concurrently, one element at a time, > so you can't just look at the last unfinished stage to determine where the > bottleneck is. (One helpful tool, however, is looking at the amount of time > spent on each step in the UI.) > > Hopefully that helps. > > - Robert > > > On Thu, Apr 23, 2020 at 12:43 PM Aniruddh Sharma <asharma...@gmail.com> > wrote: > >> Thanks Robert and Luke >> >> This approach seems good to me. I am trying that , i have to include a >> GroupBy to make Iterable<Rows> available to do ParDo function to do same. >> Now GroupBy is a bottleneck, its working for last 2 hours and proceed only >> 40 GB data (still waiting for rest of 100's of GB of data). >> >> Currently I used GroupByKey.Create() >> >> What's recommended way to use what key to make it execute faster like >> same key for all rows, vs different key for each row vs same row for a >> group of keys. >> >> Thanks >> Aniruddh >> >> On Thu, Apr 23, 2020 at 12:47 PM Luke Cwik <lc...@google.com> wrote: >> >>> As Robert suggested, what prevents you from doing: >>> ReadFromBQ -> ParDo(BatchInMemory) -> DLP >>> where BatchInMemory stores elements in the @ProcessElement method in an >>> in memory list and produce output every time the list is large enough with >>> a final output in the @FinishBundle method? >>> >>> On Thu, Apr 23, 2020 at 9:42 AM Aniruddh Sharma <asharma...@gmail.com> >>> wrote: >>> >>>> Hi Luke >>>> >>>> Sorry forgot to mention the functions. Dataflow adds following function >>>> and ["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is super slow, >>>> How to choose keys to make it faster ? >>>> >>>> .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>())) >>>> .setCoder( >>>> KvCoder.of( >>>> keyCoder, >>>> KvCoder.of(InstantCoder.of(), >>>> WindowedValue.getFullCoder(kvCoder, windowCoder)))) >>>> >>>> // Group by key and sort by timestamp, dropping windows as >>>> they are reified >>>> .apply("PartitionKeys", new GroupByKeyAndSortValuesOnly<>()) >>>> >>>> // The GBKO sets the windowing strategy to the global default >>>> .setWindowingStrategyInternal(inputWindowingStrategy); >>>> >>>> THanks >>>> ANiruddh >>>> >>>> On 2020/04/23 16:35:58, Aniruddh Sharma <asharma...@gmail.com> wrote: >>>> > Thanks Luke for your response. >>>> > >>>> > My use case is following. >>>> > a) I read data from BQ (TableRow) >>>> > b) Convert it into (Table.Row) for DLP calls. >>>> > c) have to batch Table.Row collection up to a max size of 512 KB (i.e >>>> fit may rows from BQ into a single DLP table) and call DLP. >>>> > >>>> > Functionally, I don't have a need of key and window. As I just want >>>> to fit rows in DLP table up to a max size. >>>> > >>>> > In batch mode, when I call StateFulAPI, >>>> > it adds a "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" >>>> step and this step is super slow. Like it is running on 50 node cluster for >>>> 800 GB data for last 10 hours. >>>> > >>>> > This step is not added when I call Dataflow in streaming mode. But I >>>> can't call it in Streaming mode for other reasons. >>>> > >>>> > So I am trying to understand following >>>> > a) Either I give a hint somehow to Dataflow runner not to add this >>>> step "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" at all, >>>> then I don't have any issues. >>>> > b) if it adds this step, then how should I choose my ARTIFICIALLY >>>> created keys that step can execute as fast as possible. It does a SORT by >>>> on timestamps on records. As I don't have any functional key requirement, >>>> shall I choose same keys for all rows vs randomkey for some rows vs random >>>> key for each row; what timestamps shall I add same for all rows ? to make >>>> this function work faster. >>>> > >>>> > Thanks >>>> > Aniruddh >>>> > >>>> > On 2020/04/23 16:15:44, Luke Cwik <lc...@google.com> wrote: >>>> > > Stateful & timely operations are always per key and window which is >>>> the >>>> > > GbkBeforeStatefulParDo is being added. Do you not need your >>>> stateful & >>>> > > timely operation to be done per key and window, if so can you >>>> explain >>>> > > further? >>>> > > >>>> > > On Thu, Apr 23, 2020 at 6:29 AM Aniruddh Sharma < >>>> asharma...@gmail.com> >>>> > > wrote: >>>> > > >>>> > > > Hi Kenn >>>> > > > >>>> > > > Thanks for your guidance, I understand that batch mode waits for >>>> previous >>>> > > > stage. But the real issue in this particular case is not only >>>> this. >>>> > > > >>>> > > > Dataflow runner adds a step automatically >>>> > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which not >>>> only waits >>>> > > > for previous stage but it waits for a very very very long time. >>>> Is there a >>>> > > > way to give hint to Dataflow runner not to add this step, as in >>>> my case I >>>> > > > functionally do not require this step. >>>> > > > >>>> > > > Thanks for your suggestion, will create another thread to >>>> understand BQ >>>> > > > options >>>> > > > >>>> > > > Thanks >>>> > > > Aniruddh >>>> > > > >>>> > > > On 2020/04/23 03:51:31, Kenneth Knowles <k...@apache.org> wrote: >>>> > > > > The definition of batch mode for Dataflow is this: completely >>>> compute the >>>> > > > > result of one stage of computation before starting the next >>>> stage. There >>>> > > > is >>>> > > > > no way around this. It does not have to do with using state and >>>> timers. >>>> > > > > >>>> > > > > If you are working with state & timers & triggers, and you are >>>> hoping for >>>> > > > > output before the pipeline is completely terminated, then you >>>> most likely >>>> > > > > want streaming mode. Perhaps it is best to investigate the BQ >>>> read >>>> > > > > performance issue. >>>> > > > > >>>> > > > > Kenn >>>> > > > > >>>> > > > > On Wed, Apr 22, 2020 at 4:04 PM Aniruddh Sharma < >>>> asharma...@gmail.com> >>>> > > > > wrote: >>>> > > > > >>>> > > > > > Hi >>>> > > > > > >>>> > > > > > I am reading a bounded collection from BQ. >>>> > > > > > >>>> > > > > > I have to use a Stateful & Timely operation. >>>> > > > > > >>>> > > > > > a) I am invoking job in batch mode. Dataflow runner adds a >>>> step >>>> > > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which has >>>> > > > partitionBy. >>>> > > > > > This partitionBy waits for all the data to come and becomes a >>>> > > > bottleneck. >>>> > > > > > when I read about its documentation it seems its objective it >>>> to be >>>> > > > added >>>> > > > > > when there are no windows. >>>> > > > > > >>>> > > > > > I tried added windows and triggering them before stateful >>>> step, but >>>> > > > > > everything comes to this partitionBy step and waits till all >>>> data is >>>> > > > here. >>>> > > > > > >>>> > > > > > Is there a way to write code in some way (like window etc) or >>>> give >>>> > > > > > Dataflow a hint not to add this step in. >>>> > > > > > >>>> > > > > > b) I dont want to call this job in streaming mode, When I >>>> call in >>>> > > > > > streaming mode, this Dataflow runner does not add this step, >>>> but in >>>> > > > > > Streaming BQ read becomes a bottleneck. >>>> > > > > > >>>> > > > > > So either I have to solve how I read BQ faster if I call job >>>> in >>>> > > > Streaming >>>> > > > > > mode or How I bypass this partitionBy from >>>> > > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" if I >>>> invoke job in >>>> > > > > > batch mode ? >>>> > > > > > >>>> > > > > > Thanks >>>> > > > > > Aniruddh >>>> > > > > > >>>> > > > > > >>>> > > > > > >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> >>>