Hi Bashir,
the behavior you describe should be expected. DirectRunner splits the
input work into bundles, processing each bundle might result in zero,
one or more new bundles. The executor executes the work associated with
these bundles, enqueuing new bundles into a queue, until there are no
unprocessed bundles left in the queue (that is, the work has been
completely done). It uses a fixed-size thread pool to consume and
execute work associated with these bundles (the size of which is defined
by --targetParallelism), so what happens is that the processing of
bundles of "Sleep" transform and "Next" transform are interleaved, but
not due to fusion, but due to limited parallelism. If you increase the
parallelism beyond the total number of bundles in your `lines`
PCollection, then I think you would see the result you expect.
Best,
Jan
On 5/12/21 7:35 PM, Bashir Sadjad wrote:
Thanks Kenn.
On Wed, May 12, 2021 at 12:14 PM Kenneth Knowles <k...@apache.org
<mailto:k...@apache.org>> wrote:
On Sat, May 8, 2021 at 12:00 AM Bashir Sadjad <bas...@google.com
<mailto:bas...@google.com>> wrote:
However, if I add a dummy S2' after S2 (i.e., S1->S2->S2'->S3)
which only prints some log messages for each record and passes
the record to output, then it seems S2 and S2' are fused.
Because the log messages are interleaved with fetches.
*Q1*: Does DirectRunner do any fusion optimization (e.g., like
DataflowRunner)? If not by default, is there any way to enable it?
The Java DirectRunner does not do any fusion optimization. There's
no code to enable :-). It should affect performance only, not
semantics. The DirectRunner is known to have poor performance, but
mostly no one is working on speeding it up because it is really
just for small-scale testing.
Here is a minimal pipeline (with no windowing) that demonstrates what
I mean; maybe I am using the wrong terminology but when I run this
pipeline with DirectRunner (and with `--targetParallelism=1`) the
`DEBUG INPUT` and `DEBUG NEXT` messages are interleaved. While if
there was no fusion, I would have expected to see all `DEBUG INPUT`
messages first and then all of `DEBUG NEXT`:
Pipeline pipeline = Pipeline.create(options);
PCollection<String> lines =
pipeline.apply(TextIO.read().from(options.getInputFile()));
PCollection<String> linesDelayed = lines.apply("Sleep", ParDo.of(new
DoFn<String, String>() {
@StartBundle
public void startBundle() {
log.info <http://log.info>("INPUT: Started a new bundle");
}
@ProcessElement
public void ProcessElement(@Element String line,
OutputReceiver<String> out) throws InterruptedException {
log.info <http://log.info>(String.format("DEBUG INPUT %s", line));
Thread.sleep(3000);
out.output(line);
}
}));
PCollection<String> linesDebug = linesDelayed.apply("Next",
ParDo.of(new DoFn<String, String>() {
@StartBundle
public void startBundle() {
log.info <http://log.info>("NEXT: Started a new bundle");
}
@ProcessElement
public void ProcessElement(@Element String line,
OutputReceiver<String> out) {
log.info <http://log.info>(String.format("DEBUG NEXT %s", line));
out.output(line);
}
}));
linesDebug.apply(TextIO.write().to(options.getOutputFile()).withNumShards(1));
PipelineResult result = pipeline.run();
result.waitUntilFinish();
It seems that a few bundles are processed by `Sleep` transform then
they all go through `Next`. Again a few more bundles go through
`Sleep` then `Next` and so on.
The other issue is with triggers and creating panes. I have an
extended version of this pipeline where a simplified view of
it is: S1->S2A->GBK->S2B->S3
S1: Like before
S2A: Add a key to the output of S1
GBK: Groups output of S2A to remove duplicate keys
S2B: Similar to S2 above, i.e., fetch deduped URLs and create
Avro records
S3: Same as before
*Q2*: In this case, if I add a dummy S2B' after S2', the log
messages are /not/ interleaved with resource fetches, i.e., no
fusion is happening. Why? What is different here?
I don't quite understand what the problem is here.
The same log message interleaving does not happen in this case. So
back to my original example sketch, log messages of S2' are
interleaved with S2 (which I thought is because of fusion) but all of
the log messages of S2B' are printed after all messages of S2B.
*Q3*: Even if I add a similar trigger to the output of S2B,
the Parquet file generation does not start until all of the
fetches are done. Again, what is different here and why
intermediate panes are not fired while the output of S2B is
being generated?
I think it would help to see how you have configured the ParquetIO
write transform.
I think this is related to the difference between the behaviour of the
two examples above (i.e., S2' vs. S2B'). If it turns out that is not
the case, I will create a minimal example including ParquetIO too.
Thanks again
-B
Kenn
Thanks
-B
P.S. I need this pipeline to work both on a distributed runner
and also on a local machine with many cores. That's why the
performance of DirectRunner is important to me.