If you make sure that these extra jars are in your path when you execute your pipeline, they should get picked up when invoking the expansion service (though this may not be the case long term).
The cleanest way would be to provide your own expansion service. If you build a jar that consists of Beam's IO expansion service plus any necessary dependencies, you should be able to do ReadFromKafka( [ordinary params], expansion_service=BeamJarExpansionService('path/to/your/jar')) to use this "custom" expansion service. See https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py An alternative is to pass a pipeline option --beam_services={"sdks:java:io:expansion-service:shadowJar": "path/to/your/jar"} which will override the default. (You can pass "host:port" rather than a path as well if you manually start the expansion service.) Exactly how to specify at a top level a set of extra dependencies to be applied to a particular subset of other-language transforms is still an open problem. Alternatively we could try to make expansion services themselves trivially easy to build, customize, and use. Hopefully that helps. - Robert On Fri, Oct 2, 2020 at 5:57 PM Kobe Feng <flllbls...@gmail.com> wrote: > > Thanks Rober, yes, our Kafka requires JAAS configuration (sasl.jaas.config) > at the client side for security check with the corresponding LoginModule > which requires additional classes: > ================================================================================================================== > Caused by: javax.security.auth.login.LoginException: unable to find > LoginModule class: io.${XXXX}.kafka.security.iaf.IAFLoginModule > at > javax.security.auth.login.LoginContext.invoke(LoginContext.java:794) > at > javax.security.auth.login.LoginContext.access$000(LoginContext.java:195) > at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682) > at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680) > at java.security.AccessController.doPrivileged(Native Method) > at > javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680) > at javax.security.auth.login.LoginContext.login(LoginContext.java:587) > at > org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:52) > at > org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:53) > at > org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:76) > at > org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:103) > ... 42 more > > at > org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) > at > org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) > > On Fri, Oct 2, 2020 at 5:14 PM Robert Bradshaw <rober...@google.com> wrote: >> >> Could you clarify a bit exactly what you're trying to do? When using >> KafkaIO, the provided jar should have all the necessary dependencies to >> construct and execute the kafka read/write. Is there some reason you need to >> inject additional dependencies into the environment provided by kafka? >> >> On Fri, Oct 2, 2020 at 3:20 PM Kobe Feng <flllbls...@gmail.com> wrote: >>> >>> Just a followup since no one replied it. >>> My understanding is for any expanded transforms beam wants the environment >>> self-described. >>> So I updated boot and dockerfile for the java harness environment and use >>> --sdk_harness_container_image_overrides in portable runner but fail to see >>> the updated image loaded (default still), I guess only dataflow runner >>> support it by glancing the code, but I believe it's the correct way and >>> just need to deep dive the codes here when I turn back, then I will update >>> this thread too. >>> >>> Kobe >>> On Wed, Sep 30, 2020 at 1:26 PM Kobe Feng <flllbls...@gmail.com> wrote: >>>> >>>> Hi everyone, >>>> Is there any recommended way to upload a third party jar (runtime scope) >>>> for expanding transform like KafkaIO.Read when using the python portable >>>> runner? Thank you! >>>> >>>> I tried --experiments=jar_packages=abc.jar,d.jar but just found those >>>> artifacts in python harness with provision info, and the java harness just >>>> uses the default environment for dependencies after expanding >>>> transformation from the grpc server upon expansion jar for reading Kafka >>>> messages. >>>> >>>> Also noticed above option will be removed in the future then tried >>>> --files_to_stage but this option only exists in Java SDK pipeline options. >>>> >>>> -- >>>> Yours Sincerely >>>> Kobe Feng >>> >>> >>> >>> -- >>> Yours Sincerely >>> Kobe Feng > > > > -- > Yours Sincerely > Kobe Feng