On 7/24/21 12:34 AM, Robert Bradshaw wrote:

  On Thu, Jul 22, 2021 at 10:20 AM Jan Lukavský <je...@seznam.cz> wrote:
Hi,

this was a ride. But I managed to get that working. I'd like to discuss two 
points, though:

  a) I had to push Java coders to ModelCoders for Java (which makes sense to 
me, but is that correct?). See [1]. It is needed so that the Read transform 
(executed directly in TaskManager) can correctly communicate with Java SDK 
harness using custom coders (which is tested here [2]).
I think the intent was that ModelCoders represent the set of
language-agnostic in the model, though I have to admit I've always
been a bit fuzzy on when a coder must or must not be in that list.
I think that this definition works as long, as runner does not itself interfere with the Pipeline. Once the runner starts (by itself, not via SdkHarnessClient) producing data, it starts to be part of the environment, and therefore it should understand its own Coders. I'd propose the definition of "model coders" to be Coders that the SDK is able to understand, which then works naturally for the ModelCoders located in "core-construction-java", that it should understand Javs SDK Coders.

  b) I'd strongly prefer if we moved the handling of use_deprecated_read from 
outside of the Read PTransform directly into expand method, see [3]. Though 
this is not needed for the Read on Flink to work, it seems cleaner.

WDYT?
The default value of use_deprecated_read should depend on the runner
(e.g. some runners don't work well with it, others require it). As
such should not be visible to the PTransform's expand.
I think we should know what is the expected outcome. If a runner does not support primitive Read (and therefore use_deprecated_read), what should we do, if we have such experiment set? Should the Pipeline fail, or should it be silently ignored? I think that we should fail, because user expects something that cannot be fulfilled. Therefore, we have two options - handling the experiment explicitly in runners that do not support it, or handle it explicitly in all cases (both supported and unsupported). The latter case is when we force runners to call explicit conversion method (convertPrimitiveRead....). Every runner that does not support primitive Read must handle the experiment either way, because otherwise the experiment would be simply silently ignored, which is not exactly user-friendly.

  Jan

[1] 
https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375

[2] 
https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201

[3] 
https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb

On 7/18/21 6:29 PM, Jan Lukavský wrote:

Hi,

I was debugging the issue and it relates to pipeline fusion - it seems that the 
primitive Read transform gets fused and then is 'missing' as source. I'm a 
little lost in the code, but the most strange parts are that:

  a) I tried to reject fusion of primitive Read by adding 
GreedyPCollectionFusers::cannotFuse for 
PTransformTranslation.READ_TRANSFORM_URN to 
GreedyPCollectionFusers.URN_FUSIBILITY_CHECKERS, but that didn't change the 
exception

  b) I tried adding Reshuffle.viaRandomKey between Read and PAssert, but that 
didn't change it either

  c) when I run portable Pipeline with use_deprecated_read on Flink it actually 
runs (though it fails when it actually reads any data, but if the input is 
empty, the job runs), so it does not hit the same issue, which is a mystery to 
me

If anyone has any pointers that I can investigate, I'd be really grateful.

Thanks in advance,

  Jan



On 7/16/21 2:00 PM, Jan Lukavský wrote:

Hi,

I hit another issue with the portable Flink runner. Long story short - reading 
from Kafka is not working in portable Flink. After solving issues with 
expansion service configuration (ability to add use_deprecated_read) option, 
because flink portable runner has issues with SDF [1], [2]. After being able to 
inject the use_deprecated_read into expansion service I was able to get an 
execution DAG that has the UnboundedSource, but then more and more issues 
appeared (probably related to missing LengthPrefixCoder somewhere - maybe at 
the output from the primitive Read). I wanted to create a test for it and I 
found out, that there actually is ReadSourcePortableTest in FlinkRunner, but 
_it tests nothing_. The problem is that Read is transformed to SDF, so this 
test tests the SDF, not the Read transform. As a result, the Read transform 
does not work.

I tried using convertReadBasedSplittableDoFnsToPrimitiveReads so that I could 
make the test fail and debug that, but I got into

java.lang.IllegalArgumentException: PCollectionNodes 
[PCollectionNode{id=PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output,
 PCollection=unique_name: 
"PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output"
coder_id: "IterableCoder"
is_bounded: BOUNDED
windowing_strategy_id: "WindowingStrategy(GlobalWindows)"
}] were consumed but never produced


which gave me the last knock-out. :)

My current impression is that starting from Beam 2.25.0, portable FlinkRunner 
is not able to read from Kafka. Could someone give me a hint about what is 
wrong with using convertReadBasedSplittableDoFnsToPrimitiveReads in the test 
[3]?

  Jan

[1] https://issues.apache.org/jira/browse/BEAM-11991

[2] https://issues.apache.org/jira/browse/BEAM-11998

[3] https://github.com/apache/beam/pull/15181

Reply via email to