Hi Michael, in the flink-test-0.1.jar the class DaoJoin$1.class is located at com/davengo/rfidcloud/flink but Flink tries to load com.otter.ist.flink.DaoJoin$1. This might be the problem. This is somehow odd because in the source code you’ve specified the correct package com.otter.ist.flink.
Cheers, Till On Thu, Aug 6, 2015 at 11:27 AM, Michael Huelfenhaus < m.huelfenh...@davengo.com> wrote: > hi, > > how did you build the jar file? > > > mvn clean install -Pbuild-jar > > Have you checked whether your classes are in the jar file? > > > yes, this seems alright for me > > > jar tf target/flink-test-0.1.jar > META-INF/MANIFEST.MF > META-INF/ > com/ > com/davengo/ > com/davengo/rfidcloud/ > com/davengo/rfidcloud/flink/ > com/davengo/rfidcloud/flink/DaoJoin$1.class > com/davengo/rfidcloud/flink/DaoJoin.class > com/davengo/rfidcloud/flink/streampojos/ > com/davengo/rfidcloud/flink/streampojos/EpcTuple.class > log4j.properties > META-INF/maven/ > META-INF/maven/com.davengo.rfidcloud.flink/ > META-INF/maven/com.davengo.rfidcloud.flink/flink-test/ > META-INF/maven/com.davengo.rfidcloud.flink/flink-test/pom.xml > META-INF/maven/com.davengo.rfidcloud.flink/flink-test/pom.properties > > Am 06.08.2015 um 11:21 schrieb Robert Metzger <rmetz...@apache.org>: > > Hi, > > how did you build the jar file? > Have you checked whether your classes are in the jar file? > > On Thu, Aug 6, 2015 at 11:08 AM, Michael Huelfenhaus < > m.huelfenh...@davengo.com> wrote: > >> Hello everybody >> >> I am truing to build a very simple streaming application with the nightly >> build of flink 0.10, my code runs fine in eclipse. >> >> But when I build and deploy the jar locally I always get >> java.lang.ClassNotFoundException: com.otter.ist.flink.DaoJoin$1 >> >> There is also no plan visible in the web interface. >> >> I start the local flink 0.10 with start-local-streaming.sh after >> building it from the git code >> >> Below you find the complete error, my code and the pom.xml any help is >> appreciated. >> >> Cheers Michael >> >> >> error log from web interface: >> An error occurred while invoking the program: >> >> The main method caused an error. >> >> >> org.apache.flink.runtime.client.JobExecutionException: Job execution >> failed. >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:364) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >> at >> org.apache.flink.runtime.LeaderSessionMessages$$anonfun$receive$1.applyOrElse(LeaderSessionMessages.scala:40) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) >> at >> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) >> at >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) >> at >> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) >> at >> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) >> at >> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >> at >> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:101) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> Caused by: java.lang.Exception: Call to registerInputOutput() of >> invokable failed >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: >> Cannot instantiate user function. >> at >> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:207) >> at >> org.apache.flink.streaming.runtime.tasks.OutputHandler.createChainedCollector(OutputHandler.java:173) >> at >> org.apache.flink.streaming.runtime.tasks.OutputHandler.createChainedCollector(OutputHandler.java:159) >> at >> org.apache.flink.streaming.runtime.tasks.OutputHandler.<init>(OutputHandler.java:107) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:99) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:523) >> ... 1 more >> Caused by: java.lang.ClassNotFoundException: com.otter.ist.flink.DaoJoin$1 >> at java.net.URLClassLoader$1.run(URLClassLoader.java:372) >> at java.net.URLClassLoader$1.run(URLClassLoader.java:361) >> at java.security.AccessController.doPrivileged(Native Method) >> at java.net.URLClassLoader.findClass(URLClassLoader.java:360) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >> at java.lang.Class.forName0(Native Method) >> at java.lang.Class.forName(Class.java:344) >> at >> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:71) >> at >> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) >> at >> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) >> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) >> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) >> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) >> at >> java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) >> at >> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302) >> at >> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264) >> at >> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:205) >> ... 6 more >> >> my code: >> >> package com.otter.ist.flink; >> >> import org.apache.flink.api.common.functions.MapFunction; >> import org.apache.flink.api.java.tuple.Tuple3; >> import org.apache.flink.streaming.api.datastream.DataStream; >> import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> >> public class DaoJoin { >> >> public static void main(String[] args) throws Exception { >> // >> ************************************************************************* >> // PROGRAM >> // >> ************************************************************************* >> >> if (!parseParameters(args)) { >> return; >> } >> >> // set up the execution environment >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.createLocalEnvironment(); >> // final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> >> // get input data >> DataStream<String> text = getTextDataStream(env); >> >> DataStream<Tuple3<String, String, Integer>> epcs = >> text.map(new MapFunction<String, Tuple3<String, String, Integer>>(){ >> >> private static final long serialVersionUID = >> -7889264579632622427L; >> >> @Override >> public Tuple3<String, String, Integer> map(String >> line) throws Exception { >> String[] fields = line.split(" "); >> >> return new Tuple3<String, String, >> Integer>(fields[0], fields[1], Integer.parseInt(fields[2])); >> } >> >> }); >> >> // emit result >> if (fileOutput) { >> epcs.writeAsText(outputPath); >> } else { >> epcs.print(); >> } >> System.out.println(env.getExecutionPlan()); >> >> // execute program >> env.execute("DaoJoin"); >> } >> >> // >> ************************************************************************* >> // UTIL METHODS >> // >> ************************************************************************* >> >> private static boolean fileOutput = false; >> private static String textPath; >> private static String outputPath; >> >> private static boolean parseParameters(String[] args) { >> >> if (args.length > 0) { >> // parse input arguments >> fileOutput = true; >> if (args.length == 2) { >> textPath = args[0]; >> outputPath = args[1]; >> } else { >> System.err.println("Usage: DaoJoin <text >> path> <result path>"); >> return false; >> } >> System.out.println("fileout: " + fileOutput); >> } else { >> System.out.println("Executing WordCount example >> with built-in default data."); >> System.out.println(" Provide parameters to read >> input data from a file."); >> System.out.println(" Usage: WordCount <text >> path> <result path>"); >> } >> return true; >> } >> >> private static DataStream<String> >> getTextDataStream(StreamExecutionEnvironment env) { >> // read the text file from given input path >> return env.readTextFile(textPath); >> } >> } >> >> >> the pom.xml >> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi=" >> http://xwww.w3.org/2001/XMLSchema-instance" >> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >> http://maven.apache.org/xsd/maven-4.0.0.xsd"> >> <modelVersion>4.0.0</modelVersion> >> >> <groupId>com.otter.ist.flink</groupId> >> <artifactId>flink-test</artifactId> >> <version>0.1</version> >> <packaging>jar</packaging> >> >> <name>DaoJoin</name> >> <url>http://www.otter.com</url> >> >> <properties> >> >> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> >> </properties> >> >> <repositories> >> <repository> >> <id>apache.snapshots</id> >> <name>Apache Development Snapshot >> Repository</name> >> <url> >> https://repository.apache.org/content/repositories/snapshots/</url> >> <releases> >> <enabled>false</enabled> >> </releases> >> <snapshots> >> <enabled>true</enabled> >> </snapshots> >> </repository> >> </repositories> >> >> <!-- >> >> Execute "mvn clean package -Pbuild-jar" >> to build a jar file out of this project! >> >> How to use the Flink Quickstart pom: >> >> a) Adding new dependencies: >> You can add dependencies to the list below. >> Please check if the maven-shade-plugin below is >> filtering out your dependency >> and remove the exclude from there. >> >> b) Build a jar for running on the cluster: >> There are two options for creating a jar from >> this project >> >> b.1) "mvn clean package" -> this will create a >> fat jar which contains all >> dependencies necessary for >> running the jar created by this pom in a cluster. >> The "maven-shade-plugin" excludes >> everything that is provided on a running Flink cluster. >> >> b.2) "mvn clean package -Pbuild-jar" -> This will >> also create a fat-jar, but with much >> nicer dependency exclusion >> handling. This approach is preferred and leads to >> much cleaner jar files. >> --> >> >> <dependencies> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-java</artifactId> >> <version>0.10-SNAPSHOT</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-streaming-core</artifactId> >> <version>0.10-SNAPSHOT</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-clients</artifactId> >> <version>0.10-SNAPSHOT</version> >> </dependency> >> </dependencies> >> >> <build> >> <plugins> >> <!-- We use the maven-shade plugin to create a >> fat jar that contains all dependencies >> except flink and it's transitive dependencies. >> The resulting fat-jar can be executed >> on a cluster. Change the value of Program-Class >> if your program entry point changes. --> >> <plugin> >> >> <groupId>org.apache.maven.plugins</groupId> >> >> <artifactId>maven-shade-plugin</artifactId> >> <version>2.3</version> >> <executions> >> <!-- Run shade goal on package >> phase --> >> <execution> >> <phase>package</phase> >> <goals> >> <goal>shade</goal> >> </goals> >> <configuration> >> <artifactSet> >> <excludes> >> >> <!-- This list contains all dependencies of flink-dist >> >> Everything else will be packaged into the fat-jar >> >> --> >> >> <exclude>org.apache.flink:flink-shaded-*</exclude> >> >> <exclude>org.apache.flink:flink-core</exclude> >> >> <exclude>org.apache.flink:flink-java</exclude> >> >> <exclude>org.apache.flink:flink-scala</exclude> >> >> <exclude>org.apache.flink:flink-runtime</exclude> >> >> <exclude>org.apache.flink:flink-optimizer</exclude> >> >> <exclude>org.apache.flink:flink-clients</exclude> >> >> <exclude>org.apache.flink:flink-spargel</exclude> >> >> <exclude>org.apache.flink:flink-avro</exclude> >> >> <exclude>org.apache.flink:flink-java-examples</exclude> >> >> <exclude>org.apache.flink:flink-scala-examples</exclude> >> >> <exclude>org.apache.flink:flink-streaming-examples</exclude> >> >> <exclude>org.apache.flink:flink-streaming-core</exclude> >> >> >> <!-- Also exclude very big transitive dependencies of Flink >> >> >> WARNING: You have to remove these excludes if your code relies on other >> >> versions of these dependencies. >> >> >> --> >> >> <exclude>org.scala-lang:scala-library</exclude> >> >> <exclude>org.scala-lang:scala-compiler</exclude> >> >> <exclude>org.scala-lang:scala-reflect</exclude> >> >> <exclude>com.amazonaws:aws-java-sdk</exclude> >> >> <exclude>com.typesafe.akka:akka-actor_*</exclude> >> >> <exclude>com.typesafe.akka:akka-remote_*</exclude> >> >> <exclude>com.typesafe.akka:akka-slf4j_*</exclude> >> >> <exclude>io.netty:netty-all</exclude> >> >> <exclude>io.netty:netty</exclude> >> >> <exclude>org.eclipse.jetty:jetty-server</exclude> >> >> <exclude>org.eclipse.jetty:jetty-continuation</exclude> >> >> <exclude>org.eclipse.jetty:jetty-http</exclude> >> >> <exclude>org.eclipse.jetty:jetty-io</exclude> >> >> <exclude>org.eclipse.jetty:jetty-util</exclude> >> >> <exclude>org.eclipse.jetty:jetty-security</exclude> >> >> <exclude>org.eclipse.jetty:jetty-servlet</exclude> >> >> <exclude>commons-fileupload:commons-fileupload</exclude> >> >> <exclude>org.apache.avro:avro</exclude> >> >> <exclude>commons-collections:commons-collections</exclude> >> >> <exclude>org.codehaus.jackson:jackson-core-asl</exclude> >> >> <exclude>org.codehaus.jackson:jackson-mapper-asl</exclude> >> >> <exclude>com.thoughtworks.paranamer:paranamer</exclude> >> >> <exclude>org.xerial.snappy:snappy-java</exclude> >> >> <exclude>org.apache.commons:commons-compress</exclude> >> >> <exclude>org.tukaani:xz</exclude> >> >> <exclude>com.esotericsoftware.kryo:kryo</exclude> >> >> <exclude>com.esotericsoftware.minlog:minlog</exclude> >> >> <exclude>org.objenesis:objenesis</exclude> >> >> <exclude>com.twitter:chill_*</exclude> >> >> <exclude>com.twitter:chill-java</exclude> >> >> <exclude>com.twitter:chill-avro_*</exclude> >> >> <exclude>com.twitter:chill-bijection_*</exclude> >> >> <exclude>com.twitter:bijection-core_*</exclude> >> >> <exclude>com.twitter:bijection-avro_*</exclude> >> >> <exclude>commons-lang:commons-lang</exclude> >> >> <exclude>junit:junit</exclude> >> >> <exclude>de.javakaffee:kryo-serializers</exclude> >> >> <exclude>joda-time:joda-time</exclude> >> >> <exclude>org.apache.commons:commons-lang3</exclude> >> >> <exclude>org.slf4j:slf4j-api</exclude> >> >> <exclude>org.slf4j:slf4j-log4j12</exclude> >> >> <exclude>log4j:log4j</exclude> >> >> <exclude>org.apache.commons:commons-math</exclude> >> >> <exclude>org.apache.sling:org.apache.sling.commons.json</exclude> >> >> <exclude>commons-logging:commons-logging</exclude> >> >> <exclude>org.apache.httpcomponents:httpclient</exclude> >> >> <exclude>org.apache.httpcomponents:httpcore</exclude> >> >> <exclude>commons-codec:commons-codec</exclude> >> >> <exclude>com.fasterxml.jackson.core:jackson-core</exclude> >> >> <exclude>com.fasterxml.jackson.core:jackson-databind</exclude> >> >> <exclude>com.fasterxml.jackson.core:jackson-annotations</exclude> >> >> <exclude>org.codehaus.jettison:jettison</exclude> >> >> <exclude>stax:stax-api</exclude> >> >> <exclude>com.typesafe:config</exclude> >> >> <exclude>org.uncommons.maths:uncommons-maths</exclude> >> >> <exclude>com.github.scopt:scopt_*</exclude> >> >> <exclude>org.mortbay.jetty:servlet-api</exclude> >> >> <exclude>commons-io:commons-io</exclude> >> >> <exclude>commons-cli:commons-cli</exclude> >> >> </excludes> >> </artifactSet> >> <filters> >> <filter> >> >> <artifact>org.apache.flink:*</artifact> >> >> <excludes> >> >> <exclude>org/apache/flink/shaded/**</exclude> >> >> <exclude>web-docs/**</exclude> >> >> </excludes> >> </filter> >> </filters> >> <transformers> >> <!-- add >> Main-Class to manifest file --> >> >> <transformer >> implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> >> >> <mainClass>com.otter.ist.flink.DaoJoin</mainClass> >> >> </transformer> >> </transformers> >> >> <createDependencyReducedPom>false</createDependencyReducedPom> >> </configuration> >> </execution> >> </executions> >> </plugin> >> >> <!-- Configure the jar plugin to add the main >> class as a manifest entry --> >> <plugin> >> >> <groupId>org.apache.maven.plugins</groupId> >> <artifactId>maven-jar-plugin</artifactId> >> <version>2.5</version> >> <configuration> >> <archive> >> <manifestEntries> >> >> <Main-Class>com.otter.ist.flink.DaoJoin</Main-Class> >> </manifestEntries> >> </archive> >> </configuration> >> </plugin> >> >> <plugin> >> >> <groupId>org.apache.maven.plugins</groupId> >> >> <artifactId>maven-compiler-plugin</artifactId> >> <version>3.1</version> >> <configuration> >> <source>1.8</source> <!-- If you >> want to use Java 8, change this to "1.8" --> >> <target>1.8</target> <!-- If you >> want to use Java 8, change this to "1.8" --> >> </configuration> >> </plugin> >> </plugins> >> >> >> <!-- If you want to use Java 8 Lambda Expressions >> uncomment the following lines --> >> <!-- >> <pluginManagement> >> <plugins> >> <plugin> >> >> <artifactId>maven-compiler-plugin</artifactId> >> <configuration> >> <source>1.8</source> >> <target>1.8</target> >> >> <compilerId>jdt</compilerId> >> </configuration> >> <dependencies> >> <dependency> >> >> <groupId>org.eclipse.tycho</groupId> >> >> <artifactId>tycho-compiler-jdt</artifactId> >> >> <version>0.21.0</version> >> </dependency> >> </dependencies> >> </plugin> >> >> <plugin> >> <groupId>org.eclipse.m2e</groupId> >> >> <artifactId>lifecycle-mapping</artifactId> >> <version>1.0.0</version> >> <configuration> >> <lifecycleMappingMetadata> >> <pluginExecutions> >> >> <pluginExecution> >> >> <pluginExecutionFilter> >> >> <groupId>org.apache.maven.plugins</groupId> >> >> <artifactId>maven-assembly-plugin</artifactId> >> >> <versionRange>[2.4,)</versionRange> >> >> <goals> >> >> <goal>single</goal> >> >> </goals> >> >> </pluginExecutionFilter> >> >> <action> >> >> <ignore/> >> >> </action> >> >> </pluginExecution> >> >> <pluginExecution> >> >> <pluginExecutionFilter> >> >> <groupId>org.apache.maven.plugins</groupId> >> >> <artifactId>maven-compiler-plugin</artifactId> >> >> <versionRange>[3.1,)</versionRange> >> >> <goals> >> >> <goal>testCompile</goal> >> >> <goal>compile</goal> >> >> </goals> >> >> </pluginExecutionFilter> >> >> <action> >> >> <ignore/> >> >> </action> >> >> </pluginExecution> >> >> </pluginExecutions> >> >> </lifecycleMappingMetadata> >> </configuration> >> </plugin> >> </plugins> >> </pluginManagement> >> --> >> >> </build> >> <profiles> >> <profile> >> <!-- A profile that does everyting correctly: >> We set the Flink dependencies to provided --> >> <id>build-jar</id> >> <activation> >> <activeByDefault>false</activeByDefault> >> </activation> >> <dependencies> >> <dependency> >> >> <groupId>org.apache.flink</groupId> >> >> <artifactId>flink-java</artifactId> >> <version>0.10-SNAPSHOT</version> >> <scope>provided</scope> >> </dependency> >> <dependency> >> >> <groupId>org.apache.flink</groupId> >> >> <artifactId>flink-streaming-core</artifactId> >> <version>0.10-SNAPSHOT</version> >> <scope>provided</scope> >> </dependency> >> <dependency> >> >> <groupId>org.apache.flink</groupId> >> >> <artifactId>flink-clients</artifactId> >> <version>0.10-SNAPSHOT</version> >> <scope>provided</scope> >> </dependency> >> </dependencies> >> </profile> >> </profiles> >> </project> > > > >