On 8/30/21 10:57 PM, Luke Cwik wrote:
If you want to say that Flink understands all Java SDK encodings then you want to declare that the Portable Flink runner understands the beam:coders:javasdk:0.1 specification since this is the default URN for things that aren't portable. Note that "beam:coders:javasdk:0.1" is not really specified beyond "a Java serializable object using the environments classpath that this coder is used" which isn't really a concrete spec.
There are problems with that. I changes ModelCoders to understand beam:coders:javasdk:0.1, but that is wired in multiple places and causes a bunch of apparently unrelated tests to fail (some of which do not fail, but are simply "stuck" without any error or any progress). There seems to be no simple way of a runner independently specifying a set of coders it understands - the code relies directly on ModelCoders on multiple places. Moreover, it makes little sense for a runner to specify a set of coders it understands, because that is not defined by the runner but the SDK the runner is written in.
Will you solve the class loading issue now (since the Flink instance may be running multiple pipelines each with different versions of the SDK and different dependencies)?
Flink solves that out of the box, it uses separate classloader for every submitted job.

Note that each SDK instance and runner instance have their own set of "encodings" they understand and right now for convenience it has been that Java based runners are effectively using the "known" coders list from the Java SDK but that may not be the case in the future and is definitely not the case if you assume that a user may try to use a newer Beam Java SDK version against an existing Java based portable runner that has been deployed somewhere.
This is probably a deeper discussion about what requirements we need to fulfill in order to be able to unify portable and classical runners. Classical runners are effectively portable runners that inline all the transform expansion into the EMBEDDED environment and use all the possible optimizations that comes from the knowledge that the SDK of the pipeline is the same as SDK of the runner. The fact that all SDK coders must be known to the runner immediately follow from that.

On Sun, Aug 22, 2021 at 1:47 PM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Hi,

    looks like I iterated to a solution [1]. The change should be the
    minimal, there seem to be no (relevant) changes needed in core.
    Almost
    everything is located in the code of the FlinkRunner. There is still
    something weird, which probably signals a bug somewhere. Without this
    statement [2] the test fails with the already mentioned exception of
    "PCollection being consumed but never produced".

    Could anyone help with both reviewing and possibly suggesting what
    could
    be causing the exception?

      Jan

    [1] https://github.com/apache/beam/pull/15370
    <https://github.com/apache/beam/pull/15370>

    [2]
    
https://github.com/apache/beam/pull/15370/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R111
    
<https://github.com/apache/beam/pull/15370/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R111>

    On 8/20/21 5:27 PM, Jan Lukavský wrote:
    > Hi,
    >
    > I've tried to build a better understanding of what is really
    happening
    > and how, could someone validate my lines of thinking?
    >
    >  a) under normal circumstances ExecutableStage has two pieces - a
    > runner side and SDK side, passing data between these two is done
    over
    > gRPC channel, the runner side is not supposed to understand the
    data
    > and therefore runners-core-construction-java replaces coders for
    > passing data between SDK harness and the runner with
    > LengthPrefixCoder(ByteArrayCoder) - that means that the data is
    passed
    > as opaque bytes
    >

    >  b) the proto representation of the Pipeline contains the actual
    > coders, without the specifying how should the data be passed
    between
    > SDK harness and the runner (which seems correct, only the runner
    knows
    > the environment, and it is therefore the runner's duty to build the
    > actual wire harness coders)
    >
    >  c) because of that, there are utility classes that inject
    > LengthPrefixCoder where appropriate - most of the code is in
    > WireCoders, but unfortunately ProcessBundleDescriptors does some
    work
    > in this regard as well
    >


a, b, and c are spot on.

Just to rephrase the idea here is that there are certain requirements in the graph and encodings the runner needs to be able to handle to be able to perform certain operations (e.g. needs to be able to split the "key" and "value" from a KV for a GBK). You could imagine that a runner would want to optimize a SQL transform and then would need to possibly understand the row encoding fully. The more coders the runner understands the more things it can do, for everything else it needs to abstract away by treating it as "unknown" data. See https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.j8zpiclux4id <https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.j8zpiclux4id> for slightly more details. The SDK has to state that it knows length prefix encoding V1 otherwise the runner doesn't know it can use it so the SDK declares all the encodings that have specifications that it understands even though they may not be used within the pipeline (these appear in the environment capabilities list).

    >  d) the problem arises when a runner decides to inline a PTransform
    > that was supposed to be ExecutableStage and run it within the
    context
    > of the runner - that is the case of Flink's primitive Read. In that
    > case the Coders of how the runner encodes the PCollection on output
    > from Read and how is then consumed in a (non-inlined)
    ExecutableStage
    > do not match.
    >

