I filed https://issues.apache.org/jira/browse/BEAM-12814 to support external environments for the JAVA SDK harness to better support K8s.
On Sat, Aug 28, 2021 at 8:52 AM Jeremy Lewi <jeremy.l...@primer.ai> wrote: > Hi Folks, > > Thank you so much for all your help. I was able to get this working > although I had to hack the python SDK to work around the issue with > connecting to a remote expansion service mentioned in the other thread > <https://lists.apache.org/x/thread.html/r6c02f6c80d35929a46587ac5d6662ca2e5d8997ae6adfb5902314a35@%3Cuser.beam.apache.org%3E> > . > > Here's a summary of everything I had to do > > - I built from the 2.33 release branch to pick up the mentioned fixes > - To Deal with NoClassDefFoundErrors I ended up baking the Beam job > server Jar into the Flink workers > - I'm still not quite sure why the Jar isn't being staged correctly > but I'll have to dig into it further > - I setup my taskmanagers to run docker in docker so they could use > the docker environment for the Java SDK harness > - I ran the expansion service as a separate service from the job > server so that I could set the options to control the default environment > and pass the use_deprecated_read_flag. > > > J > > On Fri, Aug 27, 2021 at 7:16 PM Jeremy Lewi <jeremy.l...@primer.ai> wrote: > >> It looks to me like https://github.com/apache/beam/pull/15082 added the >> ability configure the default environment to the main entrypoint to the >> expansion service >> >> but not to the JobServer >> >> https://github.com/apache/beam/blob/master/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java#L260 >> >> Should I file a JIRA for this? >> >> Thanks. >> J >> >> >> On Fri, Aug 27, 2021 at 12:36 PM Jeremy Lewi <jeremy.l...@primer.ai> >> wrote: >> >>> Thank you very much for the pointers. I'm working on getting the code >>> built from 2.33 branch and trying that out. >>> >>> J >>> >>> On Thu, Aug 26, 2021 at 6:35 AM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> Hi Jeremy, >>>> >>>> the fix for expansion service enables specifying ExperimentalOptions >>>> [1] and PortablePipelineOptions [2], so you can specify default environment >>>> for the expansion. However ... Java SDK harness does not have the >>>> "work_pool" implementation that python SDK harness has. So I think that the >>>> correct environment for the expansion would be either DOCKER (which is a >>>> pain in kubernetes) or PROCESS - that requires building custom flink docker >>>> image for TaskManager that includes the binaries from beam Java SDK image >>>> (/opt/apache/beam). >>>> >>>> I didn't test if EMBEDDED environment would work as well, you might try >>>> it. That would mean that the expansion will be completely inlined inside >>>> the TaskManager process. >>>> >>>> Jan >>>> >>>> [1] >>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java >>>> >>>> [2] >>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java >>>> On 8/26/21 3:14 PM, Jeremy Lewi wrote: >>>> >>>> HI Jan, >>>> >>>> That's very helpful. Do you have a timeline for 2.33? Until then I will >>>> try building from source. >>>> >>>> So if I understand correctly. Using 2.33, the solution would be to set >>>> the use_deprecated_read flag until the issues with SDFs[1] are fixed? >>>> >>>> Does the fix for 3 allow specifying a different environment for >>>> different languages? When running in Kubernetes, I think the preferred >>>> solution is to use two side car containers one running the python SDK >>>> harness and the other running the java SDK harness. So the environment >>>> config would need to be different for the two languages. >>>> >>>> Thanks >>>> J >>>> >>>> >>>> >>>> >>>> J >>>> >>>> On Thu, Aug 26, 2021 at 3:14 AM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>>> Hi Jeremy, >>>>> >>>>> unfortunately, there are several bugs affecting KafkaIO with Python on >>>>> FlinkRunner in current releases. >>>>> >>>>> a) there are some limitations to portable SDF support on Flink [1] >>>>> >>>>> b) the use_deprecated_read flag cannot be passed to ExpansionService, >>>>> that is fixed for upcoming 2.32.0 in [2] >>>>> >>>>> c) primitive Read transform needed for the use_deprecated_read flag >>>>> to work is not working properly until 2.33.0, fix was merged just >>>>> yesterday, see [3] >>>>> Unfortunately, there are no known workarounds, if you can build beam >>>>> from sources, you can try building it from the currently cut release >>>>> branch >>>>> 'release-2.33.0'. It would require to build both java and python SDKs. The >>>>> alternative would be to wait for the release 2.33.0 to come out. >>>>> >>>>> Hope this helps, if you had any more questions, I'd be glad to help. >>>>> >>>>> Jan >>>>> >>>>> [1] https://issues.apache.org/jira/browse/BEAM-11998 >>>>> >>>>> [2] https://issues.apache.org/jira/browse/BEAM-12538 >>>>> >>>>> [3] https://issues.apache.org/jira/browse/BEAM-12704 >>>>> On 8/26/21 2:36 AM, Jeremy Lewi wrote: >>>>> >>>>> Is this the same issue as in this thread >>>>> https://lists.apache.org/list.html?d...@beam.apache.org:2021-5 >>>>> about specifying the environment to be used in cross-language >>>>> transforms. >>>>> >>>>> Is the problem in the taskmanager or expansion service? Are there >>>>> environment variables I can override to force it to use an external >>>>> environment so I can use a sidecar for the Java SDK harness? >>>>> >>>>> Thanks >>>>> J >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Wed, Aug 25, 2021 at 5:18 PM Luke Cwik <lc...@google.com> wrote: >>>>> >>>>>> It is likely that the expansion service is returning a graph segment >>>>>> saying you execute KafkaIO within a docker environment which is what >>>>>> Flink >>>>>> is trying to do. >>>>>> >>>>>> On Wed, Aug 25, 2021 at 4:26 PM Jeremy Lewi <jeremy.l...@primer.ai> >>>>>> wrote: >>>>>> >>>>>>> Hi Folks, >>>>>>> >>>>>>> So I tried putting the beam job server on the Flink JobManager and >>>>>>> Taskmanager containers and setting classloader.resolve-order >>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#classloader-resolve-order> >>>>>>> to parent-first. >>>>>>> My taskmanager's are now crashing because it looks like the Kafka IO >>>>>>> transform is trying to run docker and it can't because its running in a >>>>>>> K8s >>>>>>> pod. Logs attached. >>>>>>> >>>>>>> Why would KafkaIO try to launch docker? Does this have something to >>>>>>> do with the expansion service. The docs >>>>>>> <https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines> >>>>>>> make >>>>>>> it seem like the expansion service only runs at job submission time and >>>>>>> only needs to be accessible from the machine where you are running your >>>>>>> python program to submit the job. >>>>>>> >>>>>>> Thanks >>>>>>> J >>>>>>> >>>>>>> On Wed, Aug 25, 2021 at 12:09 PM Jeremy Lewi <jeremy.l...@primer.ai> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Luke, >>>>>>>> >>>>>>>> Thanks. I've attached the full stack trace. When I reran it gave me >>>>>>>> an error about a different class. >>>>>>>> >>>>>>>> I checked the beam job server jar and as far as I can tell the >>>>>>>> classes are present. So seems like a potential issue with the >>>>>>>> classpath or >>>>>>>> staging of JARs on the task managers. >>>>>>>> >>>>>>>> Does anyone happen to know how jars get staged onto Flink >>>>>>>> taskmanagers? On the jobmanager I was able to locate the jar in a /tmp >>>>>>>> directory but I couldn't figure out how it was getting staged on >>>>>>>> taskmanagers. >>>>>>>> >>>>>>>> I tried baking the job server jar into the flink containers. That >>>>>>>> gave me an IllegalAccessError. I assume per the Flink Docs >>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#inverted-class-loading-and-classloader-resolution-order> >>>>>>>> this >>>>>>>> is indicating a dependency conflict between the system JARs and the >>>>>>>> application JARs. >>>>>>>> >>>>>>>> With the portable runner is there anyway to disable uploading of >>>>>>>> the JAR and instead rely on the JARs being baked into the docker >>>>>>>> container? >>>>>>>> >>>>>>>> Thanks >>>>>>>> J >>>>>>>> >>>>>>>> On Wed, Aug 25, 2021 at 9:20 AM Luke Cwik <lc...@google.com> wrote: >>>>>>>> >>>>>>>>> Both those classes exist in beam-vendor-grpc-1_36_0-0.1.jar: >>>>>>>>> >>>>>>>>> lcwik@lcwik:~/Downloads$ jar tf beam-vendor-grpc-1_36_0-0.1.jar | >>>>>>>>> grep Hpack >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$1.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Http2HeadersSink.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil$IndexType.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$HeaderType.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDynamicTable.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanDecoder.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$1.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackStaticTable.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHeaderField.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$HeaderEntry.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$1.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodeProcessor.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodedLengthProcessor.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Sink.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder.class >>>>>>>>> lcwik@lcwik:~/Downloads$ jar tf beam-vendor-grpc-1_36_0-0.1.jar | >>>>>>>>> grep DnsNameResolver >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolverFactory.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolver.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$AddressResolver.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$InternalResolutionResult.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$1.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$SrvRecord.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$JdkAddressResolver.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolverProvider.class >>>>>>>>> >>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver.class >>>>>>>>> >>>>>>>>> Java has a tendency to only report the full cause on the first >>>>>>>>> failure of this kind with all subsequent failures only reporting the >>>>>>>>> ClassNotFoundException. This happens because the ClassLoader remembers >>>>>>>>> which classes failed and doesn't try loading them again. >>>>>>>>> >>>>>>>>> Is there more of the stack trace pointing out the actual cause >>>>>>>>> associated with the first time this exception occurred? >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Aug 24, 2021 at 4:32 PM Jeremy Lewi <jeremy.l...@primer.ai> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Folks, >>>>>>>>>> >>>>>>>>>> I'm trying to run Beam Python 2.31 on Flink 1.13. >>>>>>>>>> >>>>>>>>>> I've created a simple streaming program to count Kafka messages. >>>>>>>>>> Running on the DirectRunner this works fine. But when I try to >>>>>>>>>> submit to my >>>>>>>>>> Flink cluster. I get the exception below in my taskmanager. >>>>>>>>>> >>>>>>>>>> I'm using the PortableRunner. Any suggestions on how to fix or >>>>>>>>>> debug this? >>>>>>>>>> >>>>>>>>>> Running programs that don't use Kafka works. >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> J >>>>>>>>>> >>>>>>>>>> WARNING: An illegal reflective access operation has occurred >>>>>>>>>> >>>>>>>>>> WARNING: Illegal reflective access by >>>>>>>>>> org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil >>>>>>>>>> ( >>>>>>>>>> file:/opt/flink/lib/flink-dist_2.12-1.13.1.jar) to method >>>>>>>>>> java.nio.DirectByteBuffer.cleaner() >>>>>>>>>> >>>>>>>>>> WARNING: Please consider reporting this to the maintainers of >>>>>>>>>> org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil >>>>>>>>>> >>>>>>>>>> WARNING: Use --illegal-access=warn to enable warnings of further >>>>>>>>>> illegal reflective access operations >>>>>>>>>> >>>>>>>>>> WARNING: All illegal access operations will be denied in a future >>>>>>>>>> release >>>>>>>>>> >>>>>>>>>> Aug 24, 2021 11:17:12 PM >>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$2 >>>>>>>>>> uncaughtException >>>>>>>>>> >>>>>>>>>> SEVERE: [Channel<55>: (localhost:50000)] Uncaught exception in >>>>>>>>>> the SynchronizationContext. Panic! >>>>>>>>>> >>>>>>>>>> java.lang.NoClassDefFoundError: >>>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:73) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:59) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.GrpcHttp2HeadersUtils$GrpcHttp2ClientHeadersDecoder.<init>(GrpcHttp2HeadersUtils.java:70) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientHandler.newHandler(NettyClientHandler.java:147) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientTransport.start(NettyClientTransport.java:230) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.startNewTransport(InternalSubchannel.java:258) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.access$400(InternalSubchannel.java:65) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel$2.run(InternalSubchannel.java:200) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$NameResolverListener.onResult(ManagedChannelImpl.java:1815) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:333) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown >>>>>>>>>> Source) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown >>>>>>>>>> Source) >>>>>>>>>> >>>>>>>>>> at java.base/java.lang.Thread.run(Unknown Source) >>>>>>>>>> >>>>>>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.HpackDecoder >>>>>>>>>> >>>>>>>>>> at java.base/java.net.URLClassLoader.findClass(Unknown Source) >>>>>>>>>> >>>>>>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) >>>>>>>>>> >>>>>>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source) >>>>>>>>>> >>>>>>>>>> ... 17 more >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Exception in thread "grpc-default-executor-0" >>>>>>>>>> java.lang.NoClassDefFoundError: >>>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1 >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:339) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown >>>>>>>>>> Source) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown >>>>>>>>>> Source) >>>>>>>>>> >>>>>>>>>> at java.base/java.lang.Thread.run(Unknown Source) >>>>>>>>>> >>>>>>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve$1 >>>>>>>>>> >>>>>>>>>> at java.base/java.net.URLClassLoader.findClass(Unknown Source) >>>>>>>>>> >>>>>>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) >>>>>>>>>> >>>>>>>>>> at >>>>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) >>>>>>>>>> >>>>>>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source) >>>>>>>>>> >>>>>>>>>> ... 4 more >>>>>>>>>> >>>>>>>>>