B1 and B2 are the same bolt but running on 2 separate tasks.

Here is the snippet of the topologyBuilder function I have.

spout_parallelism_hint = 4;
bolt_parallelism_hint = 4;

private static void buildTopology(TopologyBuilder builder) {
KafkaSpout spout = new KafkaSpout(getSpoutConfig(propMap.get(KAFKA_TOPIC),
"ID1"));

builder.setSpout(SPOUT_NAME, spout, spout_parallelism_hint);
 builder.setBolt("MainBolt", new MainBolt(),
bolt_parallelism_hint).shuffleGrouping(SPOUT_NAME);
builder.setBolt("B1", new Bolt1(),
bolt_parallelism_hint).shuffleGrouping(MainBolt);
 // go to store sales bolts first
builder.setBolt("B2", new Bolt2(),
bolt_parallelism_hint).fieldsGrouping(B1, new Fields("X"));
 // split on assoc, dept and vendor
builder.setBolt("B3", new Bolt3(),
bolt_parallelism_hint).shuffleGrouping(B2);
}
I got bunch of other bolts pretty much doing the same thing as above.

LMK if that is sufficient. Thanks.


--
Kushan Maskey

On Tue, Jan 20, 2015 at 3:45 PM, Nathan Leung <[email protected]> wrote:

> Actually I thought about it and you should not have to do fieldsGrouping
> on both X and Y; one should be sufficient.  In your original email, are B1
> and B2 the same bolt, but different tasks, or are they different bolts
> entirely?  As Harsha pointed out, it may help if you give more details of
> how your topology is constructed.
>
> On Tue, Jan 20, 2015 at 4:42 PM, Kushan Maskey <
> [email protected]> wrote:
>
>> I am only fieldGrouping on X and not Y. Is it necessary to fieldGroup by
>> both the fields? Is there any sample document I can look at? Thanks.
>>
>> --
>> Kushan Maskey
>> 817.403.7500
>> M. Miller & Associates <http://mmillerassociates.com/>
>> [email protected]
>>
>> On Tue, Jan 20, 2015 at 3:14 PM, Nathan Leung <[email protected]> wrote:
>>
>>> which fields are you doing fieldsGrouping on?  If you do fields grouping
>>> on X and Y, why are you having a race condition in a separate bolt task?
>>> Each X and Y combo should always go to the same bolt task with
>>> fieldsGrouping, and the scenario you describe should work properly whether
>>> you have 1 task, 4 tasks, or 100 tasks.
>>>
>>> On Tue, Jan 20, 2015 at 4:11 PM, Kushan Maskey <
>>> [email protected]> wrote:
>>>
>>>> Not at the moment. We have been using KafkaSpout for all the other
>>>> projects but have not looked into using trident. How would it help resolve
>>>> the issue we are facing at the moment. We also need to keep in mind the
>>>> development time it would take to implement triedent. While KafkaSpout has
>>>> been working fine with all the other projects.
>>>>
>>>> --
>>>> Kushan Maskey
>>>>
>>>> On Tue, Jan 20, 2015 at 3:05 PM, Rajiv Onat <[email protected]> wrote:
>>>>
>>>>> Seems like stateful processing, have you looked at using trident ?
>>>>>
>>>>> -Rajiv
>>>>>
>>>>> On Jan 20, 2015, at 12:26 PM, Kushan Maskey <
>>>>> [email protected]> wrote:
>>>>>
>>>>> Thanks Keith and Itai,
>>>>>
>>>>> We are using fieldGrouping. Initially we were using suffleGrouping, we
>>>>> saw this problem and then moved to fieldGrouping, with better result, 
>>>>> until
>>>>> now. I am thinking due to bolts parallelism which we have set it to 4, is
>>>>> the culprit here. My understanding of parallelism is threading, correct me
>>>>> if I am not incorrect.
>>>>>
>>>>> --
>>>>> Kushan Maskey
>>>>>
>>>>> On Tue, Jan 20, 2015 at 1:03 PM, Itai Frenkel <[email protected]> wrote:
>>>>>
>>>>>>  Hello,
>>>>>>
>>>>>>
>>>>>>  Are you familiar with field grouping ? The idea is that the same
>>>>>> bolt instance would always update the value of a specific key (similar to
>>>>>> web load balancer cookie stickiness).
>>>>>>
>>>>>> https://storm.apache.org/documentation/Concepts.html
>>>>>>
>>>>>> *"Fields grouping**: The stream is partitioned by the fields
>>>>>> specified in the grouping. For example, if the stream is grouped by the
>>>>>> "user-id" field, tuples with the same "user-id" will always go to the 
>>>>>> same
>>>>>> task, but tuples with different "user-id"'s may go to different tasks."*
>>>>>>
>>>>>>
>>>>>>  ​Itai
>>>>>>
>>>>>>  ------------------------------
>>>>>>
>>>>>> *From:* Kushan Maskey <[email protected]>
>>>>>> *Sent:* Tuesday, January 20, 2015 8:55 PM
>>>>>> *To:* [email protected]
>>>>>> *Subject:* URGENT!! Race condition
>>>>>>
>>>>>>  We are having a major issue trying to update Cassandra database
>>>>>> where we see race condition in a bolt.
>>>>>>
>>>>>>  Here is an example,
>>>>>>
>>>>>>  I have a columnfamily, where i have 2 partitioning columns say X
>>>>>> and Y. There is another columns Z which basically aggregated number. We 
>>>>>> are
>>>>>> suppose to update Z based on X and Y. Storm is reading a huge volume of
>>>>>> data from Kafka. When sport receives a message, first bolt reads the
>>>>>> database for that combination of X and Y and get the value of Z. Then it
>>>>>> updates the value Z and store it back into the database. Bolt parallelism
>>>>>> is set to be 4 which mean 4 instances of bolt are trying to update the
>>>>>> database. So when first bolt (B1) read the value of Z to be say 100, same
>>>>>> time the second bolt (B2) also read it to be 100, but once B1 completed
>>>>>> execution and the value of Z is now 150, B2 still has 100 so the value 
>>>>>> of Z
>>>>>> is out of sync.
>>>>>>
>>>>>>  How can we prevent the race condition like this? This is causing a
>>>>>> major nuisance to us.
>>>>>>
>>>>>>  Any help is highly appreciated. Thanks.
>>>>>>
>>>>>>    --
>>>>>> Kushan Maskey
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to