The runner is able to modify the pipeline proto before it becomes fused and converted into ExecutableStages. The idea is that the runner should look at the graph and replace any transforms it wants to execute directly with their own internal URN like "beam:flink:read:v1" extracting whatever it needs from the model pipeline. This should make it to the fuser which then figures out what needs to execute in the SDK and what executes in the runner. Finally the executable version is created via the URN -> runner specific implementation mapping.

    > I tried to modify the ModelCoders, or to patch
    > LengthPrefixUnknownCoders.addLengthPrefixedCoder, so that it can
    work
    > with the case when both ends (SDK and runner) are Java, but I
    always
    > hit an issue somewhere. I think that it is because the decision of
    > which "wire coder" to use is in this case no longer a function
    of pair
    > (coderId, SDK-side or runner-side), but of a tripple (coderId,
    > producer side, consumer side). That is if the collection should be
    > both produced and consumed in the runner environment, the coder
    should
    > be different than if it is produced in runner and consumed in
    > SDK-harness.
    >
    > Another option seems that when a PCollection is produced
    directly in a
    > runner, it should wrap it using LengthPrefixCoder (unless the coder
    > used is already a ModelCoder), which is what I'll try next. I'll be
    > grateful if someone verified that I understand the problem
    correctly
    > and that the solution with LengthPrefixCoder on output of Read
    should
    > work. The solution is somewhat suboptimal regarding performance,
    > because it wraps the coder with LengthPrefixCoder in the case where
    > all coders on the way are known and therefore the length prefix
    should
    > not be needed. But I think that we could live with this right
    now, at
    > least until some finer control of the in-out coders of
    ExecutableStage
    > is introduced.
    >
    > Thanks for any thoughts on this!
    >
    >  Jan
    >
    > On 8/1/21 8:33 PM, Jan Lukavský wrote:
    >> Hi,
    >>
    >> I have figured out another way of fixing the problem without
    >> modifying ModelCoders. It consists of creating a
    >> JavaSDKCoderTranslatorRegistrar [1] and fixing
    >> LengthPrefixUnknownCoders [2]. Would this be a better approach?
    >>
    >>  Jan
    >>
    >> [1]
    >>
    
https://github.com/apache/beam/pull/15181/files#diff-e4df94a4e799e14a76ada42506aacb8cb7567c84349acacd6126c64ed03de062R27
    
<https://github.com/apache/beam/pull/15181/files#diff-e4df94a4e799e14a76ada42506aacb8cb7567c84349acacd6126c64ed03de062R27>
    >>
    >> [2]
    >>
    
https://github.com/apache/beam/pull/15181/files#diff-64103a1eabf2872230e5df56cf02d535c4146f5a3f67c51c261433e4caa9a972R63
    
