> P.S. I need this pipeline to work both on a distributed runner and also
on a local machine with many cores. That's why the performance of
DirectRunner is important to me.

IIUC the DirectRunner has intentionally made some trade-offs to make it
less performant, so that it better verifies pipelines under test. I wonder
if there's another runner you can use that is more performant for local
execution on a machine with many cores, like starting up a local Flink or
Spark cluster?

Brian

On Mon, May 17, 2021 at 7:12 AM Jan Lukavský <je...@seznam.cz> wrote:

> On 5/17/21 3:46 PM, Bashir Sadjad wrote:
>
> Thanks Jan. Two points:
>
> - I was running all the experiments I reported with
> `--targetParallelism=1` to make sure concurrent threads do not mess up the
> logs.
>
> I think that is what causes what you see. Try to increase the parallelism
> to number higher than number of input bundles.
>
> - I have been tracking bundles too (see @StartBundle log messages in the
> mini-example in my previous reply to Kenn).
>
> I see the code, but not the log output. My suspicion would be, that you
> see "Start bundle" -> "Debug Input" OR "Debug NEXT", right? If yes, than
> this is expected - processing of a bundle produces "output bundle", which
> is queued into work queue and is then processed as soon as there is free
> worker to work on it. Fetching new outputs produces new bundles, which are
> also queued to this queue, which is what causes the interleave.
>
>
> So I don't think bundles alone describe what I see. In the mini-example,
> processing of INPUT bundles and NEXT bundles are interleaved, e.g., 3 INPUT
> bundles are processed, then the output of those go through NEXT, then a few
> other INPUT bundles and so on.
>
> Now, if we go back to my original example with S1->S2A->GBK->S2B->S3, the
> input to S2B also has many bundles. However in this case *all* of those
> bundles are processed first, then they all go through the next stages,
> e.g., the logging S2B' that I mentioned. So there is no interleaving of log
> messages.
>
> GBK is a stateful operation that has to wait for a trigger - in simple
> batch case the trigger is the end of input, which is why you cannot see
> outputs of GBK being interleaved with reading inputs. All inputs have had
> to be read before GBK can proceed and output any bundle downstream.
>
>
> Regards,
>
> -B
>
> On Mon, May 17, 2021 at 3:50 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Bashir,
>>
>> the behavior you describe should be expected. DirectRunner splits the
>> input work into bundles, processing each bundle might result in zero, one
>> or more new bundles. The executor executes the work associated with these
>> bundles, enqueuing new bundles into a queue, until there are no unprocessed
>> bundles left in the queue (that is, the work has been completely done). It
>> uses a fixed-size thread pool to consume and execute work associated with
>> these bundles (the size of which is defined by --targetParallelism), so
>> what happens is that the processing of bundles of "Sleep" transform and
>> "Next" transform are interleaved, but not due to fusion, but due to limited
>> parallelism. If you increase the parallelism beyond the total number of
>> bundles in your `lines` PCollection, then I think you would see the result
>> you expect.
>>
>> Best,
>>
>>  Jan
>> On 5/12/21 7:35 PM, Bashir Sadjad wrote:
>>
>> Thanks Kenn.
>>
>> On Wed, May 12, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org> wrote:
>>
>>>
>>> On Sat, May 8, 2021 at 12:00 AM Bashir Sadjad <bas...@google.com> wrote:
>>>
>>>> However, if I add a dummy S2' after S2 (i.e., S1->S2->S2'->S3) which
>>>> only prints some log messages for each record and passes the record to
>>>> output, then it seems S2 and S2' are fused. Because the log messages are
>>>> interleaved with fetches.
>>>>
>>>
>>>> *Q1*: Does DirectRunner do any fusion optimization (e.g., like
>>>> DataflowRunner)? If not by default, is there any way to enable it?
>>>>
>>>
>>> The Java DirectRunner does not do any fusion optimization. There's no
>>> code to enable :-). It should affect performance only, not semantics. The
>>> DirectRunner is known to have poor performance, but mostly no one is
>>> working on speeding it up because it is really just for small-scale testing.
>>>
>>
>> Here is a minimal pipeline (with no windowing) that demonstrates what I
>> mean; maybe I am using the wrong terminology but when I run this pipeline
>> with DirectRunner (and with `--targetParallelism=1`) the `DEBUG INPUT` and
>> `DEBUG NEXT` messages are interleaved. While if there was no fusion, I
>> would have expected to see all `DEBUG INPUT` messages first and then all of
>> `DEBUG NEXT`:
>>
>> Pipeline pipeline = Pipeline.create(options);
>> PCollection<String> lines =
>> pipeline.apply(TextIO.read().from(options.getInputFile()));
>>
>> PCollection<String> linesDelayed = lines.apply("Sleep", ParDo.of(new
>> DoFn<String, String>() {
>>   @StartBundle
>>   public void startBundle() {
>>     log.info("INPUT: Started a new bundle");
>>   }
>>   @ProcessElement
>>   public void ProcessElement(@Element String line, OutputReceiver<String>
>> out) throws InterruptedException {
>>     log.info(String.format("DEBUG INPUT %s", line));
>>     Thread.sleep(3000);
>>     out.output(line);
>>   }
>> }));
>>
>> PCollection<String> linesDebug = linesDelayed.apply("Next", ParDo.of(new
>> DoFn<String, String>() {
>>   @StartBundle
>>   public void startBundle() {
>>     log.info("NEXT: Started a new bundle");
>>   }
>>   @ProcessElement
>>   public void ProcessElement(@Element String line, OutputReceiver<String>
>> out) {
>>     log.info(String.format("DEBUG NEXT %s", line));
>>     out.output(line);
>>   }
>> }));
>>
>>
>> linesDebug.apply(TextIO.write().to(options.getOutputFile()).withNumShards(1));
>>
>> PipelineResult result = pipeline.run();
>> result.waitUntilFinish();
>>
>> It seems that a few bundles are processed by `Sleep` transform then they
>> all go through `Next`. Again a few more bundles go through `Sleep` then
>> `Next` and so on.
>>
>>
>>>
>>>
>>>
>>>> The other issue is with triggers and creating panes. I have an extended
>>>> version of this pipeline where a simplified view of it is:
>>>> S1->S2A->GBK->S2B->S3
>>>>
>>>> S1: Like before
>>>> S2A: Add a key to the output of S1
>>>> GBK: Groups output of S2A to remove duplicate keys
>>>> S2B: Similar to S2 above, i.e., fetch deduped URLs and create Avro
>>>> records
>>>> S3: Same as before
>>>>
>>>> *Q2*: In this case, if I add a dummy S2B' after S2', the log messages
>>>> are *not* interleaved with resource fetches, i.e., no fusion is
>>>> happening. Why? What is different here?
>>>>
>>>
>>> I don't quite understand what the problem is here.
>>>
>>
>> The same log message interleaving does not happen in this case. So back
>> to my original example sketch, log messages of S2' are interleaved with S2
>> (which I thought is because of fusion) but all of the log messages of S2B'
>> are printed after all messages of S2B.
>>
>>
>>>
>>>
>>>
>>>> *Q3*: Even if I add a similar trigger to the output of S2B, the
>>>> Parquet file generation does not start until all of the fetches are done.
>>>> Again, what is different here and why intermediate panes are not fired
>>>> while the output of S2B is being generated?
>>>>
>>>
>>> I think it would help to see how you have configured the ParquetIO write
>>> transform.
>>>
>>
>> I think this is related to the difference between the behaviour of the
>> two examples above (i.e., S2' vs. S2B'). If it turns out that is not the
>> case, I will create a minimal example including ParquetIO too.
>>
>> Thanks again
>>
>> -B
>>
>>
>>>
>>> Kenn
>>>
>>>
>>>>
>>>> Thanks
>>>>
>>>> -B
>>>> P.S. I need this pipeline to work both on a distributed runner and also
>>>> on a local machine with many cores. That's why the performance of
>>>> DirectRunner is important to me.
>>>>
>>>

Reply via email to