Hi Jeremy,

+dev <mailto:d...@beam.apache.org>, as this might be interesting for the dev mailing list as well.

Couple of questions:

 a) why do you need specifying default environment to the JobServer? That should be done via the PipelineOptions of the SDK that you use for building your Pipeline, or is there any other reason for that?

 b) regarding the NoClassDefFound - would --flink_submit_uber_jar flag help? See [1]

 c) for [BEAM-12814] - that definitely has value for non-Java runners, but when it comes to Flink, wouldn't EMBEDDED environment be preferred? That way you would not have to configure anything at all. I'm not sure if that works for cluster mode, it works with local JM, I'd _suspect_ that it might work for cluster as well. Did you test it and it did not work?

 Jan

[1] https://github.com/apache/beam/blob/cbb363f2f01d44dd3f7c063c6cd9d529b5fa9104/sdks/python/apache_beam/runners/portability/flink_runner.py#L51

On 8/28/21 5:52 PM, Jeremy Lewi wrote:
I filed https://issues.apache.org/jira/browse/BEAM-12814 <https://issues.apache.org/jira/browse/BEAM-12814> to support external environments for the JAVA SDK harness to better support K8s.

On Sat, Aug 28, 2021 at 8:52 AM Jeremy Lewi <jeremy.l...@primer.ai <mailto:jeremy.l...@primer.ai>> wrote:

    Hi Folks,

    Thank you so much for all your help. I was able to get this
    working although I had to hack the python SDK to work around the
    issue with connecting to a remote expansion service mentioned in
    the other thread
    
<https://lists.apache.org/x/thread.html/r6c02f6c80d35929a46587ac5d6662ca2e5d8997ae6adfb5902314a35@%3Cuser.beam.apache.org%3E>.

    Here's a summary of everything I had to do

      * I built from the 2.33 release branch to pick up the mentioned
        fixes
      * To Deal with NoClassDefFoundErrors I ended up baking the Beam
        job server Jar into the Flink workers
          o I'm still not quite sure why the Jar isn't being staged
            correctly but I'll have to dig into it further
      * I setup my taskmanagers to run docker in docker so they could
        use the docker environment for the Java SDK harness
      * I ran the expansion service as a separate service from the job
        server so that I could set the options to control the default
        environment and pass the use_deprecated_read_flag.


    J

    On Fri, Aug 27, 2021 at 7:16 PM Jeremy Lewi <jeremy.l...@primer.ai
    <mailto:jeremy.l...@primer.ai>> wrote:

        It looks to me like https://github.com/apache/beam/pull/15082
        <https://github.com/apache/beam/pull/15082> added the
        ability configure the default environment to the main
        entrypoint to the expansion service

        but not to the JobServer
        
https://github.com/apache/beam/blob/master/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java#L260
        
<https://github.com/apache/beam/blob/master/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java#L260>

        Should I file a JIRA for this?

        Thanks.
        J


        On Fri, Aug 27, 2021 at 12:36 PM Jeremy Lewi
        <jeremy.l...@primer.ai <mailto:jeremy.l...@primer.ai>> wrote:

            Thank you very much for the pointers. I'm working on
            getting the code built from 2.33 branch and trying that out.

            J

            On Thu, Aug 26, 2021 at 6:35 AM Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                Hi Jeremy,

                the fix for expansion service enables specifying
                ExperimentalOptions [1] and PortablePipelineOptions
                [2], so you can specify default environment for the
                expansion. However ... Java SDK harness does not have
                the "work_pool" implementation that python SDK harness
                has. So I think that the correct environment for the
                expansion would be either DOCKER (which is a pain in
                kubernetes) or PROCESS - that requires building custom
                flink docker image for TaskManager that includes the
                binaries from beam Java SDK image (/opt/apache/beam).

                I didn't test if EMBEDDED environment would work as
                well, you might try it. That would mean that the
                expansion will be completely inlined inside the
                TaskManager process.

                 Jan

                [1]
                
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java
                
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java>

                [2]
                
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
                
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java>

                On 8/26/21 3:14 PM, Jeremy Lewi wrote:
                HI Jan,

                That's very helpful. Do you have a timeline for 2.33?
                Until then I will try building from source.

                So if I understand correctly. Using 2.33, the
                solution would be to set the use_deprecated_read flag
                until the issues with SDFs[1]  are fixed?

                Does the fix for 3 allow specifying a different
                environment for different languages?  When running in
                Kubernetes, I think the preferred solution is to use
                two side car containers one running the python SDK
                harness and the other running the java SDK harness.
                So the environment config would need to be different
                for the two languages.

                Thanks
                J




                J

                On Thu, Aug 26, 2021 at 3:14 AM Jan Lukavský
                <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                    Hi Jeremy,

                    unfortunately, there are several bugs affecting
                    KafkaIO with Python on FlinkRunner in current
                    releases.

                     a) there are some limitations to portable SDF
                    support on Flink [1]

                     b) the use_deprecated_read flag cannot be passed
                    to ExpansionService, that is fixed for upcoming
                    2.32.0 in [2]

                     c) primitive Read transform needed for the
                    use_deprecated_read flag to work is not working
                    properly until 2.33.0, fix was merged just
                    yesterday, see [3]

                    Unfortunately, there are no known workarounds, if
                    you can build beam from sources, you can try
                    building it from the currently cut release branch
                    'release-2.33.0'. It would require to build both
                    java and python SDKs. The alternative would be to
                    wait for the release 2.33.0 to come out.

                    Hope this helps, if you had any more questions,
                    I'd be glad to help.

                     Jan

                    [1]
                    https://issues.apache.org/jira/browse/BEAM-11998
                    <https://issues.apache.org/jira/browse/BEAM-11998>

                    [2]
                    https://issues.apache.org/jira/browse/BEAM-12538
                    <https://issues.apache.org/jira/browse/BEAM-12538>

                    [3]
                    https://issues.apache.org/jira/browse/BEAM-12704
                    <https://issues.apache.org/jira/browse/BEAM-12704>

                    On 8/26/21 2:36 AM, Jeremy Lewi wrote:
                    Is this the same issue as in this thread
                    