<https://github.com/apache/beam/pull/15181/files#diff-64103a1eabf2872230e5df56cf02d535c4146f5a3f67c51c261433e4caa9a972R63>
    >>
    >> On 7/29/21 7:54 PM, Jan Lukavský wrote:
    >>> On 7/29/21 6:45 PM, Robert Bradshaw wrote:
    >>>
    >>>> On Thu, Jul 29, 2021 at 3:04 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:
    >>>>> Hi,
    >>>>>
    >>>>> I'd like to move the discussion of this topic further.
    Because it
    >>>>> seems that fixing the portable SDF is a larger work, I think
    there
    >>>>> are two options:
    >>>> +1
    >>>>
    >>>>>   a) extend the definition of model coders to include SDK
    coders
    >>>>> of the language that implement the model (that would mean
    that the
    >>>>> definition of model coder is not "language agnostic coders",
    but
    >>>>> "coders that a given SDK can instantiate"), or
    >>>>>
    >>>>>   b) make the model coders extensible so that a runner can
    modify
    >>>>> it - that would make it possible for each runner to have a
    >>>>> slightly different definition of these model coders
    >>>>>
    >>>>> I'm strongly in favor of a), but I can live with b) as well.
    >>>> We should probably just rename "ModelCoders" to
    >>>> "JavaCoders[Registrar]" and stick everything there.
    ModelCoders is not
    >>>> understood or used by anything but Java. (That or we just
    discard the
    >>>> whole ModelCoders thing and just let Coders define their own
    portable
    >>>> representations, possibly with a registration system.)
    >>> Coders must be Serializable, so it seems to me, that all Java
    Coders
    >>> are quite easily serialized and a registration is not exactly
    needed
    >>> for that. Renaming ModelCoders to Java(Portable)Coders looks
    good to
    >>> me.
    >>>>
    >>>>
    >>>>> Thanks in advance for any comments on this.
    >>>>>
    >>>>>   Jan
    >>>>>
    >>>>> On 7/25/21 8:59 PM, Jan Lukavský wrote:
    >>>>>
    >>>>> I didn't want to say that Flink should not support SDF. I
    only do
    >>>>> not see any benefits of it for a native streaming source - like
    >>>>> Kafka - without the ability to use dynamic splitting. The
    >>>>> potential benefits of composability and extensibility do not
    apply
    >>>>> here. Yes, it would be good to have as low number of source
    >>>>> transforms as possible. And another yes, there probably isn't
    >>>>> anything that would fundamentally disable Flink to correctly
    >>>>> support SDF. On the other hand, the current state is such we
    >>>>> cannot use KafkaIO in Flink. I think we should fix this by the
    >>>>> shortest possible path, because the technically correct
    solution
    >>>>> is currently unknown (at least to me, if anyone can give
    pointers
    >>>>> about how to fix the SDF, I'd be grateful).
    >>>>>
    >>>>> I still think that enabling a runner to support Read natively,
    >>>>> when appropriate, has value by itself. And it requires SDK
    Coders
    >>>>> to be 'known' to the runner, at least that was the result of my
    >>>>> tests.
    >>>>>
    >>>>> On 7/25/21 8:31 PM, Chamikara Jayalath wrote:
    >>>>>
    >>>>>
    >>>>>
    >>>>> On Sun, Jul 25, 2021 at 11:09 AM Jan Lukavský
    <je...@seznam.cz <mailto:je...@seznam.cz>>
    >>>>> wrote:
    >>>>>> In general, language-neutral APIs and protocols are a key
    feature
    >>>>>> of portable Beam.
    >>>>>>
    >>>>>> Yes, sure, that is well understood. But - language neutral
    APIs
    >>>>>> requires language neutral environment. That is why the
    portable
    >>>>>> Pipeline representation is built around protocol buffers and
    >>>>>> gRPC. That is truly language-neutral. Once we implement
    something
    >>>>>> around that - like in the case of ModelCoders.java - we use a
    >>>>>> specific language for that and the language-neutral part is
    >>>>>> already gone. The decision to include same-language-SDK coders
    >>>>>> into such language-specific object plays no role in the
    fact it
    >>>>>> already is language-specific.
    >>>>>>
    >>>>>> Not all runners are implemented using Java. For example, the
    >>>>>> portable DirectRunner (FnAPI runner) is implemented using
    Python
    >>>>>> and Dataflow is implemented using C++. Such runners will
    not be
    >>>>>> able to do this.
    >>>>>>
    >>>>>> Yes, I'm aware of that and that is why I said "any Java native
    >>>>>> runner". It is true, that non-Java runners *must* (as long
    as we
    >>>>>> don't include Read into SDK harness) resort to expanding it to
    >>>>>> SDF. That is why use_deprecated_read is invalid setting for
    such
    >>>>>> runner and should be handled accordingly.
    >>>>>>
    >>>>>> Similarly, I think there were previous discussions related to
    >>>>>> using SDF as the source framework for portable runners.
    >>>>>>
    >>>>>> Don't get me wrong, I'm not trying to revoke this decision. On
    >>>>>> the other hand I still think that the decision to use SDF
    >>>>>> implementation of Read or not should be left to the runner.
    >>>>>>
    >>>>>> I understand that there are some bugs related to SDF and
    portable
    >>>>>> Flink currently. How much work do you think is needed here
    ? Will
    >>>>>> it be better to focus our efforts on fixing remaining
    issues for
    >>>>>> SDF and portable runners instead of supporting
    >>>>>> "use_deprecated_read" for that path ?
    >>>>>>
    >>>>>> I'm not sure. I don't know portability and the SDK harness
    well
    >>>>>> enough to be able to answer this. But we should really know
    why
    >>>>>> we do that. What exactly does SDF bring to the Flink runner
    (and
    >>>>>> let's leave Flink aside of this - what does it bring to
    runners
    >>>>>> that cannot make use of dynamic splitting, being it
    admittedly a
    >>>>>> very cool feature)? Yes, supporting Java Read makes it
    impossible
    >>>>>> to implement it in Python. But practically, I think that
    most of
    >>>>>> the Pipelines will use x-lang for that. It makes very much
    sense
    >>>>>> to offload IOs to a more performant environment.
    >>>>>
    >>>>> A bit old, but please see the following for the benefits of SDF
    >>>>> and the motivation for it.
    >>>>>
    >>>>> https://beam.apache.org/blog/splittable-do-fn/
    <https://beam.apache.org/blog/splittable-do-fn/>
    >>>>> https://s.apache.org/splittable-do-fn
    <https://s.apache.org/splittable-do-fn>
    >>>>>
    >>>>> Thanks,
    >>>>> Cham
    >>>>>
    >>>>>>   Jan
    >>>>>>
    >>>>>> On 7/25/21 6:54 PM, Chamikara Jayalath wrote:
    >>>>>>
    >>>>>>
    >>>>>>
    >>>>>> On Sun, Jul 25, 2021 at 6:33 AM Jan Lukavský
    <je...@seznam.cz <mailto:je...@seznam.cz>>
    >>>>>> wrote:
    >>>>>>> I'll start from the end.
    >>>>>>>
    >>>>>>> I don't think we should be breaking language agnostic API
    layers
    >>>>>>> (for example, definition of model coders) just to support
    >>>>>>> "use_deprecated_read".
    >>>>>>>
    >>>>>>> "Breaking" and "fixing" can only be a matter of the
    definition
    >>>>>>> of the object at hand. I don't think, that Coder can be
    totally
    >>>>>>> language agnostic - yes, the mapping between serialized
    form and
    >>>>>>> deserialized form can be _defined_ in a language agnostic
    way,
    >>>>>>> but must be_implemented_ in a specific language. If we choose
    >>>>>>> the implementing language, what makes us treat SDK-specific
    >>>>>>> coders defined by the SDK of the same language as
    "unknown"? It
    >>>>>>> is only our decision, that seems to have no practical
    benefits.
    >>>>>>
    >>>>>> In general, language-neutral APIs and protocols are a key
    feature
    >>>>>> of portable Beam. See here:
    >>>>>> https://beam.apache.org/roadmap/portability/
    <https://beam.apache.org/roadmap/portability/>
    >>>>>> (I did not look into all the old discussions and votes
    related to
    >>>>>> this but I'm sure they are there)
    >>>>>>
    >>>>>>> Moreover, including SDK-specific coders into supported
    coders of
    >>>>>>> the SDK runner construction counterpart (that is, runner
    >>>>>>> core-construction-java for Java SDK) is a necessary
    prerequisite
    >>>>>>> for unifying "classical" and "portable" runners, because the
    >>>>>>> runner needs to understand *all* SDK coders so that it can
    >>>>>>> _inline_ the complete Pipeline (if the Pipeline SDK has
    the same
    >>>>>>> language as the runner), instead of running it through SDK
    >>>>>>> harness. This need therefore is not specific to supporting
    >>>>>>> use_deprecated_read, but is a generic requirement, which only
    >>>>>>> has the first manifestation in the support of a transform not
    >>>>>>> supported by SDK harness.
    >>>>>>>
    >>>>>>> I think "use_deprecated_read" should be considered a stop-gap
    >>>>>>> measure for Flink (and Spark ?) till we have proper
    support for
    >>>>>>> SDF. In fact I don't think an arbitrary portable runner can
    >>>>>>> support "use_deprecated_read" due to the following.
    >>>>>>>
    >>>>>>> There seems to be nothing special about Flink regarding the
    >>>>>>> support of primitive Read. I think any Java native runner can
    >>>>>>> implement it pretty much the same way as Flink does. The
    >>>>>>> question is if any other runner might want to do that. The
    >>>>>>> problem with Flink is that
    >>>>>>
    >>>>>> Not all runners are implemented using Java. For example, the
    >>>>>> portable DirectRunner (FnAPI runner) is implemented using
    Python
    >>>>>> and Dataflow is implemented using C++. Such runners will
    not be
    >>>>>> able to do this.
    >>>>>>>   1) portable SDF seems not to work [1]
    >>>>>>>
    >>>>>>>   2) even classical Flink runner has still issues with SDF -
    >>>>>>> there are reports of watermark being stuck when reading
    data via
    >>>>>>> SDF, this gets resolved using use_deprecated_read
    >>>>>>>
    >>>>>>>   3) Flink actually does not have any benefits from SDF,
    because
    >>>>>>> it cannot make use of the dynamic splitting, so this actually
    >>>>>>> brings only implementation burden without any practical
    benefit
    >>>>>> Similarly, I think there were previous discussions related to
    >>>>>> using SDF as the source framework for portable runners.
    >>>>>> I understand that there are some bugs related to SDF and
    portable
    >>>>>> Flink currently. How much work do you think is needed here
    ? Will
    >>>>>> it be better to focus our efforts on fixing remaining
    issues for
    >>>>>> SDF and portable runners instead of supporting
    >>>>>> "use_deprecated_read" for that path ? Note that I'm fine with
    >>>>>> fixing any issues related to "use_deprecated_read" for classic
    >>>>>> (non-portable) Flink but I think you are trying to use x-lang
    >>>>>> hence probably need portable Flink.
    >>>>>>
    >>>>>> Thanks,
    >>>>>> Cham
    >>>>>>
    >>>>>>> I think that we should reiterate on the decision of
    deprecating
    >>>>>>> Read - if we can implement it via SDF, what is the reason to
    >>>>>>> forbid a runner to make use of a simpler implementation? The
    >>>>>>> expansion of Read might be runner dependent, that is
    something
    >>>>>>> we do all the time, or am I missing something?
    >>>>>>>
    >>>>>>>   Jan
    >>>>>>>
    >>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-10940
    <https://issues.apache.org/jira/browse/BEAM-10940>
    >>>>>>>
    >>>>>>> On 7/25/21 1:38 AM, Chamikara Jayalath wrote:
    >>>>>>>
    >>>>>>> I think we might be going down a bit of a rabbit hole with
    the
    >>>>>>> support for "use_deprecated_read" for portable Flink :)
    >>>>>>>
    >>>>>>> I think "use_deprecated_read" should be considered a stop-gap
    >>>>>>> measure for Flink (and Spark ?) till we have proper
    support for
    >>>>>>> SDF. In fact I don't think an arbitrary portable runner can
    >>>>>>> support "use_deprecated_read" due to the following.
    >>>>>>>
    >>>>>>> (1) SDK Harness is not aware of
    BoundedSource/UnboundedSource.
    >>>>>>> Only source framework SDK Harness is aware of is SDF.
    >>>>>>> (2) Invoking BoundedSource/UnboundedSource is not a part
    of the
    >>>>>>> Fn API
    >>>>>>> (3) A non-Java Beam portable runner will probably not be
    able to
    >>>>>>> directly invoke legacy Read transforms similar to the way
    Flink
    >>>>>>> does today.
    >>>>>>>
    >>>>>>> I don't think we should be breaking language agnostic API
    layers
    >>>>>>> (for example, definition of model coders) just to support
    >>>>>>> "use_deprecated_read".
    >>>>>>>
    >>>>>>> Thanks,
    >>>>>>> Cham
    >>>>>>>
    >>>>>>> On Sat, Jul 24, 2021 at 11:50 AM Jan Lukavský
    <je...@seznam.cz <mailto:je...@seznam.cz>>
    >>>>>>> wrote:
    >>>>>>>> On 7/24/21 12:34 AM, Robert Bradshaw wrote:
    >>>>>>>>
    >>>>>>>>>    On Thu, Jul 22, 2021 at 10:20 AM Jan Lukavský
    >>>>>>>>> <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
    >>>>>>>>>> Hi,
    >>>>>>>>>>
    >>>>>>>>>> this was a ride. But I managed to get that working. I'd
    like
    >>>>>>>>>> to discuss two points, though:
    >>>>>>>>>>
    >>>>>>>>>>    a) I had to push Java coders to ModelCoders for Java
    >>>>>>>>>> (which makes sense to me, but is that correct?). See
    [1]. It
    >>>>>>>>>> is needed so that the Read transform (executed directly in
    >>>>>>>>>> TaskManager) can correctly communicate with Java SDK
    harness
    >>>>>>>>>> using custom coders (which is tested here [2]).
    >>>>>>>>> I think the intent was that ModelCoders represent the set of
    >>>>>>>>> language-agnostic in the model, though I have to admit I've
    >>>>>>>>> always
    >>>>>>>>> been a bit fuzzy on when a coder must or must not be in
    that
    >>>>>>>>> list.
    >>>>>>>> I think that this definition works as long, as runner
    does not
    >>>>>>>> itself
    >>>>>>>> interfere with the Pipeline. Once the runner starts (by
    itself,
    >>>>>>>> not via
    >>>>>>>> SdkHarnessClient) producing data, it starts to be part of the
    >>>>>>>> environment, and therefore it should understand its own
    Coders.
    >>>>>>>> I'd
    >>>>>>>> propose the definition of "model coders" to be Coders
    that the
    >>>>>>>> SDK is
    >>>>>>>> able to understand, which then works naturally for the
    ModelCoders
    >>>>>>>> located in "core-construction-java", that it should
    understand
    >>>>>>>> Javs SDK
    >>>>>>>> Coders.
    >>>>>>>>>>    b) I'd strongly prefer if we moved the handling of
    >>>>>>>>>> use_deprecated_read from outside of the Read PTransform
    >>>>>>>>>> directly into expand method, see [3]. Though this is not
    >>>>>>>>>> needed for the Read on Flink to work, it seems cleaner.
    >>>>>>>>>>
    >>>>>>>>>> WDYT?
    >>>>>>>>> The default value of use_deprecated_read should depend
    on the
    >>>>>>>>> runner
    >>>>>>>>> (e.g. some runners don't work well with it, others require
    >>>>>>>>> it). As
    >>>>>>>>> such should not be visible to the PTransform's expand.
    >>>>>>>> I think we should know what is the expected outcome. If a
    >>>>>>>> runner does
    >>>>>>>> not support primitive Read (and therefore
    use_deprecated_read),
    >>>>>>>> what
    >>>>>>>> should we do, if we have such experiment set? Should the
    >>>>>>>> Pipeline fail,
    >>>>>>>> or should it be silently ignored? I think that we should
    fail,
    >>>>>>>> because
    >>>>>>>> user expects something that cannot be fulfilled.
    Therefore, we
    >>>>>>>> have two
    >>>>>>>> options - handling the experiment explicitly in runners
    that do
    >>>>>>>> not
    >>>>>>>> support it, or handle it explicitly in all cases (both
    >>>>>>>> supported and
    >>>>>>>> unsupported). The latter case is when we force runners to
    call
    >>>>>>>> explicit
    >>>>>>>> conversion method (convertPrimitiveRead....). Every
    runner that
    >>>>>>>> does not
    >>>>>>>> support primitive Read must handle the experiment either
    way,
    >>>>>>>> because
    >>>>>>>> otherwise the experiment would be simply silently ignored,
    >>>>>>>> which is not
    >>>>>>>> exactly user-friendly.
    >>>>>>>>>>    Jan
    >>>>>>>>>>
    >>>>>>>>>> [1]
    >>>>>>>>>>
    
