Navin,  my Zookeeper solution requires you to initiate your own connection
to zookeeper.  Storm includes zookeeper, but it doesn't expose it in any
pretty way to the topologies.  In my case, I just used the zookeeper client
library to initiate my own connection and store the information under my
own folder in zookeeper.

http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#_introduction


On Wed, Apr 20, 2016 at 7:33 AM Navin Ipe <[email protected]>
wrote:

> In this case the values coming from the bolts can all be put() or updated
> into a single large hashmap in the Bolt in Worker11. So no need of
> aggregation.
> If there is no standard way in Storm for spouts and bolts to notify each
> other that there is nothing more for them to process, then I guess I'll
> just send tuples with "null" in them so that the Bolt in Worker11 will know
> the processing is over.
>
> I just hope what y'all said about tuples being automatically passed
> between workers, actually works without any problems :-)
> Thanks a lot for all your help!
>
> On Wed, Apr 20, 2016 at 12:06 PM, Jungtaek Lim <[email protected]> wrote:
>
>> Navin,
>>
>> I think this two lines are not cleared so I may have misunderstand.
>>
>> 5. When all the Bolts in the workers are finished, they send their
>> outputs to a single Bolt in Worker 11.
>> 6. The Bolt in Worker 11 writes the processed value to a new MySQL table.
>>
>> If you don't need to aggregate (I mean join) the results from Bolt in
>> Worker1~10 to Bolt in Worker11, no need to use Trident or Windowing.
>>
>> 2016년 4월 20일 (수) 오후 3:28, Navin Ipe <[email protected]>님이
>> 작성:
>>
>>> @Jungtaek: This person (
>>> http://stackoverflow.com/questions/21480773/can-twitter-storm-worker-get-only-parts-of-the-topology)
>>> claims that Storm would automatically manage the flow of data between
>>> spouts and blots on different workers. Can anyone confirm this? If this is
>>> the case, I won't have to bother using Trident.
>>>
>>> On Wed, Apr 20, 2016 at 8:59 AM, Navin Ipe <
>>> [email protected]> wrote:
>>>
>>>> @Jason: Thanks. Tried searching for Storm code which starts Ephemeral
>>>> nodes, but couldn't find it. (am new to Hadoop and Storm, so perhaps I was
>>>> searching for the wrong thing)
>>>>
>>>> @Jungtaek: Will explore component tasks. Meanwhile, I had considered
>>>> Trident, but didn't go ahead because it was not clear how I could implement
>>>> multiple spouts in Trident, where each spout would iterate a certain number
>>>> of rows of a database. Any idea how that could happen.
>>>>
>>>> On Tue, Apr 19, 2016 at 6:39 PM, Jungtaek Lim <[email protected]>
>>>> wrote:
>>>>
>>>>> There's other idea without relying on Zookeeper : use ordinal of task
>>>>> id between same components (spout)
>>>>>
>>>>> Task id is issued across all tasks including system tasks so you can't
>>>>> assume spout tasks are having task id sequentially, but whatever you can 
>>>>> do
>>>>> the trick - check "ordinal" of this spout task's id around same spouts.
>>>>> Please refer GeneralTopologyContext.getComponentTasks(String
>>>>> componentId).
>>>>>
>>>>> Btw, Spout1 -> Bolt2 can be done with various ways but it would not be
>>>>> easy to aggregate the results of Bolt2 from Bolt3.
>>>>> You should consider windowing by processed time or Trident or maintain
>>>>> your own buffers.
>>>>>
>>>>> Hope this helps.
>>>>>
>>>>> Thanks,
>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>
>>>>> 2016년 4월 19일 (화) 오후 10:02, Jason Kusar <[email protected]>님이 작성:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I've done a similar thing before with the exception that I was
>>>>>> reading from Cassandra.  The concept is the same though.  Assuming you 
>>>>>> know
>>>>>> that you have 10,000 records and you want each spout to read 1,000 of 
>>>>>> them,
>>>>>> then you would launch 10 instances of the spouts.  The first thing they 
>>>>>> do
>>>>>> during init is to connect to zookeeper and create an ephemeral node (
>>>>>> http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes)
>>>>>> starting with one called '0'.  If 0 already exists, you'll get an 
>>>>>> exception
>>>>>> which means you try to create '1' and so on until you successfully 
>>>>>> create a
>>>>>> node.  That tells you which batch of records that instance of the spout 
>>>>>> is
>>>>>> responsible for.  I.e., if you successfully created '3', then this spout
>>>>>> needs to set its offset to 3,000.
>>>>>>
>>>>>> The reason for using ephemeral nodes is that they are automatically
>>>>>> deleted if the zookeeper client disconnects.  That way if a spout 
>>>>>> crashes,
>>>>>> once Storm relaunches the spout, it will be able to re-claim that token 
>>>>>> and
>>>>>> resume work on that batch.  You'll obviously need to have some way to 
>>>>>> keep
>>>>>> track of which records you've already processed, but that's going to be
>>>>>> specific to your implementation.
>>>>>>
>>>>>> Hope that helps!
>>>>>> Jason
>>>>>>
>>>>>> On Tue, Apr 19, 2016 at 4:43 AM Navin Ipe <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Thanks guys.
>>>>>>> I didn't understand "*...spout instances by utilizing Zookeper.*".
>>>>>>> How does one utilize Zookeper? Is it the same as ".setNumTasks(10)" for 
>>>>>>> a
>>>>>>> Spout?
>>>>>>>
>>>>>>> As of now I've set
>>>>>>> config.setNumWorkers(2);
>>>>>>> and
>>>>>>> builder.setSpout("mongoSpout", new MongoSpout()).setNumTasks(2);
>>>>>>>
>>>>>>> I'm able to get spoutID in open() using this.spoutId =
>>>>>>> context.getThisTaskId();
>>>>>>> Strangely, my spoutID always begins with 3 instead of 0.
>>>>>>>
>>>>>>> By partitionID I understand that's the fieldGrouping's id.
>>>>>>>
>>>>>>> Even if I do all this, will the spout's tasks actually be
>>>>>>> distributed across multiple workers? Won't I have to create separate 
>>>>>>> spouts?
>>>>>>> builder.setSpout("mongoSpout1", new MongoSpout());
>>>>>>> builder.setSpout("mongoSpout2", new MongoSpout());
>>>>>>> builder.setSpout("mongoSpout3", new MongoSpout());
>>>>>>> and so on?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Apr 19, 2016 at 11:51 AM, Alexander T <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Coreection - group on partition id
>>>>>>>> On Apr 19, 2016 6:33 AM, "Navin Ipe" <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> I've seen this:
>>>>>>>>> http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html
>>>>>>>>> but it doesn't explain how workers coordinate with each other, so
>>>>>>>>> requesting a bit of clarity.
>>>>>>>>>
>>>>>>>>> I'm considering a situation where I have 2 million rows in MySQL
>>>>>>>>> or MongoDB.
>>>>>>>>>
>>>>>>>>> 1. I want to use a Spout to read the first 1000 rows and send the
>>>>>>>>> processed output to a Bolt. This happens in Worker1.
>>>>>>>>> 2. I want a different instance of the same Spout class to read the
>>>>>>>>> next 1000 rows in parallel with the working of the Spout of 1, then 
>>>>>>>>> send
>>>>>>>>> the processed output to an instance of the same Bolt used in 1. This
>>>>>>>>> happens in Worker2.
>>>>>>>>> 3. Same as 1 and 2, but it happens in Worker 3.
>>>>>>>>> 4. I might setup 10 workers like this.
>>>>>>>>> 5. When all the Bolts in the workers are finished, they send their
>>>>>>>>> outputs to a single Bolt in Worker 11.
>>>>>>>>> 6. The Bolt in Worker 11 writes the processed value to a new MySQL
>>>>>>>>> table.
>>>>>>>>>
>>>>>>>>> *My confusion here is in how to make the database iterations
>>>>>>>>> happen batch by batch, parallelly*. Obviously the database
>>>>>>>>> connection would have to be made in some static class outside the 
>>>>>>>>> workers,
>>>>>>>>> but if workers are started with just "conf.setNumWorkers(2);",
>>>>>>>>> then how do I tell the workers to iterate different rows of the 
>>>>>>>>> database?
>>>>>>>>> Assuming that the workers are running in different machines.
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Regards,
>>>>>>>>> Navin
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Regards,
>>>>>>> Navin
>>>>>>>
>>>>>>
>>>>
>>>>
>>>> --
>>>> Regards,
>>>> Navin
>>>>
>>>
>>>
>>>
>>> --
>>> Regards,
>>> Navin
>>>
>>
>
>
> --
> Regards,
> Navin
>

Reply via email to