[ 
https://issues.apache.org/jira/browse/FLINK-5755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-5755.
--------------------------------------
    Resolution: Not A Problem

> Flink with Kafka connection
> ---------------------------
>
>                 Key: FLINK-5755
>                 URL: https://issues.apache.org/jira/browse/FLINK-5755
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, Kafka Connector
>    Affects Versions: 1.1.3
>         Environment: Ubuntu 16.04.1 LTS
> Flink 1.1.3
> Kakfa 0.10.1.1
>            Reporter: Fábio Dias
>
> I'm trying to connect flink with kafka (Flink 1.1.3 Kakfa 0.10.1.1)
> I already try all the fixes that i could find, but none of them work.
> pom.xml :
> <project xmlns="http://maven.apache.org/POM/4.0.0"; 
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>       <modelVersion>4.0.0</modelVersion>
>       
>       <groupId>ux</groupId>
>       <artifactId>logs</artifactId>
>       <version>1.3-SNAPSHOT</version>
>       <packaging>jar</packaging>
>       <name>Flink Quickstart Job</name>
>       <url>http://www.myorganization.org</url>
>       <properties>
>               
> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>               <flink.version>${project.version}</flink.version>
>               <slf4j.version>1.7.7</slf4j.version>
>               <log4j.version>1.2.17</log4j.version>
>       </properties>
>       <repositories>
>               <repository>
>                       <id>apache.snapshots</id>
>                       <name>Apache Development Snapshot Repository</name>
>                       
> <url>https://repository.apache.org/content/repositories/snapshots/</url>
>                       <releases>
>                               <enabled>false</enabled>
>                       </releases>
>                       <snapshots>
>                               <enabled>true</enabled>
>                       </snapshots>
>               </repository>
>       </repositories>
>       <dependencies>
> <dependency>
>       <groupId>junit</groupId>
>       <artifactId>junit</artifactId>
>       <version>3.8.1</version>
>       <scope>test</scope>
>     </dependency>
>               <dependency>
>                 <groupId>org.apache.flink</groupId>
>                 <artifactId>flink-streaming-java_2.10</artifactId>
>                 <version>${project.version}</version>
>               </dependency>
>               <dependency>
>                 <groupId>org.apache.flink</groupId>
>                 <artifactId>flink-java</artifactId>
>                 <version>${project.version}</version>
>               </dependency>
>               <dependency>
>                 <groupId>org.apache.flink</groupId>
>                 <artifactId>flink-clients_2.10</artifactId>
>                 <version>${project.version}</version>
>               </dependency>
>               <dependency>
>                       <groupId>org.apache.flink</groupId>
>                       <artifactId>flink-connector-kafka-0.10_2.10</artifactId>
>                       <version>1.3-SNAPSHOT</version>
>               </dependency>
>               <dependency>
>                       <groupId>org.slf4j</groupId>
>                       <artifactId>slf4j-log4j12</artifactId>
>                       <version>${slf4j.version}</version>
>               </dependency>
>               <dependency>
>                       <groupId>log4j</groupId>
>                       <artifactId>log4j</artifactId>
>                       <version>${log4j.version}</version>
>               </dependency>
>               
>       </dependencies>
>       <profiles>
>               <profile>
>                       <id>build-jar</id>
>                       <activation>
>                               <activeByDefault>false</activeByDefault>
>                       </activation>
>                       <dependencies>
>                               <dependency>
>                                       <groupId>org.apache.flink</groupId>
>                                       <artifactId>flink-java</artifactId>
>                                       <version>${project.version}</version>
>                                       <scope>provided</scope>
>                               </dependency>
>                               <dependency>
>                                       <groupId>org.apache.flink</groupId>
>                                       
> <artifactId>flink-streaming-java_2.10</artifactId>
>                                       <version>${project.version}</version>
>                                       <scope>provided</scope>
>                               </dependency>
>                               <dependency>
>                                       <groupId>org.apache.flink</groupId>
>                                       
> <artifactId>flink-clients_2.10</artifactId>
>                                       <version>1.3-SNAPSHOT</version>
>                                       <scope>provided</scope>
>                               </dependency>
>                               <dependency>
>                                       <groupId>org.slf4j</groupId>
>                                       <artifactId>slf4j-log4j12</artifactId>
>                                       <version>${slf4j.version}</version>
>                                       <scope>provided</scope>
>                               </dependency>
>                               <dependency>
>                                       <groupId>log4j</groupId>
>                                       <artifactId>log4j</artifactId>
>                                       <version>${log4j.version}</version>
>                                       <scope>provided</scope>
>                               </dependency>
>                       </dependencies>
>                       <build>
>                               <plugins>
>                                       <plugin>
>                                               
> <groupId>org.apache.maven.plugins</groupId>
>                                               
> <artifactId>maven-shade-plugin</artifactId>
>                                               <version>2.4.1</version>
>                                               <executions>
>                                                       <execution>
>                                                               
> <phase>package</phase>
>                                                               <goals>
>                                                                       
> <goal>shade</goal>
>                                                               </goals>
>                                                               <configuration>
>                                                                       
> <artifactSet>
>                                                                               
> <excludes combine.self="override"></excludes>
>                                                                       
> </artifactSet>
>                                                               </configuration>
>                                                       </execution>
>                                               </executions>
>                                       </plugin>
>                               </plugins>
>                       </build>
>               </profile>
>       </profiles>
>       <build>
>               <plugins>
>                       <plugin>
>                               <groupId>org.apache.maven.plugins</groupId>
>                               <artifactId>maven-compiler-plugin</artifactId>
>                               <version>3.1</version>
>                               <configuration>
>                                       <source>1.8</source>
>                                       <target>1.8</target>
>                               </configuration>
>                       </plugin>
>               </plugins>
>       </build>
> </project>
> my java code : 
> import java.util.Properties;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
> public class App 
> {
>     public static void main(String[] args) throws Exception {
>               
>               System.out.println("Hello World!");
>     
>               StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>               Properties properties = new Properties();
>           properties.setProperty("bootstrap.servers", "localhost:9092");
>           properties.setProperty("zookeeper.connect", "localhost:2181");
>           properties.setProperty("group.id", "flink_consumer");
>               DataStream<String> messageStream = env.addSource(new 
> FlinkKafkaConsumer010<>
>                       ("ux_logs", new SimpleStringSchema(), properties));
>               messageStream.rebalance().map(new MapFunction<String, String>() 
> {
>                       private static final long serialVersionUID = 
> -6867736771747690202L;
>                       public String map(String value) throws Exception {
>                               return "Kafka and Flink says: " + value;
>                       }
>               }).print();
>               
>           env.execute();
>     }
> }
> And when i compile it, i get the following error:
> java.lang.NoClassDefFoundError: 
> org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010
>         at ux.App.main(App.java:28)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>         at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>         at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:320)
>         at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
>         at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> Do i need to remove my kafka, and run a older version?
> Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to