https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375
    
<https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375>
    >>>>>>>>>>
    >>>>>>>>>> [2]
    >>>>>>>>>>
    
https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201
    
<https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201>
    >>>>>>>>>>
    >>>>>>>>>> [3]
    >>>>>>>>>>
    
https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb
    
<https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb>
    >>>>>>>>>>
    >>>>>>>>>> On 7/18/21 6:29 PM, Jan Lukavský wrote:
    >>>>>>>>>>
    >>>>>>>>>> Hi,
    >>>>>>>>>>
    >>>>>>>>>> I was debugging the issue and it relates to pipeline
    fusion -
    >>>>>>>>>> it seems that the primitive Read transform gets fused and
    >>>>>>>>>> then is 'missing' as source. I'm a little lost in the
    code,
    >>>>>>>>>> but the most strange parts are that:
    >>>>>>>>>>
    >>>>>>>>>>    a) I tried to reject fusion of primitive Read by adding
    >>>>>>>>>> GreedyPCollectionFusers::cannotFuse for
    >>>>>>>>>> PTransformTranslation.READ_TRANSFORM_URN to
    >>>>>>>>>> GreedyPCollectionFusers.URN_FUSIBILITY_CHECKERS, but that
    >>>>>>>>>> didn't change the exception
    >>>>>>>>>>
    >>>>>>>>>>    b) I tried adding Reshuffle.viaRandomKey between
    Read and
    >>>>>>>>>> PAssert, but that didn't change it either
    >>>>>>>>>>
    >>>>>>>>>>    c) when I run portable Pipeline with
    use_deprecated_read
    >>>>>>>>>> on Flink it actually runs (though it fails when it
    actually
    >>>>>>>>>> reads any data, but if the input is empty, the job
    runs), so
    >>>>>>>>>> it does not hit the same issue, which is a mystery to me
    >>>>>>>>>>
    >>>>>>>>>> If anyone has any pointers that I can investigate, I'd be
    >>>>>>>>>> really grateful.
    >>>>>>>>>>
    >>>>>>>>>> Thanks in advance,
    >>>>>>>>>>
    >>>>>>>>>>    Jan
    >>>>>>>>>>
    >>>>>>>>>>
    >>>>>>>>>>
    >>>>>>>>>> On 7/16/21 2:00 PM, Jan Lukavský wrote:
    >>>>>>>>>>
    >>>>>>>>>> Hi,
    >>>>>>>>>>
    >>>>>>>>>> I hit another issue with the portable Flink runner. Long
    >>>>>>>>>> story short - reading from Kafka is not working in
    portable
    >>>>>>>>>> Flink. After solving issues with expansion service
    >>>>>>>>>> configuration (ability to add use_deprecated_read) option,
    >>>>>>>>>> because flink portable runner has issues with SDF [1],
    [2].
    >>>>>>>>>> After being able to inject the use_deprecated_read into
    >>>>>>>>>> expansion service I was able to get an execution DAG
    that has
    >>>>>>>>>> the UnboundedSource, but then more and more issues
    appeared
    >>>>>>>>>> (probably related to missing LengthPrefixCoder somewhere -
    >>>>>>>>>> maybe at the output from the primitive Read). I wanted to
    >>>>>>>>>> create a test for it and I found out, that there
    actually is
    >>>>>>>>>> ReadSourcePortableTest in FlinkRunner, but _it tests
    >>>>>>>>>> nothing_. The problem is that Read is transformed to
    SDF, so
    >>>>>>>>>> this test tests the SDF, not the Read transform. As a
    result,
    >>>>>>>>>> the Read transform does not work.
    >>>>>>>>>>
    >>>>>>>>>> I tried using
    convertReadBasedSplittableDoFnsToPrimitiveReads
    >>>>>>>>>> so that I could make the test fail and debug that, but
    I got
    >>>>>>>>>> into
    >>>>>>>>>>
    >>>>>>>>>> java.lang.IllegalArgumentException: PCollectionNodes
    >>>>>>>>>>
    
