I'll start from the end.

I don't think we should be breaking language agnostic API layers (for example, definition of model coders) just to support "use_deprecated_read".
"Breaking" and "fixing" can only be a matter of the definition of the object at hand. I don't think, that Coder can be totally language agnostic - yes, the mapping between serialized form and deserialized form can be _defined_ in a language agnostic way, but must be_implemented_ in a specific language. If we choose the implementing language, what makes us treat SDK-specific coders defined by the SDK of the same language as "unknown"? It is only our decision, that seems to have no practical benefits.

Moreover, including SDK-specific coders into supported coders of the SDK runner construction counterpart (that is, runner core-construction-java for Java SDK) is a necessary prerequisite for unifying "classical" and "portable" runners, because the runner needs to understand *all* SDK coders so that it can _inline_ the complete Pipeline (if the Pipeline SDK has the same language as the runner), instead of running it through SDK harness. This need therefore is not specific to supporting use_deprecated_read, but is a generic requirement, which only has the first manifestation in the support of a transform not supported by SDK harness.

I think "use_deprecated_read" should be considered a stop-gap measure for Flink (and Spark ?) till we have proper support for SDF. In fact I don't think an arbitrary portable runner can support "use_deprecated_read" due to the following.
There seems to be nothing special about Flink regarding the support of primitive Read. I think any Java native runner can implement it pretty much the same way as Flink does. The question is if any other runner might want to do that. The problem with Flink is that

 1) portable SDF seems not to work [1]

 2) even classical Flink runner has still issues with SDF - there are reports of watermark being stuck when reading data via SDF, this gets resolved using use_deprecated_read

 3) Flink actually does not have any benefits from SDF, because it cannot make use of the dynamic splitting, so this actually brings only implementation burden without any practical benefit

I think that we should reiterate on the decision of deprecating Read - if we can implement it via SDF, what is the reason to forbid a runner to make use of a simpler implementation? The expansion of Read might be runner dependent, that is something we do all the time, or am I missing something?

 Jan

[1] https://issues.apache.org/jira/browse/BEAM-10940

On 7/25/21 1:38 AM, Chamikara Jayalath wrote:
I think we might be going down a bit of a rabbit hole with the support for "use_deprecated_read" for portable Flink :)

I think "use_deprecated_read" should be considered a stop-gap measure for Flink (and Spark ?) till we have proper support for SDF. In fact I don't think an arbitrary portable runner can support "use_deprecated_read" due to the following.

(1) SDK Harness is not aware of BoundedSource/UnboundedSource. Only source framework SDK Harness is aware of is SDF.
(2) Invoking BoundedSource/UnboundedSource is not a part of the Fn API
(3) A non-Java Beam portable runner will probably not be able to directly invoke legacy Read transforms similar to the way Flink does today.

I don't think we should be breaking language agnostic API layers (for example, definition of model coders) just to support "use_deprecated_read".

Thanks,
Cham

