change the version to 1.1-SNAPSHOT On 04/14/2016 11:52 AM, star jlong wrote: > One question which dependency of flink are you using because I'm using > <dependency> <groupId>org.apache.flink</groupId> > <artifactId>flink-storm-examples_2.11</artifactId> > <version>1.0.0</version></dependency> > And once I change the version to SNAPSHOT version, the pom.xml complains that > it could not satisfy the given dependency. > > Le Jeudi 14 avril 2016 10h45, star jlong <jlongs...@yahoo.fr.INVALID> a > écrit : > > > Yes it is. > > Le Jeudi 14 avril 2016 10h39, Matthias J. Sax <mj...@apache.org> a écrit : > > > For the fix, you need to use the current development version of Flink, > ie, change your maven dependency from <version>1.0</version> to > <version>1.1-SNAPSHOT</version> > > One question: what is FlinkGitService.class? It does only show up when > you get the ClassLoader: > >> ClassLoader loader = URLClassLoader.newInstance(new URL[] { new URL(path) }, >> FlinkGitService.class.getClassLoader()); > > It is the class that contains methods deploy() and getFlinkTopology() ? > > -Matthias > > On 04/14/2016 05:20 AM, star jlong wrote: >> What I'm trying to say is that to get submit the flink topology to flink, I >> had to do an invocation of the mainMethod(which contain the actaul topology) >> of my topology with the class java.lang.reflect.Method.That is if you a take >> look at the following the topology the mainMethod is buildTopologypublic >> class WordCountTopology { >> public static void main(String[] args) throws Exception { >> >> Config conf = new Config(); >> conf.setDebug(true); >> if (args != null && args.length > 0) { >> >> conf.setNumWorkers(1); >> conf.setMaxTaskParallelism(1); >> FlinkSubmitter.submitTopology(args[0], conf, buildTopology()); >> >> } >> // Otherwise, we are running locally >> else { >> conf.setMaxTaskParallelism(1); >> FlinkLocalCluster cluster = new FlinkLocalCluster(); >> cluster.submitTopology("word-count", conf, buildTopology()); >> Thread.sleep(10000); >> } >> } >> >> public static FlinkTopology buildTopology() { >> >> TopologyBuilder builder = new TopologyBuilder(); >> >> builder.setSpout("spout", new RandomSentenceSpout(), 1); >> builder.setBolt("split", new SplitSentence(), >> 1).shuffleGrouping("spout"); >> builder.setBolt("count", new WordCount(), 1).fieldsGrouping("split", new >> Fields("word")); >> >> builder.setBolt("writeIntoFile", new >> BoltFileSink("/home/username/wordcount.txt", new OutputFormatter() { >> private static final long serialVersionUID = 1L; >> >> @Override >> public String format(Tuple tuple) { >> return tuple.toString(); >> } >> }), 1).shuffleGrouping("count"); >> >> return FlinkTopology.createTopology(builder); >> >> } >> }That is the method that I want to invoke from my jar so that I will be able >> to do the submitting of the topology without any problem ie >> >> final FlinkClient cluster = >> FlinkClient.getConfiguredClient(conf);cluster.submitTopology(topologyId, >> uploadedJarLocation, getFlinkTopogy(String.format("file://%s", >> jarPath),properties.getProperty("topologyMainClass"), >> properties.getProperty("methodName"))); >> Where getFlinkTopology() return the contains actually topology >> >> But while doing that reflection I had an exception. >> >> Another question please. How do I make used of the hotflix of Till. >> >> Le Jeudi 14 avril 2016 0h19, Matthias J. Sax <mj...@apache.org> a écrit : >> >> >> I cannot follow completely in your last step when you fail. What do you >> mean by "I'm stuck at the level when I want to copy that from the jar to >> submit it to flink"? >> >> Btw: I copied the code from the SO question and it works for me on the >> current master (which includes Till's hotfix). >> >> -Matthias >> >> >> On 04/13/2016 09:39 PM, star jlong wrote: >>> Thanks Matthias for the reply. >>> Maybe I should explain what I want to do better.My objective is to deploy a >>> flink topology on flink using java but in the production mode. For that >>> here are the step that I have taken. >>> 1-Convert a sample wordcount storm topology to a flink topology as >>> indicated here >>> https://flink.apache.org/news/2015/12/11/storm-compatibility.html2-Run the >>> topology on local mode (with my IDE eclipse) and on production mode by >>> assembling everything with a mvn clean install then submitting the jar to >>> flink on the command line with >>> ./bin/flink run -c stormWorldCount.WordCountTopology >>> /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar >>> myFlinkTopology >>> At this level everything went well. >>> >>> Then I wanted to submit the same jar to flink on the production mode by >>> using a java program. Then I decided to create a mainMethod in my topology >>> that returns the flinkTopology which I wanted to submit to flink using the >>> FlinkClient. But I'm stuck at the level when I want to copy that from the >>> jar to submit it to flink. >>> >>> I know that is possible because I have used the same procedure with a storm >>> topology that it works perfectly well. >>> What I'm missing please? >>> jstar >>> >>> Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <mj...@apache.org> a >>> écrit : >>> >>> >>> Hi jstar, >>> >>> I need to have a close look. But I am wondering why you use reflection >>> in the first place? Is there any specific reason for that? >>> >>> Furthermore, the example provided in project maven-example also covers >>> the case to submit a topology to Flink via Java. Have a look at >>> org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter >>> >>> It contains a main() method and you can just run it as a regular Java >>> program in your IDE. >>> >>> The SO question example should also work; it also contains a main() >>> method, so you should be able to run it. >>> >>> Btw: If you use Storm-Compatiblitly-API there is no reason the get an >>> ExecutuionEnvironment in you code. This happen automatically with >>> FlinkClient/FlinkSubmitter. >>> >>> Furthermore, I would recommend to use FlinkSubmitter instead of >>> FlinkClient as it is somewhat simpler to use. >>> >>> About SO question: I guess the problem is the jar assembling. The user says >>> >>> "Since I'using maven to handle my dependencies, I do a Mvn clean install >>> to obtain the jar." >>> >>> I guess this is not sufficient to bundle a correct jar. Have a look into >>> pom.xml from storm-examples. It uses maven plug-ins in assemble the jar >>> correctly. (Regular maven artifact do not work for job submission...) >>> >>> Will have a close look and follow up... Hope this helps already. >>> >>> -Matthias >>> >>> On 04/13/2016 06:23 PM, star jlong wrote: >>>> Thanks for the reply. >>>> @Stephen, I try using RemoteEnvironment to submit my topology to flink. >>>> Here is the try that I did RemoteEnvironment remote = new >>>> RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute(); >>>> While running the program, this is the exception that I got. >>>> java.lang.RuntimeException: No data sinks have been created yet. A program >>>> needs at least one sink that consumes data. Examples are writing the data >>>> set or printing it. >>>> >>>> >>>> Le Mercredi 13 avril 2016 16h54, Till Rohrmann <trohrm...@apache.org> >>>> a écrit : >>>> >>>> >>>> I think this is not the problem here since the problem is still happening >>>> on the client side when the FlinkTopology tries to copy the registered >>>> spouts. This happens before the job is submitted to the cluster. Maybe >>>> Mathias could chime in here. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <se...@apache.org> wrote: >>>> >>>>> Hi! >>>>> >>>>> For flink standalone programs, you would use a "RemoteEnvironment" >>>>> >>>>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api". >>>>> That one should deal with jars, classloaders, etc for you. >>>>> >>>>> Stephan >>>>> >>>>> >>>>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <jlongs...@yahoo.fr.invalid> >>>>> wrote: >>>>> >>>>>> Thanks for the suggestion. Sure those examples are interesting and I have >>>>>> deploy them successfully on flink. The deployment is done the command >>>>> line >>>>>> that is doing something like >>>>>> bin/flink run example.jarBut what I want is to submit the topology to >>>>>> flink using a java program. >>>>>> >>>>>> Thanks. >>>>>> >>>>>> Le Mercredi 13 avril 2016 14h12, Chesnay Schepler < >>>>> ches...@apache.org> >>>>>> a écrit : >>>>>> >>>>>> >>>>>> you can find examples here: >>>>>> >>>>>> >>>>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples >>>>>> >>>>>> we haven't established yet that it is an API issue; it could very well >>>>>> be caused by the reflection magic you're using... >>>>>> >>>>>> On 13.04.2016 14:57, star jlong wrote: >>>>>>> Ok, it seems like there an issue with the api. So please does anybody >>>>>> has a working example for deploying a topology using the flink dependency >>>>>> flink-storm_2.11 or any other will be welcoming. >>>>>>> >>>>>>> Thanks, >>>>>>> jstar >>>>>>> >>>>>>> Le Mercredi 13 avril 2016 13h44, star jlong >>>>>> <jlongs...@yahoo.fr.INVALID> a écrit : >>>>>>> >>>>>>> >>>>>>> Hi Schepler, >>>>>>> >>>>>>> Thanks for the concerned. Yes I'm actaully having the same issue as >>>>>> indicated on that post because I'm the one that posted that issue. >>>>>>> >>>>>>> Le Mercredi 13 avril 2016 13h35, Chesnay Schepler < >>>>>> ches...@apache.org> a écrit : >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api >>>>>>> >>>>>>> On 13.04.2016 14:28, Till Rohrmann wrote: >>>>>>>> Hi jstar, >>>>>>>> >>>>>>>> what's exactly the problem you're observing? >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Till >>>>>>>> >>>>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong >>>>> <jlongs...@yahoo.fr.invalid >>>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi there, >>>>>>>>> >>>>>>>>> I'm jstar. I have been playing around with flink. I'm very much >>>>>> interested >>>>>>>>> in submitting a topoloy to flink using its api. As indicated >>>>>>>>> on stackoverflow, that is the try that I have given. But I was stuck >>>>>> with >>>>>>>>> some exception. Please any help will be welcoming. >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks. >>>>>>>>> jstar >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>>> >>> >>> >>> >>> >> >> >> >> > > > > > >
signature.asc
Description: OpenPGP digital signature