On Mon, May 17, 2021 at 1:07 PM Brian Hulette <bhule...@google.com> wrote:

> > P.S. I need this pipeline to work both on a distributed runner and also
> on a local machine with many cores. That's why the performance of
> DirectRunner is important to me.
>
> IIUC the DirectRunner has intentionally made some trade-offs to make it
> less performant, so that it better verifies pipelines under test. I wonder
> if there's another runner you can use that is more performant for local
> execution on a machine with many cores, like starting up a local Flink or
> Spark cluster?
>

Thanks; yes I have managed to get the behaviour I expect with Flink Runner
in `--streaming` mode. I was hoping to keep local runs simpler by only
using Direct Runner. Please note that the main performance bottleneck in
this case is the external system from which the resources are fetched. The
problem is that no matter how many threads I use, Direct Runner keeps all
the resources in memory and then writes them all to file (instead of
flushing triggered panes).

Regards

-B


>
> Brian
>
> On Mon, May 17, 2021 at 7:12 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> On 5/17/21 3:46 PM, Bashir Sadjad wrote:
>>
>> Thanks Jan. Two points:
>>
>> - I was running all the experiments I reported with
>> `--targetParallelism=1` to make sure concurrent threads do not mess up the
>> logs.
>>
>> I think that is what causes what you see. Try to increase the parallelism
>> to number higher than number of input bundles.
>>
>> - I have been tracking bundles too (see @StartBundle log messages in the
>> mini-example in my previous reply to Kenn).
>>
>> I see the code, but not the log output. My suspicion would be, that you
>> see "Start bundle" -> "Debug Input" OR "Debug NEXT", right? If yes, than
>> this is expected - processing of a bundle produces "output bundle", which
>> is queued into work queue and is then processed as soon as there is free
>> worker to work on it. Fetching new outputs produces new bundles, which are
>> also queued to this queue, which is what causes the interleave.
>>
>>
>> So I don't think bundles alone describe what I see. In the mini-example,
>> processing of INPUT bundles and NEXT bundles are interleaved, e.g., 3 INPUT
>> bundles are processed, then the output of those go through NEXT, then a few
>> other INPUT bundles and so on.
>>
>> Now, if we go back to my original example with S1->S2A->GBK->S2B->S3, the
>> input to S2B also has many bundles. However in this case *all* of those
>> bundles are processed first, then they all go through the next stages,
>> e.g., the logging S2B' that I mentioned. So there is no interleaving of log
>> messages.
>>
>> GBK is a stateful operation that has to wait for a trigger - in simple
>> batch case the trigger is the end of input, which is why you cannot see
>> outputs of GBK being interleaved with reading inputs. All inputs have had
>> to be read before GBK can proceed and output any bundle downstream.
>>
>>
>> Regards,
>>
>> -B
>>
>> On Mon, May 17, 2021 at 3:50 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi Bashir,
>>>
>>> the behavior you describe should be expected. DirectRunner splits the
>>> input work into bundles, processing each bundle might result in zero, one
>>> or more new bundles. The executor executes the work associated with these
>>> bundles, enqueuing new bundles into a queue, until there are no unprocessed
>>> bundles left in the queue (that is, the work has been completely done). It
>>> uses a fixed-size thread pool to consume and execute work associated with
>>> these bundles (the size of which is defined by --targetParallelism), so
>>> what happens is that the processing of bundles of "Sleep" transform and
>>> "Next" transform are interleaved, but not due to fusion, but due to limited
>>> parallelism. If you increase the parallelism beyond the total number of
>>> bundles in your `lines` PCollection, then I think you would see the result
>>> you expect.
>>>
>>> Best,
>>>
>>>  Jan
>>> On 5/12/21 7:35 PM, Bashir Sadjad wrote:
>>>
>>> Thanks Kenn.
>>>
>>> On Wed, May 12, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org>
>>> wrote:
>>>
>>>>
>>>> On Sat, May 8, 2021 at 12:00 AM Bashir Sadjad <bas...@google.com>
>>>> wrote:
>>>>
>>>>> However, if I add a dummy S2' after S2 (i.e., S1->S2->S2'->S3) which
>>>>> only prints some log messages for each record and passes the record to
>>>>> output, then it seems S2 and S2' are fused. Because the log messages are
>>>>> interleaved with fetches.
>>>>>
>>>>
>>>>> *Q1*: Does DirectRunner do any fusion optimization (e.g., like
>>>>> DataflowRunner)? If not by default, is there any way to enable it?
>>>>>
>>>>
>>>> The Java DirectRunner does not do any fusion optimization. There's no
>>>> code to enable :-). It should affect performance only, not semantics. The
>>>> DirectRunner is known to have poor performance, but mostly no one is
>>>> working on speeding it up because it is really just for small-scale 
>>>> testing.
>>>>
>>>
>>> Here is a minimal pipeline (with no windowing) that demonstrates what I
>>> mean; maybe I am using the wrong terminology but when I run this pipeline
>>> with DirectRunner (and with `--targetParallelism=1`) the `DEBUG INPUT` and
>>> `DEBUG NEXT` messages are interleaved. While if there was no fusion, I
>>> would have expected to see all `DEBUG INPUT` messages first and then all of
>>> `DEBUG NEXT`:
>>>
>>> Pipeline pipeline = Pipeline.create(options);
>>> PCollection<String> lines =
>>> pipeline.apply(TextIO.read().from(options.getInputFile()));
>>>
>>> PCollection<String> linesDelayed = lines.apply("Sleep", ParDo.of(new
>>> DoFn<String, String>() {
>>>   @StartBundle
>>>   public void startBundle() {
>>>     log.info("INPUT: Started a new bundle");
>>>   }
>>>   @ProcessElement
>>>   public void ProcessElement(@Element String line,
>>> OutputReceiver<String> out) throws InterruptedException {
>>>     log.info(String.format("DEBUG INPUT %s", line));
>>>     Thread.sleep(3000);
>>>     out.output(line);
>>>   }
>>> }));
>>>
>>> PCollection<String> linesDebug = linesDelayed.apply("Next", ParDo.of(new
>>> DoFn<String, String>() {
>>>   @StartBundle
>>>   public void startBundle() {
>>>     log.info("NEXT: Started a new bundle");
>>>   }
>>>   @ProcessElement
>>>   public void ProcessElement(@Element String line,
>>> OutputReceiver<String> out) {
>>>     log.info(String.format("DEBUG NEXT %s", line));
>>>     out.output(line);
>>>   }
>>> }));
>>>
>>>
>>> linesDebug.apply(TextIO.write().to(options.getOutputFile()).withNumShards(1));
>>>
>>> PipelineResult result = pipeline.run();
>>> result.waitUntilFinish();
>>>
>>> It seems that a few bundles are processed by `Sleep` transform then they
>>> all go through `Next`. Again a few more bundles go through `Sleep` then
>>> `Next` and so on.
>>>
>>>
>>>>
>>>>
>>>>
>>>>> The other issue is with triggers and creating panes. I have an
>>>>> extended version of this pipeline where a simplified view of it is:
>>>>> S1->S2A->GBK->S2B->S3
>>>>>
>>>>> S1: Like before
>>>>> S2A: Add a key to the output of S1
>>>>> GBK: Groups output of S2A to remove duplicate keys
>>>>> S2B: Similar to S2 above, i.e., fetch deduped URLs and create Avro
>>>>> records
>>>>> S3: Same as before
>>>>>
>>>>> *Q2*: In this case, if I add a dummy S2B' after S2', the log messages
>>>>> are *not* interleaved with resource fetches, i.e., no fusion is
>>>>> happening. Why? What is different here?
>>>>>
>>>>
>>>> I don't quite understand what the problem is here.
>>>>
>>>
>>> The same log message interleaving does not happen in this case. So back
>>> to my original example sketch, log messages of S2' are interleaved with S2
>>> (which I thought is because of fusion) but all of the log messages of S2B'
>>> are printed after all messages of S2B.
>>>
>>>
>>>>
>>>>
>>>>
>>>>> *Q3*: Even if I add a similar trigger to the output of S2B, the
>>>>> Parquet file generation does not start until all of the fetches are done.
>>>>> Again, what is different here and why intermediate panes are not fired
>>>>> while the output of S2B is being generated?
>>>>>
>>>>
>>>> I think it would help to see how you have configured the ParquetIO
>>>> write transform.
>>>>
>>>
>>> I think this is related to the difference between the behaviour of the
>>> two examples above (i.e., S2' vs. S2B'). If it turns out that is not the
>>> case, I will create a minimal example including ParquetIO too.
>>>
>>> Thanks again
>>>
>>> -B
>>>
>>>
>>>>
>>>> Kenn
>>>>
>>>>
>>>>>
>>>>> Thanks
>>>>>
>>>>> -B
>>>>> P.S. I need this pipeline to work both on a distributed runner and
>>>>> also on a local machine with many cores. That's why the performance of
>>>>> DirectRunner is important to me.
>>>>>
>>>>

Reply via email to