Hi Vipul, Yes, this looks like a problem with a different netty version being picked up.
First of all, let me advertise Flink 1.4 for this since there we properly shade away our netty dependency (on version 4.0.27 atm) so you (or in this case Elasticsearch) can rely on your required version. Since you are executing your job inside a Flink cluster, there will be some more things in the classpath than what your job itself requires, e.g. core/runtime Flink dependencies and also things from the Hadoop classpath if present. Locally in the IDE, only a limited set of libraries are included and the classpath is set up a bit differently, I suppose. Maybe, you are also affected by the Maven shading problem for maven >= 3.3 [1][2]. As a workaround, can you try to shade elasticsearch's netty away? See [3] for details. Regards Nico [1] https://issues.apache.org/jira/browse/FLINK-5013 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/start/building.html [3] https://stackoverflow.com/questions/42962892/flink-with-elasticsearch-5-sink-conflicts-of-io-netty-library On 04/01/18 07:09, vipul singh wrote: > Hello, > > We are working on a Flink ES connector, sourcing from a kafka stream, > and sinking data into elasticsearch. The code works fine in intellij, > but while running the code on emr(version 5.9, which uses flinkĀ 1.3.2) > using flink-yarn-session, we are seeing this exception > > Using the parallelism provided by the remote cluster (1). To use > another parallelism, set it at the ./bin/flink client. > > Starting execution of program > > 2018-01-02 23:19:16,217 INFO org.apache.flink.yarn.YarnClusterClient > - Starting program in interactive mode > > ------------------------------------------------------------ > > The program finished with the following exception: > > java.lang.NoSuchMethodError: > > io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf; > > at > > org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78) > > at > > org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:449) > > at > > org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:91) > > at > > org.elasticsearch.transport.TcpTransport.internalSendMessage(TcpTransport.java:976) > > at > > org.elasticsearch.transport.TcpTransport.sendRequest(TcpTransport.java:958) > > at > > org.elasticsearch.transport.TransportService.sendRequestInternal(TransportService.java:520) > > at > > org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:465) > > at > > org.elasticsearch.transport.TransportService.submitRequest(TransportService.java:451) > > at > > org.elasticsearch.client.transport.TransportClientNodesService$SimpleNodeSampler.doSample(TransportClientNodesService.java:403) > > at > > org.elasticsearch.client.transport.TransportClientNodesService$NodeSampler.sample(TransportClientNodesService.java:338) > > at > > org.elasticsearch.client.transport.TransportClientNodesService.addTransportAddresses(TransportClientNodesService.java:179) > > at > > org.elasticsearch.client.transport.TransportClient.addTransportAddress(TransportClient.java:301) > > On searching online, it seems like this maybe due to netty version > conflicts. > However when we ran a dependency tree on our pom, and we dont see netty > coming from anywhere else but flink: > https://gist.github.com/neoeahit/b42b435e3c4519e632be87782de1cc06 > > Could you please suggest how can we resolve this error, > > Thanks, > Vipul > > >
signature.asc
Description: OpenPGP digital signature