Hello,

We are working on a Flink ES connector, sourcing from a kafka stream, and
sinking data into elasticsearch. The code works fine in intellij, but while
running the code on emr(version 5.9, which uses flink 1.3.2) using
flink-yarn-session, we are seeing this exception

Using the parallelism provided by the remote cluster (1). To use
another parallelism, set it at the ./bin/flink client.

Starting execution of program

2018-01-02 23:19:16,217 INFO  org.apache.flink.yarn.YarnClusterClient
                     - Starting program in interactive mode

------------------------------------------------------------

 The program finished with the following exception:

java.lang.NoSuchMethodError:
io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf;

        at 
org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78)

        at 
org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:449)

        at 
org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:91)

        at 
org.elasticsearch.transport.TcpTransport.internalSendMessage(TcpTransport.java:976)

        at 
org.elasticsearch.transport.TcpTransport.sendRequest(TcpTransport.java:958)

        at 
org.elasticsearch.transport.TransportService.sendRequestInternal(TransportService.java:520)

        at 
org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:465)

        at 
org.elasticsearch.transport.TransportService.submitRequest(TransportService.java:451)

        at 
org.elasticsearch.client.transport.TransportClientNodesService$SimpleNodeSampler.doSample(TransportClientNodesService.java:403)

        at 
org.elasticsearch.client.transport.TransportClientNodesService$NodeSampler.sample(TransportClientNodesService.java:338)

        at 
org.elasticsearch.client.transport.TransportClientNodesService.addTransportAddresses(TransportClientNodesService.java:179)

        at 
org.elasticsearch.client.transport.TransportClient.addTransportAddress(TransportClient.java:301)

On searching online, it seems like this maybe due to netty version
conflicts.
However when we ran a dependency tree on our pom, and we dont see netty
coming from anywhere else but flink:
https://gist.github.com/neoeahit/b42b435e3c4519e632be87782de1cc06

Could you please suggest how can we resolve this error,

Thanks,
Vipul

Reply via email to