Hi Bashir,

the behavior you describe should be expected. DirectRunner splits the input work into bundles, processing each bundle might result in zero, one or more new bundles. The executor executes the work associated with these bundles, enqueuing new bundles into a queue, until there are no unprocessed bundles left in the queue (that is, the work has been completely done). It uses a fixed-size thread pool to consume and execute work associated with these bundles (the size of which is defined by --targetParallelism), so what happens is that the processing of bundles of "Sleep" transform and "Next" transform are interleaved, but not due to fusion, but due to limited parallelism. If you increase the parallelism beyond the total number of bundles in your `lines` PCollection, then I think you would see the result you expect.

Best,

 Jan

On 5/12/21 7:35 PM, Bashir Sadjad wrote:
Thanks Kenn.

On Wed, May 12, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org <mailto:k...@apache.org>> wrote:


    On Sat, May 8, 2021 at 12:00 AM Bashir Sadjad <bas...@google.com
    <mailto:bas...@google.com>> wrote:

        However, if I add a dummy S2' after S2 (i.e., S1->S2->S2'->S3)
        which only prints some log messages for each record and passes
        the record to output, then it seems S2 and S2' are fused.
        Because the log messages are interleaved with fetches.


        *Q1*: Does DirectRunner do any fusion optimization (e.g., like
        DataflowRunner)? If not by default, is there any way to enable it?


    The Java DirectRunner does not do any fusion optimization. There's
    no code to enable :-). It should affect performance only, not
    semantics. The DirectRunner is known to have poor performance, but
    mostly no one is working on speeding it up because it is really
    just for small-scale testing.


Here is a minimal pipeline (with no windowing) that demonstrates what I mean; maybe I am using the wrong terminology but when I run this pipeline with DirectRunner (and with `--targetParallelism=1`) the `DEBUG INPUT` and `DEBUG NEXT` messages are interleaved. While if there was no fusion, I would have expected to see all `DEBUG INPUT` messages first and then all of `DEBUG NEXT`:

Pipeline pipeline = Pipeline.create(options);
PCollection<String> lines = pipeline.apply(TextIO.read().from(options.getInputFile()));

PCollection<String> linesDelayed = lines.apply("Sleep", ParDo.of(new DoFn<String, String>() {
  @StartBundle
  public void startBundle() {
log.info <http://log.info>("INPUT: Started a new bundle");
  }
  @ProcessElement
  public void ProcessElement(@Element String line, OutputReceiver<String> out) throws InterruptedException {
log.info <http://log.info>(String.format("DEBUG INPUT %s", line));
    Thread.sleep(3000);
    out.output(line);
  }
}));

PCollection<String> linesDebug = linesDelayed.apply("Next", ParDo.of(new DoFn<String, String>() {
  @StartBundle
  public void startBundle() {
log.info <http://log.info>("NEXT: Started a new bundle");
  }
  @ProcessElement
  public void ProcessElement(@Element String line, OutputReceiver<String> out) {
log.info <http://log.info>(String.format("DEBUG NEXT %s", line));
    out.output(line);
  }
}));

linesDebug.apply(TextIO.write().to(options.getOutputFile()).withNumShards(1));

PipelineResult result = pipeline.run();
result.waitUntilFinish();

It seems that a few bundles are processed by `Sleep` transform then they all go through `Next`. Again a few more bundles go through `Sleep` then `Next` and so on.

        The other issue is with triggers and creating panes. I have an
        extended version of this pipeline where a simplified view of
        it is: S1->S2A->GBK->S2B->S3

        S1: Like before
        S2A: Add a key to the output of S1
        GBK: Groups output of S2A to remove duplicate keys
        S2B: Similar to S2 above, i.e., fetch deduped URLs and create
        Avro records
        S3: Same as before

        *Q2*: In this case, if I add a dummy S2B' after S2', the log
        messages are /not/ interleaved with resource fetches, i.e., no
        fusion is happening. Why? What is different here?


    I don't quite understand what the problem is here.


The same log message interleaving does not happen in this case. So back to my original example sketch, log messages of S2' are interleaved with S2 (which I thought is because of fusion) but all of the log messages of S2B' are printed after all messages of S2B.

        *Q3*: Even if I add a similar trigger to the output of S2B,
        the Parquet file generation does not start until all of the
        fetches are done. Again, what is different here and why
        intermediate panes are not fired while the output of S2B is
        being generated?


    I think it would help to see how you have configured the ParquetIO
    write transform.


I think this is related to the difference between the behaviour of the two examples above (i.e., S2' vs. S2B'). If it turns out that is not the case, I will create a minimal example including ParquetIO too.

Thanks again

-B


    Kenn


        Thanks

        -B
        P.S. I need this pipeline to work both on a distributed runner
        and also on a local machine with many cores. That's why the
        performance of DirectRunner is important to me.

Reply via email to