Bolts is pretty simple, it get the message and hten updates the data as I
have explained in the earlier email.

@Override
public synchronized void execute(Tuple input, BasicOutputCollector
collector) {
MyDomainClass domainClass = (MyDomainClass) input.getValueByField("bytes");
 if(domainClass != null) {
 boolean success = controller.myDataUpdateFunction(domainClass);
if(success) {
 collector.emit(new Values(domainClass));
}
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("bytes"));
}

Does it sound like I am doing correctly? I am sure there is something I am
not doing it correctly. LMK.


--
Kushan Maskey

On Tue, Jan 20, 2015 at 4:33 PM, Nathan Leung <[email protected]> wrote:

> I assume Bolt2 in the snippet is the bolt in question? What do
> declareOutputFields and emit in Bolt1 look like?  Are you able to show the
> logic of Bolt2?
> On Jan 20, 2015 5:08 PM, "Kushan Maskey" <
> [email protected]> wrote:
>
>> B1 and B2 are the same bolt but running on 2 separate tasks.
>>
>>
>> Here is the snippet of the topologyBuilder function I have.
>>
>> spout_parallelism_hint = 4;
>> bolt_parallelism_hint = 4;
>>
>> private static void buildTopology(TopologyBuilder builder) {
>> KafkaSpout spout = new
>> KafkaSpout(getSpoutConfig(propMap.get(KAFKA_TOPIC), "ID1"));
>>
>> builder.setSpout(SPOUT_NAME, spout, spout_parallelism_hint);
>>  builder.setBolt("MainBolt", new MainBolt(),
>> bolt_parallelism_hint).shuffleGrouping(SPOUT_NAME);
>> builder.setBolt("B1", new Bolt1(),
>> bolt_parallelism_hint).shuffleGrouping(MainBolt);
>>  // go to store sales bolts first
>> builder.setBolt("B2", new Bolt2(),
>> bolt_parallelism_hint).fieldsGrouping(B1, new Fields("X"));
>>  // split on assoc, dept and vendor
>> builder.setBolt("B3", new Bolt3(),
>> bolt_parallelism_hint).shuffleGrouping(B2);
>> }
>> I got bunch of other bolts pretty much doing the same thing as above.
>>
>> LMK if that is sufficient. Thanks.
>>
>>
>> --
>> Kushan Maskey
>>
>> On Tue, Jan 20, 2015 at 3:45 PM, Nathan Leung <[email protected]> wrote:
>>
>>> Actually I thought about it and you should not have to do fieldsGrouping
>>> on both X and Y; one should be sufficient.  In your original email, are B1
>>> and B2 the same bolt, but different tasks, or are they different bolts
>>> entirely?  As Harsha pointed out, it may help if you give more details of
>>> how your topology is constructed.
>>>
>>> On Tue, Jan 20, 2015 at 4:42 PM, Kushan Maskey <
>>> [email protected]> wrote:
>>>
>>>> I am only fieldGrouping on X and not Y. Is it necessary to fieldGroup
>>>> by both the fields? Is there any sample document I can look at? Thanks.
>>>>
>>>> --
>>>> Kushan Maskey
>>>> 817.403.7500
>>>> M. Miller & Associates <http://mmillerassociates.com/>
>>>> [email protected]
>>>>
>>>> On Tue, Jan 20, 2015 at 3:14 PM, Nathan Leung <[email protected]>
>>>> wrote:
>>>>
>>>>> which fields are you doing fieldsGrouping on?  If you do fields
>>>>> grouping on X and Y, why are you having a race condition in a separate 
>>>>> bolt
>>>>> task?  Each X and Y combo should always go to the same bolt task with
>>>>> fieldsGrouping, and the scenario you describe should work properly whether
>>>>> you have 1 task, 4 tasks, or 100 tasks.
>>>>>
>>>>> On Tue, Jan 20, 2015 at 4:11 PM, Kushan Maskey <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Not at the moment. We have been using KafkaSpout for all the other
>>>>>> projects but have not looked into using trident. How would it help 
>>>>>> resolve
>>>>>> the issue we are facing at the moment. We also need to keep in mind the
>>>>>> development time it would take to implement triedent. While KafkaSpout 
>>>>>> has
>>>>>> been working fine with all the other projects.
>>>>>>
>>>>>> --
>>>>>> Kushan Maskey
>>>>>>
>>>>>> On Tue, Jan 20, 2015 at 3:05 PM, Rajiv Onat <[email protected]> wrote:
>>>>>>
>>>>>>> Seems like stateful processing, have you looked at using trident ?
>>>>>>>
>>>>>>> -Rajiv
>>>>>>>
>>>>>>> On Jan 20, 2015, at 12:26 PM, Kushan Maskey <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>> Thanks Keith and Itai,
>>>>>>>
>>>>>>> We are using fieldGrouping. Initially we were using suffleGrouping,
>>>>>>> we saw this problem and then moved to fieldGrouping, with better result,
>>>>>>> until now. I am thinking due to bolts parallelism which we have set it 
>>>>>>> to
>>>>>>> 4, is the culprit here. My understanding of parallelism is threading,
>>>>>>> correct me if I am not incorrect.
>>>>>>>
>>>>>>> --
>>>>>>> Kushan Maskey
>>>>>>>
>>>>>>> On Tue, Jan 20, 2015 at 1:03 PM, Itai Frenkel <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>  Hello,
>>>>>>>>
>>>>>>>>
>>>>>>>>  Are you familiar with field grouping ? The idea is that the same
>>>>>>>> bolt instance would always update the value of a specific key (similar 
>>>>>>>> to
>>>>>>>> web load balancer cookie stickiness).
>>>>>>>>
>>>>>>>> https://storm.apache.org/documentation/Concepts.html
>>>>>>>>
>>>>>>>> *"Fields grouping**: The stream is partitioned by the fields
>>>>>>>> specified in the grouping. For example, if the stream is grouped by the
>>>>>>>> "user-id" field, tuples with the same "user-id" will always go to the 
>>>>>>>> same
>>>>>>>> task, but tuples with different "user-id"'s may go to different 
>>>>>>>> tasks."*
>>>>>>>>
>>>>>>>>
>>>>>>>>  ​Itai
>>>>>>>>
>>>>>>>>  ------------------------------
>>>>>>>>
>>>>>>>> *From:* Kushan Maskey <[email protected]>
>>>>>>>> *Sent:* Tuesday, January 20, 2015 8:55 PM
>>>>>>>> *To:* [email protected]
>>>>>>>> *Subject:* URGENT!! Race condition
>>>>>>>>
>>>>>>>>  We are having a major issue trying to update Cassandra database
>>>>>>>> where we see race condition in a bolt.
>>>>>>>>
>>>>>>>>  Here is an example,
>>>>>>>>
>>>>>>>>  I have a columnfamily, where i have 2 partitioning columns say X
>>>>>>>> and Y. There is another columns Z which basically aggregated number. 
>>>>>>>> We are
>>>>>>>> suppose to update Z based on X and Y. Storm is reading a huge volume of
>>>>>>>> data from Kafka. When sport receives a message, first bolt reads the
>>>>>>>> database for that combination of X and Y and get the value of Z. Then 
>>>>>>>> it
>>>>>>>> updates the value Z and store it back into the database. Bolt 
>>>>>>>> parallelism
>>>>>>>> is set to be 4 which mean 4 instances of bolt are trying to update the
>>>>>>>> database. So when first bolt (B1) read the value of Z to be say 100, 
>>>>>>>> same
>>>>>>>> time the second bolt (B2) also read it to be 100, but once B1 completed
>>>>>>>> execution and the value of Z is now 150, B2 still has 100 so the value 
>>>>>>>> of Z
>>>>>>>> is out of sync.
>>>>>>>>
>>>>>>>>  How can we prevent the race condition like this? This is causing
>>>>>>>> a major nuisance to us.
>>>>>>>>
>>>>>>>>  Any help is highly appreciated. Thanks.
>>>>>>>>
>>>>>>>>    --
>>>>>>>> Kushan Maskey
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to