Hi,
looks like I iterated to a solution [1]. The change should be the
minimal, there seem to be no (relevant) changes needed in core. Almost
everything is located in the code of the FlinkRunner. There is still
something weird, which probably signals a bug somewhere. Without this
statement [2] the test fails with the already mentioned exception of
"PCollection being consumed but never produced".
Could anyone help with both reviewing and possibly suggesting what could
be causing the exception?
Jan
[1] https://github.com/apache/beam/pull/15370
[2]
https://github.com/apache/beam/pull/15370/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R111
On 8/20/21 5:27 PM, Jan Lukavský wrote:
Hi,
I've tried to build a better understanding of what is really happening
and how, could someone validate my lines of thinking?
a) under normal circumstances ExecutableStage has two pieces - a
runner side and SDK side, passing data between these two is done over
gRPC channel, the runner side is not supposed to understand the data
and therefore runners-core-construction-java replaces coders for
passing data between SDK harness and the runner with
LengthPrefixCoder(ByteArrayCoder) - that means that the data is passed
as opaque bytes
b) the proto representation of the Pipeline contains the actual
coders, without the specifying how should the data be passed between
SDK harness and the runner (which seems correct, only the runner knows
the environment, and it is therefore the runner's duty to build the
actual wire harness coders)
c) because of that, there are utility classes that inject
LengthPrefixCoder where appropriate - most of the code is in
WireCoders, but unfortunately ProcessBundleDescriptors does some work
in this regard as well
d) the problem arises when a runner decides to inline a PTransform
that was supposed to be ExecutableStage and run it within the context
of the runner - that is the case of Flink's primitive Read. In that
case the Coders of how the runner encodes the PCollection on output
from Read and how is then consumed in a (non-inlined) ExecutableStage
do not match.
I tried to modify the ModelCoders, or to patch
LengthPrefixUnknownCoders.addLengthPrefixedCoder, so that it can work
with the case when both ends (SDK and runner) are Java, but I always
hit an issue somewhere. I think that it is because the decision of
which "wire coder" to use is in this case no longer a function of pair
(coderId, SDK-side or runner-side), but of a tripple (coderId,
producer side, consumer side). That is if the collection should be
both produced and consumed in the runner environment, the coder should
be different than if it is produced in runner and consumed in
SDK-harness.
Another option seems that when a PCollection is produced directly in a
runner, it should wrap it using LengthPrefixCoder (unless the coder
used is already a ModelCoder), which is what I'll try next. I'll be
grateful if someone verified that I understand the problem correctly
and that the solution with LengthPrefixCoder on output of Read should
work. The solution is somewhat suboptimal regarding performance,
because it wraps the coder with LengthPrefixCoder in the case where
all coders on the way are known and therefore the length prefix should
not be needed. But I think that we could live with this right now, at
least until some finer control of the in-out coders of ExecutableStage
is introduced.
Thanks for any thoughts on this!
Jan
On 8/1/21 8:33 PM, Jan Lukavský wrote:
Hi,
I have figured out another way of fixing the problem without
modifying ModelCoders. It consists of creating a
JavaSDKCoderTranslatorRegistrar [1] and fixing
LengthPrefixUnknownCoders [2]. Would this be a better approach?
Jan
[1]
https://github.com/apache/beam/pull/15181/files#diff-e4df94a4e799e14a76ada42506aacb8cb7567c84349acacd6126c64ed03de062R27
[2]
https://github.com/apache/beam/pull/15181/files#diff-64103a1eabf2872230e5df56cf02d535c4146f5a3f67c51c261433e4caa9a972R63
On 7/29/21 7:54 PM, Jan Lukavský wrote:
On 7/29/21 6:45 PM, Robert Bradshaw wrote:
On Thu, Jul 29, 2021 at 3:04 AM Jan Lukavský <je...@seznam.cz> wrote:
Hi,
I'd like to move the discussion of this topic further. Because it
seems that fixing the portable SDF is a larger work, I think there
are two options:
+1
a) extend the definition of model coders to include SDK coders
of the language that implement the model (that would mean that the
definition of model coder is not "language agnostic coders", but
"coders that a given SDK can instantiate"), or
b) make the model coders extensible so that a runner can modify
it - that would make it possible for each runner to have a
slightly different definition of these model coders
I'm strongly in favor of a), but I can live with b) as well.
We should probably just rename "ModelCoders" to
"JavaCoders[Registrar]" and stick everything there. ModelCoders is not
understood or used by anything but Java. (That or we just discard the
whole ModelCoders thing and just let Coders define their own portable
representations, possibly with a registration system.)
Coders must be Serializable, so it seems to me, that all Java Coders
are quite easily serialized and a registration is not exactly needed
for that. Renaming ModelCoders to Java(Portable)Coders looks good to
me.
Thanks in advance for any comments on this.
Jan
On 7/25/21 8:59 PM, Jan Lukavský wrote:
I didn't want to say that Flink should not support SDF. I only do
not see any benefits of it for a native streaming source - like
Kafka - without the ability to use dynamic splitting. The
potential benefits of composability and extensibility do not apply
here. Yes, it would be good to have as low number of source
transforms as possible. And another yes, there probably isn't
anything that would fundamentally disable Flink to correctly
support SDF. On the other hand, the current state is such we
cannot use KafkaIO in Flink. I think we should fix this by the
shortest possible path, because the technically correct solution
is currently unknown (at least to me, if anyone can give pointers
about how to fix the SDF, I'd be grateful).
I still think that enabling a runner to support Read natively,
when appropriate, has value by itself. And it requires SDK Coders
to be 'known' to the runner, at least that was the result of my
tests.
On 7/25/21 8:31 PM, Chamikara Jayalath wrote:
On Sun, Jul 25, 2021 at 11:09 AM Jan Lukavský <je...@seznam.cz>
wrote:
In general, language-neutral APIs and protocols are a key feature
of portable Beam.
Yes, sure, that is well understood. But - language neutral APIs
requires language neutral environment. That is why the portable
Pipeline representation is built around protocol buffers and
gRPC. That is truly language-neutral. Once we implement something
around that - like in the case of ModelCoders.java - we use a
specific language for that and the language-neutral part is
already gone. The decision to include same-language-SDK coders
into such language-specific object plays no role in the fact it
already is language-specific.
Not all runners are implemented using Java. For example, the
portable DirectRunner (FnAPI runner) is implemented using Python
and Dataflow is implemented using C++. Such runners will not be
able to do this.
Yes, I'm aware of that and that is why I said "any Java native
runner". It is true, that non-Java runners *must* (as long as we
don't include Read into SDK harness) resort to expanding it to
SDF. That is why use_deprecated_read is invalid setting for such
runner and should be handled accordingly.
Similarly, I think there were previous discussions related to
using SDF as the source framework for portable runners.
Don't get me wrong, I'm not trying to revoke this decision. On
the other hand I still think that the decision to use SDF
implementation of Read or not should be left to the runner.
I understand that there are some bugs related to SDF and portable
Flink currently. How much work do you think is needed here ? Will
it be better to focus our efforts on fixing remaining issues for
SDF and portable runners instead of supporting
"use_deprecated_read" for that path ?
I'm not sure. I don't know portability and the SDK harness well
enough to be able to answer this. But we should really know why
we do that. What exactly does SDF bring to the Flink runner (and
let's leave Flink aside of this - what does it bring to runners
that cannot make use of dynamic splitting, being it admittedly a
very cool feature)? Yes, supporting Java Read makes it impossible
to implement it in Python. But practically, I think that most of
the Pipelines will use x-lang for that. It makes very much sense
to offload IOs to a more performant environment.
A bit old, but please see the following for the benefits of SDF
and the motivation for it.
https://beam.apache.org/blog/splittable-do-fn/
https://s.apache.org/splittable-do-fn
Thanks,
Cham
Jan
On 7/25/21 6:54 PM, Chamikara Jayalath wrote:
On Sun, Jul 25, 2021 at 6:33 AM Jan Lukavský <je...@seznam.cz>
wrote:
I'll start from the end.
I don't think we should be breaking language agnostic API layers
(for example, definition of model coders) just to support
"use_deprecated_read".
"Breaking" and "fixing" can only be a matter of the definition
of the object at hand. I don't think, that Coder can be totally
language agnostic - yes, the mapping between serialized form and
deserialized form can be _defined_ in a language agnostic way,
but must be_implemented_ in a specific language. If we choose
the implementing language, what makes us treat SDK-specific
coders defined by the SDK of the same language as "unknown"? It
is only our decision, that seems to have no practical benefits.
In general, language-neutral APIs and protocols are a key feature
of portable Beam. See here:
https://beam.apache.org/roadmap/portability/
(I did not look into all the old discussions and votes related to
this but I'm sure they are there)
Moreover, including SDK-specific coders into supported coders of
the SDK runner construction counterpart (that is, runner
core-construction-java for Java SDK) is a necessary prerequisite
for unifying "classical" and "portable" runners, because the
runner needs to understand *all* SDK coders so that it can
_inline_ the complete Pipeline (if the Pipeline SDK has the same
language as the runner), instead of running it through SDK
harness. This need therefore is not specific to supporting
use_deprecated_read, but is a generic requirement, which only
has the first manifestation in the support of a transform not
supported by SDK harness.
I think "use_deprecated_read" should be considered a stop-gap
measure for Flink (and Spark ?) till we have proper support for
SDF. In fact I don't think an arbitrary portable runner can
support "use_deprecated_read" due to the following.
There seems to be nothing special about Flink regarding the
support of primitive Read. I think any Java native runner can
implement it pretty much the same way as Flink does. The
question is if any other runner might want to do that. The
problem with Flink is that
Not all runners are implemented using Java. For example, the
portable DirectRunner (FnAPI runner) is implemented using Python
and Dataflow is implemented using C++. Such runners will not be
able to do this.
1) portable SDF seems not to work [1]
2) even classical Flink runner has still issues with SDF -
there are reports of watermark being stuck when reading data via
SDF, this gets resolved using use_deprecated_read
3) Flink actually does not have any benefits from SDF, because
it cannot make use of the dynamic splitting, so this actually
brings only implementation burden without any practical benefit
Similarly, I think there were previous discussions related to
using SDF as the source framework for portable runners.
I understand that there are some bugs related to SDF and portable
Flink currently. How much work do you think is needed here ? Will
it be better to focus our efforts on fixing remaining issues for
SDF and portable runners instead of supporting
"use_deprecated_read" for that path ? Note that I'm fine with
fixing any issues related to "use_deprecated_read" for classic
(non-portable) Flink but I think you are trying to use x-lang
hence probably need portable Flink.
Thanks,
Cham
I think that we should reiterate on the decision of deprecating
Read - if we can implement it via SDF, what is the reason to
forbid a runner to make use of a simpler implementation? The
expansion of Read might be runner dependent, that is something
we do all the time, or am I missing something?
Jan
[1] https://issues.apache.org/jira/browse/BEAM-10940
On 7/25/21 1:38 AM, Chamikara Jayalath wrote:
I think we might be going down a bit of a rabbit hole with the
support for "use_deprecated_read" for portable Flink :)
I think "use_deprecated_read" should be considered a stop-gap
measure for Flink (and Spark ?) till we have proper support for
SDF. In fact I don't think an arbitrary portable runner can
support "use_deprecated_read" due to the following.
(1) SDK Harness is not aware of BoundedSource/UnboundedSource.
Only source framework SDK Harness is aware of is SDF.
(2) Invoking BoundedSource/UnboundedSource is not a part of the
Fn API
(3) A non-Java Beam portable runner will probably not be able to
directly invoke legacy Read transforms similar to the way Flink
does today.
I don't think we should be breaking language agnostic API layers
(for example, definition of model coders) just to support
"use_deprecated_read".
Thanks,
Cham
On Sat, Jul 24, 2021 at 11:50 AM Jan Lukavský <je...@seznam.cz>
wrote:
On 7/24/21 12:34 AM, Robert Bradshaw wrote:
On Thu, Jul 22, 2021 at 10:20 AM Jan Lukavský
<je...@seznam.cz> wrote:
Hi,
this was a ride. But I managed to get that working. I'd like
to discuss two points, though:
a) I had to push Java coders to ModelCoders for Java
(which makes sense to me, but is that correct?). See [1]. It
is needed so that the Read transform (executed directly in
TaskManager) can correctly communicate with Java SDK harness
using custom coders (which is tested here [2]).
I think the intent was that ModelCoders represent the set of
language-agnostic in the model, though I have to admit I've
always
been a bit fuzzy on when a coder must or must not be in that
list.
I think that this definition works as long, as runner does not
itself
interfere with the Pipeline. Once the runner starts (by itself,
not via
SdkHarnessClient) producing data, it starts to be part of the
environment, and therefore it should understand its own Coders.
I'd
propose the definition of "model coders" to be Coders that the
SDK is
able to understand, which then works naturally for the ModelCoders
located in "core-construction-java", that it should understand
Javs SDK
Coders.
b) I'd strongly prefer if we moved the handling of
use_deprecated_read from outside of the Read PTransform
directly into expand method, see [3]. Though this is not
needed for the Read on Flink to work, it seems cleaner.
WDYT?
The default value of use_deprecated_read should depend on the
runner
(e.g. some runners don't work well with it, others require
it). As
such should not be visible to the PTransform's expand.
I think we should know what is the expected outcome. If a
runner does
not support primitive Read (and therefore use_deprecated_read),
what
should we do, if we have such experiment set? Should the
Pipeline fail,
or should it be silently ignored? I think that we should fail,
because
user expects something that cannot be fulfilled. Therefore, we
have two
options - handling the experiment explicitly in runners that do
not
support it, or handle it explicitly in all cases (both
supported and
unsupported). The latter case is when we force runners to call
explicit
conversion method (convertPrimitiveRead....). Every runner that
does not
support primitive Read must handle the experiment either way,
because
otherwise the experiment would be simply silently ignored,
which is not
exactly user-friendly.
Jan
[1]
https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375
[2]
https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201
[3]
https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb
On 7/18/21 6:29 PM, Jan Lukavský wrote:
Hi,
I was debugging the issue and it relates to pipeline fusion -
it seems that the primitive Read transform gets fused and
then is 'missing' as source. I'm a little lost in the code,
but the most strange parts are that:
a) I tried to reject fusion of primitive Read by adding
GreedyPCollectionFusers::cannotFuse for
PTransformTranslation.READ_TRANSFORM_URN to
GreedyPCollectionFusers.URN_FUSIBILITY_CHECKERS, but that
didn't change the exception
b) I tried adding Reshuffle.viaRandomKey between Read and
PAssert, but that didn't change it either
c) when I run portable Pipeline with use_deprecated_read
on Flink it actually runs (though it fails when it actually
reads any data, but if the input is empty, the job runs), so
it does not hit the same issue, which is a mystery to me
If anyone has any pointers that I can investigate, I'd be
really grateful.
Thanks in advance,
Jan
On 7/16/21 2:00 PM, Jan Lukavský wrote:
Hi,
I hit another issue with the portable Flink runner. Long
story short - reading from Kafka is not working in portable
Flink. After solving issues with expansion service
configuration (ability to add use_deprecated_read) option,
because flink portable runner has issues with SDF [1], [2].
After being able to inject the use_deprecated_read into
expansion service I was able to get an execution DAG that has
the UnboundedSource, but then more and more issues appeared
(probably related to missing LengthPrefixCoder somewhere -
maybe at the output from the primitive Read). I wanted to
create a test for it and I found out, that there actually is
ReadSourcePortableTest in FlinkRunner, but _it tests
nothing_. The problem is that Read is transformed to SDF, so
this test tests the SDF, not the Read transform. As a result,
the Read transform does not work.
I tried using convertReadBasedSplittableDoFnsToPrimitiveReads
so that I could make the test fail and debug that, but I got
into
java.lang.IllegalArgumentException: PCollectionNodes
[PCollectionNode{id=PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output,
PCollection=unique_name:
"PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output"
coder_id: "IterableCoder"
is_bounded: BOUNDED
windowing_strategy_id: "WindowingStrategy(GlobalWindows)"
}] were consumed but never produced
which gave me the last knock-out. :)
My current impression is that starting from Beam 2.25.0,
portable FlinkRunner is not able to read from Kafka. Could
someone give me a hint about what is wrong with using
convertReadBasedSplittableDoFnsToPrimitiveReads in the test [3]?
Jan
[1] https://issues.apache.org/jira/browse/BEAM-11991
[2] https://issues.apache.org/jira/browse/BEAM-11998
[3] https://github.com/apache/beam/pull/15181