If this is bolt 1, you are doing fieldsGrouping on field X but none of your
output fields are named X.
On Jan 20, 2015 8:37 PM, "Kushan Maskey" <
[email protected]> wrote:

> Bolts is pretty simple, it get the message and hten updates the data as I
> have explained in the earlier email.
>
> @Override
> public synchronized void execute(Tuple input, BasicOutputCollector
> collector) {
> MyDomainClass domainClass = (MyDomainClass) input.getValueByField("bytes");
>  if(domainClass != null) {
>  boolean success = controller.myDataUpdateFunction(domainClass);
> if(success) {
>  collector.emit(new Values(domainClass));
> }
> }
> }
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("bytes"));
> }
>
> Does it sound like I am doing correctly? I am sure there is something I am
> not doing it correctly. LMK.
>
>
> --
> Kushan Maskey
>
> On Tue, Jan 20, 2015 at 4:33 PM, Nathan Leung <[email protected]> wrote:
>
>> I assume Bolt2 in the snippet is the bolt in question? What do
>> declareOutputFields and emit in Bolt1 look like?  Are you able to show the
>> logic of Bolt2?
>> On Jan 20, 2015 5:08 PM, "Kushan Maskey" <
>> [email protected]> wrote:
>>
>>> B1 and B2 are the same bolt but running on 2 separate tasks.
>>>
>>>
>>> Here is the snippet of the topologyBuilder function I have.
>>>
>>> spout_parallelism_hint = 4;
>>> bolt_parallelism_hint = 4;
>>>
>>> private static void buildTopology(TopologyBuilder builder) {
>>> KafkaSpout spout = new
>>> KafkaSpout(getSpoutConfig(propMap.get(KAFKA_TOPIC), "ID1"));
>>>
>>> builder.setSpout(SPOUT_NAME, spout, spout_parallelism_hint);
>>>  builder.setBolt("MainBolt", new MainBolt(),
>>> bolt_parallelism_hint).shuffleGrouping(SPOUT_NAME);
>>> builder.setBolt("B1", new Bolt1(),
>>> bolt_parallelism_hint).shuffleGrouping(MainBolt);
>>>  // go to store sales bolts first
>>> builder.setBolt("B2", new Bolt2(),
>>> bolt_parallelism_hint).fieldsGrouping(B1, new Fields("X"));
>>>  // split on assoc, dept and vendor
>>> builder.setBolt("B3", new Bolt3(),
>>> bolt_parallelism_hint).shuffleGrouping(B2);
>>> }
>>> I got bunch of other bolts pretty much doing the same thing as above.
>>>
>>> LMK if that is sufficient. Thanks.
>>>
>>>
>>> --
>>> Kushan Maskey
>>>
>>> On Tue, Jan 20, 2015 at 3:45 PM, Nathan Leung <[email protected]> wrote:
>>>
>>>> Actually I thought about it and you should not have to do
>>>> fieldsGrouping on both X and Y; one should be sufficient.  In your original
>>>> email, are B1 and B2 the same bolt, but different tasks, or are they
>>>> different bolts entirely?  As Harsha pointed out, it may help if you give
>>>> more details of how your topology is constructed.
>>>>
>>>> On Tue, Jan 20, 2015 at 4:42 PM, Kushan Maskey <
>>>> [email protected]> wrote:
>>>>
>>>>> I am only fieldGrouping on X and not Y. Is it necessary to fieldGroup
>>>>> by both the fields? Is there any sample document I can look at? Thanks.
>>>>>
>>>>> --
>>>>> Kushan Maskey
>>>>> 817.403.7500
>>>>> M. Miller & Associates <http://mmillerassociates.com/>
>>>>> [email protected]
>>>>>
>>>>> On Tue, Jan 20, 2015 at 3:14 PM, Nathan Leung <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> which fields are you doing fieldsGrouping on?  If you do fields
>>>>>> grouping on X and Y, why are you having a race condition in a separate 
>>>>>> bolt
>>>>>> task?  Each X and Y combo should always go to the same bolt task with
>>>>>> fieldsGrouping, and the scenario you describe should work properly 
>>>>>> whether
>>>>>> you have 1 task, 4 tasks, or 100 tasks.
>>>>>>
>>>>>> On Tue, Jan 20, 2015 at 4:11 PM, Kushan Maskey <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Not at the moment. We have been using KafkaSpout for all the other
>>>>>>> projects but have not looked into using trident. How would it help 
>>>>>>> resolve
>>>>>>> the issue we are facing at the moment. We also need to keep in mind the
>>>>>>> development time it would take to implement triedent. While KafkaSpout 
>>>>>>> has
>>>>>>> been working fine with all the other projects.
>>>>>>>
>>>>>>> --
>>>>>>> Kushan Maskey
>>>>>>>
>>>>>>> On Tue, Jan 20, 2015 at 3:05 PM, Rajiv Onat <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Seems like stateful processing, have you looked at using trident ?
>>>>>>>>
>>>>>>>> -Rajiv
>>>>>>>>
>>>>>>>> On Jan 20, 2015, at 12:26 PM, Kushan Maskey <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>> Thanks Keith and Itai,
>>>>>>>>
>>>>>>>> We are using fieldGrouping. Initially we were using suffleGrouping,
>>>>>>>> we saw this problem and then moved to fieldGrouping, with better 
>>>>>>>> result,
>>>>>>>> until now. I am thinking due to bolts parallelism which we have set it 
>>>>>>>> to
>>>>>>>> 4, is the culprit here. My understanding of parallelism is threading,
>>>>>>>> correct me if I am not incorrect.
>>>>>>>>
>>>>>>>> --
>>>>>>>> Kushan Maskey
>>>>>>>>
>>>>>>>> On Tue, Jan 20, 2015 at 1:03 PM, Itai Frenkel <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>>  Hello,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>  Are you familiar with field grouping ? The idea is that the same
>>>>>>>>> bolt instance would always update the value of a specific key 
>>>>>>>>> (similar to
>>>>>>>>> web load balancer cookie stickiness).
>>>>>>>>>
>>>>>>>>> https://storm.apache.org/documentation/Concepts.html
>>>>>>>>>
>>>>>>>>> *"Fields grouping**: The stream is partitioned by the fields
>>>>>>>>> specified in the grouping. For example, if the stream is grouped by 
>>>>>>>>> the
>>>>>>>>> "user-id" field, tuples with the same "user-id" will always go to the 
>>>>>>>>> same
>>>>>>>>> task, but tuples with different "user-id"'s may go to different 
>>>>>>>>> tasks."*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>  ​Itai
>>>>>>>>>
>>>>>>>>>  ------------------------------
>>>>>>>>>
>>>>>>>>> *From:* Kushan Maskey <[email protected]>
>>>>>>>>> *Sent:* Tuesday, January 20, 2015 8:55 PM
>>>>>>>>> *To:* [email protected]
>>>>>>>>> *Subject:* URGENT!! Race condition
>>>>>>>>>
>>>>>>>>>  We are having a major issue trying to update Cassandra database
>>>>>>>>> where we see race condition in a bolt.
>>>>>>>>>
>>>>>>>>>  Here is an example,
>>>>>>>>>
>>>>>>>>>  I have a columnfamily, where i have 2 partitioning columns say X
>>>>>>>>> and Y. There is another columns Z which basically aggregated number. 
>>>>>>>>> We are
>>>>>>>>> suppose to update Z based on X and Y. Storm is reading a huge volume 
>>>>>>>>> of
>>>>>>>>> data from Kafka. When sport receives a message, first bolt reads the
>>>>>>>>> database for that combination of X and Y and get the value of Z. Then 
>>>>>>>>> it
>>>>>>>>> updates the value Z and store it back into the database. Bolt 
>>>>>>>>> parallelism
>>>>>>>>> is set to be 4 which mean 4 instances of bolt are trying to update the
>>>>>>>>> database. So when first bolt (B1) read the value of Z to be say 100, 
>>>>>>>>> same
>>>>>>>>> time the second bolt (B2) also read it to be 100, but once B1 
>>>>>>>>> completed
>>>>>>>>> execution and the value of Z is now 150, B2 still has 100 so the 
>>>>>>>>> value of Z
>>>>>>>>> is out of sync.
>>>>>>>>>
>>>>>>>>>  How can we prevent the race condition like this? This is causing
>>>>>>>>> a major nuisance to us.
>>>>>>>>>
>>>>>>>>>  Any help is highly appreciated. Thanks.
>>>>>>>>>
>>>>>>>>>    --
>>>>>>>>> Kushan Maskey
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Reply via email to