@Jason: Thanks. Tried searching for Storm code which starts Ephemeral nodes, but couldn't find it. (am new to Hadoop and Storm, so perhaps I was searching for the wrong thing)
@Jungtaek: Will explore component tasks. Meanwhile, I had considered Trident, but didn't go ahead because it was not clear how I could implement multiple spouts in Trident, where each spout would iterate a certain number of rows of a database. Any idea how that could happen. On Tue, Apr 19, 2016 at 6:39 PM, Jungtaek Lim <[email protected]> wrote: > There's other idea without relying on Zookeeper : use ordinal of task id > between same components (spout) > > Task id is issued across all tasks including system tasks so you can't > assume spout tasks are having task id sequentially, but whatever you can do > the trick - check "ordinal" of this spout task's id around same spouts. > Please refer GeneralTopologyContext.getComponentTasks(String componentId). > > Btw, Spout1 -> Bolt2 can be done with various ways but it would not be > easy to aggregate the results of Bolt2 from Bolt3. > You should consider windowing by processed time or Trident or maintain > your own buffers. > > Hope this helps. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > 2016년 4월 19일 (화) 오후 10:02, Jason Kusar <[email protected]>님이 작성: > >> Hi, >> >> I've done a similar thing before with the exception that I was reading >> from Cassandra. The concept is the same though. Assuming you know that >> you have 10,000 records and you want each spout to read 1,000 of them, then >> you would launch 10 instances of the spouts. The first thing they do >> during init is to connect to zookeeper and create an ephemeral node ( >> http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes) >> starting with one called '0'. If 0 already exists, you'll get an exception >> which means you try to create '1' and so on until you successfully create a >> node. That tells you which batch of records that instance of the spout is >> responsible for. I.e., if you successfully created '3', then this spout >> needs to set its offset to 3,000. >> >> The reason for using ephemeral nodes is that they are automatically >> deleted if the zookeeper client disconnects. That way if a spout crashes, >> once Storm relaunches the spout, it will be able to re-claim that token and >> resume work on that batch. You'll obviously need to have some way to keep >> track of which records you've already processed, but that's going to be >> specific to your implementation. >> >> Hope that helps! >> Jason >> >> On Tue, Apr 19, 2016 at 4:43 AM Navin Ipe < >> [email protected]> wrote: >> >>> Thanks guys. >>> I didn't understand "*...spout instances by utilizing Zookeper.*". How >>> does one utilize Zookeper? Is it the same as ".setNumTasks(10)" for a Spout? >>> >>> As of now I've set >>> config.setNumWorkers(2); >>> and >>> builder.setSpout("mongoSpout", new MongoSpout()).setNumTasks(2); >>> >>> I'm able to get spoutID in open() using this.spoutId = >>> context.getThisTaskId(); >>> Strangely, my spoutID always begins with 3 instead of 0. >>> >>> By partitionID I understand that's the fieldGrouping's id. >>> >>> Even if I do all this, will the spout's tasks actually be distributed >>> across multiple workers? Won't I have to create separate spouts? >>> builder.setSpout("mongoSpout1", new MongoSpout()); >>> builder.setSpout("mongoSpout2", new MongoSpout()); >>> builder.setSpout("mongoSpout3", new MongoSpout()); >>> and so on? >>> >>> >>> >>> On Tue, Apr 19, 2016 at 11:51 AM, Alexander T <[email protected]> >>> wrote: >>> >>>> Coreection - group on partition id >>>> On Apr 19, 2016 6:33 AM, "Navin Ipe" <[email protected]> >>>> wrote: >>>> >>>>> I've seen this: >>>>> http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html >>>>> but it doesn't explain how workers coordinate with each other, so >>>>> requesting a bit of clarity. >>>>> >>>>> I'm considering a situation where I have 2 million rows in MySQL or >>>>> MongoDB. >>>>> >>>>> 1. I want to use a Spout to read the first 1000 rows and send the >>>>> processed output to a Bolt. This happens in Worker1. >>>>> 2. I want a different instance of the same Spout class to read the >>>>> next 1000 rows in parallel with the working of the Spout of 1, then send >>>>> the processed output to an instance of the same Bolt used in 1. This >>>>> happens in Worker2. >>>>> 3. Same as 1 and 2, but it happens in Worker 3. >>>>> 4. I might setup 10 workers like this. >>>>> 5. When all the Bolts in the workers are finished, they send their >>>>> outputs to a single Bolt in Worker 11. >>>>> 6. The Bolt in Worker 11 writes the processed value to a new MySQL >>>>> table. >>>>> >>>>> *My confusion here is in how to make the database iterations happen >>>>> batch by batch, parallelly*. Obviously the database connection would >>>>> have to be made in some static class outside the workers, but if workers >>>>> are started with just "conf.setNumWorkers(2);", then how do I tell >>>>> the workers to iterate different rows of the database? Assuming that the >>>>> workers are running in different machines. >>>>> >>>>> -- >>>>> Regards, >>>>> Navin >>>>> >>>> >>> >>> >>> -- >>> Regards, >>> Navin >>> >> -- Regards, Navin