[PCollectionNode{id=PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output,

    >>>>>>>>>> PCollection=unique_name:
    >>>>>>>>>>
    
"PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output"
    >>>>>>>>>> coder_id: "IterableCoder"
    >>>>>>>>>> is_bounded: BOUNDED
    >>>>>>>>>> windowing_strategy_id: "WindowingStrategy(GlobalWindows)"
    >>>>>>>>>> }] were consumed but never produced
    >>>>>>>>>>
    >>>>>>>>>>
    >>>>>>>>>> which gave me the last knock-out. :)
    >>>>>>>>>>
    >>>>>>>>>> My current impression is that starting from Beam 2.25.0,
    >>>>>>>>>> portable FlinkRunner is not able to read from Kafka. Could
    >>>>>>>>>> someone give me a hint about what is wrong with using
    >>>>>>>>>> convertReadBasedSplittableDoFnsToPrimitiveReads in the
    test [3]?
    >>>>>>>>>>
    >>>>>>>>>>    Jan
    >>>>>>>>>>
    >>>>>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-11991
    <https://issues.apache.org/jira/browse/BEAM-11991>
    >>>>>>>>>>
    >>>>>>>>>> [2] https://issues.apache.org/jira/browse/BEAM-11998
    <https://issues.apache.org/jira/browse/BEAM-11998>
    >>>>>>>>>>
    >>>>>>>>>> [3] https://github.com/apache/beam/pull/15181
    <https://github.com/apache/beam/pull/15181>

Reply via email to