Hi Jeremy,
+dev <mailto:d...@beam.apache.org>, as this might be interesting for the
dev mailing list as well.
Couple of questions:
a) why do you need specifying default environment to the JobServer?
That should be done via the PipelineOptions of the SDK that you use for
building your Pipeline, or is there any other reason for that?
b) regarding the NoClassDefFound - would --flink_submit_uber_jar flag
help? See [1]
c) for [BEAM-12814] - that definitely has value for non-Java runners,
but when it comes to Flink, wouldn't EMBEDDED environment be preferred?
That way you would not have to configure anything at all. I'm not sure
if that works for cluster mode, it works with local JM, I'd _suspect_
that it might work for cluster as well. Did you test it and it did not work?
Jan
[1]
https://github.com/apache/beam/blob/cbb363f2f01d44dd3f7c063c6cd9d529b5fa9104/sdks/python/apache_beam/runners/portability/flink_runner.py#L51
On 8/28/21 5:52 PM, Jeremy Lewi wrote:
I filed https://issues.apache.org/jira/browse/BEAM-12814
<https://issues.apache.org/jira/browse/BEAM-12814> to support external
environments for the JAVA SDK harness to better support K8s.
On Sat, Aug 28, 2021 at 8:52 AM Jeremy Lewi <jeremy.l...@primer.ai
<mailto:jeremy.l...@primer.ai>> wrote:
Hi Folks,
Thank you so much for all your help. I was able to get this
working although I had to hack the python SDK to work around the
issue with connecting to a remote expansion service mentioned in
the other thread
<https://lists.apache.org/x/thread.html/r6c02f6c80d35929a46587ac5d6662ca2e5d8997ae6adfb5902314a35@%3Cuser.beam.apache.org%3E>.
Here's a summary of everything I had to do
* I built from the 2.33 release branch to pick up the mentioned
fixes
* To Deal with NoClassDefFoundErrors I ended up baking the Beam
job server Jar into the Flink workers
o I'm still not quite sure why the Jar isn't being staged
correctly but I'll have to dig into it further
* I setup my taskmanagers to run docker in docker so they could
use the docker environment for the Java SDK harness
* I ran the expansion service as a separate service from the job
server so that I could set the options to control the default
environment and pass the use_deprecated_read_flag.
J
On Fri, Aug 27, 2021 at 7:16 PM Jeremy Lewi <jeremy.l...@primer.ai
<mailto:jeremy.l...@primer.ai>> wrote:
It looks to me like https://github.com/apache/beam/pull/15082
<https://github.com/apache/beam/pull/15082> added the
ability configure the default environment to the main
entrypoint to the expansion service
but not to the JobServer
https://github.com/apache/beam/blob/master/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java#L260
<https://github.com/apache/beam/blob/master/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java#L260>
Should I file a JIRA for this?
Thanks.
J
On Fri, Aug 27, 2021 at 12:36 PM Jeremy Lewi
<jeremy.l...@primer.ai <mailto:jeremy.l...@primer.ai>> wrote:
Thank you very much for the pointers. I'm working on
getting the code built from 2.33 branch and trying that out.
J
On Thu, Aug 26, 2021 at 6:35 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
Hi Jeremy,
the fix for expansion service enables specifying
ExperimentalOptions [1] and PortablePipelineOptions
[2], so you can specify default environment for the
expansion. However ... Java SDK harness does not have
the "work_pool" implementation that python SDK harness
has. So I think that the correct environment for the
expansion would be either DOCKER (which is a pain in
kubernetes) or PROCESS - that requires building custom
flink docker image for TaskManager that includes the
binaries from beam Java SDK image (/opt/apache/beam).
I didn't test if EMBEDDED environment would work as
well, you might try it. That would mean that the
expansion will be completely inlined inside the
TaskManager process.
Jan
[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java>
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java>
On 8/26/21 3:14 PM, Jeremy Lewi wrote:
HI Jan,
That's very helpful. Do you have a timeline for 2.33?
Until then I will try building from source.
So if I understand correctly. Using 2.33, the
solution would be to set the use_deprecated_read flag
until the issues with SDFs[1] are fixed?
Does the fix for 3 allow specifying a different
environment for different languages? When running in
Kubernetes, I think the preferred solution is to use
two side car containers one running the python SDK
harness and the other running the java SDK harness.
So the environment config would need to be different
for the two languages.
Thanks
J
J
On Thu, Aug 26, 2021 at 3:14 AM Jan Lukavský
<je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
Hi Jeremy,
unfortunately, there are several bugs affecting
KafkaIO with Python on FlinkRunner in current
releases.
a) there are some limitations to portable SDF
support on Flink [1]
b) the use_deprecated_read flag cannot be passed
to ExpansionService, that is fixed for upcoming
2.32.0 in [2]
c) primitive Read transform needed for the
use_deprecated_read flag to work is not working
properly until 2.33.0, fix was merged just
yesterday, see [3]
Unfortunately, there are no known workarounds, if
you can build beam from sources, you can try
building it from the currently cut release branch
'release-2.33.0'. It would require to build both
java and python SDKs. The alternative would be to
wait for the release 2.33.0 to come out.
Hope this helps, if you had any more questions,
I'd be glad to help.
Jan
[1]
https://issues.apache.org/jira/browse/BEAM-11998
<https://issues.apache.org/jira/browse/BEAM-11998>
[2]
https://issues.apache.org/jira/browse/BEAM-12538
<https://issues.apache.org/jira/browse/BEAM-12538>
[3]
https://issues.apache.org/jira/browse/BEAM-12704
<https://issues.apache.org/jira/browse/BEAM-12704>
On 8/26/21 2:36 AM, Jeremy Lewi wrote:
Is this the same issue as in this thread
https://lists.apache.org/list.html?d...@beam.apache.org:2021-5
<https://lists.apache.org/list.html?d...@beam.apache.org:2021-5>
about specifying the environment to be used in
cross-language transforms.
Is the problem in the taskmanager or expansion
service? Are there environment variables I can
override to force it to use an external
environment so I can use a sidecar for the Java
SDK harness?
Thanks
J
On Wed, Aug 25, 2021 at 5:18 PM Luke Cwik
<lc...@google.com <mailto:lc...@google.com>> wrote:
It is likely that the expansion service is
returning a graph segment saying you execute
KafkaIO within a docker environment which is
what Flink is trying to do.
On Wed, Aug 25, 2021 at 4:26 PM Jeremy Lewi
<jeremy.l...@primer.ai
<mailto:jeremy.l...@primer.ai>> wrote:
Hi Folks,
So I tried putting the beam job server
on the Flink JobManager and Taskmanager
containers and setting
classloader.resolve-order
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#classloader-resolve-order>to
parent-first.
My taskmanager's are now crashing
because it looks like the Kafka IO
transform is trying to run docker and it
can't because its running in a K8s pod.
Logs attached.
Why would KafkaIO try to launch docker?
Does this have something to do with the
expansion service. The docs
<https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines>
make
it seem like the expansion service only
runs at job submission time and only
needs to be accessible from the machine
where you are running your python
program to submit the job.
Thanks
J
On Wed, Aug 25, 2021 at 12:09 PM Jeremy
Lewi <jeremy.l...@primer.ai
<mailto:jeremy.l...@primer.ai>> wrote:
Hi Luke,
Thanks. I've attached the full stack
trace. When I reran it gave me an
error about a different class.
I checked the beam job server jar
and as far as I can tell the classes
are present. So seems like a
potential issue with the classpath
or staging of JARs on the task managers.
Does anyone happen to know how jars
get staged onto Flink taskmanagers?
On the jobmanager I was able to
locate the jar in a /tmp directory
but I couldn't figure out how it was
getting staged on taskmanagers.
I tried baking the job server jar
into the flink containers. That gave
me an IllegalAccessError. I assume
per the Flink Docs
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#inverted-class-loading-and-classloader-resolution-order>
this
is indicating a dependency
conflict between the system JARs and
the application JARs.
With the portable runner is there
anyway to disable uploading of the
JAR and instead rely on the JARs
being baked into the docker container?
Thanks
J
On Wed, Aug 25, 2021 at 9:20 AM Luke
Cwik <lc...@google.com
<mailto:lc...@google.com>> wrote:
Both those classes exist in
beam-vendor-grpc-1_36_0-0.1.jar:
lcwik@lcwik:~/Downloads$ jar tf
beam-vendor-grpc-1_36_0-0.1.jar
| grep Hpack
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$1.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Http2HeadersSink.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil$IndexType.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$HeaderType.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDynamicTable.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanDecoder.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$1.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackStaticTable.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHeaderField.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$HeaderEntry.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$1.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodeProcessor.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodedLengthProcessor.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Sink.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder.class
lcwik@lcwik:~/Downloads$ jar tf
beam-vendor-grpc-1_36_0-0.1.jar
| grep DnsNameResolver
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolverFactory.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolver.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$AddressResolver.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$InternalResolutionResult.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$1.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$SrvRecord.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$JdkAddressResolver.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolverProvider.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver.class
Java has a tendency to only
report the full cause on the
first failure of this kind with
all subsequent failures only
reporting the
ClassNotFoundException. This
happens because the ClassLoader
remembers which classes failed
and doesn't try loading them again.
Is there more of the stack trace
pointing out the actual cause
associated with the first time
this exception occurred?
On Tue, Aug 24, 2021 at 4:32 PM
Jeremy Lewi
<jeremy.l...@primer.ai
<mailto:jeremy.l...@primer.ai>>
wrote:
Hi Folks,
I'm trying to run Beam
Python 2.31 on Flink 1.13.
I've created a simple
streaming program to count
Kafka messages. Running on
the DirectRunner this works
fine. But when I try to
submit to my Flink cluster.
I get the exception below in
my taskmanager.
I'm using the
PortableRunner. Any
suggestions on how to fix or
debug this?
Running programs that don't
use Kafka works.
Thanks
J
WARNING: An illegal
reflective access operation
has occurred
WARNING: Illegal reflective
access by
org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil
(file:/opt/flink/lib/flink-dist_2.12-1.13.1.jar)
to method
java.nio.DirectByteBuffer.cleaner()
WARNING: Please consider
reporting this to the
maintainers of
org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil
WARNING: Use
--illegal-access=warn to
enable warnings of further
illegal reflective access
operations
WARNING: All illegal access
operations will be denied in
a future release
Aug 24, 2021 11:17:12 PM
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$2
uncaughtException
SEVERE: [Channel<55>:
(localhost:50000)] Uncaught
exception in the
SynchronizationContext. Panic!
java.lang.NoClassDefFoundError:
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder
at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:73)
at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:59)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.GrpcHttp2HeadersUtils$GrpcHttp2ClientHeadersDecoder.<init>(GrpcHttp2HeadersUtils.java:70)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientHandler.newHandler(NettyClientHandler.java:147)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientTransport.start(NettyClientTransport.java:230)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.startNewTransport(InternalSubchannel.java:258)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.access$400(InternalSubchannel.java:65)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel$2.run(InternalSubchannel.java:200)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$NameResolverListener.onResult(ManagedChannelImpl.java:1815)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:333)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at
java.base/java.lang.Thread.run(Unknown
Source)
Caused by:
java.lang.ClassNotFoundException:
org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.HpackDecoder
at
java.base/java.net.URLClassLoader.findClass(Unknown
Source)
at
java.base/java.lang.ClassLoader.loadClass(Unknown
Source)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at
java.base/java.lang.ClassLoader.loadClass(Unknown
Source)
... 17 more
Exception in thread
"grpc-default-executor-0"
java.lang.NoClassDefFoundError:
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:339)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at
java.base/java.lang.Thread.run(Unknown
Source)
Caused by:
java.lang.ClassNotFoundException:
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve$1
at
java.base/java.net.URLClassLoader.findClass(Unknown
Source)
at
java.base/java.lang.ClassLoader.loadClass(Unknown
Source)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at
java.base/java.lang.ClassLoader.loadClass(Unknown
Source)
... 4 more