I'll start from the end.
I don't think we should be breaking language agnostic API layers (for
example, definition of model coders) just to support
"use_deprecated_read".
"Breaking" and "fixing" can only be a matter of the definition of the
object at hand. I don't think, that Coder can be totally language
agnostic - yes, the mapping between serialized form and deserialized
form can be _defined_ in a language agnostic way, but must
be_implemented_ in a specific language. If we choose the implementing
language, what makes us treat SDK-specific coders defined by the SDK of
the same language as "unknown"? It is only our decision, that seems to
have no practical benefits.
Moreover, including SDK-specific coders into supported coders of the SDK
runner construction counterpart (that is, runner core-construction-java
for Java SDK) is a necessary prerequisite for unifying "classical" and
"portable" runners, because the runner needs to understand *all* SDK
coders so that it can _inline_ the complete Pipeline (if the Pipeline
SDK has the same language as the runner), instead of running it through
SDK harness. This need therefore is not specific to supporting
use_deprecated_read, but is a generic requirement, which only has the
first manifestation in the support of a transform not supported by SDK
harness.
I think "use_deprecated_read" should be considered a stop-gap measure
for Flink (and Spark ?) till we have proper support for SDF. In fact I
don't think an arbitrary portable runner can support
"use_deprecated_read" due to the following.
There seems to be nothing special about Flink regarding the support of
primitive Read. I think any Java native runner can implement it pretty
much the same way as Flink does. The question is if any other runner
might want to do that. The problem with Flink is that
1) portable SDF seems not to work [1]
2) even classical Flink runner has still issues with SDF - there are
reports of watermark being stuck when reading data via SDF, this gets
resolved using use_deprecated_read
3) Flink actually does not have any benefits from SDF, because it
cannot make use of the dynamic splitting, so this actually brings only
implementation burden without any practical benefit
I think that we should reiterate on the decision of deprecating Read -
if we can implement it via SDF, what is the reason to forbid a runner to
make use of a simpler implementation? The expansion of Read might be
runner dependent, that is something we do all the time, or am I missing
something?
Jan
[1] https://issues.apache.org/jira/browse/BEAM-10940
On 7/25/21 1:38 AM, Chamikara Jayalath wrote:
I think we might be going down a bit of a rabbit hole with the support
for "use_deprecated_read" for portable Flink :)
I think "use_deprecated_read" should be considered a stop-gap measure
for Flink (and Spark ?) till we have proper support for SDF. In fact I
don't think an arbitrary portable runner can support
"use_deprecated_read" due to the following.
(1) SDK Harness is not aware of BoundedSource/UnboundedSource. Only
source framework SDK Harness is aware of is SDF.
(2) Invoking BoundedSource/UnboundedSource is not a part of the Fn API
(3) A non-Java Beam portable runner will probably not be able to
directly invoke legacy Read transforms similar to the way Flink does
today.
I don't think we should be breaking language agnostic API layers (for
example, definition of model coders) just to support
"use_deprecated_read".
Thanks,
Cham
On Sat, Jul 24, 2021 at 11:50 AM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
On 7/24/21 12:34 AM, Robert Bradshaw wrote:
> On Thu, Jul 22, 2021 at 10:20 AM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
>> Hi,
>>
>> this was a ride. But I managed to get that working. I'd like to
discuss two points, though:
>>
>> a) I had to push Java coders to ModelCoders for Java (which
makes sense to me, but is that correct?). See [1]. It is needed so
that the Read transform (executed directly in TaskManager) can
correctly communicate with Java SDK harness using custom coders
(which is tested here [2]).
> I think the intent was that ModelCoders represent the set of
> language-agnostic in the model, though I have to admit I've always
> been a bit fuzzy on when a coder must or must not be in that list.
I think that this definition works as long, as runner does not itself
interfere with the Pipeline. Once the runner starts (by itself,
not via
SdkHarnessClient) producing data, it starts to be part of the
environment, and therefore it should understand its own Coders. I'd
propose the definition of "model coders" to be Coders that the SDK is
able to understand, which then works naturally for the ModelCoders
located in "core-construction-java", that it should understand
Javs SDK
Coders.
>
>> b) I'd strongly prefer if we moved the handling of
use_deprecated_read from outside of the Read PTransform directly
into expand method, see [3]. Though this is not needed for the
Read on Flink to work, it seems cleaner.
>>
>> WDYT?
> The default value of use_deprecated_read should depend on the runner
> (e.g. some runners don't work well with it, others require it). As
> such should not be visible to the PTransform's expand.
I think we should know what is the expected outcome. If a runner does
not support primitive Read (and therefore use_deprecated_read), what
should we do, if we have such experiment set? Should the Pipeline
fail,
or should it be silently ignored? I think that we should fail,
because
user expects something that cannot be fulfilled. Therefore, we
have two
options - handling the experiment explicitly in runners that do not
support it, or handle it explicitly in all cases (both supported and
unsupported). The latter case is when we force runners to call
explicit
conversion method (convertPrimitiveRead....). Every runner that
does not
support primitive Read must handle the experiment either way, because
otherwise the experiment would be simply silently ignored, which
is not
exactly user-friendly.
>
>> Jan
>>
>> [1]
https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375
<https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375>
>>
>> [2]
https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201
<https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201>
>>
>> [3]
https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb
<https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb>
>>
>> On 7/18/21 6:29 PM, Jan Lukavský wrote:
>>
>> Hi,
>>
>> I was debugging the issue and it relates to pipeline fusion -
it seems that the primitive Read transform gets fused and then is
'missing' as source. I'm a little lost in the code, but the most
strange parts are that:
>>
>> a) I tried to reject fusion of primitive Read by adding
GreedyPCollectionFusers::cannotFuse for
PTransformTranslation.READ_TRANSFORM_URN to
GreedyPCollectionFusers.URN_FUSIBILITY_CHECKERS, but that didn't
change the exception
>>
>> b) I tried adding Reshuffle.viaRandomKey between Read and
PAssert, but that didn't change it either
>>
>> c) when I run portable Pipeline with use_deprecated_read on
Flink it actually runs (though it fails when it actually reads any
data, but if the input is empty, the job runs), so it does not hit
the same issue, which is a mystery to me
>>
>> If anyone has any pointers that I can investigate, I'd be
really grateful.
>>
>> Thanks in advance,
>>
>> Jan
>>
>>
>>
>> On 7/16/21 2:00 PM, Jan Lukavský wrote:
>>
>> Hi,
>>
>> I hit another issue with the portable Flink runner. Long story
short - reading from Kafka is not working in portable Flink. After
solving issues with expansion service configuration (ability to
add use_deprecated_read) option, because flink portable runner has
issues with SDF [1], [2]. After being able to inject the
use_deprecated_read into expansion service I was able to get an
execution DAG that has the UnboundedSource, but then more and more
issues appeared (probably related to missing LengthPrefixCoder
somewhere - maybe at the output from the primitive Read). I wanted
to create a test for it and I found out, that there actually is
ReadSourcePortableTest in FlinkRunner, but _it tests nothing_. The
problem is that Read is transformed to SDF, so this test tests the
SDF, not the Read transform. As a result, the Read transform does
not work.
>>
>> I tried using convertReadBasedSplittableDoFnsToPrimitiveReads
so that I could make the test fail and debug that, but I got into
>>
>> java.lang.IllegalArgumentException: PCollectionNodes
[PCollectionNode{id=PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output,
PCollection=unique_name:
"PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output"
>> coder_id: "IterableCoder"
>> is_bounded: BOUNDED
>> windowing_strategy_id: "WindowingStrategy(GlobalWindows)"
>> }] were consumed but never produced
>>
>>
>> which gave me the last knock-out. :)
>>
>> My current impression is that starting from Beam 2.25.0,
portable FlinkRunner is not able to read from Kafka. Could someone
give me a hint about what is wrong with using
convertReadBasedSplittableDoFnsToPrimitiveReads in the test [3]?
>>
>> Jan
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-11991
<https://issues.apache.org/jira/browse/BEAM-11991>
>>
>> [2] https://issues.apache.org/jira/browse/BEAM-11998
<https://issues.apache.org/jira/browse/BEAM-11998>
>>
>> [3] https://github.com/apache/beam/pull/15181
<https://github.com/apache/beam/pull/15181>