https://lists.apache.org/list.html?d...@beam.apache.org:2021-5
                    
<https://lists.apache.org/list.html?d...@beam.apache.org:2021-5>
                    about specifying the environment to be used in
                    cross-language transforms.

                    Is the problem in the taskmanager or expansion
                    service? Are there environment variables I can
                    override to force it to use an external
                    environment so I can use a sidecar for the Java
                    SDK harness?

                    Thanks
                    J






                    On Wed, Aug 25, 2021 at 5:18 PM Luke Cwik
                    <lc...@google.com <mailto:lc...@google.com>> wrote:

                        It is likely that the expansion service is
                        returning a graph segment saying you execute
                        KafkaIO within a docker environment which is
                        what Flink is trying to do.

                        On Wed, Aug 25, 2021 at 4:26 PM Jeremy Lewi
                        <jeremy.l...@primer.ai
                        <mailto:jeremy.l...@primer.ai>> wrote:

                            Hi Folks,

                            So I tried putting the beam job server
                            on the Flink JobManager and Taskmanager
                            containers and setting
                            classloader.resolve-order
                            
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#classloader-resolve-order>to
                            parent-first.
                            My taskmanager's are now crashing
                            because it looks like the Kafka IO
                            transform is trying to run docker and it
                            can't because its running in a K8s pod.
                            Logs attached.

                            Why would KafkaIO try to launch docker?
                            Does this have something to do with the
                            expansion service. The docs
                            
<https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines>
 make
                            it seem like the expansion service only
                            runs at job submission time and only
                            needs to be accessible from the machine
                            where you are running your python
                            program to submit the job.

                            Thanks
                            J

                            On Wed, Aug 25, 2021 at 12:09 PM Jeremy
                            Lewi <jeremy.l...@primer.ai
                            <mailto:jeremy.l...@primer.ai>> wrote:

                                Hi Luke,

                                Thanks. I've attached the full stack
                                trace. When I reran it gave me an
                                error about a different class.

                                I checked the beam job server jar
                                and as far as I can tell the classes
                                are present. So seems like a
                                potential issue with the classpath
                                or staging of JARs on the task managers.

                                Does anyone happen to know how jars
                                get staged onto Flink taskmanagers?
                                On the jobmanager I was able to
                                locate the jar in a /tmp directory
                                but I couldn't figure out how it was
                                getting staged on taskmanagers.

                                I tried baking the job server jar
                                into the flink containers. That gave
                                me an IllegalAccessError. I assume
                                per the Flink Docs
                                
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#inverted-class-loading-and-classloader-resolution-order>
 this
                                is indicating a dependency
                                conflict between the system JARs and
                                the application JARs.

                                With the portable runner is there
                                anyway to disable uploading of the
                                JAR and instead rely on the JARs
                                being baked into the docker container?

                                Thanks
                                J

                                On Wed, Aug 25, 2021 at 9:20 AM Luke
                                Cwik <lc...@google.com
                                <mailto:lc...@google.com>> wrote:

                                    Both those classes exist in
                                    beam-vendor-grpc-1_36_0-0.1.jar:

                                    lcwik@lcwik:~/Downloads$ jar tf
                                    beam-vendor-grpc-1_36_0-0.1.jar
                                    | grep Hpack
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$1.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Http2HeadersSink.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil$IndexType.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$HeaderType.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDynamicTable.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanDecoder.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$1.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackStaticTable.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHeaderField.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$HeaderEntry.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$1.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodeProcessor.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodedLengthProcessor.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Sink.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder.class
                                    lcwik@lcwik:~/Downloads$ jar tf
                                    beam-vendor-grpc-1_36_0-0.1.jar
                                    | grep DnsNameResolver
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolverFactory.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolver.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$AddressResolver.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$InternalResolutionResult.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$1.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$SrvRecord.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$JdkAddressResolver.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolverProvider.class
                                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver.class

                                    Java has a tendency to only
                                    report the full cause on the
                                    first failure of this kind with
                                    all subsequent failures only
                                    reporting the
                                    ClassNotFoundException. This
                                    happens because the ClassLoader
                                    remembers which classes failed
                                    and doesn't try loading them again.

                                    Is there more of the stack trace
                                    pointing out the actual cause
                                    associated with the first time
                                    this exception occurred?


                                    On Tue, Aug 24, 2021 at 4:32 PM
                                    Jeremy Lewi
                                    <jeremy.l...@primer.ai
                                    <mailto:jeremy.l...@primer.ai>>
                                    wrote:

                                        Hi Folks,

                                        I'm trying to run Beam
                                        Python 2.31 on Flink 1.13.

                                        I've created a simple
                                        streaming program to count
                                        Kafka messages. Running on
                                        the DirectRunner this works
                                        fine. But when I try to
                                        submit to my Flink cluster.
                                        I get the exception below in
                                        my taskmanager.

                                        I'm using the
                                        PortableRunner. Any
                                        suggestions on how to fix or
                                        debug this?

                                        Running programs that don't
                                        use Kafka works.

                                        Thanks
                                        J

                                        WARNING: An illegal
                                        reflective access operation
                                        has occurred

                                        WARNING: Illegal reflective
                                        access by
                                        
