Just saw your email after my answer... Have a look here about task slots. https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#configuring-taskmanager-processing-slots
Also have a look here (starting from 18:30): https://www.youtube.com/watch?v=UEkjRN8jRx4 -Matthias On 01/14/2016 12:05 PM, Shinhyung Yang wrote: > Dear Matthias, > > Thank you very much again. I changed the value of > taskmanager.numberOfTaskSlots from 1 to 128 in > flink-0.10.1/conf/flink-conf.yaml, stopped the local cluster and > started local cluster again. And it works fine and well. (It is still > running and I can check it clear on the webfrontend) Although I'm not > sure whether it would be safe to keep the value like this or not. > > Thank you. > Best regards, > Shinhyung > > On Thu, Jan 14, 2016 at 7:53 PM, Shinhyung Yang > <shinhyung.y...@gmail.com> wrote: >> Dear Matthias, >> >> Thank you for a quick reply. It failed again, however I was able to >> access to its WebFrontend and it gave me some logs. I wanted to show >> logs immediately before digging down into it. >> >> 19:48:18,011 INFO org.apache.flink.runtime.jobmanager.JobManager >> - Submitting job 6f52281fb987a3def8d3c01e1fc0bdcb >> (topology). >> 19:48:18,014 INFO org.apache.flink.runtime.jobmanager.JobManager >> - Scheduling job 6f52281fb987a3def8d3c01e1fc0bdcb >> (topology). >> 19:48:18,014 INFO org.apache.flink.runtime.jobmanager.JobManager >> - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology) >> changed to RUNNING. >> 19:48:18,014 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - >> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from >> CREATED to SCHEDULED >> 19:48:18,014 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - >> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from >> SCHEDULED to DEPLOYING >> 19:48:18,015 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - >> Deploying Source: src (1/1) (attempt #0) to localhost >> 19:48:18,015 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt >> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from CREATED to >> SCHEDULED >> 19:48:18,015 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt >> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from SCHEDULED to >> DEPLOYING >> 19:48:18,015 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - >> Deploying bolt (1/8) (attempt #0) to localhost >> 19:48:18,015 INFO org.apache.flink.runtime.taskmanager.TaskManager >> - Received task Source: src (1/1) >> 19:48:18,015 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt >> (2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from CREATED to >> SCHEDULED >> 19:48:18,016 INFO org.apache.flink.runtime.taskmanager.Task >> - Loading JAR files for task Source: src (1/1) >> 19:48:18,017 INFO org.apache.flink.runtime.taskmanager.TaskManager >> - Received task bolt (1/8) >> 19:48:18,017 INFO org.apache.flink.runtime.blob.BlobCache >> - Downloading fac8ddfb4668c9f76559766001b7c9fd07cbd0bc from >> localhost/127.0.0.1:36498 >> 19:48:18,017 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - >> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from >> DEPLOYING to CANCELING >> 19:48:18,018 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt >> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from DEPLOYING to >> CANCELING >> 19:48:18,018 INFO org.apache.flink.runtime.taskmanager.Task >> - Attempting to cancel task Source: src (1/1) >> 19:48:18,018 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt >> (2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from SCHEDULED to >> CANCELED >> 19:48:18,018 INFO org.apache.flink.runtime.taskmanager.Task >> - Source: src (1/1) switched to CANCELING >> 19:48:18,018 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt >> (3/8) (0fed2e4059a37fe25b8b15480547a550) switched from CREATED to >> CANCELED >> 19:48:18,018 INFO org.apache.flink.runtime.taskmanager.Task >> - Attempting to cancel task bolt (1/8) >> 19:48:18,018 INFO org.apache.flink.runtime.taskmanager.Task >> - bolt (1/8) switched to CANCELING >> 19:48:18,018 INFO org.apache.flink.runtime.taskmanager.Task >> - Loading JAR files for task bolt (1/8) >> 19:48:18,018 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt >> (4/8) (647c524f0a73b936656d6c6a67ecfbc9) switched from CREATED to >> CANCELED >> 19:48:18,018 INFO org.apache.flink.runtime.jobmanager.JobManager >> - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology) >> changed to FAILING. >> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: >> Not enough free slots available to run the job. You can decrease the >> operator parallelism or increase the number of slots per TaskManager >> in the configuration. Task to schedule: < Attempt #0 (bolt (2/8)) @ >> (unassigned) - [SCHEDULED] > with groupID < >> 52431fe365bb7d0f12dd8d20e2528427 > in sharing group < SlotSharingGroup >> [52431fe365bb7d0f12dd8d20e2528427, be3fd4192759bf8d5b39e028f005d7f4] >>> . Resources available to scheduler: Number of instances=1, total >> number of slots=1, available slots=0 >> at >> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:256) >> at >> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131) >> at >> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298) >> at >> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458) >> at >> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322) >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:982) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) >> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> 19:48:18,019 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt >> (5/8) (4cb013017e278161124c3c6549cd3f80) switched from CREATED to >> CANCELED >> 19:48:18,020 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt >> (6/8) (a8737f034530b66a44e8c0a2cd60528d) switched from CREATED to >> CANCELED >> 19:48:18,020 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt >> (7/8) (495b895a7f338647b876640f91d823d6) switched from CREATED to >> CANCELED >> 19:48:18,020 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt >> (8/8) (0c45cccb3f5b786e4bdacd20d3e164c8) switched from CREATED to >> CANCELED >> 19:48:18,526 WARN akka.remote.ReliableDeliverySupervisor >> - Association with remote system >> [akka.tcp://flink@127.0.0.1:50370] has failed, address is now gated >> for [5000] ms. Reason is: [Disassociated]. >> 19:48:18,527 WARN akka.remote.ReliableDeliverySupervisor >> - Association with remote system >> [akka.tcp://flink@127.0.0.1:54918] has failed, address is now gated >> for [5000] ms. Reason is: [Disassociated]. >> 19:48:18,531 WARN akka.remote.ReliableDeliverySupervisor >> - Association with remote system >> [akka.tcp://flink@127.0.0.1:53543] has failed, address is now gated >> for [5000] ms. Reason is: [Disassociated]. >> 19:48:18,768 INFO org.apache.flink.runtime.taskmanager.Task >> - Source: src (1/1) switched to CANCELED >> 19:48:18,768 INFO org.apache.flink.runtime.taskmanager.Task >> - bolt (1/8) switched to CANCELED >> 19:48:18,768 INFO org.apache.flink.runtime.taskmanager.Task >> - Freeing task resources for Source: src (1/1) >> 19:48:18,768 INFO org.apache.flink.runtime.taskmanager.Task >> - Freeing task resources for bolt (1/8) >> 19:48:18,769 INFO org.apache.flink.runtime.taskmanager.TaskManager >> - Unregistering task and sending final execution state >> CANCELED to JobManager for task Source: src >> (3535644576ae695d2685a65401e16fc4) >> 19:48:18,769 INFO org.apache.flink.runtime.taskmanager.TaskManager >> - Unregistering task and sending final execution state >> CANCELED to JobManager for task bolt >> (391ac2875a2fdc86d8af4f2d51e3e849) >> 19:48:18,770 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt >> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from CANCELING to >> CANCELED >> 19:48:18,770 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - >> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from >> CANCELING to CANCELED >> 19:48:18,773 INFO org.apache.flink.runtime.jobmanager.JobManager >> - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology) >> changed to FAILED. >> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: >> Not enough free slots available to run the job. You can decrease the >> operator parallelism or increase the number of slots per TaskManager >> in the configuration. Task to schedule: < Attempt #0 (bolt (2/8)) @ >> (unassigned) - [SCHEDULED] > with groupID < >> 52431fe365bb7d0f12dd8d20e2528427 > in sharing group < SlotSharingGroup >> [52431fe365bb7d0f12dd8d20e2528427, be3fd4192759bf8d5b39e028f005d7f4] >>> . Resources available to scheduler: Number of instances=1, total >> number of slots=1, available slots=0 >> at >> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:256) >> at >> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131) >> at >> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298) >> at >> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458) >> at >> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322) >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:982) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) >> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> It seems like my setting has some problem from a quick look at the log? >> >> Thank you. >> Best regards, >> Shinhyung >> >> On Thu, Jan 14, 2016 at 7:29 PM, Matthias J. Sax <mj...@apache.org> wrote: >>> Hi, >>> >>> I can submit the topology without any problems. Your code is fine. >>> >>> If your program "exits silently" I would actually assume, that you >>> submitted the topology successfully. Can you see the topology in >>> JobManager WebFrontend? If not, do you see any errors in the log files? >>> >>> -Matthias >>> >>> On 01/14/2016 07:37 AM, Shinhyung Yang wrote: >>>> Dear Matthias, >>>> >>>> Thank you for the reply! I am so sorry to respond late on the matter. >>>> >>>>> I just double checked the Flink code and during translation from Storm >>>>> to Flink declareOuputFields() is called twice. You are right that is >>>>> does the same job twice, but that is actually not a problem. The Flink >>>>> code is cleaner this way to I guess we will not change it. >>>> >>>> Thank you for checking. I don't think it contributed any part of my >>>> current problem anyways. For my case though, it is called 3 times if >>>> the number is important at all. >>>> >>>>> About lifecyle: >>>>> If you submit your code, during deployment, Spout.open() and >>>>> Bolt.prepare() should be called for each parallel instance on each >>>>> Spout/Bolt of your topology. >>>>> >>>>> About your submission (I guess this should solve your current problem): >>>>> If you use bin/start-local.sh, you should *not* use FlinkLocalCluster, >>>>> but FlinkSubmitter. You have to distinguish three cases: >>>>> >>>>> - local/debug/IDE mode: use FlinkLocalCluster >>>>> => you do not need to start any Flink cluster before -- >>>>> FlinkLocalCluster is started up in you current JVM >>>>> * the purpose is local debugging in an IDE (this allows to easily >>>>> set break points and debug code) >>>>> >>>>> - pseudo-distributed mode: use FlinkSubmitter >>>>> => you start up a local Flink cluster via bin/start-local.sh >>>>> * this local Flink cluster run in an own JVM and looks like a real >>>>> cluster to the Flink client, ie, "bin/flink run" >>>>> * thus, you just use FlinkSubmitter as for a real cluster (with >>>>> JobManager/Nimbus hostname "localhost") >>>>> * in contrast to FlinkLocalCluster, no "internal Flink Cluster" is >>>>> started in your current JVM, but your code is shipped to the local >>>>> cluster you started up beforehand via bin/start-local.sh and executed in >>>>> this JVM >>>>> >>>>> - distributed mode: use FlinkSubmitter >>>>> => you start up Flink in a real cluster using bin/start-cluster.sh >>>>> * you use "bin/flink run" to submit your code to the real cluster >>>> >>>> Thank you for the explanation, now I have clearer understanding of >>>> clusters and submitters. However my problem is not fixed yet. Here's >>>> my code: >>>> >>>> //////////////////////////////////////////////////////////////////////////////// >>>> // ./src/main/java/myexample/App.java >>>> //////////////////////////////////////////////////////////////////////////////// >>>> >>>> package myexample; >>>> >>>> import backtype.storm.Config; >>>> import backtype.storm.LocalCluster; >>>> import myexample.spout.StandaloneSpout; >>>> import backtype.storm.generated.StormTopology; >>>> import backtype.storm.topology.IRichSpout; >>>> import backtype.storm.topology.TopologyBuilder; >>>> import backtype.storm.topology.base.BaseBasicBolt; >>>> >>>> import myexample.bolt.Node; >>>> import myexample.bolt.StandardBolt; >>>> >>>> import java.util.Arrays; >>>> import java.util.List; >>>> >>>> // >>>> import org.apache.flink.storm.api.FlinkTopology; >>>> //import org.apache.flink.storm.api.FlinkLocalCluster; >>>> import org.apache.flink.storm.api.FlinkSubmitter; >>>> //import org.apache.flink.storm.api.FlinkClient; >>>> import org.apache.flink.storm.api.FlinkTopologyBuilder; >>>> >>>> public class App >>>> { >>>> public static void main( String[] args ) throws Exception >>>> { >>>> int layer = 0; >>>> StandaloneSpout spout = new StandaloneSpout(); >>>> Config conf = new Config(); >>>> conf.put(Config.TOPOLOGY_DEBUG, false); >>>> //FlinkLocalCluster cluster = new FlinkLocalCluster(); >>>> //FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); >>>> //LocalCluster cluster = new LocalCluster(); >>>> >>>> layer = Integer.parseInt(args[0]); >>>> //cluster.submitTopology("topology", conf, >>>> BinaryTopology(spout, layer)); >>>> FlinkSubmitter.submitTopology("topology", conf, >>>> BinaryTopology(spout, layer)); >>>> //Thread.sleep(5 * 1000); >>>> //FlinkClient.getConfiguredClient(conf).killTopology("topology"); >>>> //cluster.killTopology("topology"); >>>> //cluster.shutdown(); >>>> } >>>> >>>> public static FlinkTopology BinaryTopology(IRichSpout input, int n) { >>>> //public static StormTopology BinaryTopology(IRichSpout input, int n) { >>>> return BinaryTopology(input, n, >>>> Arrays.asList((BaseBasicBolt)new StandardBolt())); >>>> } >>>> >>>> public static FlinkTopology BinaryTopology(IRichSpout input, int >>>> n, List<BaseBasicBolt> boltList) { >>>> //public static StormTopology BinaryTopology(IRichSpout input, int >>>> n, List<BaseBasicBolt> boltList) { >>>> FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); >>>> //TopologyBuilder builder = new TopologyBuilder(); >>>> String sourceId = "src"; >>>> builder.setSpout(sourceId, input); >>>> >>>> >>>> String boltId = "bolt"; >>>> builder.setBolt(boltId, new Node(), Math.pow(2, >>>> n)).shuffleGrouping(sourceId); >>>> >>>> return builder.createTopology(); >>>> } >>>> } >>>> >>>> //////////////////////////////////////////////////////////////////////////////// >>>> // ./src/main/java/myexample/spout/StandaloneSpout.java >>>> //////////////////////////////////////////////////////////////////////////////// >>>> >>>> package myexample.spout; >>>> >>>> import backtype.storm.spout.SpoutOutputCollector; >>>> import backtype.storm.task.TopologyContext; >>>> import backtype.storm.topology.OutputFieldsDeclarer; >>>> import backtype.storm.topology.base.BaseRichSpout; >>>> import backtype.storm.tuple.Fields; >>>> import backtype.storm.tuple.Values; >>>> >>>> import java.io.*; >>>> import java.text.DateFormat; >>>> import java.text.SimpleDateFormat; >>>> import java.util.*; >>>> >>>> public class StandaloneSpout extends BaseRichSpout { >>>> >>>> private SpoutOutputCollector mCollector; >>>> >>>> @Override >>>> public void open(Map conf, TopologyContext context, >>>> SpoutOutputCollector collector) { >>>> this.mCollector = collector; >>>> } >>>> >>>> @Override >>>> public void nextTuple() { >>>> long currentTime = System.currentTimeMillis(); >>>> >>>> // TODO: Currently, do not check bound of list, because of >>>> experiment.(Avoid branch) >>>> mCollector.emit(new Values(new String("aaa"), >>>> System.currentTimeMillis(), 0)); >>>> >>>> } >>>> >>>> @Override >>>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>>> System.out.println("declareOutputFields"); >>>> declarer.declare(new Fields("string1", "timestamp", "omitted")); >>>> } >>>> } >>>> >>>> //////////////////////////////////////////////////////////////////////////////// >>>> // ./src/main/java/myexample/bolt/Node.java >>>> //////////////////////////////////////////////////////////////////////////////// >>>> >>>> package myexample.bolt; >>>> >>>> import backtype.storm.task.TopologyContext; >>>> import backtype.storm.topology.BasicOutputCollector; >>>> import backtype.storm.topology.OutputFieldsDeclarer; >>>> import backtype.storm.topology.base.BaseBasicBolt; >>>> import backtype.storm.tuple.Fields; >>>> import backtype.storm.tuple.Tuple; >>>> import backtype.storm.tuple.Values; >>>> import java.util.Map; >>>> >>>> public class Node extends BaseBasicBolt { >>>> >>>> public static boolean isTupleEmpty(Tuple tuple) { >>>> return false; >>>> } >>>> >>>> @Override >>>> public void prepare(Map stormConf, TopologyContext context) { >>>> super.prepare(stormConf, context); >>>> } >>>> >>>> @Override >>>> public void cleanup() { >>>> super.cleanup(); >>>> } >>>> >>>> @Override >>>> public void execute(Tuple tuple, BasicOutputCollector collector) { >>>> collector.emit(new Values("aaa", 1, System.currentTimeMillis(), >>>> 0)); >>>> } >>>> >>>> @Override >>>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>>> declarer.declare(new Fields("string1", "string2", "timestamp", >>>> "omitted")); >>>> } >>>> } >>>> >>>> //////////////////////////////////////////////////////////////////////////////// >>>> // ./src/main/java/myexample/bolt/StandardBolt.java >>>> //////////////////////////////////////////////////////////////////////////////// >>>> >>>> package myexample.bolt; >>>> >>>> import java.util.Map; >>>> >>>> import backtype.storm.task.TopologyContext; >>>> import backtype.storm.topology.BasicOutputCollector; >>>> import backtype.storm.topology.OutputFieldsDeclarer; >>>> import backtype.storm.topology.base.BaseBasicBolt; >>>> import backtype.storm.tuple.Tuple; >>>> >>>> public class StandardBolt extends BaseBasicBolt { >>>> >>>> @Override >>>> public void prepare(Map stormConf, TopologyContext context) { >>>> super.prepare(stormConf, context); >>>> } >>>> >>>> @Override >>>> public void execute(Tuple tuple, BasicOutputCollector collector) { >>>> } >>>> >>>> @Override >>>> public void declareOutputFields(OutputFieldsDeclarer ofd) { >>>> } >>>> } >>>> >>>> Probably it is the source code which has the problem or other things >>>> around the project environment might contain the problem. I would >>>> really appreciate if you could verify whether the code looks ok or >>>> not. >>>> >>>>> >>>>> About further debugging: you can increase the log level to get more >>>>> information: >>>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/logging.html >>>> >>>> I tried to inject `log4j.properties' file that I got from a sample >>>> flink-quickstart-java application created from `mvn >>>> archetype:generate' to a ./target/*.jar but it does not work. I tried >>>> this because placing that `log4j.properties' file under >>>> ./src/main/resources of my project did not work in the first place. >>>> >>>> Thank you again for your help. >>>> With best regards, >>>> Shinhyung >>>> >>>>> Hope this helps! >>>>> >>>>> -Matthias >>>>> >>>>> On 01/09/2016 04:38 PM, Shinhyung Yang wrote: >>>>>> Dear Matthias, >>>>>> >>>>>> Thank you for replying! >>>>>> >>>>>> that sounds weird and should not happen -- Spout.open() should get >>>>>> called exactly once. >>>>>> >>>>>> >>>>>> That's what I thought too. I'm new to both Storm and Flink so it's quite >>>>>> complicated for me to handle both yet; would it be helpful for me if I >>>>>> know storm's lifecyle and flink 's lifecycle? When submitTopology() >>>>>> invoked, what should be called other than spout.open()? >>>>>> >>>>>> I am not sure about multiple calls to >>>>>> >>>>>> declareOuputFields though -- if might be called multiple times -- >>>>>> would >>>>>> need to double check the code. >>>>>> >>>>>> >>>>>> I'll check my code too. >>>>>> >>>>>> >>>>>> However, the call to declareOuputFields should be idempotent, so it >>>>>> should actually not be a problem if it is called multiple times. >>>>>> Even if >>>>>> Storm might call this method only once, there is no guarantee that >>>>>> it is >>>>>> not called multiple time. If this is a problem for you, please let me >>>>>> know. I think we could fix this and make sure the method is only >>>>>> called >>>>>> once. >>>>>> >>>>>> >>>>>> Actually it doesn't seem to be a problem for now. It just does the same >>>>>> job multiple times. >>>>>> >>>>>> >>>>>> It would be helpful if you could share you code. What do you mean >>>>>> with >>>>>> "exits silently"? No submission happens? Did you check the logs? As >>>>>> you >>>>>> mentioned FlinkLocalCluster, I assume that you run within an IDE? >>>>>> >>>>>> >>>>>> The topology doesn't seem to continue. There's a set of initialization >>>>>> code in the open method of the program's spout and it looks hopeless if >>>>>> it's not invoked. Is there any way to check the logs other than using >>>>>> println() calls? I'm running it on the commandline with having >>>>>> `bin/start_local.sh' running in the background and `bin/flink run'. >>>>>> >>>>>> >>>>>> Btw: lately we fixed a couple of bugs. I would suggest that you use >>>>>> the >>>>>> latest version from Flink master branch. I should work with 0.10.1 >>>>>> without problems. >>>>>> >>>>>> >>>>>> It was vey tedious for me to deal with a pom.xml file and .m2 >>>>>> repository. So I preferred to use maven central. But I should try with >>>>>> the master branch if I have to. >>>>>> >>>>>> I will quickly check if I could share some of the code. >>>>>> >>>>>> Thank you again for the help! >>>>>> With best regards, >>>>>> Shinhyung Yang >>>>>> >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> >>>>>> On 01/09/2016 01:27 AM, Shinhyung Yang wrote: >>>>>> > Howdies to everyone, >>>>>> > >>>>>> > I'm trying to use the storm compatibility layer on Flink 0.10.1. >>>>>> The >>>>>> > original storm topology works fine on Storm 0.9.5 and I have >>>>>> > incorporated FlinkLocalCluster, FlinkTopologyBuilder, and >>>>>> > FlinkTopology classes according to the programming guide >>>>>> > >>>>>> >>>>>> (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html). >>>>>> > I'm running it on Oracle Java 8 (1.8.0_66-b17) on Centos 7 >>>>>> (7.2.1511). >>>>>> > What happens is, it seems to be going all the way to submitTopology >>>>>> > method without any problem, however it doesn't invoke open method >>>>>> of >>>>>> > Spout class but declareOutputFields method is called for multiple >>>>>> > times and the program exits silently. Do you guys have any idea >>>>>> what's >>>>>> > going on here or have any suggestions? If needed, then please ask >>>>>> me >>>>>> > for more information. >>>>>> > >>>>>> > Thank you for reading. >>>>>> > With best regards, >>>>>> > Shinhyung Yang >>>>>> > >>>>>> >>>>> >>>
signature.asc
Description: OpenPGP digital signature