On Sat, May 8, 2021 at 12:00 AM Bashir Sadjad <bas...@google.com> wrote:
> Hi Beam-users, > > *TL;DR;* I wonder if DirectRunner does any fusion optimization > <https://beam.apache.org/contribute/ptransform-style-guide/#performance> > and whether this has any impact on triggers/panes? > > *Details* (the context for everything below is *DirectRunner* and this is > a *batch* job): > I have a batch pipeline that roughly looks like this: S1->S2->S3 > > S1: Create URLs (from DB) > S2: Fetch those URLs (output of S1) and create Avro records > S3: Write those records to Parquet files > > S2 and S3 can be fused to generate Parquet files while the records are > fetched/created. However, it does not seem to be the case, because there is > no [temp] file while the resources are being fetched and the writer log > messages appear only after all fetches are done. > > If I add a trigger to the output PCollection of S2 (i.e., `records` > below), then I get intermediate Parquet output: > ``` > records.apply(Window.<T> into(new GlobalWindows()) > .triggering(Repeatedly.forever( > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5)))) > .discardingFiredPanes()); > ``` > > However, if I add a dummy S2' after S2 (i.e., S1->S2->S2'->S3) which only > prints some log messages for each record and passes the record to output, > then it seems S2 and S2' are fused. Because the log messages are > interleaved with fetches. > > *Q1*: Does DirectRunner do any fusion optimization (e.g., like > DataflowRunner)? If not by default, is there any way to enable it? > The Java DirectRunner does not do any fusion optimization. There's no code to enable :-). It should affect performance only, not semantics. The DirectRunner is known to have poor performance, but mostly no one is working on speeding it up because it is really just for small-scale testing. > The other issue is with triggers and creating panes. I have an extended > version of this pipeline where a simplified view of it is: > S1->S2A->GBK->S2B->S3 > > S1: Like before > S2A: Add a key to the output of S1 > GBK: Groups output of S2A to remove duplicate keys > S2B: Similar to S2 above, i.e., fetch deduped URLs and create Avro records > S3: Same as before > > *Q2*: In this case, if I add a dummy S2B' after S2', the log messages are > *not* interleaved with resource fetches, i.e., no fusion is happening. > Why? What is different here? > I don't quite understand what the problem is here. > *Q3*: Even if I add a similar trigger to the output of S2B, the Parquet > file generation does not start until all of the fetches are done. Again, > what is different here and why intermediate panes are not fired while the > output of S2B is being generated? > I think it would help to see how you have configured the ParquetIO write transform. Kenn > > Thanks > > -B > P.S. I need this pipeline to work both on a distributed runner and also on > a local machine with many cores. That's why the performance of DirectRunner > is important to me. >