What I'm trying to say is that to get submit the flink topology to flink, I
had to do an invocation of the mainMethod(which contain the actaul topology) of
my topology with the class java.lang.reflect.Method.That is if you a take look
at the following the topology the mainMethod is buildTopologypublic class
WordCountTopology {
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(1);
conf.setMaxTaskParallelism(1);
FlinkSubmitter.submitTopology(args[0], conf, buildTopology());
}
// Otherwise, we are running locally
else {
conf.setMaxTaskParallelism(1);
FlinkLocalCluster cluster = new FlinkLocalCluster();
cluster.submitTopology("word-count", conf, buildTopology());
Thread.sleep(10000);
}
}
public static FlinkTopology buildTopology() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);
builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 1).fieldsGrouping("split", new
Fields("word"));
builder.setBolt("writeIntoFile", new
BoltFileSink("/home/username/wordcount.txt", new OutputFormatter() {
private static final long serialVersionUID = 1L;
@Override
public String format(Tuple tuple) {
return tuple.toString();
}
}), 1).shuffleGrouping("count");
return FlinkTopology.createTopology(builder);
}
}That is the method that I want to invoke from my jar so that I will be able to
do the submitting of the topology without any problem ie
final FlinkClient cluster =
FlinkClient.getConfiguredClient(conf);cluster.submitTopology(topologyId,
uploadedJarLocation, getFlinkTopogy(String.format("file://%s",
jarPath),properties.getProperty("topologyMainClass"),
properties.getProperty("methodName")));
Where getFlinkTopology() return the contains actually topology
But while doing that reflection I had an exception.
Another question please. How do I make used of the hotflix of Till.
Le Jeudi 14 avril 2016 0h19, Matthias J. Sax <[email protected]> a écrit :
I cannot follow completely in your last step when you fail. What do you
mean by "I'm stuck at the level when I want to copy that from the jar to
submit it to flink"?
Btw: I copied the code from the SO question and it works for me on the
current master (which includes Till's hotfix).
-Matthias
On 04/13/2016 09:39 PM, star jlong wrote:
> Thanks Matthias for the reply.
> Maybe I should explain what I want to do better.My objective is to deploy a
> flink topology on flink using java but in the production mode. For that here
> are the step that I have taken.
> 1-Convert a sample wordcount storm topology to a flink topology as indicated
> here https://flink.apache.org/news/2015/12/11/storm-compatibility.html2-Run
> the topology on local mode (with my IDE eclipse) and on production mode by
> assembling everything with a mvn clean install then submitting the jar to
> flink on the command line with
> ./bin/flink run -c stormWorldCount.WordCountTopology
> /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar
> myFlinkTopology
> At this level everything went well.
>
> Then I wanted to submit the same jar to flink on the production mode by using
> a java program. Then I decided to create a mainMethod in my topology that
> returns the flinkTopology which I wanted to submit to flink using the
> FlinkClient. But I'm stuck at the level when I want to copy that from the jar
> to submit it to flink.
>
> I know that is possible because I have used the same procedure with a storm
> topology that it works perfectly well.
> What I'm missing please?
> jstar
>
> Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <[email protected]> a
>écrit :
>
>
> Hi jstar,
>
> I need to have a close look. But I am wondering why you use reflection
> in the first place? Is there any specific reason for that?
>
> Furthermore, the example provided in project maven-example also covers
> the case to submit a topology to Flink via Java. Have a look at
> org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter
>
> It contains a main() method and you can just run it as a regular Java
> program in your IDE.
>
> The SO question example should also work; it also contains a main()
> method, so you should be able to run it.
>
> Btw: If you use Storm-Compatiblitly-API there is no reason the get an
> ExecutuionEnvironment in you code. This happen automatically with
> FlinkClient/FlinkSubmitter.
>
> Furthermore, I would recommend to use FlinkSubmitter instead of
> FlinkClient as it is somewhat simpler to use.
>
> About SO question: I guess the problem is the jar assembling. The user says
>
> "Since I'using maven to handle my dependencies, I do a Mvn clean install
> to obtain the jar."
>
> I guess this is not sufficient to bundle a correct jar. Have a look into
> pom.xml from storm-examples. It uses maven plug-ins in assemble the jar
> correctly. (Regular maven artifact do not work for job submission...)
>
> Will have a close look and follow up... Hope this helps already.
>
> -Matthias
>
> On 04/13/2016 06:23 PM, star jlong wrote:
>> Thanks for the reply.
>> @Stephen, I try using RemoteEnvironment to submit my topology to flink.
>> Here is the try that I did RemoteEnvironment remote = new
>> RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute();
>> While running the program, this is the exception that I got.
>> java.lang.RuntimeException: No data sinks have been created yet. A program
>> needs at least one sink that consumes data. Examples are writing the data
>> set or printing it.
>>
>>
>> Le Mercredi 13 avril 2016 16h54, Till Rohrmann <[email protected]> a
>>écrit :
>>
>>
>> I think this is not the problem here since the problem is still happening
>> on the client side when the FlinkTopology tries to copy the registered
>> spouts. This happens before the job is submitted to the cluster. Maybe
>> Mathias could chime in here.
>>
>> Cheers,
>> Till
>>
>> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <[email protected]> wrote:
>>
>>> Hi!
>>>
>>> For flink standalone programs, you would use a "RemoteEnvironment"
>>>
>>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api".
>>> That one should deal with jars, classloaders, etc for you.
>>>
>>> Stephan
>>>
>>>
>>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <[email protected]>
>>> wrote:
>>>
>>>> Thanks for the suggestion. Sure those examples are interesting and I have
>>>> deploy them successfully on flink. The deployment is done the command
>>> line
>>>> that is doing something like
>>>> bin/flink run example.jarBut what I want is to submit the topology to
>>>> flink using a java program.
>>>>
>>>> Thanks.
>>>>
>>>> Le Mercredi 13 avril 2016 14h12, Chesnay Schepler <
>>> [email protected]>
>>>> a écrit :
>>>>
>>>>
>>>> you can find examples here:
>>>>
>>>>
>>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples
>>>>
>>>> we haven't established yet that it is an API issue; it could very well
>>>> be caused by the reflection magic you're using...
>>>>
>>>> On 13.04.2016 14:57, star jlong wrote:
>>>>> Ok, it seems like there an issue with the api. So please does anybody
>>>> has a working example for deploying a topology using the flink dependency
>>>> flink-storm_2.11 or any other will be welcoming.
>>>>>
>>>>> Thanks,
>>>>> jstar
>>>>>
>>>>> Le Mercredi 13 avril 2016 13h44, star jlong
>>>> <[email protected]> a écrit :
>>>>>
>>>>>
>>>>> Hi Schepler,
>>>>>
>>>>> Thanks for the concerned. Yes I'm actaully having the same issue as
>>>> indicated on that post because I'm the one that posted that issue.
>>>>>
>>>>> Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <
>>>> [email protected]> a écrit :
>>>>>
>>>>>
>>>>>
>>>>
>>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api
>>>>>
>>>>> On 13.04.2016 14:28, Till Rohrmann wrote:
>>>>>> Hi jstar,
>>>>>>
>>>>>> what's exactly the problem you're observing?
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong
>>> <[email protected]
>>>>>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi there,
>>>>>>>
>>>>>>> I'm jstar. I have been playing around with flink. I'm very much
>>>> interested
>>>>>>> in submitting a topoloy to flink using its api. As indicated
>>>>>>> on stackoverflow, that is the try that I have given. But I was stuck
>>>> with
>>>>>>> some exception. Please any help will be welcoming.
>>>>>>>
>>>>>>>
>>>>>>> Thanks.
>>>>>>> jstar
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>>
>>
>
>
>
>