For the fix, you need to use the current development version of Flink, ie, change your maven dependency from <version>1.0</version> to <version>1.1-SNAPSHOT</version>
One question: what is FlinkGitService.class? It does only show up when you get the ClassLoader: > ClassLoader loader = URLClassLoader.newInstance(new URL[] { new URL(path) }, > FlinkGitService.class.getClassLoader()); It is the class that contains methods deploy() and getFlinkTopology() ? -Matthias On 04/14/2016 05:20 AM, star jlong wrote: > What I'm trying to say is that to get submit the flink topology to flink, I > had to do an invocation of the mainMethod(which contain the actaul topology) > of my topology with the class java.lang.reflect.Method.That is if you a take > look at the following the topology the mainMethod is buildTopologypublic > class WordCountTopology { > public static void main(String[] args) throws Exception { > > Config conf = new Config(); > conf.setDebug(true); > if (args != null && args.length > 0) { > > conf.setNumWorkers(1); > conf.setMaxTaskParallelism(1); > FlinkSubmitter.submitTopology(args[0], conf, buildTopology()); > > } > // Otherwise, we are running locally > else { > conf.setMaxTaskParallelism(1); > FlinkLocalCluster cluster = new FlinkLocalCluster(); > cluster.submitTopology("word-count", conf, buildTopology()); > Thread.sleep(10000); > } > } > > public static FlinkTopology buildTopology() { > > TopologyBuilder builder = new TopologyBuilder(); > > builder.setSpout("spout", new RandomSentenceSpout(), 1); > builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout"); > builder.setBolt("count", new WordCount(), 1).fieldsGrouping("split", new > Fields("word")); > > builder.setBolt("writeIntoFile", new > BoltFileSink("/home/username/wordcount.txt", new OutputFormatter() { > private static final long serialVersionUID = 1L; > > @Override > public String format(Tuple tuple) { > return tuple.toString(); > } > }), 1).shuffleGrouping("count"); > > return FlinkTopology.createTopology(builder); > > } > }That is the method that I want to invoke from my jar so that I will be able > to do the submitting of the topology without any problem ie > > final FlinkClient cluster = > FlinkClient.getConfiguredClient(conf);cluster.submitTopology(topologyId, > uploadedJarLocation, getFlinkTopogy(String.format("file://%s", > jarPath),properties.getProperty("topologyMainClass"), > properties.getProperty("methodName"))); > Where getFlinkTopology() return the contains actually topology > > But while doing that reflection I had an exception. > > Another question please. How do I make used of the hotflix of Till. > > Le Jeudi 14 avril 2016 0h19, Matthias J. Sax <mj...@apache.org> a écrit : > > > I cannot follow completely in your last step when you fail. What do you > mean by "I'm stuck at the level when I want to copy that from the jar to > submit it to flink"? > > Btw: I copied the code from the SO question and it works for me on the > current master (which includes Till's hotfix). > > -Matthias > > > On 04/13/2016 09:39 PM, star jlong wrote: >> Thanks Matthias for the reply. >> Maybe I should explain what I want to do better.My objective is to deploy a >> flink topology on flink using java but in the production mode. For that here >> are the step that I have taken. >> 1-Convert a sample wordcount storm topology to a flink topology as indicated >> here https://flink.apache.org/news/2015/12/11/storm-compatibility.html2-Run >> the topology on local mode (with my IDE eclipse) and on production mode by >> assembling everything with a mvn clean install then submitting the jar to >> flink on the command line with >> ./bin/flink run -c stormWorldCount.WordCountTopology >> /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar >> myFlinkTopology >> At this level everything went well. >> >> Then I wanted to submit the same jar to flink on the production mode by >> using a java program. Then I decided to create a mainMethod in my topology >> that returns the flinkTopology which I wanted to submit to flink using the >> FlinkClient. But I'm stuck at the level when I want to copy that from the >> jar to submit it to flink. >> >> I know that is possible because I have used the same procedure with a storm >> topology that it works perfectly well. >> What I'm missing please? >> jstar >> >> Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <mj...@apache.org> a >> écrit : >> >> >> Hi jstar, >> >> I need to have a close look. But I am wondering why you use reflection >> in the first place? Is there any specific reason for that? >> >> Furthermore, the example provided in project maven-example also covers >> the case to submit a topology to Flink via Java. Have a look at >> org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter >> >> It contains a main() method and you can just run it as a regular Java >> program in your IDE. >> >> The SO question example should also work; it also contains a main() >> method, so you should be able to run it. >> >> Btw: If you use Storm-Compatiblitly-API there is no reason the get an >> ExecutuionEnvironment in you code. This happen automatically with >> FlinkClient/FlinkSubmitter. >> >> Furthermore, I would recommend to use FlinkSubmitter instead of >> FlinkClient as it is somewhat simpler to use. >> >> About SO question: I guess the problem is the jar assembling. The user says >> >> "Since I'using maven to handle my dependencies, I do a Mvn clean install >> to obtain the jar." >> >> I guess this is not sufficient to bundle a correct jar. Have a look into >> pom.xml from storm-examples. It uses maven plug-ins in assemble the jar >> correctly. (Regular maven artifact do not work for job submission...) >> >> Will have a close look and follow up... Hope this helps already. >> >> -Matthias >> >> On 04/13/2016 06:23 PM, star jlong wrote: >>> Thanks for the reply. >>> @Stephen, I try using RemoteEnvironment to submit my topology to flink. >>> Here is the try that I did RemoteEnvironment remote = new >>> RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute(); >>> While running the program, this is the exception that I got. >>> java.lang.RuntimeException: No data sinks have been created yet. A program >>> needs at least one sink that consumes data. Examples are writing the data >>> set or printing it. >>> >>> >>> Le Mercredi 13 avril 2016 16h54, Till Rohrmann <trohrm...@apache.org> a >>> écrit : >>> >>> >>> I think this is not the problem here since the problem is still happening >>> on the client side when the FlinkTopology tries to copy the registered >>> spouts. This happens before the job is submitted to the cluster. Maybe >>> Mathias could chime in here. >>> >>> Cheers, >>> Till >>> >>> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <se...@apache.org> wrote: >>> >>>> Hi! >>>> >>>> For flink standalone programs, you would use a "RemoteEnvironment" >>>> >>>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api". >>>> That one should deal with jars, classloaders, etc for you. >>>> >>>> Stephan >>>> >>>> >>>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <jlongs...@yahoo.fr.invalid> >>>> wrote: >>>> >>>>> Thanks for the suggestion. Sure those examples are interesting and I have >>>>> deploy them successfully on flink. The deployment is done the command >>>> line >>>>> that is doing something like >>>>> bin/flink run example.jarBut what I want is to submit the topology to >>>>> flink using a java program. >>>>> >>>>> Thanks. >>>>> >>>>> Le Mercredi 13 avril 2016 14h12, Chesnay Schepler < >>>> ches...@apache.org> >>>>> a écrit : >>>>> >>>>> >>>>> you can find examples here: >>>>> >>>>> >>>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples >>>>> >>>>> we haven't established yet that it is an API issue; it could very well >>>>> be caused by the reflection magic you're using... >>>>> >>>>> On 13.04.2016 14:57, star jlong wrote: >>>>>> Ok, it seems like there an issue with the api. So please does anybody >>>>> has a working example for deploying a topology using the flink dependency >>>>> flink-storm_2.11 or any other will be welcoming. >>>>>> >>>>>> Thanks, >>>>>> jstar >>>>>> >>>>>> Le Mercredi 13 avril 2016 13h44, star jlong >>>>> <jlongs...@yahoo.fr.INVALID> a écrit : >>>>>> >>>>>> >>>>>> Hi Schepler, >>>>>> >>>>>> Thanks for the concerned. Yes I'm actaully having the same issue as >>>>> indicated on that post because I'm the one that posted that issue. >>>>>> >>>>>> Le Mercredi 13 avril 2016 13h35, Chesnay Schepler < >>>>> ches...@apache.org> a écrit : >>>>>> >>>>>> >>>>>> >>>>> >>>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api >>>>>> >>>>>> On 13.04.2016 14:28, Till Rohrmann wrote: >>>>>>> Hi jstar, >>>>>>> >>>>>>> what's exactly the problem you're observing? >>>>>>> >>>>>>> Cheers, >>>>>>> Till >>>>>>> >>>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong >>>> <jlongs...@yahoo.fr.invalid >>>>>> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi there, >>>>>>>> >>>>>>>> I'm jstar. I have been playing around with flink. I'm very much >>>>> interested >>>>>>>> in submitting a topoloy to flink using its api. As indicated >>>>>>>> on stackoverflow, that is the try that I have given. But I was stuck >>>>> with >>>>>>>> some exception. Please any help will be welcoming. >>>>>>>> >>>>>>>> >>>>>>>> Thanks. >>>>>>>> jstar >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>> >>> >>> >> >> >> >> > > > >
signature.asc
Description: OpenPGP digital signature