I created a new project, and only add kaka-client, Flink-kafka-connect and Flink streaming libs, it works.
Thanks. > On Apr 2, 2016, at 12:54 AM, Stephan Ewen <se...@apache.org> wrote: > > The issue may be that you include Kafka twice: > > 1) You explicitly add "org.apache.kafka:kafka-clients:0.9.0.0" > 2) You add "org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0", which > internally adds "org.apache.kafka:kafka-clients:0.9.0.1" > > These two Kafka versions may conflict. I would drop the dependency (1) and > simply let the FlinkKafkaConsumer pull whatever dependency it needs by itself. > The 0.9.0.1 client the Flink internally uses should read fine from Kafka > 0.9.0.0 brokers. > > Greetings, > Stephan > > > On Fri, Apr 1, 2016 at 5:19 PM, Zhun Shen <shenzhunal...@gmail.com > <mailto:shenzhunal...@gmail.com>> wrote: > Yeah, I mean I read the demo with > FlinkKafkaConsumer08(http://data-artisans.com/kafka-flink-a-practical-how-to/ > <http://data-artisans.com/kafka-flink-a-practical-how-to/>) then I wrote the > program based on Kafka 0.9.0.0 and Flink 1.0.0. > >> On Apr 1, 2016, at 7:27 PM, Balaji Rajagopalan >> <balaji.rajagopa...@olacabs.com <mailto:balaji.rajagopa...@olacabs.com>> >> wrote: >> >> Did you make sure the flinkconnector version and flink version is the same ? >> Also for 0.8.0.0 you will have to use FlinkKafkaConsumer08 >> >> On Fri, Apr 1, 2016 at 3:21 PM, Zhun Shen <shenzhunal...@gmail.com >> <mailto:shenzhunal...@gmail.com>> wrote: >> I follow the example of kafka 0.8.0.0 on Flink doc. >> >> public static void main(String[] args) throws Exception { >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> Properties properties = new Properties(); >> properties.setProperty("bootstrap.servers", "localhost:9092"); >> properties.setProperty("zookeeper.connect", "localhost:2181"); >> properties.setProperty("group.id <http://group.id/>", "test"); >> properties.setProperty("key.deserializer", >> "org.apache.kafka.common.serialization.StringDeserializer"); >> properties.setProperty("value.deserializer", >> "org.apache.kafka.common.serialization.StringDeserializer"); >> properties.setProperty("partition.assignment.strategy", "range"); >> >> DataStream<String> messageStream = env >> .addSource(new FlinkKafkaConsumer09<String>("nginx-logs", >> new SimpleStringSchema(), properties)); >> >> messageStream >> .rebalance() >> .map(new MapFunction<String, String>() { >> >> @Override >> public String map(String value) throws Exception { >> return "Kafka and Flink says: " + value; >> } >> }).print(); >> >> env.execute(); >> } >> >> >> Always got the error below: >> >> java.lang.NoSuchMethodError: >> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List; >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131) >> >> >> >> >>> On Apr 1, 2016, at 1:40 PM, Ashutosh Kumar <kmr.ashutos...@gmail.com >>> <mailto:kmr.ashutos...@gmail.com>> wrote: >>> >>> I am using flink 1.0.0 with kafka 0.9 . I works fine for me. I use >>> following dependency. >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-connector-kafka-0.9_2.10</artifactId> >>> <version>1.0.0</version> >>> <scope>provided</scope> >>> </dependency> >>> >>> Thanks >>> Ashutosh >>> >>> On Fri, Apr 1, 2016 at 10:46 AM, Zhun Shen <shenzhunal...@gmail.com >>> <mailto:shenzhunal...@gmail.com>> wrote: >>> Hi there, >>> >>> I check my build.gradle file, I use >>> 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0’, but I found that >>> this lib is based on kaka-clients 0.9.0.1. >>> >>> I want to use Flink streaming to consume Kafka’s events in realtime, but >>> I’m confused by Flink’s libs with different versions. Which >>> flink-connector-kafka is comparable with kafka 0.9.0.0 ? >>> My environment is Kafka: 0.9.0.0, Flink: 1.0.0, Language: java >>> >>> part of my build.grade: >>> 'org.apache.kafka:kafka_2.10:0.9.0.0', >>> 'org.apache.kafka:kafka-clients:0.9.0.0', >>> 'org.apache.flink:flink-java:1.0.0', >>> 'org.apache.flink:flink-streaming-java_2.10:1.0.0', >>> 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0', >>> 'org.apache.flink:flink-connector-kafka-base_2.10:1.0.0 >>> >>> Any advice ? >>> >>> Thanks. >>> >>> >>>> On Mar 30, 2016, at 10:35 PM, Stephan Ewen <se...@apache.org >>>> <mailto:se...@apache.org>> wrote: >>>> >>>> Hi! >>>> >>>> A "NoSuchMethodError" usually means that you compile and run against >>>> different versions. >>>> >>>> Make sure the version you reference in the IDE and the version on the >>>> cluster are the same. >>>> >>>> Greetings, >>>> Stephan >>>> >>>> >>>> >>>> On Wed, Mar 30, 2016 at 9:42 AM, Balaji Rajagopalan >>>> <balaji.rajagopa...@olacabs.com <mailto:balaji.rajagopa...@olacabs.com>> >>>> wrote: >>>> I have tested kafka 0.8.0.2 with flink 1.0.0 and it works for me. Can't >>>> talk about kafka 0.9.0.1. >>>> >>>> On Wed, Mar 30, 2016 at 12:51 PM, Zhun Shen <shenzhunal...@gmail.com >>>> <mailto:shenzhunal...@gmail.com>> wrote: >>>> Hi there, >>>> >>>> flink version: 1.0.0 >>>> kafka version: 0.9.0.0 >>>> env: local >>>> >>>> I run the script below: >>>> ./bin/flink run -c com.test.flink.FlinkTest test.jar --topic nginx-logs >>>> --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 >>>> --group.id <http://group.id/> myGroup --partition.assignment.strategy >>>> round robin >>>> >>>> But I got the error: >>>> ava.lang.NoSuchMethodError: >>>> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List; >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131) >>>> >>>> >>>> The code as below: >>>> DataStream<String> messageStream = env.addSource(new >>>> FlinkKafkaConsumer09<>("nginx-logs", new >>>> SimpleStringSchema(),parameterTool.getProperties())); >>>> messageStream.rebalance().map(new MapFunction<String, String>() { >>>> >>>> @Override >>>> public String map(String value) throws Exception { >>>> return "Kafka and Flink says: " + value; >>>> } >>>> }).print(); >>>> >>>> >>>> I check the error with google, but it shows that it is a method of kafka >>>> 0.9.01. Any idea? Thanks. >>>> >>>> >>>> >>> >>> >> >> > >