We have tried several thing trying to play around with the field grouping
but in vain. Is trident method going to help us resolve this issue? We are
trying to deal with the real time sales data and we see a lot of
discrepancies in our aggregation and this seem to be a major issue that
needs to be resolved.

We have not used trident before so need to research on that as well.

--
Kushan Maskey


On Wed, Jan 21, 2015 at 7:25 AM, Nathan Leung <[email protected]> wrote:

> Your number of fields declared must match the number of fields emitted in
> each tuple. Also if you are doing a grouping by a field name, then yes you
> must declare a field with that name. So in this case you should declare one
> field with name X.  Alternatively, you can change your topology so the
> fields grouping is done on "bytes" and not "X".
> On Jan 21, 2015 8:22 AM, "Kushan Maskey" <
> [email protected]> wrote:
>
>> So are you saying instead of declareOutputFields as bytes we declare it
>> as X? I tried to have both the fields there but storm complained about
>> number of declared fields.
>> On Jan 21, 2015 6:53 AM, Nathan Leung <[email protected]> wrote:
>>
>> If this is bolt 1, you are doing fieldsGrouping on field X but none of
>> your output fields are named X.
>> On Jan 20, 2015 8:37 PM, "Kushan Maskey" <
>> [email protected]> wrote:
>>
>>> Bolts is pretty simple, it get the message and hten updates the data as
>>> I have explained in the earlier email.
>>>
>>> @Override
>>> public synchronized void execute(Tuple input, BasicOutputCollector
>>> collector) {
>>> MyDomainClass domainClass = (MyDomainClass)
>>> input.getValueByField("bytes");
>>>  if(domainClass != null) {
>>>  boolean success = controller.myDataUpdateFunction(domainClass);
>>> if(success) {
>>>  collector.emit(new Values(domainClass));
>>> }
>>> }
>>> }
>>>
>>> @Override
>>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>> declarer.declare(new Fields("bytes"));
>>> }
>>>
>>> Does it sound like I am doing correctly? I am sure there is something I
>>> am not doing it correctly. LMK.
>>>
>>>
>>> --
>>> Kushan Maskey
>>>
>>> On Tue, Jan 20, 2015 at 4:33 PM, Nathan Leung <[email protected]> wrote:
>>>
>>>> I assume Bolt2 in the snippet is the bolt in question? What do
>>>> declareOutputFields and emit in Bolt1 look like?  Are you able to show the
>>>> logic of Bolt2?
>>>> On Jan 20, 2015 5:08 PM, "Kushan Maskey" <
>>>> [email protected]> wrote:
>>>>
>>>>> B1 and B2 are the same bolt but running on 2 separate tasks.
>>>>>
>>>>>
>>>>> Here is the snippet of the topologyBuilder function I have.
>>>>>
>>>>> spout_parallelism_hint = 4;
>>>>> bolt_parallelism_hint = 4;
>>>>>
>>>>> private static void buildTopology(TopologyBuilder builder) {
>>>>> KafkaSpout spout = new
>>>>> KafkaSpout(getSpoutConfig(propMap.get(KAFKA_TOPIC), "ID1"));
>>>>>
>>>>> builder.setSpout(SPOUT_NAME, spout, spout_parallelism_hint);
>>>>>  builder.setBolt("MainBolt", new MainBolt(),
>>>>> bolt_parallelism_hint).shuffleGrouping(SPOUT_NAME);
>>>>> builder.setBolt("B1", new Bolt1(),
>>>>> bolt_parallelism_hint).shuffleGrouping(MainBolt);
>>>>>  // go to store sales bolts first
>>>>> builder.setBolt("B2", new Bolt2(),
>>>>> bolt_parallelism_hint).fieldsGrouping(B1, new Fields("X"));
>>>>>  // split on assoc, dept and vendor
>>>>> builder.setBolt("B3", new Bolt3(),
>>>>> bolt_parallelism_hint).shuffleGrouping(B2);
>>>>> }
>>>>> I got bunch of other bolts pretty much doing the same thing as above.
>>>>>
>>>>> LMK if that is sufficient. Thanks.
>>>>>
>>>>>
>>>>> --
>>>>> Kushan Maskey
>>>>>
>>>>> On Tue, Jan 20, 2015 at 3:45 PM, Nathan Leung <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Actually I thought about it and you should not have to do
>>>>>> fieldsGrouping on both X and Y; one should be sufficient.  In your 
>>>>>> original
>>>>>> email, are B1 and B2 the same bolt, but different tasks, or are they
>>>>>> different bolts entirely?  As Harsha pointed out, it may help if you give
>>>>>> more details of how your topology is constructed.
>>>>>>
>>>>>> On Tue, Jan 20, 2015 at 4:42 PM, Kushan Maskey <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> I am only fieldGrouping on X and not Y. Is it necessary to
>>>>>>> fieldGroup by both the fields? Is there any sample document I can look 
>>>>>>> at?
>>>>>>> Thanks.
>>>>>>>
>>>>>>> --
>>>>>>> Kushan Maskey
>>>>>>> 817.403.7500
>>>>>>> M. Miller & Associates <http://mmillerassociates.com/>
>>>>>>> [email protected]
>>>>>>>
>>>>>>> On Tue, Jan 20, 2015 at 3:14 PM, Nathan Leung <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> which fields are you doing fieldsGrouping on?  If you do fields
>>>>>>>> grouping on X and Y, why are you having a race condition in a separate 
>>>>>>>> bolt
>>>>>>>> task?  Each X and Y combo should always go to the same bolt task with
>>>>>>>> fieldsGrouping, and the scenario you describe should work properly 
>>>>>>>> whether
>>>>>>>> you have 1 task, 4 tasks, or 100 tasks.
>>>>>>>>
>>>>>>>> On Tue, Jan 20, 2015 at 4:11 PM, Kushan Maskey <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Not at the moment. We have been using KafkaSpout for all the other
>>>>>>>>> projects but have not looked into using trident. How would it help 
>>>>>>>>> resolve
>>>>>>>>> the issue we are facing at the moment. We also need to keep in mind 
>>>>>>>>> the
>>>>>>>>> development time it would take to implement triedent. While 
>>>>>>>>> KafkaSpout has
>>>>>>>>> been working fine with all the other projects.
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Kushan Maskey
>>>>>>>>>
>>>>>>>>> On Tue, Jan 20, 2015 at 3:05 PM, Rajiv Onat <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Seems like stateful processing, have you looked at using trident ?
>>>>>>>>>>
>>>>>>>>>> -Rajiv
>>>>>>>>>>
>>>>>>>>>> On Jan 20, 2015, at 12:26 PM, Kushan Maskey <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>> Thanks Keith and Itai,
>>>>>>>>>>
>>>>>>>>>> We are using fieldGrouping. Initially we were using
>>>>>>>>>> suffleGrouping, we saw this problem and then moved to fieldGrouping, 
>>>>>>>>>> with
>>>>>>>>>> better result, until now. I am thinking due to bolts parallelism 
>>>>>>>>>> which we
>>>>>>>>>> have set it to 4, is the culprit here. My understanding of 
>>>>>>>>>> parallelism is
>>>>>>>>>> threading, correct me if I am not incorrect.
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Kushan Maskey
>>>>>>>>>>
>>>>>>>>>> On Tue, Jan 20, 2015 at 1:03 PM, Itai Frenkel <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>>  Hello,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>  Are you familiar with field grouping ? The idea is that the
>>>>>>>>>>> same bolt instance would always update the value of a specific key 
>>>>>>>>>>> (similar
>>>>>>>>>>> to web load balancer cookie stickiness).
>>>>>>>>>>>
>>>>>>>>>>> https://storm.apache.org/documentation/Concepts.html
>>>>>>>>>>>
>>>>>>>>>>> *"Fields grouping**: The stream is partitioned by the fields
>>>>>>>>>>> specified in the grouping. For example, if the stream is grouped by 
>>>>>>>>>>> the
>>>>>>>>>>> "user-id" field, tuples with the same "user-id" will always go to 
>>>>>>>>>>> the same
>>>>>>>>>>> task, but tuples with different "user-id"'s may go to different 
>>>>>>>>>>> tasks."*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>  ​Itai
>>>>>>>>>>>
>>>>>>>>>>>  ------------------------------
>>>>>>>>>>>
>>>>>>>>>>> *From:* Kushan Maskey <[email protected]>
>>>>>>>>>>> *Sent:* Tuesday, January 20, 2015 8:55 PM
>>>>>>>>>>> *To:* [email protected]
>>>>>>>>>>> *Subject:* URGENT!! Race condition
>>>>>>>>>>>
>>>>>>>>>>>  We are having a major issue trying to update Cassandra
>>>>>>>>>>> database where we see race condition in a bolt.
>>>>>>>>>>>
>>>>>>>>>>>  Here is an example,
>>>>>>>>>>>
>>>>>>>>>>>  I have a columnfamily, where i have 2 partitioning columns say
>>>>>>>>>>> X and Y. There is another columns Z which basically aggregated 
>>>>>>>>>>> number. We
>>>>>>>>>>> are suppose to update Z based on X and Y. Storm is reading a huge 
>>>>>>>>>>> volume of
>>>>>>>>>>> data from Kafka. When sport receives a message, first bolt reads the
>>>>>>>>>>> database for that combination of X and Y and get the value of Z. 
>>>>>>>>>>> Then it
>>>>>>>>>>> updates the value Z and store it back into the database. Bolt 
>>>>>>>>>>> parallelism
>>>>>>>>>>> is set to be 4 which mean 4 instances of bolt are trying to update 
>>>>>>>>>>> the
>>>>>>>>>>> database. So when first bolt (B1) read the value of Z to be say 
>>>>>>>>>>> 100, same
>>>>>>>>>>> time the second bolt (B2) also read it to be 100, but once B1 
>>>>>>>>>>> completed
>>>>>>>>>>> execution and the value of Z is now 150, B2 still has 100 so the 
>>>>>>>>>>> value of Z
>>>>>>>>>>> is out of sync.
>>>>>>>>>>>
>>>>>>>>>>>  How can we prevent the race condition like this? This is
>>>>>>>>>>> causing a major nuisance to us.
>>>>>>>>>>>
>>>>>>>>>>>  Any help is highly appreciated. Thanks.
>>>>>>>>>>>
>>>>>>>>>>>    --
>>>>>>>>>>> Kushan Maskey
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>

Reply via email to