On Sun, Jul 25, 2021 at 6:33 AM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
I'll start from the end.
I don't think we should be breaking language agnostic API layers
(for example, definition of model coders) just to support
"use_deprecated_read".
"Breaking" and "fixing" can only be a matter of the definition of
the object at hand. I don't think, that Coder can be totally
language agnostic - yes, the mapping between serialized form and
deserialized form can be _defined_ in a language agnostic way, but
must be_implemented_ in a specific language. If we choose the
implementing language, what makes us treat SDK-specific coders
defined by the SDK of the same language as "unknown"? It is only
our decision, that seems to have no practical benefits.
In general, language-neutral APIs and protocols are a key feature of
portable Beam. See here: https://beam.apache.org/roadmap/portability/
<https://beam.apache.org/roadmap/portability/>
(I did not look into all the old discussions and votes related to this
but I'm sure they are there)
Moreover, including SDK-specific coders into supported coders of
the SDK runner construction counterpart (that is, runner
core-construction-java for Java SDK) is a necessary prerequisite
for unifying "classical" and "portable" runners, because the
runner needs to understand *all* SDK coders so that it can
_inline_ the complete Pipeline (if the Pipeline SDK has the same
language as the runner), instead of running it through SDK
harness. This need therefore is not specific to supporting
use_deprecated_read, but is a generic requirement, which only has
the first manifestation in the support of a transform not
supported by SDK harness.
I think "use_deprecated_read" should be considered a stop-gap
measure for Flink (and Spark ?) till we have proper support for
SDF. In fact I don't think an arbitrary portable runner can
support "use_deprecated_read" due to the following.
There seems to be nothing special about Flink regarding the
support of primitive Read. I think any Java native runner can
implement it pretty much the same way as Flink does. The question
is if any other runner might want to do that. The problem with
Flink is that
Not all runners are implemented using Java. For example, the portable
DirectRunner (FnAPI runner) is implemented using Python and Dataflow
is implemented using C++. Such runners will not be able to do this.
1) portable SDF seems not to work [1]
2) even classical Flink runner has still issues with SDF - there
are reports of watermark being stuck when reading data via SDF,
this gets resolved using use_deprecated_read
3) Flink actually does not have any benefits from SDF, because it
cannot make use of the dynamic splitting, so this actually brings
only implementation burden without any practical benefit
Similarly, I think there were previous discussions related to using
SDF as the source framework for portable runners.
I understand that there are some bugs related to SDF and portable
Flink currently. How much work do you think is needed here ? Will it
be better to focus our efforts on fixing remaining issues for SDF and
portable runners instead of supporting "use_deprecated_read" for that
path ? Note that I'm fine with fixing any issues related to
"use_deprecated_read" for classic (non-portable) Flink but I think you
are trying to use x-lang hence probably need portable Flink.
Thanks,
Cham
I think that we should reiterate on the decision of deprecating
Read - if we can implement it via SDF, what is the reason to
forbid a runner to make use of a simpler implementation? The
expansion of Read might be runner dependent, that is something we
do all the time, or am I missing something?
Jan
[1] https://issues.apache.org/jira/browse/BEAM-10940
<https://issues.apache.org/jira/browse/BEAM-10940>
On 7/25/21 1:38 AM, Chamikara Jayalath wrote:
I think we might be going down a bit of a rabbit hole with the
support for "use_deprecated_read" for portable Flink :)
I think "use_deprecated_read" should be considered a stop-gap
measure for Flink (and Spark ?) till we have proper support for
SDF. In fact I don't think an arbitrary portable runner can
support "use_deprecated_read" due to the following.
(1) SDK Harness is not aware of BoundedSource/UnboundedSource.
Only source framework SDK Harness is aware of is SDF.
(2) Invoking BoundedSource/UnboundedSource is not a part of the
Fn API
(3) A non-Java Beam portable runner will probably not be able to
directly invoke legacy Read transforms similar to the way Flink
does today.
I don't think we should be breaking language agnostic API layers
(for example, definition of model coders) just to support
"use_deprecated_read".
Thanks,
Cham
On Sat, Jul 24, 2021 at 11:50 AM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
On 7/24/21 12:34 AM, Robert Bradshaw wrote:
> On Thu, Jul 22, 2021 at 10:20 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
>> Hi,
>>
>> this was a ride. But I managed to get that working. I'd
like to discuss two points, though:
>>
>> a) I had to push Java coders to ModelCoders for Java
(which makes sense to me, but is that correct?). See [1]. It
is needed so that the Read transform (executed directly in
TaskManager) can correctly communicate with Java SDK harness
using custom coders (which is tested here [2]).
> I think the intent was that ModelCoders represent the set of
> language-agnostic in the model, though I have to admit I've
always
> been a bit fuzzy on when a coder must or must not be in
that list.
I think that this definition works as long, as runner does
not itself
interfere with the Pipeline. Once the runner starts (by
itself, not via
SdkHarnessClient) producing data, it starts to be part of the
environment, and therefore it should understand its own
Coders. I'd
propose the definition of "model coders" to be Coders that
the SDK is
able to understand, which then works naturally for the
ModelCoders
located in "core-construction-java", that it should
understand Javs SDK
Coders.
>
>> b) I'd strongly prefer if we moved the handling of
use_deprecated_read from outside of the Read PTransform
directly into expand method, see [3]. Though this is not
needed for the Read on Flink to work, it seems cleaner.
>>
>> WDYT?
> The default value of use_deprecated_read should depend on
the runner
> (e.g. some runners don't work well with it, others require
it). As
> such should not be visible to the PTransform's expand.
I think we should know what is the expected outcome. If a
runner does
not support primitive Read (and therefore
use_deprecated_read), what
should we do, if we have such experiment set? Should the
Pipeline fail,
or should it be silently ignored? I think that we should
fail, because
user expects something that cannot be fulfilled. Therefore,
we have two
options - handling the experiment explicitly in runners that
do not
support it, or handle it explicitly in all cases (both
supported and
unsupported). The latter case is when we force runners to
call explicit
conversion method (convertPrimitiveRead....). Every runner
that does not
support primitive Read must handle the experiment either way,
because
otherwise the experiment would be simply silently ignored,
which is not
exactly user-friendly.
>
>> Jan
>>
>> [1]
https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375
<https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375>
>>
>> [2]
https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201
<https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201>
>>
>> [3]
https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb
<https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb>
>>
>> On 7/18/21 6:29 PM, Jan Lukavský wrote:
>>
>> Hi,
>>
>> I was debugging the issue and it relates to pipeline
fusion - it seems that the primitive Read transform gets
fused and then is 'missing' as source. I'm a little lost in
the code, but the most strange parts are that:
>>
>> a) I tried to reject fusion of primitive Read by adding
GreedyPCollectionFusers::cannotFuse for
PTransformTranslation.READ_TRANSFORM_URN to
GreedyPCollectionFusers.URN_FUSIBILITY_CHECKERS, but that
didn't change the exception
>>
>> b) I tried adding Reshuffle.viaRandomKey between Read
and PAssert, but that didn't change it either
>>
>> c) when I run portable Pipeline with use_deprecated_read
on Flink it actually runs (though it fails when it actually
reads any data, but if the input is empty, the job runs), so
it does not hit the same issue, which is a mystery to me
>>
>> If anyone has any pointers that I can investigate, I'd be
really grateful.
>>
>> Thanks in advance,
>>
>> Jan
>>
>>
>>
>> On 7/16/21 2:00 PM, Jan Lukavský wrote:
>>
>> Hi,
>>
>> I hit another issue with the portable Flink runner. Long
story short - reading from Kafka is not working in portable
Flink. After solving issues with expansion service
configuration (ability to add use_deprecated_read) option,
because flink portable runner has issues with SDF [1], [2].
After being able to inject the use_deprecated_read into
expansion service I was able to get an execution DAG that has
the UnboundedSource, but then more and more issues appeared
(probably related to missing LengthPrefixCoder somewhere -
maybe at the output from the primitive Read). I wanted to
create a test for it and I found out, that there actually is
ReadSourcePortableTest in FlinkRunner, but _it tests
nothing_. The problem is that Read is transformed to SDF, so
this test tests the SDF, not the Read transform. As a result,
the Read transform does not work.
>>
>> I tried using
convertReadBasedSplittableDoFnsToPrimitiveReads so that I
could make the test fail and debug that, but I got into
>>
>> java.lang.IllegalArgumentException: PCollectionNodes
[PCollectionNode{id=PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output,
PCollection=unique_name:
"PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output"
>> coder_id: "IterableCoder"
>> is_bounded: BOUNDED
>> windowing_strategy_id: "WindowingStrategy(GlobalWindows)"
>> }] were consumed but never produced
>>
>>
>> which gave me the last knock-out. :)
>>
>> My current impression is that starting from Beam 2.25.0,
portable FlinkRunner is not able to read from Kafka. Could
someone give me a hint about what is wrong with using
convertReadBasedSplittableDoFnsToPrimitiveReads in the test [3]?
>>
>> Jan
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-11991
<https://issues.apache.org/jira/browse/BEAM-11991>
>>
>> [2] https://issues.apache.org/jira/browse/BEAM-11998
<https://issues.apache.org/jira/browse/BEAM-11998>
>>
>> [3] https://github.com/apache/beam/pull/15181
<https://github.com/apache/beam/pull/15181>