org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil
                                        
(file:/opt/flink/lib/flink-dist_2.12-1.13.1.jar)
                                        to method
                                        java.nio.DirectByteBuffer.cleaner()

                                        WARNING: Please consider
                                        reporting this to the
                                        maintainers of
                                        
org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil

                                        WARNING: Use
                                        --illegal-access=warn to
                                        enable warnings of further
                                        illegal reflective access
                                        operations

                                        WARNING: All illegal access
                                        operations will be denied in
                                        a future release

                                        Aug 24, 2021 11:17:12 PM
                                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$2
                                        uncaughtException

                                        SEVERE: [Channel<55>:
                                        (localhost:50000)] Uncaught
                                        exception in the
                                        SynchronizationContext. Panic!

                                        java.lang.NoClassDefFoundError:
                                        
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder

                                        at
                                        
org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:73)

                                        at
                                        
org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:59)

                                        at
                                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.GrpcHttp2HeadersUtils$GrpcHttp2ClientHeadersDecoder.<init>(GrpcHttp2HeadersUtils.java:70)

                                        at
                                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientHandler.newHandler(NettyClientHandler.java:147)

                                        at
                                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientTransport.start(NettyClientTransport.java:230)

                                        at
                                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33)

                                        at
                                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33)

                                        at
                                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.startNewTransport(InternalSubchannel.java:258)

                                        at
                                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.access$400(InternalSubchannel.java:65)

                                        at
                                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel$2.run(InternalSubchannel.java:200)

                                        at
                                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)

                                        at
                                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)

                                        at
                                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$NameResolverListener.onResult(ManagedChannelImpl.java:1815)

                                        at
                                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:333)

                                        at
                                        
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
                                        Source)

                                        at
                                        
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
                                        Source)

                                        at
                                        java.base/java.lang.Thread.run(Unknown
                                        Source)

                                        Caused by:
                                        java.lang.ClassNotFoundException:
                                        
org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.HpackDecoder

                                        at
                                        
java.base/java.net.URLClassLoader.findClass(Unknown
                                        Source)

                                        at
                                        
java.base/java.lang.ClassLoader.loadClass(Unknown
                                        Source)

                                        at
                                        
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)

                                        at
                                        
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)

                                        at
                                        
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)

                                        at
                                        
java.base/java.lang.ClassLoader.loadClass(Unknown
                                        Source)

                                        ... 17 more


                                        Exception in thread
                                        "grpc-default-executor-0"
                                        java.lang.NoClassDefFoundError:
                                        
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1

                                        at
                                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:339)

                                        at
                                        
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
                                        Source)

                                        at
                                        
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
                                        Source)

                                        at
                                        java.base/java.lang.Thread.run(Unknown
                                        Source)

                                        Caused by:
                                        java.lang.ClassNotFoundException:
                                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve$1

                                        at
                                        
java.base/java.net.URLClassLoader.findClass(Unknown
                                        Source)

                                        at
                                        
java.base/java.lang.ClassLoader.loadClass(Unknown
                                        Source)

                                        at
                                        
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)

                                        at
                                        
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)

                                        at
                                        
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)

                                        at
                                        
java.base/java.lang.ClassLoader.loadClass(Unknown
                                        Source)

                                        ... 4 more

Reply via email to