[ https://issues.apache.org/jira/browse/FLINK-5755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tzu-Li (Gordon) Tai closed FLINK-5755. -------------------------------------- Resolution: Not A Problem > Flink with Kafka connection > --------------------------- > > Key: FLINK-5755 > URL: https://issues.apache.org/jira/browse/FLINK-5755 > Project: Flink > Issue Type: Bug > Components: DataStream API, Kafka Connector > Affects Versions: 1.1.3 > Environment: Ubuntu 16.04.1 LTS > Flink 1.1.3 > Kakfa 0.10.1.1 > Reporter: Fábio Dias > > I'm trying to connect flink with kafka (Flink 1.1.3 Kakfa 0.10.1.1) > I already try all the fixes that i could find, but none of them work. > pom.xml : > <project xmlns="http://maven.apache.org/POM/4.0.0" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/xsd/maven-4.0.0.xsd"> > <modelVersion>4.0.0</modelVersion> > > <groupId>ux</groupId> > <artifactId>logs</artifactId> > <version>1.3-SNAPSHOT</version> > <packaging>jar</packaging> > <name>Flink Quickstart Job</name> > <url>http://www.myorganization.org</url> > <properties> > > <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> > <flink.version>${project.version}</flink.version> > <slf4j.version>1.7.7</slf4j.version> > <log4j.version>1.2.17</log4j.version> > </properties> > <repositories> > <repository> > <id>apache.snapshots</id> > <name>Apache Development Snapshot Repository</name> > > <url>https://repository.apache.org/content/repositories/snapshots/</url> > <releases> > <enabled>false</enabled> > </releases> > <snapshots> > <enabled>true</enabled> > </snapshots> > </repository> > </repositories> > <dependencies> > <dependency> > <groupId>junit</groupId> > <artifactId>junit</artifactId> > <version>3.8.1</version> > <scope>test</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-java_2.10</artifactId> > <version>${project.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>${project.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients_2.10</artifactId> > <version>${project.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka-0.10_2.10</artifactId> > <version>1.3-SNAPSHOT</version> > </dependency> > <dependency> > <groupId>org.slf4j</groupId> > <artifactId>slf4j-log4j12</artifactId> > <version>${slf4j.version}</version> > </dependency> > <dependency> > <groupId>log4j</groupId> > <artifactId>log4j</artifactId> > <version>${log4j.version}</version> > </dependency> > > </dependencies> > <profiles> > <profile> > <id>build-jar</id> > <activation> > <activeByDefault>false</activeByDefault> > </activation> > <dependencies> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>${project.version}</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-streaming-java_2.10</artifactId> > <version>${project.version}</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-clients_2.10</artifactId> > <version>1.3-SNAPSHOT</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.slf4j</groupId> > <artifactId>slf4j-log4j12</artifactId> > <version>${slf4j.version}</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>log4j</groupId> > <artifactId>log4j</artifactId> > <version>${log4j.version}</version> > <scope>provided</scope> > </dependency> > </dependencies> > <build> > <plugins> > <plugin> > > <groupId>org.apache.maven.plugins</groupId> > > <artifactId>maven-shade-plugin</artifactId> > <version>2.4.1</version> > <executions> > <execution> > > <phase>package</phase> > <goals> > > <goal>shade</goal> > </goals> > <configuration> > > <artifactSet> > > <excludes combine.self="override"></excludes> > > </artifactSet> > </configuration> > </execution> > </executions> > </plugin> > </plugins> > </build> > </profile> > </profiles> > <build> > <plugins> > <plugin> > <groupId>org.apache.maven.plugins</groupId> > <artifactId>maven-compiler-plugin</artifactId> > <version>3.1</version> > <configuration> > <source>1.8</source> > <target>1.8</target> > </configuration> > </plugin> > </plugins> > </build> > </project> > my java code : > import java.util.Properties; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; > import org.apache.flink.streaming.util.serialization.SimpleStringSchema; > public class App > { > public static void main(String[] args) throws Exception { > > System.out.println("Hello World!"); > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", "localhost:9092"); > properties.setProperty("zookeeper.connect", "localhost:2181"); > properties.setProperty("group.id", "flink_consumer"); > DataStream<String> messageStream = env.addSource(new > FlinkKafkaConsumer010<> > ("ux_logs", new SimpleStringSchema(), properties)); > messageStream.rebalance().map(new MapFunction<String, String>() > { > private static final long serialVersionUID = > -6867736771747690202L; > public String map(String value) throws Exception { > return "Kafka and Flink says: " + value; > } > }).print(); > > env.execute(); > } > } > And when i compile it, i get the following error: > java.lang.NoClassDefFoundError: > org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010 > at ux.App.main(App.java:28) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:320) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > Do i need to remove my kafka, and run a older version? > Thanks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)