@Spico: Will share.
The streams implementation is working beautifully.
Only the topology killing is failing.
*Tried:*
Map conf = Utils.readStormConfig();
NimbusClient cc =
NimbusClient.getConfiguredClient(conf);
Nimbus.Client client = cc.getClient();
client.killTopology("myStorm");
*I get these errors:*
29442 [Thread-32-topologyKillerBolt-executor[16 16]] WARN
o.a.s.u.NimbusClient - Ignoring exception while trying to get leader nimbus
info from localhost. will retry with a different seed host.
java.lang.RuntimeException:
org.apache.storm.thrift.transport.TTransportException:
java.net.ConnectException: Connection refused
Caused by: org.apache.storm.thrift.transport.TTransportException:
java.net.ConnectException: Connection refused
Caused by: java.net.ConnectException: Connection refused
29445 [Thread-32-topologyKillerBolt-executor[16 16]] ERROR o.a.s.util -
Async loop died!
java.lang.RuntimeException:
org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader
nimbus from seed hosts [localhost]. Did you specify a valid list of nimbus
hosts for config nimbus.seeds?
Caused by: org.apache.storm.utils.NimbusLeaderNotFoundException: Could not
find leader nimbus from seed hosts [localhost]. Did you specify a valid
list of nimbus hosts for config nimbus.seeds?
29462 [Thread-32-topologyKillerBolt-executor[16 16]] ERROR o.a.s.util -
Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
The error is apparently on this line: NimbusClient cc =
NimbusClient.getConfiguredClient(conf);
On Mon, May 9, 2016 at 3:15 PM, Spico Florin <[email protected]> wrote:
> Hi!
> You welcome Navine. I'm also interested in the solution. Can you please
> share your remarks and (some code :)) after the implementation?
> Thanks.
> Regards,\
> Florin
>
> On Mon, May 9, 2016 at 7:20 AM, Navin Ipe <[email protected]
> > wrote:
>
>> @Matthias: That's genius! I didn't know streams and allGroupings could be
>> used like that.
>> In the way Storm introduced tick tuples, it'd have been nice if Storm had
>> a native technique of doing all this, but the ideas you've come up with are
>> extremely good. Am going to try implementing them right away.
>> Thank you too Florin!
>>
>> On Mon, May 9, 2016 at 12:48 AM, Matthias J. Sax <[email protected]>
>> wrote:
>>
>>> To synchronize this, use an additional "shut down bolt" that used
>>> parallelism of one. "shut down bolt" must be notified by all parallel
>>> DbBolts after they performed the flush. If all notifications are
>>> received, there are not in-flight message and thus "shut down bolt" can
>>> kill the topology safely.
>>>
>>> -Matthias
>>>
>>>
>>>
>>> On 05/08/2016 07:27 PM, Spico Florin wrote:
>>> > hi!
>>> > there is this solution of sending a poison pill message from the
>>> > spout. on bolt wil receiv your poison pill and will kill topology via
>>> > storm storm nimbus API. one potentential issue whith this approach is
>>> > that due to your topology structure regarding the parralelism of your
>>> > bolts nd the time required by themto excute their bussineess logic, is
>>> > that the poison pill to be swallowed by the one bolt responsilble for
>>> > killing the topology, before all the other messages that are in-flight
>>> > to be processed. the conseuence is that you cannot be sure that all the
>>> > messagess sent by the spout were processed. also sharing the total
>>> > number of sent messages between the excutors in order to shutdown when
>>> > all messages were processed coul be error prone since tuple can be
>>> > processed many times (depending on your guaranteee message processing)
>>> > or they could be failed.
>>> > i coul not find a solution for this. storm is intended to run
>>> > forunbounded data.
>>> > i hope that thrse help,
>>> > regard,
>>> > florin
>>> >
>>> >
>>> > On Sunday, May 8, 2016, Matthias J. Sax <[email protected]
>>> > <mailto:[email protected]>> wrote:
>>> >
>>> > You can get the number of bolt instances from TopologyContext that
>>> is
>>> > provided in Bolt.prepare()
>>> >
>>> > Furthermore, you could put a loop into your topology, ie, a bolt
>>> reads
>>> > it's own output; if you broadcast (ie, allGrouping) this
>>> > feedback-loop-stream you can let bolt instances talk to each other.
>>> >
>>> > builder.setBolt("DbBolt", new MyDBBolt())
>>> > .shuffleGrouping("spout")
>>> > .allGrouping("flush-stream", "DbBolt");
>>> >
>>> > where "flush-stream" is a second output stream of MyDBBolt()
>>> sending a
>>> > notification tuple after it received the end-of-stream from spout;
>>> > furthermore, if a bolt received the signal via "flush-stream" from
>>> > **all** parallel bolt instances, it can flush to DB.
>>> >
>>> > Or something like this... Be creative! :)
>>> >
>>> >
>>> > -Matthias
>>> >
>>> >
>>> > On 05/08/2016 02:26 PM, Navin Ipe wrote:
>>> > > @Matthias: I agree about the batch processor, but my superior
>>> took the
>>> > > decision to use Storm, and he visualizes more complexity later
>>> for
>>> > which
>>> > > he needs Storm.
>>> > > I had considered the "end of stream" tuple earlier (my idea was
>>> to
>>> > emit
>>> > > 10 consecutive nulls), but then the question was how do I know
>>> how
>>> > many
>>> > > bolt instances have been created, and how do I notify all the
>>> bolts?
>>> > > Because it's only after the last bolt finishes writing to DB,
>>> that I
>>> > > have to shut down the topology.
>>> > >
>>> > > @Jason: Thanks. I had seen storm signals earlier (I think from
>>> one of
>>> > > your replies to someone else) and I had a look at the code too,
>>> > but am a
>>> > > bit wary because it's no longer being maintained and because of
>>> the
>>> > > issues: https://github.com/ptgoetz/storm-signals/issues
>>> > >
>>> > > On Sun, May 8, 2016 at 5:40 AM, Jason Kusar <[email protected]
>>> > <javascript:;>
>>> > > <mailto:[email protected] <javascript:;>>> wrote:
>>> > >
>>> > > You might want to check out Storm Signals.
>>> > > https://github.com/ptgoetz/storm-signals
>>> > >
>>> > > It might give you what you're looking for.
>>> > >
>>> > >
>>> > > On Sat, May 7, 2016, 11:59 AM Matthias J. Sax
>>> > <[email protected] <javascript:;>
>>> > > <mailto:[email protected] <javascript:;>>> wrote:
>>> > >
>>> > > As you mentioned already: Storm is designed to run
>>> topologies
>>> > > forever ;)
>>> > > If you have finite data, why do you not use a batch
>>> > processor???
>>> > >
>>> > > As a workaround, you can embed "control messages" in your
>>> > stream
>>> > > (or use
>>> > > an additional stream for them).
>>> > >
>>> > > If you want a topology to shut down itself, you could use
>>> > >
>>> >
>>> `NimbusClient.getConfiguredClient(conf).getClient().killTopology(name);`
>>> > > in your spout/bolt code.
>>> > >
>>> > > Something like:
>>> > > - Spout emit all tuples
>>> > > - Spout emit special "end of stream" control tuple
>>> > > - Bolt1 processes everything
>>> > > - Bolt1 forward "end of stream" control tuple (when it
>>> > received it)
>>> > > - Bolt2 processed everything
>>> > > - Bolt2 receives "end of stream" control tuple => flush
>>> to DB
>>> > > => kill
>>> > > topology
>>> > >
>>> > > But I guess, this is kinda weird pattern.
>>> > >
>>> > > -Matthias
>>> > >
>>> > > On 05/05/2016 06:13 AM, Navin Ipe wrote:
>>> > > > Hi,
>>> > > >
>>> > > > I know Storm is designed to run forever. I also know
>>> about
>>> > > Trident's
>>> > > > technique of aggregation. But shouldn't Storm have a
>>> way to
>>> > > let bolts
>>> > > > know that a certain bunch of processing has been
>>> completed?
>>> > > >
>>> > > > Consider this topology:
>>> > > > Spout------>Bolt-A------>Bolt-B
>>> > > > | |--->Bolt-B
>>> > > > | \--->Bolt-B
>>> > > > |--->Bolt-A------>Bolt-B
>>> > > > | |--->Bolt-B
>>> > > > | \--->Bolt-B
>>> > > > \--->Bolt-A------>Bolt-B
>>> > > > |--->Bolt-B
>>> > > > \--->Bolt-B
>>> > > >
>>> > > > * From Bolt-A to Bolt-B, it is a FieldsGrouping.
>>> > > > * Spout emits only a few tuples and then stops
>>> emitting.
>>> > > > * Bolt A takes those tuples and generates millions of
>>> > tuples.
>>> > > >
>>> > > >
>>> > > > *Bolt-B accumulates tuples that Bolt A sends and needs
>>> > to know
>>> > > when
>>> > > > Spout finished emitting. Only then can Bolt-B start
>>> > writing to
>>> > > SQL.*
>>> > > >
>>> > > > *Questions:*
>>> > > > 1. How can all Bolts B be notified that it is time to
>>> > write to
>>> > > SQL?
>>> > > > 2. After all Bolts B have written to SQL, how to know
>>> > that all
>>> > > Bolts B
>>> > > > have completed writing?
>>> > > > 3. How to stop the topology? I know of
>>> > > > localCluster.killTopology("HelloStorm"), but shouldn't
>>> there
>>> > > be a way to
>>> > > > do it from the Bolt?
>>> > > >
>>> > > > --
>>> > > > Regards,
>>> > > > Navin
>>> > >
>>> > >
>>> > >
>>> > >
>>> > > --
>>> > > Regards,
>>> > > Navin
>>> >
>>>
>>>
>>
>>
>> --
>> Regards,
>> Navin
>>
>
>
--
Regards,
Navin