Hi,

I prefer all answers to be posed via e-mail as I'm working at a customer
office... and it is not nice to them being at the phone.

On Wed, Jan 21, 2015 at 1:25 PM, Nathan Leung <[email protected]> wrote:

> Your number of fields declared must match the number of fields emitted in
> each tuple. Also if you are doing a grouping by a field name, then yes you
> must declare a field with that name. So in this case you should declare one
> field with name X.  Alternatively, you can change your topology so the
> fields grouping is done on "bytes" and not "X".
> On Jan 21, 2015 8:22 AM, "Kushan Maskey" <
> [email protected]> wrote:
>
>> So are you saying instead of declareOutputFields as bytes we declare it
>> as X? I tried to have both the fields there but storm complained about
>> number of declared fields.
>> On Jan 21, 2015 6:53 AM, Nathan Leung <[email protected]> wrote:
>>
>> If this is bolt 1, you are doing fieldsGrouping on field X but none of
>> your output fields are named X.
>> On Jan 20, 2015 8:37 PM, "Kushan Maskey" <
>> [email protected]> wrote:
>>
>>> Bolts is pretty simple, it get the message and hten updates the data as
>>> I have explained in the earlier email.
>>>
>>> @Override
>>> public synchronized void execute(Tuple input, BasicOutputCollector
>>> collector) {
>>> MyDomainClass domainClass = (MyDomainClass)
>>> input.getValueByField("bytes");
>>>  if(domainClass != null) {
>>>  boolean success = controller.myDataUpdateFunction(domainClass);
>>> if(success) {
>>>  collector.emit(new Values(domainClass));
>>> }
>>> }
>>> }
>>>
>>> @Override
>>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>> declarer.declare(new Fields("bytes"));
>>> }
>>>
>>> Does it sound like I am doing correctly? I am sure there is something I
>>> am not doing it correctly. LMK.
>>>
>>>
>>> --
>>> Kushan Maskey
>>>
>>> On Tue, Jan 20, 2015 at 4:33 PM, Nathan Leung <[email protected]> wrote:
>>>
>>>> I assume Bolt2 in the snippet is the bolt in question? What do
>>>> declareOutputFields and emit in Bolt1 look like?  Are you able to show the
>>>> logic of Bolt2?
>>>> On Jan 20, 2015 5:08 PM, "Kushan Maskey" <
>>>> [email protected]> wrote:
>>>>
>>>>> B1 and B2 are the same bolt but running on 2 separate tasks.
>>>>>
>>>>>
>>>>> Here is the snippet of the topologyBuilder function I have.
>>>>>
>>>>> spout_parallelism_hint = 4;
>>>>> bolt_parallelism_hint = 4;
>>>>>
>>>>> private static void buildTopology(TopologyBuilder builder) {
>>>>> KafkaSpout spout = new
>>>>> KafkaSpout(getSpoutConfig(propMap.get(KAFKA_TOPIC), "ID1"));
>>>>>
>>>>> builder.setSpout(SPOUT_NAME, spout, spout_parallelism_hint);
>>>>>  builder.setBolt("MainBolt", new MainBolt(),
>>>>> bolt_parallelism_hint).shuffleGrouping(SPOUT_NAME);
>>>>> builder.setBolt("B1", new Bolt1(),
>>>>> bolt_parallelism_hint).shuffleGrouping(MainBolt);
>>>>>  // go to store sales bolts first
>>>>> builder.setBolt("B2", new Bolt2(),
>>>>> bolt_parallelism_hint).fieldsGrouping(B1, new Fields("X"));
>>>>>  // split on assoc, dept and vendor
>>>>> builder.setBolt("B3", new Bolt3(),
>>>>> bolt_parallelism_hint).shuffleGrouping(B2);
>>>>> }
>>>>> I got bunch of other bolts pretty much doing the same thing as above.
>>>>>
>>>>> LMK if that is sufficient. Thanks.
>>>>>
>>>>>
>>>>> --
>>>>> Kushan Maskey
>>>>>
>>>>> On Tue, Jan 20, 2015 at 3:45 PM, Nathan Leung <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Actually I thought about it and you should not have to do
>>>>>> fieldsGrouping on both X and Y; one should be sufficient.  In your 
>>>>>> original
>>>>>> email, are B1 and B2 the same bolt, but different tasks, or are they
>>>>>> different bolts entirely?  As Harsha pointed out, it may help if you give
>>>>>> more details of how your topology is constructed.
>>>>>>
>>>>>> On Tue, Jan 20, 2015 at 4:42 PM, Kushan Maskey <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> I am only fieldGrouping on X and not Y. Is it necessary to
>>>>>>> fieldGroup by both the fields? Is there any sample document I can look 
>>>>>>> at?
>>>>>>> Thanks.
>>>>>>>
>>>>>>> --
>>>>>>> Kushan Maskey
>>>>>>> 817.403.7500
>>>>>>> M. Miller & Associates <http://mmillerassociates.com/>
>>>>>>> [email protected]
>>>>>>>
>>>>>>> On Tue, Jan 20, 2015 at 3:14 PM, Nathan Leung <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> which fields are you doing fieldsGrouping on?  If you do fields
>>>>>>>> grouping on X and Y, why are you having a race condition in a separate 
>>>>>>>> bolt
>>>>>>>> task?  Each X and Y combo should always go to the same bolt task with
>>>>>>>> fieldsGrouping, and the scenario you describe should work properly 
>>>>>>>> whether
>>>>>>>> you have 1 task, 4 tasks, or 100 tasks.
>>>>>>>>
>>>>>>>> On Tue, Jan 20, 2015 at 4:11 PM, Kushan Maskey <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Not at the moment. We have been using KafkaSpout for all the other
>>>>>>>>> projects but have not looked into using trident. How would it help 
>>>>>>>>> resolve
>>>>>>>>> the issue we are facing at the moment. We also need to keep in mind 
>>>>>>>>> the
>>>>>>>>> development time it would take to implement triedent. While 
>>>>>>>>> KafkaSpout has
>>>>>>>>> been working fine with all the other projects.
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Kushan Maskey
>>>>>>>>>
>>>>>>>>> On Tue, Jan 20, 2015 at 3:05 PM, Rajiv Onat <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Seems like stateful processing, have you looked at using trident ?
>>>>>>>>>>
>>>>>>>>>> -Rajiv
>>>>>>>>>>
>>>>>>>>>> On Jan 20, 2015, at 12:26 PM, Kushan Maskey <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>> Thanks Keith and Itai,
>>>>>>>>>>
>>>>>>>>>> We are using fieldGrouping. Initially we were using
>>>>>>>>>> suffleGrouping, we saw this problem and then moved to fieldGrouping, 
>>>>>>>>>> with
>>>>>>>>>> better result, until now. I am thinking due to bolts parallelism 
>>>>>>>>>> which we
>>>>>>>>>> have set it to 4, is the culprit here. My understanding of 
>>>>>>>>>> parallelism is
>>>>>>>>>> threading, correct me if I am not incorrect.
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Kushan Maskey
>>>>>>>>>>
>>>>>>>>>> On Tue, Jan 20, 2015 at 1:03 PM, Itai Frenkel <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>>  Hello,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>  Are you familiar with field grouping ? The idea is that the
>>>>>>>>>>> same bolt instance would always update the value of a specific key 
>>>>>>>>>>> (similar
>>>>>>>>>>> to web load balancer cookie stickiness).
>>>>>>>>>>>
>>>>>>>>>>> https://storm.apache.org/documentation/Concepts.html
>>>>>>>>>>>
>>>>>>>>>>> *"Fields grouping**: The stream is partitioned by the fields
>>>>>>>>>>> specified in the grouping. For example, if the stream is grouped by 
>>>>>>>>>>> the
>>>>>>>>>>> "user-id" field, tuples with the same "user-id" will always go to 
>>>>>>>>>>> the same
>>>>>>>>>>> task, but tuples with different "user-id"'s may go to different 
>>>>>>>>>>> tasks."*
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>  ​Itai
>>>>>>>>>>>
>>>>>>>>>>>  ------------------------------
>>>>>>>>>>>
>>>>>>>>>>> *From:* Kushan Maskey <[email protected]>
>>>>>>>>>>> *Sent:* Tuesday, January 20, 2015 8:55 PM
>>>>>>>>>>> *To:* [email protected]
>>>>>>>>>>> *Subject:* URGENT!! Race condition
>>>>>>>>>>>
>>>>>>>>>>>  We are having a major issue trying to update Cassandra
>>>>>>>>>>> database where we see race condition in a bolt.
>>>>>>>>>>>
>>>>>>>>>>>  Here is an example,
>>>>>>>>>>>
>>>>>>>>>>>  I have a columnfamily, where i have 2 partitioning columns say
>>>>>>>>>>> X and Y. There is another columns Z which basically aggregated 
>>>>>>>>>>> number. We
>>>>>>>>>>> are suppose to update Z based on X and Y. Storm is reading a huge 
>>>>>>>>>>> volume of
>>>>>>>>>>> data from Kafka. When sport receives a message, first bolt reads the
>>>>>>>>>>> database for that combination of X and Y and get the value of Z. 
>>>>>>>>>>> Then it
>>>>>>>>>>> updates the value Z and store it back into the database. Bolt 
>>>>>>>>>>> parallelism
>>>>>>>>>>> is set to be 4 which mean 4 instances of bolt are trying to update 
>>>>>>>>>>> the
>>>>>>>>>>> database. So when first bolt (B1) read the value of Z to be say 
>>>>>>>>>>> 100, same
>>>>>>>>>>> time the second bolt (B2) also read it to be 100, but once B1 
>>>>>>>>>>> completed
>>>>>>>>>>> execution and the value of Z is now 150, B2 still has 100 so the 
>>>>>>>>>>> value of Z
>>>>>>>>>>> is out of sync.
>>>>>>>>>>>
>>>>>>>>>>>  How can we prevent the race condition like this? This is
>>>>>>>>>>> causing a major nuisance to us.
>>>>>>>>>>>
>>>>>>>>>>>  Any help is highly appreciated. Thanks.
>>>>>>>>>>>
>>>>>>>>>>>    --
>>>>>>>>>>> Kushan Maskey
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>


-- 
Regards - Ernesto Reinaldo Barreiro

Reply via email to