Thanks Jan. Two points:

- I was running all the experiments I reported with `--targetParallelism=1`
to make sure concurrent threads do not mess up the logs.
- I have been tracking bundles too (see @StartBundle log messages in the
mini-example in my previous reply to Kenn).

So I don't think bundles alone describe what I see. In the mini-example,
processing of INPUT bundles and NEXT bundles are interleaved, e.g., 3 INPUT
bundles are processed, then the output of those go through NEXT, then a few
other INPUT bundles and so on.

Now, if we go back to my original example with S1->S2A->GBK->S2B->S3, the
input to S2B also has many bundles. However in this case *all* of those
bundles are processed first, then they all go through the next stages,
e.g., the logging S2B' that I mentioned. So there is no interleaving of log
messages.

Regards,

-B

On Mon, May 17, 2021 at 3:50 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Bashir,
>
> the behavior you describe should be expected. DirectRunner splits the
> input work into bundles, processing each bundle might result in zero, one
> or more new bundles. The executor executes the work associated with these
> bundles, enqueuing new bundles into a queue, until there are no unprocessed
> bundles left in the queue (that is, the work has been completely done). It
> uses a fixed-size thread pool to consume and execute work associated with
> these bundles (the size of which is defined by --targetParallelism), so
> what happens is that the processing of bundles of "Sleep" transform and
> "Next" transform are interleaved, but not due to fusion, but due to limited
> parallelism. If you increase the parallelism beyond the total number of
> bundles in your `lines` PCollection, then I think you would see the result
> you expect.
>
> Best,
>
>  Jan
> On 5/12/21 7:35 PM, Bashir Sadjad wrote:
>
> Thanks Kenn.
>
> On Wed, May 12, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org> wrote:
>
>>
>> On Sat, May 8, 2021 at 12:00 AM Bashir Sadjad <bas...@google.com> wrote:
>>
>>> However, if I add a dummy S2' after S2 (i.e., S1->S2->S2'->S3) which
>>> only prints some log messages for each record and passes the record to
>>> output, then it seems S2 and S2' are fused. Because the log messages are
>>> interleaved with fetches.
>>>
>>
>>> *Q1*: Does DirectRunner do any fusion optimization (e.g., like
>>> DataflowRunner)? If not by default, is there any way to enable it?
>>>
>>
>> The Java DirectRunner does not do any fusion optimization. There's no
>> code to enable :-). It should affect performance only, not semantics. The
>> DirectRunner is known to have poor performance, but mostly no one is
>> working on speeding it up because it is really just for small-scale testing.
>>
>
> Here is a minimal pipeline (with no windowing) that demonstrates what I
> mean; maybe I am using the wrong terminology but when I run this pipeline
> with DirectRunner (and with `--targetParallelism=1`) the `DEBUG INPUT` and
> `DEBUG NEXT` messages are interleaved. While if there was no fusion, I
> would have expected to see all `DEBUG INPUT` messages first and then all of
> `DEBUG NEXT`:
>
> Pipeline pipeline = Pipeline.create(options);
> PCollection<String> lines =
> pipeline.apply(TextIO.read().from(options.getInputFile()));
>
> PCollection<String> linesDelayed = lines.apply("Sleep", ParDo.of(new
> DoFn<String, String>() {
>   @StartBundle
>   public void startBundle() {
>     log.info("INPUT: Started a new bundle");
>   }
>   @ProcessElement
>   public void ProcessElement(@Element String line, OutputReceiver<String>
> out) throws InterruptedException {
>     log.info(String.format("DEBUG INPUT %s", line));
>     Thread.sleep(3000);
>     out.output(line);
>   }
> }));
>
> PCollection<String> linesDebug = linesDelayed.apply("Next", ParDo.of(new
> DoFn<String, String>() {
>   @StartBundle
>   public void startBundle() {
>     log.info("NEXT: Started a new bundle");
>   }
>   @ProcessElement
>   public void ProcessElement(@Element String line, OutputReceiver<String>
> out) {
>     log.info(String.format("DEBUG NEXT %s", line));
>     out.output(line);
>   }
> }));
>
>
> linesDebug.apply(TextIO.write().to(options.getOutputFile()).withNumShards(1));
>
> PipelineResult result = pipeline.run();
> result.waitUntilFinish();
>
> It seems that a few bundles are processed by `Sleep` transform then they
> all go through `Next`. Again a few more bundles go through `Sleep` then
> `Next` and so on.
>
>
>>
>>
>>
>>> The other issue is with triggers and creating panes. I have an extended
>>> version of this pipeline where a simplified view of it is:
>>> S1->S2A->GBK->S2B->S3
>>>
>>> S1: Like before
>>> S2A: Add a key to the output of S1
>>> GBK: Groups output of S2A to remove duplicate keys
>>> S2B: Similar to S2 above, i.e., fetch deduped URLs and create Avro
>>> records
>>> S3: Same as before
>>>
>>> *Q2*: In this case, if I add a dummy S2B' after S2', the log messages
>>> are *not* interleaved with resource fetches, i.e., no fusion is
>>> happening. Why? What is different here?
>>>
>>
>> I don't quite understand what the problem is here.
>>
>
> The same log message interleaving does not happen in this case. So back to
> my original example sketch, log messages of S2' are interleaved with S2
> (which I thought is because of fusion) but all of the log messages of S2B'
> are printed after all messages of S2B.
>
>
>>
>>
>>
>>> *Q3*: Even if I add a similar trigger to the output of S2B, the Parquet
>>> file generation does not start until all of the fetches are done. Again,
>>> what is different here and why intermediate panes are not fired while the
>>> output of S2B is being generated?
>>>
>>
>> I think it would help to see how you have configured the ParquetIO write
>> transform.
>>
>
> I think this is related to the difference between the behaviour of the two
> examples above (i.e., S2' vs. S2B'). If it turns out that is not the case,
> I will create a minimal example including ParquetIO too.
>
> Thanks again
>
> -B
>
>
>>
>> Kenn
>>
>>
>>>
>>> Thanks
>>>
>>> -B
>>> P.S. I need this pipeline to work both on a distributed runner and also
>>> on a local machine with many cores. That's why the performance of
>>> DirectRunner is important to me.
>>>
>>

Reply via email to