It would also help if you could paste the entire POM file of your project.
Maybe something is amiss with the dependencies, or the scopes?

On Sat, Aug 8, 2015 at 7:27 PM, Matthias J. Sax <
mj...@informatik.hu-berlin.de> wrote:

> Hi Huang,
>
> about Storm compatibility. Did you double check, that the file that is
> missing (StormSpoutWrapper) is contained in your jar. Looking at pom.xml
> does not help here, because if you specify to include a file, but maven
> cannot find it, it will just not add it to the jar, but build will
> succeed. Thus, you need to check the jar file itself via command line:
>     unzip -l myjarfile.jar
> or
>     unzip -l myjarfile.jar | grep file-I-am-looking-for
>
> I guess your jar is not build correctly, ie, the file is not there...
>
> Did you have a look into pom.xml for flink-storm-compatibililty-example
> and the corresponding word-count-storm.xml? This shows how to build a
> jar correctly (it was recently fixed, so make sure you update to the
> latest master)
>
> You can also have a look here how to package jars correctly (even if
> this example is about Flink ML):
>
> https://stackoverflow.com/questions/31661900/maven-build-has-missing-package-linkages/31662642#31662642
>
> -Matthias
>
> On 08/08/2015 11:15 AM, huangwei (G) wrote:
> > Hi,
> > I get some trouble in developing Flink applications.
> >
> > 1.
> > I want to test the performance between Storm and
> flink-storm-compatibility using the test program:
> https://github.com/project-flink/flink-perf/blob/master/storm-jobs/src/jvm/experiments/Throughput.java
> .
> > And there is a bit of my changes with this Throughput.java below:
> >
> >
> >
> > public static void main(String[] args) throws Exception {
> >                    ParameterTool pt = ParameterTool.fromArgs(args);
> >
> >                    int par = pt.getInt("para");
> >
> >                    final FlinkTopologyBuilder builder = new
> FlinkTopologyBuilder();
> >
> >                    builder.setSpout("source0", new Generator(pt),
> pt.getInt("sourceParallelism"));
> >
> >                    int i = 0;
> >                    for (; i < pt.getInt("repartitions", 1) - 1; i++) {
> >                             System.out.println("adding source" + i + "
> --> source" + (i + 1));
> >                             builder.setBolt("source" + (i + 1), new
> RepartPassThroughBolt(pt), pt.getInt("sinkParallelism"))
> >                                                .fieldsGrouping("source"
> + i, new Fields("id"));
> >                    }
> >
> >                    System.out.println("adding final source" + i + " -->
> sink");
> >
> >                    builder.setBolt("sink", new Sink(pt),
> pt.getInt("sinkParallelism")).fieldsGrouping("source" + i, new
> Fields("id"));
> >
> >                    Config conf = new Config();
> >                    conf.setDebug(false);
> > //System.exit(1);
> >
> >                    // execute program locally
> >                    final FlinkLocalCluster cluster =
> FlinkLocalCluster.getLocalCluster();
> >                    cluster.submitTopology("throughput", null,
> builder.createTopology());
> >
> >                    Utils.sleep(10 * 1000);
> >
> >                    // TODO kill does no do anything so far
> >                    cluster.killTopology("throughput");
> >                    cluster.shutdown();
> >          }
> >
> >
> > This program will run well in IDEA with flink-storm-compatibility.
> > However, when I packaged it into a jar file and run on the
> flink-0.10SNAPSHOT there is a problem in flink-client log file:
> >
> > java.lang.Exception: Call to registerInputOutput() of invokable failed
> >          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:521)
> >          at java.lang.Thread.run(Thread.java:745)
> > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot instantiate user function.
> >          at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:187)
> >          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:90)
> >          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:518)
> >          ... 1 more
> > Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper
> >          at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> >          at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >          at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >          at java.lang.Class.forName0(Native Method)
> >          at java.lang.Class.forName(Class.java:348)
> >          at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:71)
> >          at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
> >          at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> >          at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> >          at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> >          at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
> >          at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> >          at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> >          at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> >          at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> >          at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302)
> >          at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264)
> >          at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:185)
> >          ... 3 more
> >
> >
> > And this class(StormSpoutWrapper) was exist in my packaged jar file.
> > As you can see part of my pom.xml:
> >
> > <!-- Throughput -->
> >                     <execution>
> >                         <id>Throughput</id>
> >                         <phase>package</phase>
> >                         <goals>
> >                             <goal>jar</goal>
> >                         </goals>
> >                         <configuration>
> >                             <classifier>Throughput</classifier>
> >
> >                             <archive>
> >                                 <manifestEntries>
> >
>  
> <program-class>org.apache.flink.stormcompatibility.experiments.Throughput</program-class>
> >                                 </manifestEntries>
> >                             </archive>
> >
> >                             <includes>
> >                                 <!-- from storm-core -->
> >                                 <include>defaults.yaml</include>
> >                                 <include>backtype/storm/*.class</include>
> >
>  <include>backtype/storm/serialization/*.class</include>
> >
>  <include>backtype/storm/topology/*.class</include>
> >
>  <include>backtype/storm/topology/base/*.class</include>
> >
>  <include>backtype/storm/utils/*.class</include>
> >
>  <include>backtype/storm/spout/*.class</include>
> >
>  <include>backtype/storm/task/*.class</include>
> >
>  <include>backtype/storm/tuple/*.class</include>
> >
>  <include>backtype/storm/generated/*.class</include>
> >
>  <include>backtype/storm/metric/**/*.class</include>
> >
>  <include>org/apache/storm/curator/*.class</include>
> >
>  <include>org/apache/thrift7/**/*.class</include>
> >
>  <!--<include>org/yaml/snakeyaml/constructor/*.class</include>-->
> >
>  <include>org/yaml/snakeyaml/**/*.class</include>
> >                                 <!-- Storm's recursive dependencies -->
> >
>  <include>org/json/simple/**/*.class</include>
> >                                 <!-- compatibility layer -->
> >
>  <include>org/apache/flink/stormcompatibility/api/*.class</include>
> >
>  <include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
> >                                 <!-- Word Count -->
> >
>  
> <include>org/apache/flink/stormcompatibility/experiments/Throughput.class</include>
> >
>  
> <include>org/apache/flink/stormcompatibility/experiments/Throughput$*.class</include>
> >                             </includes>
> >                         </configuration>
> >                     </execution>
> >
> > So how can I fix it?
> >
> >
> > 2.
> > There is a case following using operator join:
> >
> > DataStream<Tuple3<String, Integer, Long>> user = env.addSource(new
> sourceUserFunction());
> > DataStream<Tuple2<String, Integer>> area = env.addSource(new
> sourceAreaFunction());
> >
> > DataStream<Tuple2<String, Integer>> sink = user
> >       .join(area)
> >       .onWindow(15, TimeUnit.MINUTES)
> >       .where(0)
> >       .equalTo(0)
> >       .with(new JoinFunction<Tuple3<String, Integer, Long>,
> Tuple2<String, Integer>, Tuple2<String, Integer>>() {
> >          @Override
> >          public Tuple2<String, Integer> join(Tuple3<String, Integer,
> Long> first, Tuple2<String, Integer> second) throws Exception {
> >             if (first.f1 + second.f1 > 10){
> >                return new Tuple2<String, Integer>(first.f0, first.f1 +
> second.f1);
> >             }
> >             return null;
> >          }
> >       });
> >
> > As you see, I don`t want to return null when the condition is not
> satisfied.
> > But there is not any JoinFunction with Collector.
> > I found a FlatJoinFunction which allows the Collector.
> > However, the FlatJoinFunction seem only can be used in DataSet instead
> DataStream.
> > Is there any other way to improve this case?
> >
> > PS. I`m sorry about this email. You may ignore me during the weekend.
> >
> > Greetings,
> > Huang Wei
> > 华为技术有限公司 Huawei Technologies Co., Ltd.
> >
> >
> > Tel:+86 18106512602
> > Email:huangwei...@huawei.com
> >
>
>

Reply via email to