I created a new project, and only add kaka-client, Flink-kafka-connect and 
Flink streaming libs, it works.

Thanks.


> On Apr 2, 2016, at 12:54 AM, Stephan Ewen <se...@apache.org> wrote:
> 
> The issue may be that you include Kafka twice:
> 
> 1) You explicitly add "org.apache.kafka:kafka-clients:0.9.0.0"
> 2) You add "org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0", which 
> internally adds "org.apache.kafka:kafka-clients:0.9.0.1"
> 
> These two Kafka versions may conflict. I would drop the dependency (1) and 
> simply let the FlinkKafkaConsumer pull whatever dependency it needs by itself.
> The 0.9.0.1 client the Flink internally uses should read fine from Kafka 
> 0.9.0.0 brokers.
> 
> Greetings,
> Stephan
> 
> 
> On Fri, Apr 1, 2016 at 5:19 PM, Zhun Shen <shenzhunal...@gmail.com 
> <mailto:shenzhunal...@gmail.com>> wrote:
> Yeah, I mean I read the demo with 
> FlinkKafkaConsumer08(http://data-artisans.com/kafka-flink-a-practical-how-to/ 
> <http://data-artisans.com/kafka-flink-a-practical-how-to/>) then I wrote the 
> program based on Kafka 0.9.0.0 and Flink 1.0.0.
> 
>> On Apr 1, 2016, at 7:27 PM, Balaji Rajagopalan 
>> <balaji.rajagopa...@olacabs.com <mailto:balaji.rajagopa...@olacabs.com>> 
>> wrote:
>> 
>> Did you make sure the flinkconnector version and flink version is the same ? 
>> Also for 0.8.0.0 you will have to use FlinkKafkaConsumer08
>> 
>> On Fri, Apr 1, 2016 at 3:21 PM, Zhun Shen <shenzhunal...@gmail.com 
>> <mailto:shenzhunal...@gmail.com>> wrote:
>> I follow the example of kafka 0.8.0.0 on Flink doc.
>> 
>>     public static void main(String[] args) throws Exception {
>>         StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>         properties.setProperty("zookeeper.connect", "localhost:2181");
>>         properties.setProperty("group.id <http://group.id/>", "test");
>>         properties.setProperty("key.deserializer", 
>> "org.apache.kafka.common.serialization.StringDeserializer");
>>         properties.setProperty("value.deserializer", 
>> "org.apache.kafka.common.serialization.StringDeserializer");
>>         properties.setProperty("partition.assignment.strategy", "range");
>> 
>>         DataStream<String> messageStream = env
>>                 .addSource(new FlinkKafkaConsumer09<String>("nginx-logs", 
>> new SimpleStringSchema(), properties));
>> 
>>         messageStream
>>                 .rebalance()
>>                 .map(new MapFunction<String, String>() {
>> 
>>                     @Override
>>                     public String map(String value) throws Exception {
>>                         return "Kafka and Flink says: " + value;
>>                     }
>>                 }).print();
>> 
>>         env.execute();
>>     }
>> 
>> 
>> Always got the error below:
>> 
>> java.lang.NoSuchMethodError: 
>> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
>>      at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194)
>>      at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164)
>>      at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)
>> 
>> 
>> 
>> 
>>> On Apr 1, 2016, at 1:40 PM, Ashutosh Kumar <kmr.ashutos...@gmail.com 
>>> <mailto:kmr.ashutos...@gmail.com>> wrote:
>>> 
>>> I am using flink 1.0.0 with kafka 0.9 . I works fine for me. I use 
>>> following dependency. 
>>> 
>>> <dependency>
>>>             <groupId>org.apache.flink</groupId>
>>>             <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>              <version>1.0.0</version>
>>>             <scope>provided</scope>
>>>   </dependency>
>>> 
>>> Thanks
>>> Ashutosh
>>> 
>>> On Fri, Apr 1, 2016 at 10:46 AM, Zhun Shen <shenzhunal...@gmail.com 
>>> <mailto:shenzhunal...@gmail.com>> wrote:
>>> Hi there,
>>> 
>>> I check my build.gradle file, I use 
>>> 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0’, but I found that 
>>> this lib is based on kaka-clients 0.9.0.1.
>>> 
>>> I want to use Flink streaming to consume Kafka’s events in realtime, but 
>>> I’m confused by Flink’s libs with different versions. Which 
>>> flink-connector-kafka is comparable with kafka 0.9.0.0 ?
>>> My environment is Kafka: 0.9.0.0, Flink: 1.0.0, Language: java
>>> 
>>> part of my build.grade:
>>> 'org.apache.kafka:kafka_2.10:0.9.0.0',
>>> 'org.apache.kafka:kafka-clients:0.9.0.0',
>>> 'org.apache.flink:flink-java:1.0.0',
>>> 'org.apache.flink:flink-streaming-java_2.10:1.0.0',
>>> 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0',
>>> 'org.apache.flink:flink-connector-kafka-base_2.10:1.0.0
>>> 
>>> Any advice ? 
>>> 
>>> Thanks.
>>> 
>>> 
>>>> On Mar 30, 2016, at 10:35 PM, Stephan Ewen <se...@apache.org 
>>>> <mailto:se...@apache.org>> wrote:
>>>> 
>>>> Hi!
>>>> 
>>>> A "NoSuchMethodError" usually means that you compile and run against 
>>>> different versions.
>>>> 
>>>> Make sure the version you reference in the IDE and the version on the 
>>>> cluster are the same.
>>>> 
>>>> Greetings,
>>>> Stephan
>>>> 
>>>> 
>>>> 
>>>> On Wed, Mar 30, 2016 at 9:42 AM, Balaji Rajagopalan 
>>>> <balaji.rajagopa...@olacabs.com <mailto:balaji.rajagopa...@olacabs.com>> 
>>>> wrote:
>>>> I have tested kafka 0.8.0.2 with flink 1.0.0 and it works for me. Can't 
>>>> talk about kafka 0.9.0.1. 
>>>> 
>>>> On Wed, Mar 30, 2016 at 12:51 PM, Zhun Shen <shenzhunal...@gmail.com 
>>>> <mailto:shenzhunal...@gmail.com>> wrote:
>>>> Hi there,
>>>> 
>>>> flink version: 1.0.0
>>>> kafka version: 0.9.0.0
>>>> env: local
>>>> 
>>>> I run the script below:
>>>> ./bin/flink run -c com.test.flink.FlinkTest test.jar --topic nginx-logs 
>>>> --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 
>>>> --group.id <http://group.id/> myGroup --partition.assignment.strategy 
>>>> round robin
>>>> 
>>>> But I got the error:
>>>> ava.lang.NoSuchMethodError: 
>>>> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
>>>>         at 
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194)
>>>>         at 
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164)
>>>>         at 
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)
>>>> 
>>>> 
>>>> The code as  below:
>>>>         DataStream<String> messageStream = env.addSource(new 
>>>> FlinkKafkaConsumer09<>("nginx-logs", new 
>>>> SimpleStringSchema(),parameterTool.getProperties()));
>>>>         messageStream.rebalance().map(new MapFunction<String, String>() {
>>>> 
>>>>             @Override
>>>>             public String map(String value) throws Exception {
>>>>                 return "Kafka and Flink says: " + value;
>>>>             }
>>>>         }).print();
>>>> 
>>>> 
>>>> I check the error with google, but it shows that it is a method of kafka 
>>>> 0.9.01. Any idea? Thanks.
>>>> 
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 

Reply via email to