On Sat, Jul 24, 2021 at 11:50 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    On 7/24/21 12:34 AM, Robert Bradshaw wrote:

    >   On Thu, Jul 22, 2021 at 10:20 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:
    >> Hi,
    >>
    >> this was a ride. But I managed to get that working. I'd like to
    discuss two points, though:
    >>
    >>   a) I had to push Java coders to ModelCoders for Java (which
    makes sense to me, but is that correct?). See [1]. It is needed so
    that the Read transform (executed directly in TaskManager) can
    correctly communicate with Java SDK harness using custom coders
    (which is tested here [2]).
    > I think the intent was that ModelCoders represent the set of
    > language-agnostic in the model, though I have to admit I've always
    > been a bit fuzzy on when a coder must or must not be in that list.
    I think that this definition works as long, as runner does not itself
    interfere with the Pipeline. Once the runner starts (by itself,
    not via
    SdkHarnessClient) producing data, it starts to be part of the
    environment, and therefore it should understand its own Coders. I'd
    propose the definition of "model coders" to be Coders that the SDK is
    able to understand, which then works naturally for the ModelCoders
    located in "core-construction-java", that it should understand
    Javs SDK
    Coders.
    >
    >>   b) I'd strongly prefer if we moved the handling of
    use_deprecated_read from outside of the Read PTransform directly
    into expand method, see [3]. Though this is not needed for the
    Read on Flink to work, it seems cleaner.
    >>
    >> WDYT?
    > The default value of use_deprecated_read should depend on the runner
    > (e.g. some runners don't work well with it, others require it). As
    > such should not be visible to the PTransform's expand.
    I think we should know what is the expected outcome. If a runner does
    not support primitive Read (and therefore use_deprecated_read), what
    should we do, if we have such experiment set? Should the Pipeline
    fail,
    or should it be silently ignored? I think that we should fail,
    because
    user expects something that cannot be fulfilled. Therefore, we
    have two
    options - handling the experiment explicitly in runners that do not
    support it, or handle it explicitly in all cases (both supported and
    unsupported). The latter case is when we force runners to call
    explicit
    conversion method (convertPrimitiveRead....). Every runner that
    does not
    support primitive Read must handle the experiment either way, because
    otherwise the experiment would be simply silently ignored, which
    is not
    exactly user-friendly.
    >
    >>   Jan
    >>
    >> [1]
    
https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375
    
<https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375>
    >>
    >> [2]
    
https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201
    
<https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201>
    >>
    >> [3]
    
https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb
    
<https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb>
    >>
    >> On 7/18/21 6:29 PM, Jan Lukavský wrote:
    >>
    >> Hi,
    >>
    >> I was debugging the issue and it relates to pipeline fusion -
    it seems that the primitive Read transform gets fused and then is
    'missing' as source. I'm a little lost in the code, but the most
    strange parts are that:
    >>
    >>   a) I tried to reject fusion of primitive Read by adding
    GreedyPCollectionFusers::cannotFuse for
    PTransformTranslation.READ_TRANSFORM_URN to
    GreedyPCollectionFusers.URN_FUSIBILITY_CHECKERS, but that didn't
    change the exception
    >>
    >>   b) I tried adding Reshuffle.viaRandomKey between Read and
    PAssert, but that didn't change it either
    >>
    >>   c) when I run portable Pipeline with use_deprecated_read on
    Flink it actually runs (though it fails when it actually reads any
    data, but if the input is empty, the job runs), so it does not hit
    the same issue, which is a mystery to me
    >>
    >> If anyone has any pointers that I can investigate, I'd be
    really grateful.
    >>
    >> Thanks in advance,
    >>
    >>   Jan
    >>
    >>
    >>
    >> On 7/16/21 2:00 PM, Jan Lukavský wrote:
    >>
    >> Hi,
    >>
    >> I hit another issue with the portable Flink runner. Long story
    short - reading from Kafka is not working in portable Flink. After
    solving issues with expansion service configuration (ability to
    add use_deprecated_read) option, because flink portable runner has
    issues with SDF [1], [2]. After being able to inject the
    use_deprecated_read into expansion service I was able to get an
    execution DAG that has the UnboundedSource, but then more and more
    issues appeared (probably related to missing LengthPrefixCoder
    somewhere - maybe at the output from the primitive Read). I wanted
    to create a test for it and I found out, that there actually is
    ReadSourcePortableTest in FlinkRunner, but _it tests nothing_. The
    problem is that Read is transformed to SDF, so this test tests the
    SDF, not the Read transform. As a result, the Read transform does
    not work.
    >>
    >> I tried using convertReadBasedSplittableDoFnsToPrimitiveReads
    so that I could make the test fail and debug that, but I got into
    >>
    >> java.lang.IllegalArgumentException: PCollectionNodes
    
[PCollectionNode{id=PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output,
    PCollection=unique_name:
    
"PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output"
    >> coder_id: "IterableCoder"
    >> is_bounded: BOUNDED
    >> windowing_strategy_id: "WindowingStrategy(GlobalWindows)"
    >> }] were consumed but never produced
    >>
    >>
    >> which gave me the last knock-out. :)
    >>
    >> My current impression is that starting from Beam 2.25.0,
    portable FlinkRunner is not able to read from Kafka. Could someone
    give me a hint about what is wrong with using
    convertReadBasedSplittableDoFnsToPrimitiveReads in the test [3]?
    >>
    >>   Jan
    >>
    >> [1] https://issues.apache.org/jira/browse/BEAM-11991
    <https://issues.apache.org/jira/browse/BEAM-11991>
    >>
    >> [2] https://issues.apache.org/jira/browse/BEAM-11998
    <https://issues.apache.org/jira/browse/BEAM-11998>
    >>
    >> [3] https://github.com/apache/beam/pull/15181
    <https://github.com/apache/beam/pull/15181>

Reply via email to