Thanks Jan. Two points: - I was running all the experiments I reported with `--targetParallelism=1` to make sure concurrent threads do not mess up the logs. - I have been tracking bundles too (see @StartBundle log messages in the mini-example in my previous reply to Kenn).
So I don't think bundles alone describe what I see. In the mini-example, processing of INPUT bundles and NEXT bundles are interleaved, e.g., 3 INPUT bundles are processed, then the output of those go through NEXT, then a few other INPUT bundles and so on. Now, if we go back to my original example with S1->S2A->GBK->S2B->S3, the input to S2B also has many bundles. However in this case *all* of those bundles are processed first, then they all go through the next stages, e.g., the logging S2B' that I mentioned. So there is no interleaving of log messages. Regards, -B On Mon, May 17, 2021 at 3:50 AM Jan Lukavský <je...@seznam.cz> wrote: > Hi Bashir, > > the behavior you describe should be expected. DirectRunner splits the > input work into bundles, processing each bundle might result in zero, one > or more new bundles. The executor executes the work associated with these > bundles, enqueuing new bundles into a queue, until there are no unprocessed > bundles left in the queue (that is, the work has been completely done). It > uses a fixed-size thread pool to consume and execute work associated with > these bundles (the size of which is defined by --targetParallelism), so > what happens is that the processing of bundles of "Sleep" transform and > "Next" transform are interleaved, but not due to fusion, but due to limited > parallelism. If you increase the parallelism beyond the total number of > bundles in your `lines` PCollection, then I think you would see the result > you expect. > > Best, > > Jan > On 5/12/21 7:35 PM, Bashir Sadjad wrote: > > Thanks Kenn. > > On Wed, May 12, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org> wrote: > >> >> On Sat, May 8, 2021 at 12:00 AM Bashir Sadjad <bas...@google.com> wrote: >> >>> However, if I add a dummy S2' after S2 (i.e., S1->S2->S2'->S3) which >>> only prints some log messages for each record and passes the record to >>> output, then it seems S2 and S2' are fused. Because the log messages are >>> interleaved with fetches. >>> >> >>> *Q1*: Does DirectRunner do any fusion optimization (e.g., like >>> DataflowRunner)? If not by default, is there any way to enable it? >>> >> >> The Java DirectRunner does not do any fusion optimization. There's no >> code to enable :-). It should affect performance only, not semantics. The >> DirectRunner is known to have poor performance, but mostly no one is >> working on speeding it up because it is really just for small-scale testing. >> > > Here is a minimal pipeline (with no windowing) that demonstrates what I > mean; maybe I am using the wrong terminology but when I run this pipeline > with DirectRunner (and with `--targetParallelism=1`) the `DEBUG INPUT` and > `DEBUG NEXT` messages are interleaved. While if there was no fusion, I > would have expected to see all `DEBUG INPUT` messages first and then all of > `DEBUG NEXT`: > > Pipeline pipeline = Pipeline.create(options); > PCollection<String> lines = > pipeline.apply(TextIO.read().from(options.getInputFile())); > > PCollection<String> linesDelayed = lines.apply("Sleep", ParDo.of(new > DoFn<String, String>() { > @StartBundle > public void startBundle() { > log.info("INPUT: Started a new bundle"); > } > @ProcessElement > public void ProcessElement(@Element String line, OutputReceiver<String> > out) throws InterruptedException { > log.info(String.format("DEBUG INPUT %s", line)); > Thread.sleep(3000); > out.output(line); > } > })); > > PCollection<String> linesDebug = linesDelayed.apply("Next", ParDo.of(new > DoFn<String, String>() { > @StartBundle > public void startBundle() { > log.info("NEXT: Started a new bundle"); > } > @ProcessElement > public void ProcessElement(@Element String line, OutputReceiver<String> > out) { > log.info(String.format("DEBUG NEXT %s", line)); > out.output(line); > } > })); > > > linesDebug.apply(TextIO.write().to(options.getOutputFile()).withNumShards(1)); > > PipelineResult result = pipeline.run(); > result.waitUntilFinish(); > > It seems that a few bundles are processed by `Sleep` transform then they > all go through `Next`. Again a few more bundles go through `Sleep` then > `Next` and so on. > > >> >> >> >>> The other issue is with triggers and creating panes. I have an extended >>> version of this pipeline where a simplified view of it is: >>> S1->S2A->GBK->S2B->S3 >>> >>> S1: Like before >>> S2A: Add a key to the output of S1 >>> GBK: Groups output of S2A to remove duplicate keys >>> S2B: Similar to S2 above, i.e., fetch deduped URLs and create Avro >>> records >>> S3: Same as before >>> >>> *Q2*: In this case, if I add a dummy S2B' after S2', the log messages >>> are *not* interleaved with resource fetches, i.e., no fusion is >>> happening. Why? What is different here? >>> >> >> I don't quite understand what the problem is here. >> > > The same log message interleaving does not happen in this case. So back to > my original example sketch, log messages of S2' are interleaved with S2 > (which I thought is because of fusion) but all of the log messages of S2B' > are printed after all messages of S2B. > > >> >> >> >>> *Q3*: Even if I add a similar trigger to the output of S2B, the Parquet >>> file generation does not start until all of the fetches are done. Again, >>> what is different here and why intermediate panes are not fired while the >>> output of S2B is being generated? >>> >> >> I think it would help to see how you have configured the ParquetIO write >> transform. >> > > I think this is related to the difference between the behaviour of the two > examples above (i.e., S2' vs. S2B'). If it turns out that is not the case, > I will create a minimal example including ParquetIO too. > > Thanks again > > -B > > >> >> Kenn >> >> >>> >>> Thanks >>> >>> -B >>> P.S. I need this pipeline to work both on a distributed runner and also >>> on a local machine with many cores. That's why the performance of >>> DirectRunner is important to me. >>> >>