Navin, my Zookeeper solution requires you to initiate your own connection to zookeeper. Storm includes zookeeper, but it doesn't expose it in any pretty way to the topologies. In my case, I just used the zookeeper client library to initiate my own connection and store the information under my own folder in zookeeper.
http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#_introduction On Wed, Apr 20, 2016 at 7:33 AM Navin Ipe <[email protected]> wrote: > In this case the values coming from the bolts can all be put() or updated > into a single large hashmap in the Bolt in Worker11. So no need of > aggregation. > If there is no standard way in Storm for spouts and bolts to notify each > other that there is nothing more for them to process, then I guess I'll > just send tuples with "null" in them so that the Bolt in Worker11 will know > the processing is over. > > I just hope what y'all said about tuples being automatically passed > between workers, actually works without any problems :-) > Thanks a lot for all your help! > > On Wed, Apr 20, 2016 at 12:06 PM, Jungtaek Lim <[email protected]> wrote: > >> Navin, >> >> I think this two lines are not cleared so I may have misunderstand. >> >> 5. When all the Bolts in the workers are finished, they send their >> outputs to a single Bolt in Worker 11. >> 6. The Bolt in Worker 11 writes the processed value to a new MySQL table. >> >> If you don't need to aggregate (I mean join) the results from Bolt in >> Worker1~10 to Bolt in Worker11, no need to use Trident or Windowing. >> >> 2016년 4월 20일 (수) 오후 3:28, Navin Ipe <[email protected]>님이 >> 작성: >> >>> @Jungtaek: This person ( >>> http://stackoverflow.com/questions/21480773/can-twitter-storm-worker-get-only-parts-of-the-topology) >>> claims that Storm would automatically manage the flow of data between >>> spouts and blots on different workers. Can anyone confirm this? If this is >>> the case, I won't have to bother using Trident. >>> >>> On Wed, Apr 20, 2016 at 8:59 AM, Navin Ipe < >>> [email protected]> wrote: >>> >>>> @Jason: Thanks. Tried searching for Storm code which starts Ephemeral >>>> nodes, but couldn't find it. (am new to Hadoop and Storm, so perhaps I was >>>> searching for the wrong thing) >>>> >>>> @Jungtaek: Will explore component tasks. Meanwhile, I had considered >>>> Trident, but didn't go ahead because it was not clear how I could implement >>>> multiple spouts in Trident, where each spout would iterate a certain number >>>> of rows of a database. Any idea how that could happen. >>>> >>>> On Tue, Apr 19, 2016 at 6:39 PM, Jungtaek Lim <[email protected]> >>>> wrote: >>>> >>>>> There's other idea without relying on Zookeeper : use ordinal of task >>>>> id between same components (spout) >>>>> >>>>> Task id is issued across all tasks including system tasks so you can't >>>>> assume spout tasks are having task id sequentially, but whatever you can >>>>> do >>>>> the trick - check "ordinal" of this spout task's id around same spouts. >>>>> Please refer GeneralTopologyContext.getComponentTasks(String >>>>> componentId). >>>>> >>>>> Btw, Spout1 -> Bolt2 can be done with various ways but it would not be >>>>> easy to aggregate the results of Bolt2 from Bolt3. >>>>> You should consider windowing by processed time or Trident or maintain >>>>> your own buffers. >>>>> >>>>> Hope this helps. >>>>> >>>>> Thanks, >>>>> Jungtaek Lim (HeartSaVioR) >>>>> >>>>> 2016년 4월 19일 (화) 오후 10:02, Jason Kusar <[email protected]>님이 작성: >>>>> >>>>>> Hi, >>>>>> >>>>>> I've done a similar thing before with the exception that I was >>>>>> reading from Cassandra. The concept is the same though. Assuming you >>>>>> know >>>>>> that you have 10,000 records and you want each spout to read 1,000 of >>>>>> them, >>>>>> then you would launch 10 instances of the spouts. The first thing they >>>>>> do >>>>>> during init is to connect to zookeeper and create an ephemeral node ( >>>>>> http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes) >>>>>> starting with one called '0'. If 0 already exists, you'll get an >>>>>> exception >>>>>> which means you try to create '1' and so on until you successfully >>>>>> create a >>>>>> node. That tells you which batch of records that instance of the spout >>>>>> is >>>>>> responsible for. I.e., if you successfully created '3', then this spout >>>>>> needs to set its offset to 3,000. >>>>>> >>>>>> The reason for using ephemeral nodes is that they are automatically >>>>>> deleted if the zookeeper client disconnects. That way if a spout >>>>>> crashes, >>>>>> once Storm relaunches the spout, it will be able to re-claim that token >>>>>> and >>>>>> resume work on that batch. You'll obviously need to have some way to >>>>>> keep >>>>>> track of which records you've already processed, but that's going to be >>>>>> specific to your implementation. >>>>>> >>>>>> Hope that helps! >>>>>> Jason >>>>>> >>>>>> On Tue, Apr 19, 2016 at 4:43 AM Navin Ipe < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Thanks guys. >>>>>>> I didn't understand "*...spout instances by utilizing Zookeper.*". >>>>>>> How does one utilize Zookeper? Is it the same as ".setNumTasks(10)" for >>>>>>> a >>>>>>> Spout? >>>>>>> >>>>>>> As of now I've set >>>>>>> config.setNumWorkers(2); >>>>>>> and >>>>>>> builder.setSpout("mongoSpout", new MongoSpout()).setNumTasks(2); >>>>>>> >>>>>>> I'm able to get spoutID in open() using this.spoutId = >>>>>>> context.getThisTaskId(); >>>>>>> Strangely, my spoutID always begins with 3 instead of 0. >>>>>>> >>>>>>> By partitionID I understand that's the fieldGrouping's id. >>>>>>> >>>>>>> Even if I do all this, will the spout's tasks actually be >>>>>>> distributed across multiple workers? Won't I have to create separate >>>>>>> spouts? >>>>>>> builder.setSpout("mongoSpout1", new MongoSpout()); >>>>>>> builder.setSpout("mongoSpout2", new MongoSpout()); >>>>>>> builder.setSpout("mongoSpout3", new MongoSpout()); >>>>>>> and so on? >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Apr 19, 2016 at 11:51 AM, Alexander T < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Coreection - group on partition id >>>>>>>> On Apr 19, 2016 6:33 AM, "Navin Ipe" < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> I've seen this: >>>>>>>>> http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html >>>>>>>>> but it doesn't explain how workers coordinate with each other, so >>>>>>>>> requesting a bit of clarity. >>>>>>>>> >>>>>>>>> I'm considering a situation where I have 2 million rows in MySQL >>>>>>>>> or MongoDB. >>>>>>>>> >>>>>>>>> 1. I want to use a Spout to read the first 1000 rows and send the >>>>>>>>> processed output to a Bolt. This happens in Worker1. >>>>>>>>> 2. I want a different instance of the same Spout class to read the >>>>>>>>> next 1000 rows in parallel with the working of the Spout of 1, then >>>>>>>>> send >>>>>>>>> the processed output to an instance of the same Bolt used in 1. This >>>>>>>>> happens in Worker2. >>>>>>>>> 3. Same as 1 and 2, but it happens in Worker 3. >>>>>>>>> 4. I might setup 10 workers like this. >>>>>>>>> 5. When all the Bolts in the workers are finished, they send their >>>>>>>>> outputs to a single Bolt in Worker 11. >>>>>>>>> 6. The Bolt in Worker 11 writes the processed value to a new MySQL >>>>>>>>> table. >>>>>>>>> >>>>>>>>> *My confusion here is in how to make the database iterations >>>>>>>>> happen batch by batch, parallelly*. Obviously the database >>>>>>>>> connection would have to be made in some static class outside the >>>>>>>>> workers, >>>>>>>>> but if workers are started with just "conf.setNumWorkers(2);", >>>>>>>>> then how do I tell the workers to iterate different rows of the >>>>>>>>> database? >>>>>>>>> Assuming that the workers are running in different machines. >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Regards, >>>>>>>>> Navin >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Regards, >>>>>>> Navin >>>>>>> >>>>>> >>>> >>>> >>>> -- >>>> Regards, >>>> Navin >>>> >>> >>> >>> >>> -- >>> Regards, >>> Navin >>> >> > > > -- > Regards, > Navin >
