Hmm...yes that's a better idea.
On Tue, May 10, 2016 at 3:12 PM, Matthias J. Sax <[email protected]> wrote:
> I am not sure if NimbusClient works well with LocalCluster. My
> suggestion was based on the assumption, that you run in a real cluster.
>
> There would be LocalCluster.killTopology(); maybe you should use this
> method instead of NimbusClient.kill().
>
> Using LocalCluster, I usually use the following pattern (that return
> nicely):
>
> > LocalCluster lc = new LocalCluster();
> > lc.submitTopology(TOPOLOGY_NAME, config, topology);
> >
> > Utils.sleep(runtime);
> > lc.deactivate(TOPOLOGY_NAME);
> >
> > Utils.sleep(10000);
> > lc.shutdown();
>
> I use deactivate() to send a flush-signal through my topology, too.
> You could of course replace deactivete() with kill(). Furthermore,
> instead of "Utils.sleep(runtime)", you could do a wait-loop checking for
> a global boolean flag "finished" to get set by your bolt -- ie, instead
> of calling NimbusClient.kill(...) in you "shut-down bolt", just set this
> global flag to tell the driver to resume.
>
>
>
> -Matthias
>
> On 05/10/2016 11:29 AM, Navin Ipe wrote:
> > Turns out, using nimbus.seeds was sufficient.
> > /
> > import org.apache.storm.utils.NimbusClient;
> > import org.apache.storm.utils.Utils;
> >
> > Map conf = Utils.readStormConfig();
> > conf.put("nimbus.seeds", "localhost");
> >
> > NimbusClient cc = NimbusClient.getConfiguredClient(conf);
> >
> > Nimbus.Client client = cc.getClient();
> > client.killTopology("MyStorm");/
> >
> > Was able to kill the topology. Am a bit surprised though. I thought
> > doing this would kill the submitted topology and take me to the next
> > task line of code in main() (ie: the line of code after I submitted the
> > topology to Storm).
> > But killing the topology stops the entire program in this manner:
> > /:run FAILED
> >
> > FAILURE: Build failed with an exception.
> >
> > * What went wrong:
> > Execution failed for task ':run'.
> >> Process 'command
> >
> '/Library/Java/JavaVirtualMachines/jdk1.8.0_73.jdk/Contents/Home/bin/java''
> > finished with non-zero exit value 1
> > /
> > I guess this is why a Storm topology is meant to run forever.
> > Would've been nice though, if Storm provided a clean way to exit a
> topology.
> > Thanks for all the help Matthias!
> >
> >
> >
> > On Tue, May 10, 2016 at 1:57 PM, Matthias J. Sax <[email protected]
> > <mailto:[email protected]>> wrote:
> >
> > My bad.
> >
> > The parameter is called "nimbus.seeds" (former "nimbus.host") and not
> > "nimbus.leader".
> >
> > And I guess, "build/libs" is not your working directory. (See you IDE
> > setting of your run configuration.)
> >
> > In doubt, include a "System.out.println(new
> File().getAbsolutePath());"
> > (or similar) in your bolt code, to get the working directory.
> >
> > And check
> >
> https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/utils/Utils.java#L349
> >
> > and
> >
> https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/utils/Utils.java#L452
> >
> > You can also specify the location for storm.yaml via
> >
> > System.setProperty("storm.conf.file", <your-path>);
> > (or -Dstorm.conf.file=<your-path>)
> >
> >
> > -Matthias
> >
> > On 05/10/2016 06:24 AM, Navin Ipe wrote:
> > > *@Spico: *The code as promised:
> > >
> >
> http://nrecursions.blogspot.in/2016/05/more-concepts-of-apache-storm-you-need.html#morecreativetopologystructures
> > > *@Matthias:* Still no luck. I tried this in the bolt code:
> > > Map conf = Utils.readStormConfig();
> > > conf.put("nimbus.leader", "localhost");
> > >
> > > Also tried altering the storm.yaml file to have this:
> > > ########### These MUST be filled in for a storm configuration
> > >
> > > storm.zookeeper.servers:
> > > - "localhost"
> > > # - "server2"
> > > nimbus.seeds: ["localhost"]
> > >
> > > Am running this on LocalCluster, and strangely, the storm.yaml
> file is
> > > in my ~/eclipseworkspace/apache-storm-1.0.0_release/conf/ folder,
> > > although my project is in the ~/eclipseworkspace/MyStorm folder.
> > >
> > > Placed a copy of storm.yaml in my project folder and in the
> build/libs
> > > folder. Still no luck.
> > > For this person
> > >
> > <
> http://stackoverflow.com/questions/36742451/apache-storm-could-not-find-leader-nimbus-from-seed-hosts
> >,
> > > it was a port issue. I don't think that's the case for me.
> > > Is there anything else that could be tried out?
> > >
> > >
> > >
> > > On Mon, May 9, 2016 at 6:18 PM, Matthias J. Sax <[email protected]
> <mailto:[email protected]>
> > > <mailto:[email protected] <mailto:[email protected]>>> wrote:
> > >
> > > Utils.readStormConfig() tries to read "./storm.yaml" from
> > local disc
> > > (ie, supervisor machine that executes the bolt) -- as it is
> using
> > > "working-directory" a guess it does not find the file, and
> > thus value
> > > "nimbus.host" is not set.
> > >
> > > Make sure that storm.yaml is found be the worker, or set
> > nimbus.host
> > > manually in your bolt code:
> > >
> > > conf.put("nimbus.host", "<your-nimbus-host-name>");
> > >
> > > (or "nimbus.leader" that replaces "nimbus.host" in Storm 1.0.0
> > >
> > >
> > > -Matthias
> > >
> > > On 05/09/2016 12:31 PM, Navin Ipe wrote:
> > > > @Spico: Will share.
> > > >
> > > > The streams implementation is working beautifully.
> > > > Only the topology killing is failing.
> > > >
> > > > *Tried:*
> > > > Map conf = Utils.readStormConfig();
> > > > NimbusClient cc =
> > > > NimbusClient.getConfiguredClient(conf);
> > > > Nimbus.Client client = cc.getClient();
> > > > client.killTopology("myStorm");
> > > >
> > > > *I get these errors:*
> > > > 29442 [Thread-32-topologyKillerBolt-executor[16 16]] WARN
> > > > o.a.s.u.NimbusClient - Ignoring exception while trying to
> > get leader
> > > > nimbus info from localhost. will retry with a different seed
> > host.
> > > > java.lang.RuntimeException:
> > > > org.apache.storm.thrift.transport.TTransportException:
> > > > java.net.ConnectException: Connection refused
> > > > Caused by:
> > org.apache.storm.thrift.transport.TTransportException:
> > > > java.net.ConnectException: Connection refused
> > > > Caused by: java.net.ConnectException: Connection refused
> > > > 29445 [Thread-32-topologyKillerBolt-executor[16 16]] ERROR
> > o.a.s.util -
> > > > Async loop died!
> > > > java.lang.RuntimeException:
> > > > org.apache.storm.utils.NimbusLeaderNotFoundException: Could
> > not find
> > > > leader nimbus from seed hosts [localhost]. Did you specify a
> > valid list
> > > > of nimbus hosts for config nimbus.seeds?
> > > > Caused by:
> > org.apache.storm.utils.NimbusLeaderNotFoundException: Could
> > > > not find leader nimbus from seed hosts [localhost]. Did you
> > specify a
> > > > valid list of nimbus hosts for config nimbus.seeds?
> > > > 29462 [Thread-32-topologyKillerBolt-executor[16 16]] ERROR
> > o.a.s.util -
> > > > Halting process: ("Worker died")
> > > > java.lang.RuntimeException: ("Worker died")
> > > >
> > > > The error is apparently on this line: NimbusClient cc =
> > > > NimbusClient.getConfiguredClient(conf);
> > > >
> > > >
> > > >
> > > > On Mon, May 9, 2016 at 3:15 PM, Spico Florin
> > <[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>
> > > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>>
> wrote:
> > > >
> > > > Hi!
> > > > You welcome Navine. I'm also interested in the
> solution. Can you
> > > > please share your remarks and (some code :)) after the
> implementation?
> > > > Thanks.
> > > > Regards,\
> > > > Florin
> > > >
> > > > On Mon, May 9, 2016 at 7:20 AM, Navin Ipe
> > > > <[email protected]
> > <mailto:[email protected]>
> > <mailto:[email protected]
> > <mailto:[email protected]>>
> > > > <mailto:[email protected]
> > <mailto:[email protected]>
> > > <mailto:[email protected]
> > <mailto:[email protected]>>>> wrote:
> > > >
> > > > @Matthias: That's genius! I didn't know streams and
> allGroupings
> > > > could be used like that.
> > > > In the way Storm introduced tick tuples, it'd have
> been nice if
> > > > Storm had a native technique of doing all this, but
> the ideas
> > > > you've come up with are extremely good. Am going to
> try
> > > > implementing them right away.
> > > > Thank you too Florin!
> > > >
> > > > On Mon, May 9, 2016 at 12:48 AM, Matthias J. Sax
> > > > <[email protected] <mailto:[email protected]> <mailto:
> [email protected]
> > <mailto:[email protected]>>
> > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>> wrote:
> > > >
> > > > To synchronize this, use an additional "shut down
> > > bolt" that
> > > > used
> > > > parallelism of one. "shut down bolt" must be
> > notified
> > > by all
> > > > parallel
> > > > DbBolts after they performed the flush. If all
> > > notifications are
> > > > received, there are not in-flight message and
> > thus "shut
> > > > down bolt" can
> > > > kill the topology safely.
> > > >
> > > > -Matthias
> > > >
> > > >
> > > >
> > > > On 05/08/2016 07:27 PM, Spico Florin wrote:
> > > > > hi!
> > > > > there is this solution of sending a poison
> pill
> > > message from the
> > > > > spout. on bolt wil receiv your poison pill and
> > will
> > > kill topology via
> > > > > storm storm nimbus API. one potentential issue
> > whith
> > > this approach is
> > > > > that due to your topology structure regarding
> the
> > > parralelism of your
> > > > > bolts nd the time required by themto excute
> their
> > > bussineess logic, is
> > > > > that the poison pill to be swallowed by the
> > one bolt
> > > responsilble for
> > > > > killing the topology, before all the other
> > messages
> > > that are in-flight
> > > > > to be processed. the conseuence is that you
> cannot
> > > be sure that all the
> > > > > messagess sent by the spout were processed.
> also
> > > sharing the total
> > > > > number of sent messages between the excutors in
> > > order to shutdown when
> > > > > all messages were processed coul be error prone
> > > since tuple can be
> > > > > processed many times (depending on your
> guaranteee
> > > message processing)
> > > > > or they could be failed.
> > > > > i coul not find a solution for this. storm
> is
> > > intended to run
> > > > > forunbounded data.
> > > > > i hope that thrse help,
> > > > > regard,
> > > > > florin
> > > > >
> > > > >
> > > > > On Sunday, May 8, 2016, Matthias J. Sax
> > > <[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>
> > <mailto:[email protected] <mailto:[email protected]>
> > > <mailto:[email protected] <mailto:[email protected]>>>
> > > > > <mailto:[email protected]
> > <mailto:[email protected]> <mailto:[email protected]
> > <mailto:[email protected]>>
> > > <mailto:[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>>> wrote:
> > > > >
> > > > > You can get the number of bolt instances
> from
> > > > TopologyContext that is
> > > > > provided in Bolt.prepare()
> > > > >
> > > > > Furthermore, you could put a loop into your
> > > topology,
> > > > ie, a bolt reads
> > > > > it's own output; if you broadcast (ie,
> > > allGrouping) this
> > > > > feedback-loop-stream you can let bolt
> > instances talk
> > > > to each other.
> > > > >
> > > > > builder.setBolt("DbBolt", new MyDBBolt())
> > > > > .shuffleGrouping("spout")
> > > > > .allGrouping("flush-stream",
> "DbBolt");
> > > > >
> > > > > where "flush-stream" is a second output
> > stream of
> > > > MyDBBolt() sending a
> > > > > notification tuple after it received the
> > > end-of-stream
> > > > from spout;
> > > > > furthermore, if a bolt received the signal
> via
> > > > "flush-stream" from
> > > > > **all** parallel bolt instances, it can
> > flush to DB.
> > > > >
> > > > > Or something like this... Be creative! :)
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > >
> > > > > On 05/08/2016 02:26 PM, Navin Ipe wrote:
> > > > > > @Matthias: I agree about the batch
> > processor,
> > > but my
> > > > superior took the
> > > > > > decision to use Storm, and he visualizes
> > more
> > > > complexity later for
> > > > > which
> > > > > > he needs Storm.
> > > > > > I had considered the "end of stream"
> > tuple earlier
> > > > (my idea was to
> > > > > emit
> > > > > > 10 consecutive nulls), but then the
> question
> > > was how
> > > > do I know how
> > > > > many
> > > > > > bolt instances have been created, and
> > how do I
> > > > notify all the bolts?
> > > > > > Because it's only after the last bolt
> > finishes
> > > > writing to DB, that I
> > > > > > have to shut down the topology.
> > > > > >
> > > > > > @Jason: Thanks. I had seen storm signals
> > > earlier (I
> > > > think from one of
> > > > > > your replies to someone else) and I had
> > a look at
> > > > the code too,
> > > > > but am a
> > > > > > bit wary because it's no longer being
> > > maintained and
> > > > because of the
> > > > > > issues:
> > > https://github.com/ptgoetz/storm-signals/issues
> > > > > >
> > > > > > On Sun, May 8, 2016 at 5:40 AM, Jason
> Kusar
> > > > <[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>
> > > <mailto:[email protected] <mailto:[email protected]> <mailto:
> [email protected]
> > <mailto:[email protected]>>>
> > > > > <javascript:;>
> > > > > > <mailto:[email protected]
> > <mailto:[email protected]> <mailto:[email protected]
> > <mailto:[email protected]>> <mailto:[email protected]
> > <mailto:[email protected]>
> > > <mailto:[email protected] <mailto:[email protected]>>>
> <javascript:;>>> wrote:
> > > > > >
> > > > > > You might want to check out Storm
> Signals.
> > > > > >
> https://github.com/ptgoetz/storm-signals
> > > > > >
> > > > > > It might give you what you're
> looking for.
> > > > > >
> > > > > >
> > > > > > On Sat, May 7, 2016, 11:59 AM
> Matthias J. Sax
> > > > > <[email protected] <mailto:[email protected]>
> <mailto:[email protected]
> > <mailto:[email protected]>>
> > > <mailto:[email protected] <mailto:[email protected]> <mailto:
> [email protected]
> > <mailto:[email protected]>>> <javascript:;>
> > > > > > <mailto:[email protected]
> > <mailto:[email protected]>
> > > <mailto:[email protected] <mailto:[email protected]>>
> > > > <mailto:[email protected]
> > <mailto:[email protected]> <mailto:[email protected]
> > <mailto:[email protected]>>>
> > > <javascript:;>>> wrote:
> > > > > >
> > > > > > As you mentioned already: Storm
> is
> > > designed
> > > > to run topologies
> > > > > > forever ;)
> > > > > > If you have finite data, why do
> you
> > > not use
> > > > a batch
> > > > > processor???
> > > > > >
> > > > > > As a workaround, you can embed
> > "control
> > > > messages" in your
> > > > > stream
> > > > > > (or use
> > > > > > an additional stream for them).
> > > > > >
> > > > > > If you want a topology to shut
> down
> > > itself,
> > > > you could use
> > > > > >
> > > > >
> > > >
> > >
> >
> `NimbusClient.getConfiguredClient(conf).getClient().killTopology(name);`
> > > > > > in your spout/bolt code.
> > > > > >
> > > > > > Something like:
> > > > > > - Spout emit all tuples
> > > > > > - Spout emit special "end of
> > stream"
> > > > control tuple
> > > > > > - Bolt1 processes everything
> > > > > > - Bolt1 forward "end of stream"
> > control
> > > > tuple (when it
> > > > > received it)
> > > > > > - Bolt2 processed everything
> > > > > > - Bolt2 receives "end of
> > stream" control
> > > > tuple => flush to DB
> > > > > > => kill
> > > > > > topology
> > > > > >
> > > > > > But I guess, this is kinda weird
> > pattern.
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > > On 05/05/2016 06:13 AM, Navin
> > Ipe wrote:
> > > > > > > Hi,
> > > > > > >
> > > > > > > I know Storm is designed to run
> > > forever. I
> > > > also know about
> > > > > > Trident's
> > > > > > > technique of aggregation. But
> > shouldn't
> > > > Storm have a way to
> > > > > > let bolts
> > > > > > > know that a certain bunch of
> > processing
> > > > has been completed?
> > > > > > >
> > > > > > > Consider this topology:
> > > > > > > Spout------>Bolt-A------>Bolt-B
> > > > > > > |
> > > |--->Bolt-B
> > > > > > > |
> > > \--->Bolt-B
> > > > > > >
> > |--->Bolt-A------>Bolt-B
> > > > > > > |
> > > |--->Bolt-B
> > > > > > > |
> > > \--->Bolt-B
> > > > > > >
> > \--->Bolt-A------>Bolt-B
> > > > > > >
> > > |--->Bolt-B
> > > > > > >
> > > \--->Bolt-B
> > > > > > >
> > > > > > > * From Bolt-A to Bolt-B, it
> is a
> > > > FieldsGrouping.
> > > > > > > * Spout emits only a few
> tuples
> > > and then
> > > > stops emitting.
> > > > > > > * Bolt A takes those tuples
> and
> > > > generates millions of
> > > > > tuples.
> > > > > > >
> > > > > > >
> > > > > > > *Bolt-B accumulates tuples
> > that Bolt A
> > > > sends and needs
> > > > > to know
> > > > > > when
> > > > > > > Spout finished emitting. Only
> > then can
> > > > Bolt-B start
> > > > > writing to
> > > > > > SQL.*
> > > > > > >
> > > > > > > *Questions:*
> > > > > > > 1. How can all Bolts B be
> notified
> > > that it
> > > > is time to
> > > > > write to
> > > > > > SQL?
> > > > > > > 2. After all Bolts B have
> > written to
> > > SQL,
> > > > how to know
> > > > > that all
> > > > > > Bolts B
> > > > > > > have completed writing?
> > > > > > > 3. How to stop the topology? I
> > know of
> > > > > > >
> > localCluster.killTopology("HelloStorm"),
> > > > but shouldn't there
> > > > > > be a way to
> > > > > > > do it from the Bolt?
> > > > > > >
> > > > > > > --
> > > > > > > Regards,
> > > > > > > Navin
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Regards,
> > > > > > Navin
> > > > >
> > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Regards,
> > > > Navin
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Regards,
> > > > Navin
> > >
> > >
> > >
> > >
> > > --
> > > Regards,
> > > Navin
> >
> >
> >
> >
> > --
> > Regards,
> > Navin
>
>
--
Regards,
Navin