On Sun, Jul 25, 2021, 11:09 AM Jan Lukavský <je...@seznam.cz> wrote:

> In general, language-neutral APIs and protocols are a key feature of
> portable Beam.
>
> Yes, sure, that is well understood. But - language neutral APIs requires
> language neutral environment. That is why the portable Pipeline
> representation is built around protocol buffers and gRPC. That is truly
> language-neutral. Once we implement something around that - like in the
> case of ModelCoders.java - we use a specific language for that and the
> language-neutral part is already gone. The decision to include
> same-language-SDK coders into such language-specific object plays no role
> in the fact it already is language-specific.
>

Note that GRPC is language neutral because of protocol buffers, which are
language neutral because they have a defined encoding format, that
implemented consistently in multiple languages, along with code generators
to go from the spec file to language specific implementations.

In other words, beam can achieve the same neutrality by implementing the
same encoding/decoding spec across multiple languages too. In Beam, it's
the Row standard encoder:
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L973


Which along with Beam Schemas, enables cross language compatibility without
defining custom coders on each end that match a specific protocol buffer
message or similar.


Not all runners are implemented  using Java. For example, the portable
> DirectRunner (FnAPI runner) is implemented using Python and Dataflow is
> implemented using C++. Such runners will not be able to do this.
>
> Yes, I'm aware of that and that is why I said "any Java native runner". It
> is true, that non-Java runners *must* (as long as we don't include Read
> into SDK harness) resort to expanding it to SDF. That is why
> use_deprecated_read is invalid setting for such runner and should be
> handled accordingly.
>
> Similarly, I think there were previous discussions related to using SDF as
> the source framework for portable runners.
>
> Don't get me wrong, I'm not trying to revoke this decision. On the other
> hand I still think that the decision to use SDF implementation of Read or
> not should be left to the runner.
>
> I understand that there are some bugs related to SDF and portable Flink
> currently. How much work do you think is needed here ? Will it be better to
> focus our efforts on fixing remaining issues for SDF and portable runners
> instead of supporting "use_deprecated_read" for that path ?
>
> I'm not sure. I don't know portability and the SDK harness well enough to
> be able to answer this. But we should really know why we do that. What
> exactly does SDF bring to the Flink runner (and let's leave Flink aside of
> this - what does it bring to runners that cannot make use of dynamic
> splitting, being it admittedly a very cool feature)? Yes, supporting Java
> Read makes it impossible to implement it in Python. But practically, I
> think that most of the Pipelines will use x-lang for that. It makes very
> much sense to offload IOs to a more performant environment.
>
>  Jan
> On 7/25/21 6:54 PM, Chamikara Jayalath wrote:
>
>
>
> On Sun, Jul 25, 2021 at 6:33 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> I'll start from the end.
>>
>> I don't think we should be breaking language agnostic API layers (for
>> example, definition of model coders) just to support "use_deprecated_read".
>>
>> "Breaking" and "fixing" can only be a matter of the definition of the
>> object at hand. I don't think, that Coder can be totally language agnostic
>> - yes, the mapping between serialized form and deserialized form can be
>> _defined_ in a language agnostic way, but must be_implemented_ in a
>> specific language. If we choose the implementing language, what makes us
>> treat SDK-specific coders defined by the SDK of the same language as
>> "unknown"? It is only our decision, that seems to have no practical
>> benefits.
>>
>
> In general, language-neutral APIs and protocols are a key feature of
> portable Beam. See here: https://beam.apache.org/roadmap/portability/
> (I did not look into all the old discussions and votes related to this but
> I'm sure they are there)
>
>
>> Moreover, including SDK-specific coders into supported coders of the SDK
>> runner construction counterpart (that is, runner core-construction-java for
>> Java SDK) is a necessary prerequisite for unifying "classical" and
>> "portable" runners, because the runner needs to understand *all* SDK coders
>> so that it can _inline_ the complete Pipeline (if the Pipeline SDK has the
>> same language as the runner), instead of running it through SDK harness.
>> This need therefore is not specific to supporting use_deprecated_read, but
>> is a generic requirement, which only has the first manifestation in the
>> support of a transform not supported by SDK harness.
>>
>> I think "use_deprecated_read" should be considered a stop-gap measure for
>> Flink (and Spark ?) till we have proper support for SDF. In fact I don't
>> think an arbitrary portable runner can support "use_deprecated_read" due to
>> the following.
>>
>> There seems to be nothing special about Flink regarding the support of
>> primitive Read. I think any Java native runner can implement it pretty much
>> the same way as Flink does. The question is if any other runner might want
>> to do that. The problem with Flink is that
>>
>
> Not all runners are implemented  using Java. For example, the portable
> DirectRunner (FnAPI runner) is implemented using Python and Dataflow is
> implemented using C++. Such runners will not be able to do this.
>
>>  1) portable SDF seems not to work [1]
>>
>>  2) even classical Flink runner has still issues with SDF - there are
>> reports of watermark being stuck when reading data via SDF, this gets
>> resolved using use_deprecated_read
>>
>>  3) Flink actually does not have any benefits from SDF, because it cannot
>> make use of the dynamic splitting, so this actually brings only
>> implementation burden without any practical benefit
>>
> Similarly, I think there were previous discussions related to using SDF as
> the source framework for portable runners.
> I understand that there are some bugs related to SDF and portable Flink
> currently. How much work do you think is needed here ? Will it be better to
> focus our efforts on fixing remaining issues for SDF and portable runners
> instead of supporting "use_deprecated_read" for that path ? Note that I'm
> fine with fixing any issues related to "use_deprecated_read" for classic
> (non-portable) Flink but I think you are trying to use x-lang hence
> probably need portable Flink.
>
> Thanks,
> Cham
>
>
>> I think that we should reiterate on the decision of deprecating Read - if
>> we can implement it via SDF, what is the reason to forbid a runner to make
>> use of a simpler implementation? The expansion of Read might be runner
>> dependent, that is something we do all the time, or am I missing something?
>>
>>  Jan
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-10940
>> On 7/25/21 1:38 AM, Chamikara Jayalath wrote:
>>
>> I think we might be going down a bit of a rabbit hole with the support
>> for "use_deprecated_read" for portable Flink :)
>>
>> I think "use_deprecated_read" should be considered a stop-gap measure for
>> Flink (and Spark ?) till we have proper support for SDF. In fact I don't
>> think an arbitrary portable runner can support "use_deprecated_read" due to
>> the following.
>>
>> (1) SDK Harness is not aware of BoundedSource/UnboundedSource. Only
>> source framework SDK Harness is aware of is SDF.
>> (2) Invoking BoundedSource/UnboundedSource is not a part of the Fn API
>> (3) A non-Java Beam portable runner will probably not be able to directly
>> invoke legacy Read transforms similar to the way Flink does today.
>>
>> I don't think we should be breaking language agnostic API layers (for
>> example, definition of model coders) just to support "use_deprecated_read".
>>
>> Thanks,
>> Cham
>>
>> On Sat, Jul 24, 2021 at 11:50 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> On 7/24/21 12:34 AM, Robert Bradshaw wrote:
>>>
>>> >   On Thu, Jul 22, 2021 at 10:20 AM Jan Lukavský <je...@seznam.cz>
>>> wrote:
>>> >> Hi,
>>> >>
>>> >> this was a ride. But I managed to get that working. I'd like to
>>> discuss two points, though:
>>> >>
>>> >>   a) I had to push Java coders to ModelCoders for Java (which makes
>>> sense to me, but is that correct?). See [1]. It is needed so that the Read
>>> transform (executed directly in TaskManager) can correctly communicate with
>>> Java SDK harness using custom coders (which is tested here [2]).
>>> > I think the intent was that ModelCoders represent the set of
>>> > language-agnostic in the model, though I have to admit I've always
>>> > been a bit fuzzy on when a coder must or must not be in that list.
>>> I think that this definition works as long, as runner does not itself
>>> interfere with the Pipeline. Once the runner starts (by itself, not via
>>> SdkHarnessClient) producing data, it starts to be part of the
>>> environment, and therefore it should understand its own Coders. I'd
>>> propose the definition of "model coders" to be Coders that the SDK is
>>> able to understand, which then works naturally for the ModelCoders
>>> located in "core-construction-java", that it should understand Javs SDK
>>> Coders.
>>> >
>>> >>   b) I'd strongly prefer if we moved the handling of
>>> use_deprecated_read from outside of the Read PTransform directly into
>>> expand method, see [3]. Though this is not needed for the Read on Flink to
>>> work, it seems cleaner.
>>> >>
>>> >> WDYT?
>>> > The default value of use_deprecated_read should depend on the runner
>>> > (e.g. some runners don't work well with it, others require it). As
>>> > such should not be visible to the PTransform's expand.
>>> I think we should know what is the expected outcome. If a runner does
>>> not support primitive Read (and therefore use_deprecated_read), what
>>> should we do, if we have such experiment set? Should the Pipeline fail,
>>> or should it be silently ignored? I think that we should fail, because
>>> user expects something that cannot be fulfilled. Therefore, we have two
>>> options - handling the experiment explicitly in runners that do not
>>> support it, or handle it explicitly in all cases (both supported and
>>> unsupported). The latter case is when we force runners to call explicit
>>> conversion method (convertPrimitiveRead....). Every runner that does not
>>> support primitive Read must handle the experiment either way, because
>>> otherwise the experiment would be simply silently ignored, which is not
>>> exactly user-friendly.
>>> >
>>> >>   Jan
>>> >>
>>> >> [1]
>>> https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375
>>> >>
>>> >> [2]
>>> https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201
>>> >>
>>> >> [3]
>>> https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb
>>> >>
>>> >> On 7/18/21 6:29 PM, Jan Lukavský wrote:
>>> >>
>>> >> Hi,
>>> >>
>>> >> I was debugging the issue and it relates to pipeline fusion - it
>>> seems that the primitive Read transform gets fused and then is 'missing' as
>>> source. I'm a little lost in the code, but the most strange parts are that:
>>> >>
>>> >>   a) I tried to reject fusion of primitive Read by adding
>>> GreedyPCollectionFusers::cannotFuse for
>>> PTransformTranslation.READ_TRANSFORM_URN to
>>> GreedyPCollectionFusers.URN_FUSIBILITY_CHECKERS, but that didn't change the
>>> exception
>>> >>
>>> >>   b) I tried adding Reshuffle.viaRandomKey between Read and PAssert,
>>> but that didn't change it either
>>> >>
>>> >>   c) when I run portable Pipeline with use_deprecated_read on Flink
>>> it actually runs (though it fails when it actually reads any data, but if
>>> the input is empty, the job runs), so it does not hit the same issue, which
>>> is a mystery to me
>>> >>
>>> >> If anyone has any pointers that I can investigate, I'd be really
>>> grateful.
>>> >>
>>> >> Thanks in advance,
>>> >>
>>> >>   Jan
>>> >>
>>> >>
>>> >>
>>> >> On 7/16/21 2:00 PM, Jan Lukavský wrote:
>>> >>
>>> >> Hi,
>>> >>
>>> >> I hit another issue with the portable Flink runner. Long story short
>>> - reading from Kafka is not working in portable Flink. After solving issues
>>> with expansion service configuration (ability to add use_deprecated_read)
>>> option, because flink portable runner has issues with SDF [1], [2]. After
>>> being able to inject the use_deprecated_read into expansion service I was
>>> able to get an execution DAG that has the UnboundedSource, but then more
>>> and more issues appeared (probably related to missing LengthPrefixCoder
>>> somewhere - maybe at the output from the primitive Read). I wanted to
>>> create a test for it and I found out, that there actually is
>>> ReadSourcePortableTest in FlinkRunner, but _it tests nothing_. The problem
>>> is that Read is transformed to SDF, so this test tests the SDF, not the
>>> Read transform. As a result, the Read transform does not work.
>>> >>
>>> >> I tried using convertReadBasedSplittableDoFnsToPrimitiveReads so that
>>> I could make the test fail and debug that, but I got into
>>> >>
>>> >> java.lang.IllegalArgumentException: PCollectionNodes
>>> [PCollectionNode{id=PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output,
>>> PCollection=unique_name:
>>> "PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output"
>>> >> coder_id: "IterableCoder"
>>> >> is_bounded: BOUNDED
>>> >> windowing_strategy_id: "WindowingStrategy(GlobalWindows)"
>>> >> }] were consumed but never produced
>>> >>
>>> >>
>>> >> which gave me the last knock-out. :)
>>> >>
>>> >> My current impression is that starting from Beam 2.25.0, portable
>>> FlinkRunner is not able to read from Kafka. Could someone give me a hint
>>> about what is wrong with using
>>> convertReadBasedSplittableDoFnsToPrimitiveReads in the test [3]?
>>> >>
>>> >>   Jan
>>> >>
>>> >> [1] https://issues.apache.org/jira/browse/BEAM-11991
>>> >>
>>> >> [2] https://issues.apache.org/jira/browse/BEAM-11998
>>> >>
>>> >> [3] https://github.com/apache/beam/pull/15181
>>>
>>

Reply via email to