It would also help if you could paste the entire POM file of your project. Maybe something is amiss with the dependencies, or the scopes?
On Sat, Aug 8, 2015 at 7:27 PM, Matthias J. Sax < mj...@informatik.hu-berlin.de> wrote: > Hi Huang, > > about Storm compatibility. Did you double check, that the file that is > missing (StormSpoutWrapper) is contained in your jar. Looking at pom.xml > does not help here, because if you specify to include a file, but maven > cannot find it, it will just not add it to the jar, but build will > succeed. Thus, you need to check the jar file itself via command line: > unzip -l myjarfile.jar > or > unzip -l myjarfile.jar | grep file-I-am-looking-for > > I guess your jar is not build correctly, ie, the file is not there... > > Did you have a look into pom.xml for flink-storm-compatibililty-example > and the corresponding word-count-storm.xml? This shows how to build a > jar correctly (it was recently fixed, so make sure you update to the > latest master) > > You can also have a look here how to package jars correctly (even if > this example is about Flink ML): > > https://stackoverflow.com/questions/31661900/maven-build-has-missing-package-linkages/31662642#31662642 > > -Matthias > > On 08/08/2015 11:15 AM, huangwei (G) wrote: > > Hi, > > I get some trouble in developing Flink applications. > > > > 1. > > I want to test the performance between Storm and > flink-storm-compatibility using the test program: > https://github.com/project-flink/flink-perf/blob/master/storm-jobs/src/jvm/experiments/Throughput.java > . > > And there is a bit of my changes with this Throughput.java below: > > > > > > > > public static void main(String[] args) throws Exception { > > ParameterTool pt = ParameterTool.fromArgs(args); > > > > int par = pt.getInt("para"); > > > > final FlinkTopologyBuilder builder = new > FlinkTopologyBuilder(); > > > > builder.setSpout("source0", new Generator(pt), > pt.getInt("sourceParallelism")); > > > > int i = 0; > > for (; i < pt.getInt("repartitions", 1) - 1; i++) { > > System.out.println("adding source" + i + " > --> source" + (i + 1)); > > builder.setBolt("source" + (i + 1), new > RepartPassThroughBolt(pt), pt.getInt("sinkParallelism")) > > .fieldsGrouping("source" > + i, new Fields("id")); > > } > > > > System.out.println("adding final source" + i + " --> > sink"); > > > > builder.setBolt("sink", new Sink(pt), > pt.getInt("sinkParallelism")).fieldsGrouping("source" + i, new > Fields("id")); > > > > Config conf = new Config(); > > conf.setDebug(false); > > //System.exit(1); > > > > // execute program locally > > final FlinkLocalCluster cluster = > FlinkLocalCluster.getLocalCluster(); > > cluster.submitTopology("throughput", null, > builder.createTopology()); > > > > Utils.sleep(10 * 1000); > > > > // TODO kill does no do anything so far > > cluster.killTopology("throughput"); > > cluster.shutdown(); > > } > > > > > > This program will run well in IDEA with flink-storm-compatibility. > > However, when I packaged it into a jar file and run on the > flink-0.10SNAPSHOT there is a problem in flink-client log file: > > > > java.lang.Exception: Call to registerInputOutput() of invokable failed > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:521) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: > Cannot instantiate user function. > > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:187) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:90) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:518) > > ... 1 more > > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper > > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > > at java.lang.Class.forName0(Native Method) > > at java.lang.Class.forName(Class.java:348) > > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:71) > > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) > > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) > > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) > > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) > > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > > at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302) > > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264) > > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:185) > > ... 3 more > > > > > > And this class(StormSpoutWrapper) was exist in my packaged jar file. > > As you can see part of my pom.xml: > > > > <!-- Throughput --> > > <execution> > > <id>Throughput</id> > > <phase>package</phase> > > <goals> > > <goal>jar</goal> > > </goals> > > <configuration> > > <classifier>Throughput</classifier> > > > > <archive> > > <manifestEntries> > > > > <program-class>org.apache.flink.stormcompatibility.experiments.Throughput</program-class> > > </manifestEntries> > > </archive> > > > > <includes> > > <!-- from storm-core --> > > <include>defaults.yaml</include> > > <include>backtype/storm/*.class</include> > > > <include>backtype/storm/serialization/*.class</include> > > > <include>backtype/storm/topology/*.class</include> > > > <include>backtype/storm/topology/base/*.class</include> > > > <include>backtype/storm/utils/*.class</include> > > > <include>backtype/storm/spout/*.class</include> > > > <include>backtype/storm/task/*.class</include> > > > <include>backtype/storm/tuple/*.class</include> > > > <include>backtype/storm/generated/*.class</include> > > > <include>backtype/storm/metric/**/*.class</include> > > > <include>org/apache/storm/curator/*.class</include> > > > <include>org/apache/thrift7/**/*.class</include> > > > <!--<include>org/yaml/snakeyaml/constructor/*.class</include>--> > > > <include>org/yaml/snakeyaml/**/*.class</include> > > <!-- Storm's recursive dependencies --> > > > <include>org/json/simple/**/*.class</include> > > <!-- compatibility layer --> > > > <include>org/apache/flink/stormcompatibility/api/*.class</include> > > > <include>org/apache/flink/stormcompatibility/wrappers/*.class</include> > > <!-- Word Count --> > > > > <include>org/apache/flink/stormcompatibility/experiments/Throughput.class</include> > > > > <include>org/apache/flink/stormcompatibility/experiments/Throughput$*.class</include> > > </includes> > > </configuration> > > </execution> > > > > So how can I fix it? > > > > > > 2. > > There is a case following using operator join: > > > > DataStream<Tuple3<String, Integer, Long>> user = env.addSource(new > sourceUserFunction()); > > DataStream<Tuple2<String, Integer>> area = env.addSource(new > sourceAreaFunction()); > > > > DataStream<Tuple2<String, Integer>> sink = user > > .join(area) > > .onWindow(15, TimeUnit.MINUTES) > > .where(0) > > .equalTo(0) > > .with(new JoinFunction<Tuple3<String, Integer, Long>, > Tuple2<String, Integer>, Tuple2<String, Integer>>() { > > @Override > > public Tuple2<String, Integer> join(Tuple3<String, Integer, > Long> first, Tuple2<String, Integer> second) throws Exception { > > if (first.f1 + second.f1 > 10){ > > return new Tuple2<String, Integer>(first.f0, first.f1 + > second.f1); > > } > > return null; > > } > > }); > > > > As you see, I don`t want to return null when the condition is not > satisfied. > > But there is not any JoinFunction with Collector. > > I found a FlatJoinFunction which allows the Collector. > > However, the FlatJoinFunction seem only can be used in DataSet instead > DataStream. > > Is there any other way to improve this case? > > > > PS. I`m sorry about this email. You may ignore me during the weekend. > > > > Greetings, > > Huang Wei > > 华为技术有限公司 Huawei Technologies Co., Ltd. > > > > > > Tel:+86 18106512602 > > Email:huangwei...@huawei.com > > > >