Hi, how did you build the jar file? Have you checked whether your classes are in the jar file?
On Thu, Aug 6, 2015 at 11:08 AM, Michael Huelfenhaus < m.huelfenh...@davengo.com> wrote: > Hello everybody > > I am truing to build a very simple streaming application with the nightly > build of flink 0.10, my code runs fine in eclipse. > > But when I build and deploy the jar locally I always get > java.lang.ClassNotFoundException: com.otter.ist.flink.DaoJoin$1 > > There is also no plan visible in the web interface. > > I start the local flink 0.10 with start-local-streaming.sh after building > it from the git code > > Below you find the complete error, my code and the pom.xml any help is > appreciated. > > Cheers Michael > > > error log from web interface: > An error occurred while invoking the program: > > The main method caused an error. > > > org.apache.flink.runtime.client.JobExecutionException: Job execution > failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:364) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LeaderSessionMessages$$anonfun$receive$1.applyOrElse(LeaderSessionMessages.scala:40) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:101) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception: Call to registerInputOutput() of invokable > failed > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: > Cannot instantiate user function. > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:207) > at > org.apache.flink.streaming.runtime.tasks.OutputHandler.createChainedCollector(OutputHandler.java:173) > at > org.apache.flink.streaming.runtime.tasks.OutputHandler.createChainedCollector(OutputHandler.java:159) > at > org.apache.flink.streaming.runtime.tasks.OutputHandler.<init>(OutputHandler.java:107) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:99) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:523) > ... 1 more > Caused by: java.lang.ClassNotFoundException: com.otter.ist.flink.DaoJoin$1 > at java.net.URLClassLoader$1.run(URLClassLoader.java:372) > at java.net.URLClassLoader$1.run(URLClassLoader.java:361) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:360) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:344) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:71) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264) > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:205) > ... 6 more > > my code: > > package com.otter.ist.flink; > > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.java.tuple.Tuple3; > import org.apache.flink.streaming.api.datastream.DataStream; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > > public class DaoJoin { > > public static void main(String[] args) throws Exception { > // > ************************************************************************* > // PROGRAM > // > ************************************************************************* > > if (!parseParameters(args)) { > return; > } > > // set up the execution environment > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(); > // final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > > // get input data > DataStream<String> text = getTextDataStream(env); > > DataStream<Tuple3<String, String, Integer>> epcs = > text.map(new MapFunction<String, Tuple3<String, String, Integer>>(){ > > private static final long serialVersionUID = > -7889264579632622427L; > > @Override > public Tuple3<String, String, Integer> map(String > line) throws Exception { > String[] fields = line.split(" "); > > return new Tuple3<String, String, > Integer>(fields[0], fields[1], Integer.parseInt(fields[2])); > } > > }); > > // emit result > if (fileOutput) { > epcs.writeAsText(outputPath); > } else { > epcs.print(); > } > System.out.println(env.getExecutionPlan()); > > // execute program > env.execute("DaoJoin"); > } > > // > ************************************************************************* > // UTIL METHODS > // > ************************************************************************* > > private static boolean fileOutput = false; > private static String textPath; > private static String outputPath; > > private static boolean parseParameters(String[] args) { > > if (args.length > 0) { > // parse input arguments > fileOutput = true; > if (args.length == 2) { > textPath = args[0]; > outputPath = args[1]; > } else { > System.err.println("Usage: DaoJoin <text > path> <result path>"); > return false; > } > System.out.println("fileout: " + fileOutput); > } else { > System.out.println("Executing WordCount example > with built-in default data."); > System.out.println(" Provide parameters to read > input data from a file."); > System.out.println(" Usage: WordCount <text path> > <result path>"); > } > return true; > } > > private static DataStream<String> > getTextDataStream(StreamExecutionEnvironment env) { > // read the text file from given input path > return env.readTextFile(textPath); > } > } > > > the pom.xml > <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi=" > http://xwww.w3.org/2001/XMLSchema-instance" > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/xsd/maven-4.0.0.xsd"> > <modelVersion>4.0.0</modelVersion> > > <groupId>com.otter.ist.flink</groupId> > <artifactId>flink-test</artifactId> > <version>0.1</version> > <packaging>jar</packaging> > > <name>DaoJoin</name> > <url>http://www.otter.com</url> > > <properties> > > <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> > </properties> > > <repositories> > <repository> > <id>apache.snapshots</id> > <name>Apache Development Snapshot Repository</name> > <url> > https://repository.apache.org/content/repositories/snapshots/</url> > <releases> > <enabled>false</enabled> > </releases> > <snapshots> > <enabled>true</enabled> > </snapshots> > </repository> > </repositories> > > <!-- > > Execute "mvn clean package -Pbuild-jar" > to build a jar file out of this project! > > How to use the Flink Quickstart pom: > > a) Adding new dependencies: > You can add dependencies to the list below. > Please check if the maven-shade-plugin below is > filtering out your dependency > and remove the exclude from there. > > b) Build a jar for running on the cluster: > There are two options for creating a jar from this > project > > b.1) "mvn clean package" -> this will create a fat > jar which contains all > dependencies necessary for running > the jar created by this pom in a cluster. > The "maven-shade-plugin" excludes > everything that is provided on a running Flink cluster. > > b.2) "mvn clean package -Pbuild-jar" -> This will > also create a fat-jar, but with much > nicer dependency exclusion > handling. This approach is preferred and leads to > much cleaner jar files. > --> > > <dependencies> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>0.10-SNAPSHOT</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-streaming-core</artifactId> > <version>0.10-SNAPSHOT</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients</artifactId> > <version>0.10-SNAPSHOT</version> > </dependency> > </dependencies> > > <build> > <plugins> > <!-- We use the maven-shade plugin to create a fat > jar that contains all dependencies > except flink and it's transitive dependencies. The > resulting fat-jar can be executed > on a cluster. Change the value of Program-Class if > your program entry point changes. --> > <plugin> > <groupId>org.apache.maven.plugins</groupId> > <artifactId>maven-shade-plugin</artifactId> > <version>2.3</version> > <executions> > <!-- Run shade goal on package > phase --> > <execution> > <phase>package</phase> > <goals> > <goal>shade</goal> > </goals> > <configuration> > <artifactSet> > <excludes> > > <!-- This list contains all dependencies of flink-dist > > Everything else will be packaged into the fat-jar > --> > > <exclude>org.apache.flink:flink-shaded-*</exclude> > > <exclude>org.apache.flink:flink-core</exclude> > > <exclude>org.apache.flink:flink-java</exclude> > > <exclude>org.apache.flink:flink-scala</exclude> > > <exclude>org.apache.flink:flink-runtime</exclude> > > <exclude>org.apache.flink:flink-optimizer</exclude> > > <exclude>org.apache.flink:flink-clients</exclude> > > <exclude>org.apache.flink:flink-spargel</exclude> > > <exclude>org.apache.flink:flink-avro</exclude> > > <exclude>org.apache.flink:flink-java-examples</exclude> > > <exclude>org.apache.flink:flink-scala-examples</exclude> > > <exclude>org.apache.flink:flink-streaming-examples</exclude> > > <exclude>org.apache.flink:flink-streaming-core</exclude> > > > <!-- Also exclude very big transitive dependencies of Flink > > > WARNING: You have to remove these excludes if your code relies on other > > versions of these dependencies. > > --> > > <exclude>org.scala-lang:scala-library</exclude> > > <exclude>org.scala-lang:scala-compiler</exclude> > > <exclude>org.scala-lang:scala-reflect</exclude> > > <exclude>com.amazonaws:aws-java-sdk</exclude> > > <exclude>com.typesafe.akka:akka-actor_*</exclude> > > <exclude>com.typesafe.akka:akka-remote_*</exclude> > > <exclude>com.typesafe.akka:akka-slf4j_*</exclude> > > <exclude>io.netty:netty-all</exclude> > > <exclude>io.netty:netty</exclude> > > <exclude>org.eclipse.jetty:jetty-server</exclude> > > <exclude>org.eclipse.jetty:jetty-continuation</exclude> > > <exclude>org.eclipse.jetty:jetty-http</exclude> > > <exclude>org.eclipse.jetty:jetty-io</exclude> > > <exclude>org.eclipse.jetty:jetty-util</exclude> > > <exclude>org.eclipse.jetty:jetty-security</exclude> > > <exclude>org.eclipse.jetty:jetty-servlet</exclude> > > <exclude>commons-fileupload:commons-fileupload</exclude> > > <exclude>org.apache.avro:avro</exclude> > > <exclude>commons-collections:commons-collections</exclude> > > <exclude>org.codehaus.jackson:jackson-core-asl</exclude> > > <exclude>org.codehaus.jackson:jackson-mapper-asl</exclude> > > <exclude>com.thoughtworks.paranamer:paranamer</exclude> > > <exclude>org.xerial.snappy:snappy-java</exclude> > > <exclude>org.apache.commons:commons-compress</exclude> > > <exclude>org.tukaani:xz</exclude> > > <exclude>com.esotericsoftware.kryo:kryo</exclude> > > <exclude>com.esotericsoftware.minlog:minlog</exclude> > > <exclude>org.objenesis:objenesis</exclude> > > <exclude>com.twitter:chill_*</exclude> > > <exclude>com.twitter:chill-java</exclude> > > <exclude>com.twitter:chill-avro_*</exclude> > > <exclude>com.twitter:chill-bijection_*</exclude> > > <exclude>com.twitter:bijection-core_*</exclude> > > <exclude>com.twitter:bijection-avro_*</exclude> > > <exclude>commons-lang:commons-lang</exclude> > > <exclude>junit:junit</exclude> > > <exclude>de.javakaffee:kryo-serializers</exclude> > > <exclude>joda-time:joda-time</exclude> > > <exclude>org.apache.commons:commons-lang3</exclude> > > <exclude>org.slf4j:slf4j-api</exclude> > > <exclude>org.slf4j:slf4j-log4j12</exclude> > > <exclude>log4j:log4j</exclude> > > <exclude>org.apache.commons:commons-math</exclude> > > <exclude>org.apache.sling:org.apache.sling.commons.json</exclude> > > <exclude>commons-logging:commons-logging</exclude> > > <exclude>org.apache.httpcomponents:httpclient</exclude> > > <exclude>org.apache.httpcomponents:httpcore</exclude> > > <exclude>commons-codec:commons-codec</exclude> > > <exclude>com.fasterxml.jackson.core:jackson-core</exclude> > > <exclude>com.fasterxml.jackson.core:jackson-databind</exclude> > > <exclude>com.fasterxml.jackson.core:jackson-annotations</exclude> > > <exclude>org.codehaus.jettison:jettison</exclude> > > <exclude>stax:stax-api</exclude> > > <exclude>com.typesafe:config</exclude> > > <exclude>org.uncommons.maths:uncommons-maths</exclude> > > <exclude>com.github.scopt:scopt_*</exclude> > > <exclude>org.mortbay.jetty:servlet-api</exclude> > > <exclude>commons-io:commons-io</exclude> > > <exclude>commons-cli:commons-cli</exclude> > </excludes> > </artifactSet> > <filters> > <filter> > > <artifact>org.apache.flink:*</artifact> > > <excludes> > > <exclude>org/apache/flink/shaded/**</exclude> > > <exclude>web-docs/**</exclude> > > </excludes> > </filter> > </filters> > <transformers> > <!-- add > Main-Class to manifest file --> > > <transformer > implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> > > <mainClass>com.otter.ist.flink.DaoJoin</mainClass> > > </transformer> > </transformers> > > <createDependencyReducedPom>false</createDependencyReducedPom> > </configuration> > </execution> > </executions> > </plugin> > > <!-- Configure the jar plugin to add the main > class as a manifest entry --> > <plugin> > <groupId>org.apache.maven.plugins</groupId> > <artifactId>maven-jar-plugin</artifactId> > <version>2.5</version> > <configuration> > <archive> > <manifestEntries> > > <Main-Class>com.otter.ist.flink.DaoJoin</Main-Class> > </manifestEntries> > </archive> > </configuration> > </plugin> > > <plugin> > <groupId>org.apache.maven.plugins</groupId> > > <artifactId>maven-compiler-plugin</artifactId> > <version>3.1</version> > <configuration> > <source>1.8</source> <!-- If you > want to use Java 8, change this to "1.8" --> > <target>1.8</target> <!-- If you > want to use Java 8, change this to "1.8" --> > </configuration> > </plugin> > </plugins> > > > <!-- If you want to use Java 8 Lambda Expressions > uncomment the following lines --> > <!-- > <pluginManagement> > <plugins> > <plugin> > > <artifactId>maven-compiler-plugin</artifactId> > <configuration> > <source>1.8</source> > <target>1.8</target> > > <compilerId>jdt</compilerId> > </configuration> > <dependencies> > <dependency> > > <groupId>org.eclipse.tycho</groupId> > > <artifactId>tycho-compiler-jdt</artifactId> > > <version>0.21.0</version> > </dependency> > </dependencies> > </plugin> > > <plugin> > <groupId>org.eclipse.m2e</groupId> > > <artifactId>lifecycle-mapping</artifactId> > <version>1.0.0</version> > <configuration> > <lifecycleMappingMetadata> > <pluginExecutions> > > <pluginExecution> > > <pluginExecutionFilter> > > <groupId>org.apache.maven.plugins</groupId> > > <artifactId>maven-assembly-plugin</artifactId> > > <versionRange>[2.4,)</versionRange> > > <goals> > > <goal>single</goal> > > </goals> > > </pluginExecutionFilter> > > <action> > > <ignore/> > > </action> > > </pluginExecution> > > <pluginExecution> > > <pluginExecutionFilter> > > <groupId>org.apache.maven.plugins</groupId> > > <artifactId>maven-compiler-plugin</artifactId> > > <versionRange>[3.1,)</versionRange> > > <goals> > > <goal>testCompile</goal> > > <goal>compile</goal> > > </goals> > > </pluginExecutionFilter> > > <action> > > <ignore/> > > </action> > > </pluginExecution> > </pluginExecutions> > </lifecycleMappingMetadata> > </configuration> > </plugin> > </plugins> > </pluginManagement> > --> > > </build> > <profiles> > <profile> > <!-- A profile that does everyting correctly: > We set the Flink dependencies to provided --> > <id>build-jar</id> > <activation> > <activeByDefault>false</activeByDefault> > </activation> > <dependencies> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>0.10-SNAPSHOT</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-streaming-core</artifactId> > <version>0.10-SNAPSHOT</version> > <scope>provided</scope> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > > <artifactId>flink-clients</artifactId> > <version>0.10-SNAPSHOT</version> > <scope>provided</scope> > </dependency> > </dependencies> > </profile> > </profiles> > </project>