Hi, the logs shows:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Resources available to scheduler: Number of instances=1, total number of > slots=1, available slots=0 You need to increase your task slots in conf/flink-conf.yaml. Look for parameter "taskmanager.numberOfTaskSlots". -Matthias On 01/14/2016 11:53 AM, Shinhyung Yang wrote: > Dear Matthias, > > Thank you for a quick reply. It failed again, however I was able to > access to its WebFrontend and it gave me some logs. I wanted to show > logs immediately before digging down into it. > > 19:48:18,011 INFO org.apache.flink.runtime.jobmanager.JobManager > - Submitting job 6f52281fb987a3def8d3c01e1fc0bdcb > (topology). > 19:48:18,014 INFO org.apache.flink.runtime.jobmanager.JobManager > - Scheduling job 6f52281fb987a3def8d3c01e1fc0bdcb > (topology). > 19:48:18,014 INFO org.apache.flink.runtime.jobmanager.JobManager > - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology) > changed to RUNNING. > 19:48:18,014 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from > CREATED to SCHEDULED > 19:48:18,014 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from > SCHEDULED to DEPLOYING > 19:48:18,015 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > Deploying Source: src (1/1) (attempt #0) to localhost > 19:48:18,015 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt > (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from CREATED to > SCHEDULED > 19:48:18,015 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt > (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from SCHEDULED to > DEPLOYING > 19:48:18,015 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > Deploying bolt (1/8) (attempt #0) to localhost > 19:48:18,015 INFO org.apache.flink.runtime.taskmanager.TaskManager > - Received task Source: src (1/1) > 19:48:18,015 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt > (2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from CREATED to > SCHEDULED > 19:48:18,016 INFO org.apache.flink.runtime.taskmanager.Task > - Loading JAR files for task Source: src (1/1) > 19:48:18,017 INFO org.apache.flink.runtime.taskmanager.TaskManager > - Received task bolt (1/8) > 19:48:18,017 INFO org.apache.flink.runtime.blob.BlobCache > - Downloading fac8ddfb4668c9f76559766001b7c9fd07cbd0bc from > localhost/127.0.0.1:36498 > 19:48:18,017 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from > DEPLOYING to CANCELING > 19:48:18,018 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt > (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from DEPLOYING to > CANCELING > 19:48:18,018 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to cancel task Source: src (1/1) > 19:48:18,018 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt > (2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from SCHEDULED to > CANCELED > 19:48:18,018 INFO org.apache.flink.runtime.taskmanager.Task > - Source: src (1/1) switched to CANCELING > 19:48:18,018 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt > (3/8) (0fed2e4059a37fe25b8b15480547a550) switched from CREATED to > CANCELED > 19:48:18,018 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to cancel task bolt (1/8) > 19:48:18,018 INFO org.apache.flink.runtime.taskmanager.Task > - bolt (1/8) switched to CANCELING > 19:48:18,018 INFO org.apache.flink.runtime.taskmanager.Task > - Loading JAR files for task bolt (1/8) > 19:48:18,018 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt > (4/8) (647c524f0a73b936656d6c6a67ecfbc9) switched from CREATED to > CANCELED > 19:48:18,018 INFO org.apache.flink.runtime.jobmanager.JobManager > - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology) > changed to FAILING. > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the > operator parallelism or increase the number of slots per TaskManager > in the configuration. Task to schedule: < Attempt #0 (bolt (2/8)) @ > (unassigned) - [SCHEDULED] > with groupID < > 52431fe365bb7d0f12dd8d20e2528427 > in sharing group < SlotSharingGroup > [52431fe365bb7d0f12dd8d20e2528427, be3fd4192759bf8d5b39e028f005d7f4] >> . Resources available to scheduler: Number of instances=1, total > number of slots=1, available slots=0 > at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:256) > at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131) > at > org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298) > at > org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:982) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 19:48:18,019 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt > (5/8) (4cb013017e278161124c3c6549cd3f80) switched from CREATED to > CANCELED > 19:48:18,020 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt > (6/8) (a8737f034530b66a44e8c0a2cd60528d) switched from CREATED to > CANCELED > 19:48:18,020 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt > (7/8) (495b895a7f338647b876640f91d823d6) switched from CREATED to > CANCELED > 19:48:18,020 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt > (8/8) (0c45cccb3f5b786e4bdacd20d3e164c8) switched from CREATED to > CANCELED > 19:48:18,526 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@127.0.0.1:50370] has failed, address is now gated > for [5000] ms. Reason is: [Disassociated]. > 19:48:18,527 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@127.0.0.1:54918] has failed, address is now gated > for [5000] ms. Reason is: [Disassociated]. > 19:48:18,531 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system > [akka.tcp://flink@127.0.0.1:53543] has failed, address is now gated > for [5000] ms. Reason is: [Disassociated]. > 19:48:18,768 INFO org.apache.flink.runtime.taskmanager.Task > - Source: src (1/1) switched to CANCELED > 19:48:18,768 INFO org.apache.flink.runtime.taskmanager.Task > - bolt (1/8) switched to CANCELED > 19:48:18,768 INFO org.apache.flink.runtime.taskmanager.Task > - Freeing task resources for Source: src (1/1) > 19:48:18,768 INFO org.apache.flink.runtime.taskmanager.Task > - Freeing task resources for bolt (1/8) > 19:48:18,769 INFO org.apache.flink.runtime.taskmanager.TaskManager > - Unregistering task and sending final execution state > CANCELED to JobManager for task Source: src > (3535644576ae695d2685a65401e16fc4) > 19:48:18,769 INFO org.apache.flink.runtime.taskmanager.TaskManager > - Unregistering task and sending final execution state > CANCELED to JobManager for task bolt > (391ac2875a2fdc86d8af4f2d51e3e849) > 19:48:18,770 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - bolt > (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from CANCELING to > CANCELED > 19:48:18,770 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from > CANCELING to CANCELED > 19:48:18,773 INFO org.apache.flink.runtime.jobmanager.JobManager > - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology) > changed to FAILED. > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the > operator parallelism or increase the number of slots per TaskManager > in the configuration. Task to schedule: < Attempt #0 (bolt (2/8)) @ > (unassigned) - [SCHEDULED] > with groupID < > 52431fe365bb7d0f12dd8d20e2528427 > in sharing group < SlotSharingGroup > [52431fe365bb7d0f12dd8d20e2528427, be3fd4192759bf8d5b39e028f005d7f4] >> . Resources available to scheduler: Number of instances=1, total > number of slots=1, available slots=0 > at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:256) > at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131) > at > org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298) > at > org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:982) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:962) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > It seems like my setting has some problem from a quick look at the log? > > Thank you. > Best regards, > Shinhyung > > On Thu, Jan 14, 2016 at 7:29 PM, Matthias J. Sax <mj...@apache.org> wrote: >> Hi, >> >> I can submit the topology without any problems. Your code is fine. >> >> If your program "exits silently" I would actually assume, that you >> submitted the topology successfully. Can you see the topology in >> JobManager WebFrontend? If not, do you see any errors in the log files? >> >> -Matthias >> >> On 01/14/2016 07:37 AM, Shinhyung Yang wrote: >>> Dear Matthias, >>> >>> Thank you for the reply! I am so sorry to respond late on the matter. >>> >>>> I just double checked the Flink code and during translation from Storm >>>> to Flink declareOuputFields() is called twice. You are right that is >>>> does the same job twice, but that is actually not a problem. The Flink >>>> code is cleaner this way to I guess we will not change it. >>> >>> Thank you for checking. I don't think it contributed any part of my >>> current problem anyways. For my case though, it is called 3 times if >>> the number is important at all. >>> >>>> About lifecyle: >>>> If you submit your code, during deployment, Spout.open() and >>>> Bolt.prepare() should be called for each parallel instance on each >>>> Spout/Bolt of your topology. >>>> >>>> About your submission (I guess this should solve your current problem): >>>> If you use bin/start-local.sh, you should *not* use FlinkLocalCluster, >>>> but FlinkSubmitter. You have to distinguish three cases: >>>> >>>> - local/debug/IDE mode: use FlinkLocalCluster >>>> => you do not need to start any Flink cluster before -- >>>> FlinkLocalCluster is started up in you current JVM >>>> * the purpose is local debugging in an IDE (this allows to easily >>>> set break points and debug code) >>>> >>>> - pseudo-distributed mode: use FlinkSubmitter >>>> => you start up a local Flink cluster via bin/start-local.sh >>>> * this local Flink cluster run in an own JVM and looks like a real >>>> cluster to the Flink client, ie, "bin/flink run" >>>> * thus, you just use FlinkSubmitter as for a real cluster (with >>>> JobManager/Nimbus hostname "localhost") >>>> * in contrast to FlinkLocalCluster, no "internal Flink Cluster" is >>>> started in your current JVM, but your code is shipped to the local >>>> cluster you started up beforehand via bin/start-local.sh and executed in >>>> this JVM >>>> >>>> - distributed mode: use FlinkSubmitter >>>> => you start up Flink in a real cluster using bin/start-cluster.sh >>>> * you use "bin/flink run" to submit your code to the real cluster >>> >>> Thank you for the explanation, now I have clearer understanding of >>> clusters and submitters. However my problem is not fixed yet. Here's >>> my code: >>> >>> //////////////////////////////////////////////////////////////////////////////// >>> // ./src/main/java/myexample/App.java >>> //////////////////////////////////////////////////////////////////////////////// >>> >>> package myexample; >>> >>> import backtype.storm.Config; >>> import backtype.storm.LocalCluster; >>> import myexample.spout.StandaloneSpout; >>> import backtype.storm.generated.StormTopology; >>> import backtype.storm.topology.IRichSpout; >>> import backtype.storm.topology.TopologyBuilder; >>> import backtype.storm.topology.base.BaseBasicBolt; >>> >>> import myexample.bolt.Node; >>> import myexample.bolt.StandardBolt; >>> >>> import java.util.Arrays; >>> import java.util.List; >>> >>> // >>> import org.apache.flink.storm.api.FlinkTopology; >>> //import org.apache.flink.storm.api.FlinkLocalCluster; >>> import org.apache.flink.storm.api.FlinkSubmitter; >>> //import org.apache.flink.storm.api.FlinkClient; >>> import org.apache.flink.storm.api.FlinkTopologyBuilder; >>> >>> public class App >>> { >>> public static void main( String[] args ) throws Exception >>> { >>> int layer = 0; >>> StandaloneSpout spout = new StandaloneSpout(); >>> Config conf = new Config(); >>> conf.put(Config.TOPOLOGY_DEBUG, false); >>> //FlinkLocalCluster cluster = new FlinkLocalCluster(); >>> //FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster(); >>> //LocalCluster cluster = new LocalCluster(); >>> >>> layer = Integer.parseInt(args[0]); >>> //cluster.submitTopology("topology", conf, >>> BinaryTopology(spout, layer)); >>> FlinkSubmitter.submitTopology("topology", conf, >>> BinaryTopology(spout, layer)); >>> //Thread.sleep(5 * 1000); >>> //FlinkClient.getConfiguredClient(conf).killTopology("topology"); >>> //cluster.killTopology("topology"); >>> //cluster.shutdown(); >>> } >>> >>> public static FlinkTopology BinaryTopology(IRichSpout input, int n) { >>> //public static StormTopology BinaryTopology(IRichSpout input, int n) { >>> return BinaryTopology(input, n, >>> Arrays.asList((BaseBasicBolt)new StandardBolt())); >>> } >>> >>> public static FlinkTopology BinaryTopology(IRichSpout input, int >>> n, List<BaseBasicBolt> boltList) { >>> //public static StormTopology BinaryTopology(IRichSpout input, int >>> n, List<BaseBasicBolt> boltList) { >>> FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); >>> //TopologyBuilder builder = new TopologyBuilder(); >>> String sourceId = "src"; >>> builder.setSpout(sourceId, input); >>> >>> >>> String boltId = "bolt"; >>> builder.setBolt(boltId, new Node(), Math.pow(2, >>> n)).shuffleGrouping(sourceId); >>> >>> return builder.createTopology(); >>> } >>> } >>> >>> //////////////////////////////////////////////////////////////////////////////// >>> // ./src/main/java/myexample/spout/StandaloneSpout.java >>> //////////////////////////////////////////////////////////////////////////////// >>> >>> package myexample.spout; >>> >>> import backtype.storm.spout.SpoutOutputCollector; >>> import backtype.storm.task.TopologyContext; >>> import backtype.storm.topology.OutputFieldsDeclarer; >>> import backtype.storm.topology.base.BaseRichSpout; >>> import backtype.storm.tuple.Fields; >>> import backtype.storm.tuple.Values; >>> >>> import java.io.*; >>> import java.text.DateFormat; >>> import java.text.SimpleDateFormat; >>> import java.util.*; >>> >>> public class StandaloneSpout extends BaseRichSpout { >>> >>> private SpoutOutputCollector mCollector; >>> >>> @Override >>> public void open(Map conf, TopologyContext context, >>> SpoutOutputCollector collector) { >>> this.mCollector = collector; >>> } >>> >>> @Override >>> public void nextTuple() { >>> long currentTime = System.currentTimeMillis(); >>> >>> // TODO: Currently, do not check bound of list, because of >>> experiment.(Avoid branch) >>> mCollector.emit(new Values(new String("aaa"), >>> System.currentTimeMillis(), 0)); >>> >>> } >>> >>> @Override >>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>> System.out.println("declareOutputFields"); >>> declarer.declare(new Fields("string1", "timestamp", "omitted")); >>> } >>> } >>> >>> //////////////////////////////////////////////////////////////////////////////// >>> // ./src/main/java/myexample/bolt/Node.java >>> //////////////////////////////////////////////////////////////////////////////// >>> >>> package myexample.bolt; >>> >>> import backtype.storm.task.TopologyContext; >>> import backtype.storm.topology.BasicOutputCollector; >>> import backtype.storm.topology.OutputFieldsDeclarer; >>> import backtype.storm.topology.base.BaseBasicBolt; >>> import backtype.storm.tuple.Fields; >>> import backtype.storm.tuple.Tuple; >>> import backtype.storm.tuple.Values; >>> import java.util.Map; >>> >>> public class Node extends BaseBasicBolt { >>> >>> public static boolean isTupleEmpty(Tuple tuple) { >>> return false; >>> } >>> >>> @Override >>> public void prepare(Map stormConf, TopologyContext context) { >>> super.prepare(stormConf, context); >>> } >>> >>> @Override >>> public void cleanup() { >>> super.cleanup(); >>> } >>> >>> @Override >>> public void execute(Tuple tuple, BasicOutputCollector collector) { >>> collector.emit(new Values("aaa", 1, System.currentTimeMillis(), 0)); >>> } >>> >>> @Override >>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>> declarer.declare(new Fields("string1", "string2", "timestamp", >>> "omitted")); >>> } >>> } >>> >>> //////////////////////////////////////////////////////////////////////////////// >>> // ./src/main/java/myexample/bolt/StandardBolt.java >>> //////////////////////////////////////////////////////////////////////////////// >>> >>> package myexample.bolt; >>> >>> import java.util.Map; >>> >>> import backtype.storm.task.TopologyContext; >>> import backtype.storm.topology.BasicOutputCollector; >>> import backtype.storm.topology.OutputFieldsDeclarer; >>> import backtype.storm.topology.base.BaseBasicBolt; >>> import backtype.storm.tuple.Tuple; >>> >>> public class StandardBolt extends BaseBasicBolt { >>> >>> @Override >>> public void prepare(Map stormConf, TopologyContext context) { >>> super.prepare(stormConf, context); >>> } >>> >>> @Override >>> public void execute(Tuple tuple, BasicOutputCollector collector) { >>> } >>> >>> @Override >>> public void declareOutputFields(OutputFieldsDeclarer ofd) { >>> } >>> } >>> >>> Probably it is the source code which has the problem or other things >>> around the project environment might contain the problem. I would >>> really appreciate if you could verify whether the code looks ok or >>> not. >>> >>>> >>>> About further debugging: you can increase the log level to get more >>>> information: >>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/logging.html >>> >>> I tried to inject `log4j.properties' file that I got from a sample >>> flink-quickstart-java application created from `mvn >>> archetype:generate' to a ./target/*.jar but it does not work. I tried >>> this because placing that `log4j.properties' file under >>> ./src/main/resources of my project did not work in the first place. >>> >>> Thank you again for your help. >>> With best regards, >>> Shinhyung >>> >>>> Hope this helps! >>>> >>>> -Matthias >>>> >>>> On 01/09/2016 04:38 PM, Shinhyung Yang wrote: >>>>> Dear Matthias, >>>>> >>>>> Thank you for replying! >>>>> >>>>> that sounds weird and should not happen -- Spout.open() should get >>>>> called exactly once. >>>>> >>>>> >>>>> That's what I thought too. I'm new to both Storm and Flink so it's quite >>>>> complicated for me to handle both yet; would it be helpful for me if I >>>>> know storm's lifecyle and flink 's lifecycle? When submitTopology() >>>>> invoked, what should be called other than spout.open()? >>>>> >>>>> I am not sure about multiple calls to >>>>> >>>>> declareOuputFields though -- if might be called multiple times -- >>>>> would >>>>> need to double check the code. >>>>> >>>>> >>>>> I'll check my code too. >>>>> >>>>> >>>>> However, the call to declareOuputFields should be idempotent, so it >>>>> should actually not be a problem if it is called multiple times. Even >>>>> if >>>>> Storm might call this method only once, there is no guarantee that it >>>>> is >>>>> not called multiple time. If this is a problem for you, please let me >>>>> know. I think we could fix this and make sure the method is only >>>>> called >>>>> once. >>>>> >>>>> >>>>> Actually it doesn't seem to be a problem for now. It just does the same >>>>> job multiple times. >>>>> >>>>> >>>>> It would be helpful if you could share you code. What do you mean with >>>>> "exits silently"? No submission happens? Did you check the logs? As >>>>> you >>>>> mentioned FlinkLocalCluster, I assume that you run within an IDE? >>>>> >>>>> >>>>> The topology doesn't seem to continue. There's a set of initialization >>>>> code in the open method of the program's spout and it looks hopeless if >>>>> it's not invoked. Is there any way to check the logs other than using >>>>> println() calls? I'm running it on the commandline with having >>>>> `bin/start_local.sh' running in the background and `bin/flink run'. >>>>> >>>>> >>>>> Btw: lately we fixed a couple of bugs. I would suggest that you use >>>>> the >>>>> latest version from Flink master branch. I should work with 0.10.1 >>>>> without problems. >>>>> >>>>> >>>>> It was vey tedious for me to deal with a pom.xml file and .m2 >>>>> repository. So I preferred to use maven central. But I should try with >>>>> the master branch if I have to. >>>>> >>>>> I will quickly check if I could share some of the code. >>>>> >>>>> Thank you again for the help! >>>>> With best regards, >>>>> Shinhyung Yang >>>>> >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> >>>>> On 01/09/2016 01:27 AM, Shinhyung Yang wrote: >>>>> > Howdies to everyone, >>>>> > >>>>> > I'm trying to use the storm compatibility layer on Flink 0.10.1. The >>>>> > original storm topology works fine on Storm 0.9.5 and I have >>>>> > incorporated FlinkLocalCluster, FlinkTopologyBuilder, and >>>>> > FlinkTopology classes according to the programming guide >>>>> > >>>>> >>>>> (https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/storm_compatibility.html). >>>>> > I'm running it on Oracle Java 8 (1.8.0_66-b17) on Centos 7 >>>>> (7.2.1511). >>>>> > What happens is, it seems to be going all the way to submitTopology >>>>> > method without any problem, however it doesn't invoke open method of >>>>> > Spout class but declareOutputFields method is called for multiple >>>>> > times and the program exits silently. Do you guys have any idea >>>>> what's >>>>> > going on here or have any suggestions? If needed, then please ask me >>>>> > for more information. >>>>> > >>>>> > Thank you for reading. >>>>> > With best regards, >>>>> > Shinhyung Yang >>>>> > >>>>> >>>> >>
signature.asc
Description: OpenPGP digital signature