> P.S. I need this pipeline to work both on a distributed runner and also on a local machine with many cores. That's why the performance of DirectRunner is important to me.
IIUC the DirectRunner has intentionally made some trade-offs to make it less performant, so that it better verifies pipelines under test. I wonder if there's another runner you can use that is more performant for local execution on a machine with many cores, like starting up a local Flink or Spark cluster? Brian On Mon, May 17, 2021 at 7:12 AM Jan Lukavský <je...@seznam.cz> wrote: > On 5/17/21 3:46 PM, Bashir Sadjad wrote: > > Thanks Jan. Two points: > > - I was running all the experiments I reported with > `--targetParallelism=1` to make sure concurrent threads do not mess up the > logs. > > I think that is what causes what you see. Try to increase the parallelism > to number higher than number of input bundles. > > - I have been tracking bundles too (see @StartBundle log messages in the > mini-example in my previous reply to Kenn). > > I see the code, but not the log output. My suspicion would be, that you > see "Start bundle" -> "Debug Input" OR "Debug NEXT", right? If yes, than > this is expected - processing of a bundle produces "output bundle", which > is queued into work queue and is then processed as soon as there is free > worker to work on it. Fetching new outputs produces new bundles, which are > also queued to this queue, which is what causes the interleave. > > > So I don't think bundles alone describe what I see. In the mini-example, > processing of INPUT bundles and NEXT bundles are interleaved, e.g., 3 INPUT > bundles are processed, then the output of those go through NEXT, then a few > other INPUT bundles and so on. > > Now, if we go back to my original example with S1->S2A->GBK->S2B->S3, the > input to S2B also has many bundles. However in this case *all* of those > bundles are processed first, then they all go through the next stages, > e.g., the logging S2B' that I mentioned. So there is no interleaving of log > messages. > > GBK is a stateful operation that has to wait for a trigger - in simple > batch case the trigger is the end of input, which is why you cannot see > outputs of GBK being interleaved with reading inputs. All inputs have had > to be read before GBK can proceed and output any bundle downstream. > > > Regards, > > -B > > On Mon, May 17, 2021 at 3:50 AM Jan Lukavský <je...@seznam.cz> wrote: > >> Hi Bashir, >> >> the behavior you describe should be expected. DirectRunner splits the >> input work into bundles, processing each bundle might result in zero, one >> or more new bundles. The executor executes the work associated with these >> bundles, enqueuing new bundles into a queue, until there are no unprocessed >> bundles left in the queue (that is, the work has been completely done). It >> uses a fixed-size thread pool to consume and execute work associated with >> these bundles (the size of which is defined by --targetParallelism), so >> what happens is that the processing of bundles of "Sleep" transform and >> "Next" transform are interleaved, but not due to fusion, but due to limited >> parallelism. If you increase the parallelism beyond the total number of >> bundles in your `lines` PCollection, then I think you would see the result >> you expect. >> >> Best, >> >> Jan >> On 5/12/21 7:35 PM, Bashir Sadjad wrote: >> >> Thanks Kenn. >> >> On Wed, May 12, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org> wrote: >> >>> >>> On Sat, May 8, 2021 at 12:00 AM Bashir Sadjad <bas...@google.com> wrote: >>> >>>> However, if I add a dummy S2' after S2 (i.e., S1->S2->S2'->S3) which >>>> only prints some log messages for each record and passes the record to >>>> output, then it seems S2 and S2' are fused. Because the log messages are >>>> interleaved with fetches. >>>> >>> >>>> *Q1*: Does DirectRunner do any fusion optimization (e.g., like >>>> DataflowRunner)? If not by default, is there any way to enable it? >>>> >>> >>> The Java DirectRunner does not do any fusion optimization. There's no >>> code to enable :-). It should affect performance only, not semantics. The >>> DirectRunner is known to have poor performance, but mostly no one is >>> working on speeding it up because it is really just for small-scale testing. >>> >> >> Here is a minimal pipeline (with no windowing) that demonstrates what I >> mean; maybe I am using the wrong terminology but when I run this pipeline >> with DirectRunner (and with `--targetParallelism=1`) the `DEBUG INPUT` and >> `DEBUG NEXT` messages are interleaved. While if there was no fusion, I >> would have expected to see all `DEBUG INPUT` messages first and then all of >> `DEBUG NEXT`: >> >> Pipeline pipeline = Pipeline.create(options); >> PCollection<String> lines = >> pipeline.apply(TextIO.read().from(options.getInputFile())); >> >> PCollection<String> linesDelayed = lines.apply("Sleep", ParDo.of(new >> DoFn<String, String>() { >> @StartBundle >> public void startBundle() { >> log.info("INPUT: Started a new bundle"); >> } >> @ProcessElement >> public void ProcessElement(@Element String line, OutputReceiver<String> >> out) throws InterruptedException { >> log.info(String.format("DEBUG INPUT %s", line)); >> Thread.sleep(3000); >> out.output(line); >> } >> })); >> >> PCollection<String> linesDebug = linesDelayed.apply("Next", ParDo.of(new >> DoFn<String, String>() { >> @StartBundle >> public void startBundle() { >> log.info("NEXT: Started a new bundle"); >> } >> @ProcessElement >> public void ProcessElement(@Element String line, OutputReceiver<String> >> out) { >> log.info(String.format("DEBUG NEXT %s", line)); >> out.output(line); >> } >> })); >> >> >> linesDebug.apply(TextIO.write().to(options.getOutputFile()).withNumShards(1)); >> >> PipelineResult result = pipeline.run(); >> result.waitUntilFinish(); >> >> It seems that a few bundles are processed by `Sleep` transform then they >> all go through `Next`. Again a few more bundles go through `Sleep` then >> `Next` and so on. >> >> >>> >>> >>> >>>> The other issue is with triggers and creating panes. I have an extended >>>> version of this pipeline where a simplified view of it is: >>>> S1->S2A->GBK->S2B->S3 >>>> >>>> S1: Like before >>>> S2A: Add a key to the output of S1 >>>> GBK: Groups output of S2A to remove duplicate keys >>>> S2B: Similar to S2 above, i.e., fetch deduped URLs and create Avro >>>> records >>>> S3: Same as before >>>> >>>> *Q2*: In this case, if I add a dummy S2B' after S2', the log messages >>>> are *not* interleaved with resource fetches, i.e., no fusion is >>>> happening. Why? What is different here? >>>> >>> >>> I don't quite understand what the problem is here. >>> >> >> The same log message interleaving does not happen in this case. So back >> to my original example sketch, log messages of S2' are interleaved with S2 >> (which I thought is because of fusion) but all of the log messages of S2B' >> are printed after all messages of S2B. >> >> >>> >>> >>> >>>> *Q3*: Even if I add a similar trigger to the output of S2B, the >>>> Parquet file generation does not start until all of the fetches are done. >>>> Again, what is different here and why intermediate panes are not fired >>>> while the output of S2B is being generated? >>>> >>> >>> I think it would help to see how you have configured the ParquetIO write >>> transform. >>> >> >> I think this is related to the difference between the behaviour of the >> two examples above (i.e., S2' vs. S2B'). If it turns out that is not the >> case, I will create a minimal example including ParquetIO too. >> >> Thanks again >> >> -B >> >> >>> >>> Kenn >>> >>> >>>> >>>> Thanks >>>> >>>> -B >>>> P.S. I need this pipeline to work both on a distributed runner and also >>>> on a local machine with many cores. That's why the performance of >>>> DirectRunner is important to me. >>>> >>>