Dear Matthias, Thank you for the reply! I am so sorry to respond late on the matter.
> I just double checked the Flink code and during translation from Storm > to Flink declareOuputFields() is called twice. You are right that is > does the same job twice, but that is actually not a problem. The Flink > code is cleaner this way to I guess we will not change it. Thank you for checking. I don't think it contributed any part of my current problem anyways. For my case though, it is called 3 times if the number is important at all. > About lifecyle: > If you submit your code, during deployment, Spout.open() and > Bolt.prepare() should be called for each parallel instance on each > Spout/Bolt of your topology. > > About your submission (I guess this should solve your current problem): > If you use bin/start-local.sh, you should *not* use FlinkLocalCluster, > but FlinkSubmitter. You have to distinguish three cases: > > - local/debug/IDE mode: use FlinkLocalCluster > => you do not need to start any Flink cluster before -- > FlinkLocalCluster is started up in you current JVM > * the purpose is local debugging in an IDE (this allows to easily > set break points and debug code) > > - pseudo-distributed mode: use FlinkSubmitter > => you start up a local Flink cluster via bin/start-local.sh > * this local Flink cluster run in an own JVM and looks like a real > cluster to the Flink client, ie, "bin/flink run" > * thus, you just use FlinkSubmitter as for a real cluster (with > JobManager/Nimbus hostname "localhost") > * in contrast to FlinkLocalCluster, no "internal Flink Cluster" is > started in your current JVM, but your code is shipped to the local > cluster you started up beforehand via bin/start-local.sh and executed in > this JVM > > - distributed mode: use FlinkSubmitter > => you start up Flink in a real cluster using bin/start-cluster.sh > * you use "bin/flink run" to submit your code to the real cluster Thank you for the explanation, now I have clearer understanding of clusters and submitters. However my problem is not fixed yet. Here's my code: //////////////////////////////////////////////////////////////////////////////// // ./src/main/java/myexample/App.java //////////////////////////////////////////////////////////////////////////////// package myexample; import backtype.storm.Config; import backtype.storm.LocalCluster; import myexample.spout.StandaloneSpout; import backtype.storm.generated.StormTopology; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import myexample.bolt.Node; import myexample.bolt.StandardBolt; import java.util.Arrays; import java.util.List; // import org.apache.flink.storm.api.FlinkTopology; //import org.apache.flink.storm.api.FlinkLocalCluster; import org.apache.flink.storm.api.FlinkSubmitter; //import org.apache.flink.storm.api.FlinkClient; import org.apache.flink.storm.api.FlinkTopologyBuilder; public class App { public static void main( String[] args ) throws Exception { int layer = 0; StandaloneSpout spout = new StandaloneSpout(); Config conf = new Config(); conf.put(Config.TOPOLOGY_DEBUG, false); //FlinkLocalCluster cluster = new FlinkLocalCluster(); //FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); //LocalCluster cluster = new LocalCluster(); layer = Integer.parseInt(args[0]); //cluster.submitTopology("topology", conf, BinaryTopology(spout, layer)); FlinkSubmitter.submitTopology("topology", conf, BinaryTopology(spout, layer)); //Thread.sleep(5 * 1000); //FlinkClient.getConfiguredClient(conf).killTopology("topology"); //cluster.killTopology("topology"); //cluster.shutdown(); } public static FlinkTopology BinaryTopology(IRichSpout input, int n) { //public static StormTopology BinaryTopology(IRichSpout input, int n) { return BinaryTopology(input, n, Arrays.asList((BaseBasicBolt)new StandardBolt())); } public static FlinkTopology BinaryTopology(IRichSpout input, int n, List<BaseBasicBolt> boltList) { //public static StormTopology BinaryTopology(IRichSpout input, int n, List<BaseBasicBolt> boltList) { FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); //TopologyBuilder builder = new TopologyBuilder(); String sourceId = "src"; builder.setSpout(sourceId, input); String boltId = "bolt"; builder.setBolt(boltId, new Node(), Math.pow(2, n)).shuffleGrouping(sourceId); return builder.createTopology(); } } //////////////////////////////////////////////////////////////////////////////// // ./src/main/java/myexample/spout/StandaloneSpout.java //////////////////////////////////////////////////////////////////////////////// package myexample.spout; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import java.io.*; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.*; public class StandaloneSpout extends BaseRichSpout { private SpoutOutputCollector mCollector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.mCollector = collector; } @Override public void nextTuple() { long currentTime = System.currentTimeMillis(); // TODO: Currently, do not check bound of list, because of experiment.(Avoid branch) mCollector.emit(new Values(new String("aaa"), System.currentTimeMillis(), 0)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { System.out.println("declareOutputFields"); declarer.declare(new Fields("string1", "timestamp", "omitted")); } } //////////////////////////////////////////////////////////////////////////////// // ./src/main/java/myexample/bolt/Node.java //////////////////////////////////////////////////////////////////////////////// package myexample.bolt; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.util.Map; public class Node extends BaseBasicBolt { public static boolean isTupleEmpty(Tuple tuple) { return false; } @Override public void prepare(Map stormConf, TopologyContext context) { super.prepare(stormConf, context); } @Override public void cleanup() { super.cleanup(); } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { collector.emit(new Values("aaa", 1, System.currentTimeMillis(), 0)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("string1", "string2", "timestamp", "omitted")); } } //////////////////////////////////////////////////////////////////////////////// // ./src/main/java/myexample/bolt/StandardBolt.java //////////////////////////////////////////////////////////////////////////////// package myexample.bolt; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; public class StandardBolt extends BaseBasicBolt { @Override public void prepare(Map stormConf, TopologyContext context) { super.prepare(stormConf, context); } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { } @Override public void declareOutputFields(OutputFieldsDeclarer ofd) { } } Probably it is the source code which has the problem or other things around the project environment might contain the problem. I would really appreciate if you could verify whether the code looks ok or not. > > About further debugging: you can increase the log level to get more > information: > https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/logging.html I tried to inject `log4j.properties' file that I got from a sample flink-quickstart-java application created from `mvn archetype:generate' to a ./target/*.jar but it does not work. I tried this because placing that `log4j.properties' file under ./src/main/resources of my project did not work in the first place. Thank you again for your help. With best regards, Shinhyung > Hope this helps! > > -Matthias > > On 01/09/2016 04:38 PM, Shinhyung Yang wrote: >> Dear Matthias, >> >> Thank you for replying! >> >> that sounds weird and should not happen -- Spout.open() should get >> called exactly once. >> >> >> That's what I thought too. I'm new to both Storm and Flink so it's quite >> complicated for me to handle both yet; would it be helpful for me if I >> know storm's lifecyle and flink 's lifecycle? When submitTopology() >> invoked, what should be called other than spout.open()? >> >> I am not sure about multiple calls to >> >> declareOuputFields though -- if might be called multiple times -- would >> need to double check the code. >> >> >> I'll check my code too. >> >> >> However, the call to declareOuputFields should be idempotent, so it >> should actually not be a problem if it is called multiple times. Even if >> Storm might call this method only once, there is no guarantee that it is >> not called multiple time. If this is a problem for you, please let me >> know. I think we could fix this and make sure the method is only called >> once. >> >> >> Actually it doesn't seem to be a problem for now. It just does the same >> job multiple times. >> >> >> It would be helpful if you could share you code. What do you mean with >> "exits silently"? No submission happens? Did you check the logs? As you >> mentioned FlinkLocalCluster, I assume that you run within an IDE? >> >> >> The topology doesn't seem to continue. There's a set of initialization >> code in the open method of the program's spout and it looks hopeless if >> it's not invoked. Is there any way to check the logs other than using >> println() calls? I'm running it on the commandline with having >> `bin/start_local.sh' running in the background and `bin/flink run'. >> >> >> Btw: lately we fixed a couple of bugs. I would suggest that you use the >> latest version from Flink master branch. I should work with 0.10.1 >> without problems. >> >> >> It was vey tedious for me to deal with a pom.xml file and .m2 >> repository. So I preferred to use maven central. But I should try with >> the master branch if I have to. >> >> I will quickly check if I could share some of the code. >> >> Thank you again for the help! >> With best regards, >> Shinhyung Yang >> >> >> >> -Matthias >> >> >> >> On 01/09/2016 01:27 AM, Shinhyung Yang wrote: >> > Howdies to everyone, >> > >> > I'm trying to use the storm compatibility layer on Flink 0.10.1. The >> > original storm topology works fine on Storm 0.9.5 and I have >> > incorporated FlinkLocalCluster, FlinkTopologyBuilder, and >> > FlinkTopology classes according to the programming guide >> > >> >> (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html). >> > I'm running it on Oracle Java 8 (1.8.0_66-b17) on Centos 7 (7.2.1511). >> > What happens is, it seems to be going all the way to submitTopology >> > method without any problem, however it doesn't invoke open method of >> > Spout class but declareOutputFields method is called for multiple >> > times and the program exits silently. Do you guys have any idea what's >> > going on here or have any suggestions? If needed, then please ask me >> > for more information. >> > >> > Thank you for reading. >> > With best regards, >> > Shinhyung Yang >> > >> >