One more note - the PR has a bunch of failing tests, which I plan to
fix, once there is a consensus, that the overall approach is the correct
one.
Thanks,
Jan
On 7/22/21 7:20 PM, Jan Lukavský wrote:
Hi,
this was a ride. But I managed to get that working. I'd like to
discuss two points, though:
a) I had to push Java coders to ModelCoders for Java (which makes
sense to me, but is that correct?). See [1]. It is needed so that the
Read transform (executed directly in TaskManager) can correctly
communicate with Java SDK harness using custom coders (which is tested
here [2]).
b) I'd strongly prefer if we moved the handling of
use_deprecated_read from outside of the Read PTransform directly into
expand method, see [3]. Though this is not needed for the Read on
Flink to work, it seems cleaner.
WDYT?
Jan
[1]
https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375
[2]
https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201
[3]
https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb
On 7/18/21 6:29 PM, Jan Lukavský wrote:
Hi,
I was debugging the issue and it relates to pipeline fusion - it
seems that the primitive Read transform gets fused and then is
'missing' as source. I'm a little lost in the code, but the most
strange parts are that:
a) I tried to reject fusion of primitive Read by adding
GreedyPCollectionFusers::cannotFuse for
PTransformTranslation.READ_TRANSFORM_URN to
GreedyPCollectionFusers.URN_FUSIBILITY_CHECKERS, but that didn't
change the exception
b) I tried adding Reshuffle.viaRandomKey between Read and PAssert,
but that didn't change it either
c) when I run portable Pipeline with use_deprecated_read on Flink it
actually runs (though it fails when it actually reads any data, but
if the input is empty, the job runs), so it does not hit the same
issue, which is a mystery to me
If anyone has any pointers that I can investigate, I'd be really
grateful.
Thanks in advance,
Jan
On 7/16/21 2:00 PM, Jan Lukavský wrote:
Hi,
I hit another issue with the portable Flink runner. Long story short
- reading from Kafka is not working in portable Flink. After solving
issues with expansion service configuration (ability to add
use_deprecated_read) option, because flink portable runner has
issues with SDF [1], [2]. After being able to inject the
use_deprecated_read into expansion service I was able to get an
execution DAG that has the UnboundedSource, but then more and more
issues appeared (probably related to missing LengthPrefixCoder
somewhere - maybe at the output from the primitive Read). I wanted
to create a test for it and I found out, that there actually is
ReadSourcePortableTest in FlinkRunner, but _it tests nothing_. The
problem is that Read is transformed to SDF, so this test tests the
SDF, not the Read transform. As a result, the Read transform does
not work.
I tried using convertReadBasedSplittableDoFnsToPrimitiveReads so
that I could make the test fail and debug that, but I got into
java.lang.IllegalArgumentException: PCollectionNodes
[PCollectionNode{id=PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output,
PCollection=unique_name:
"PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output"
coder_id: "IterableCoder"
is_bounded: BOUNDED
windowing_strategy_id: "WindowingStrategy(GlobalWindows)"
}] were consumed but never produced
which gave me the last knock-out. :)
My current impression is that starting from Beam 2.25.0, portable
FlinkRunner is not able to read from Kafka. Could someone give me a
hint about what is wrong with using
convertReadBasedSplittableDoFnsToPrimitiveReads in the test [3]?
Jan
[1] https://issues.apache.org/jira/browse/BEAM-11991
[2] https://issues.apache.org/jira/browse/BEAM-11998
[3] https://github.com/apache/beam/pull/15181