Nilesh, That is what I am saying if you use same consumer group for both the topologies it will pick up the offset from T1 topology. if you want to separate the reading of T1 and T2 from the same topic use different consumer groups. -Harsha
On Thu, Nov 6, 2014, at 10:28 PM, Nilesh Chhapru wrote: > Harsha, > > I wanted following scenario, which isn’t working if I use simple > consumer. > > 1)Topology-1(T1) reads a kafka topic(KT1). > 2)Topology-2(T2) reads the same topic (KT1) > > But both the topologies are not deployed at the same time, when T1 is > deployed it starts reading the KT1, but by that time if I bring up T2 > it doesn’t read from start or where it left last time as the offset is changed for that since T1 was reading the topic, I know we can set the offset for T2 spout to read from beginning but can’t do that as it will not read every time from start, it has to start from where it ends last time when un-deployed. > > I read through many blogs which says that zookeeper saves the offset > with consumer group id hence wanted to include the same. > > *Regards*, > *Nilesh Chhapru.* > > *From:* Harsha [mailto:[email protected]] > > *Sent:* 07 November 2014 12:13 AM *To:* [email protected] > *Subject:* Re: Issues Storm - Kafka > > Nilesh, > You can safely run two topologies reading from same topic twice. You > can use kafka spout from storm to achieve this. > If you are using single consumer group in two topologies you are > distributing the data into two topologies and it doesn't read the same > topic twice. If you want to use consumer group you need to give unique > names for the two topologies. > > please read the following doc > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example > "The Consumer Group name is global across a Kafka cluster, so you > should be careful that any 'old' logic Consumers be shutdown before > starting new code. When a new process is started with the same > Consumer Group name, Kafka will add that processes' threads to the set of threads available to consume the Topic and trigger a 're-balance'. During this re-balance Kafka will assign available partitions to available threads, possibly moving a partition to another process. If you have a mixture of old and new business logic, it is possible that some messages go to the old logic." > > From you use case I don't see why you can't use KafkaSpout from > storm. You can use multiple topologies reading from same topic or > use multiple bolts in a single topology to do different operations > on a tuple. > -Harsha > > > > > On Thu, Nov 6, 2014, at 10:14 AM, Nilesh Chhapru wrote: >> Hi Harsha, >> >> I wanted to broadcast one message to two consumer that is spouts in >> two topology, for which I read about consumer group in kafka docs, but this isn’t supported by the simple consumer provided by storm kafka. >> >> Hence had to move to a high level consumer API, but a bit doubtful as >> some of the blogs says that it do a batch offset commit, do you have more details on this, or are you using high level api in any of you applications. >> >> Also is there a way to broadcast a message from kafka using simple >> consumer provided by storm kafka integration. >> >> >> *Regards*, >> *Nilesh Chhapru.* >> >> *From:* Harsha [mailto:[email protected]] >> >> *Sent:* 06 November 2014 09:57 PM *To:* [email protected] >> *Subject:* Re: Issues Storm - Kafka >> >> Nilesh, >> I thought you are using >> https://github.com/apache/storm/tree/master/external/storm-kafka. Any >> reason for you to use the kafkaSpout with consumer group support? >> It handles the replays based on ack or fail. The linked KafkaSpout >> uses simpleApi which allows it go back n forth in the kafka queue >> which is not part of high-level consumer api ( this is the api where >> consumer groups are supported). >> If you have two topologies and doing different operations and you are >> using consumer group than you should use different consumer group. If >> you are using single consumer group , data from kafka queue will be >> distributed to two topologies. So each topology gets part of the data. >> My suggestion would be to use above kafkaspout If the only reason you >> are using https://github.com/HolmesNL/kafka-spout is for consumer >> groups. >> >> Here is a link to kafka higher level api >> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example >> "Why use the High Level Consumer >> Sometimes the logic to read messages from Kafka doesn't care about >> handling the message offsets, it just wants the data. So the High >> Level Consumer is provided to abstract most of the details of >> consuming events from Kafka." >> With storm you want control over handling the message offsets. If a >> message failed in a downstream bolt you want roll back the offset and >> replay the tuple from kafka. With higher level api you won't be able >> todo that. >> >> -Harsha >> >> >> >> On Thu, Nov 6, 2014, at 07:26 AM, Nilesh Chhapru wrote: >>> Hi Harsha / Shamsul, >>> >>> Thanks for your inputs. >>> I am using BasicBasicBolt so it call the ack method automatically >>> hence now explicitly doing the same. >>> >>> Moreover for consumer group I have now moved KafkaSpout to >>> https://github.com/HolmesNL/kafka-spout for getting the consumer >>> group id, let me know if you have used this anytime. >>> >>> I don’t need 2 consumer to coordinate but we have 2 topologies >>> listening to one kafka topic and doing different operations on the same live saving to database and passing it to validator. >>> >>> Do email in-case you need any other information. >>> >>> *Regards*, >>> *Nilesh Chhapru.* >>> >>> *From:* Harsha [mailto:[email protected]] >>> >>> *Sent:* 06 November 2014 08:36 PM *To:* [email protected] >>> *Subject:* Re: Issues Storm - Kafka >>> >>> NIlesh and Shamsul, >>> 2) you don't need to use another database to keep track processed >>> tuples. Are you sure you are doing tuple ack and fail in the >>> downstream bolts so that kafkaspout knows it processed the tuple. >>> Tuple replays occurs if there are timeouts happening or incase of exceptions where you call fail on a tuple. >>> >>>>> 3)The consumer group is isn’t working properly for storm Kafka >>>>> integration. >>>>> a.When we give same group id to the Kafka consumer of different >>>>> topology but still both are reading same messages. >>>>> b.If we have 2 different consumer with different consumer group id >>>>> in different topology it works fine if both topologies are >>>>> deployed at the same time, but doesn’t if we deploy one of them >>>>> after some of the message are already loaded in the topic and >>>>> read by the first topology. >>> a. Kafka Spout uses simple consumer api it doesn't need a consumer >>> group. can you give us more details why you need two topologies to >>> use coordinate? (i.e use the same consumer group). >>> Thanks, >>> Harsha >>> >>> >>> On Thu, Nov 6, 2014, at 04:27 AM, Shamsul Haque wrote: >>>> Hi Nilesh, >>>> >>>> For point 1, try by increasing the 'topology.message.timeout.secs' >>>> to 10 to 15 mins or more then slowly decrease it which suits your >>>> topology. For me that worked for the same case. >>>> For point 2, we have used database to made track what we have >>>> processed, so don't process the same tuple again. >>>> >>>> regards >>>> Shams >>>> >>>> On Thursday 06 November 2014 12:16 PM, Nilesh Chhapru wrote: >>>>> Hi All, >>>>> >>>>> We are using storm Kafka integration where a Spout reads from a >>>>> Kafka topic. >>>>> >>>>> Following is the version of storm, Kafka and zookeeper we are >>>>> using. >>>>> *Strom : apache-storm-0.9.2-incubating* >>>>> *Kafka : kafka_2.8.0-0.8.1.1* >>>>> *Zookeeper : zookeeper-3.4.6* >>>>> >>>>> I am facing following issues at spout. >>>>> 1)The messages gets failed even if the average time taken is less >>>>> than max.topology.timeout value, also we aren’t getting any >>>>> exceptions at any of the bolt. >>>>> 2)A topology is finally emitting to the Kafka producer i.e. some >>>>> other topic, but the messages are getting duplicated due to >>>>> replay issues. >>>>> 3)The consumer group is isn’t working properly for storm Kafka >>>>> integration. >>>>> a.When we give same group id to the Kafka consumer of different >>>>> topology but still both are reading same messages. >>>>> b.If we have 2 different consumer with different consumer group id >>>>> in different topology it works fine if both topologies are >>>>> deployed at the same time, but doesn’t if we deploy one of them after some of the message are already loaded in the topic and read by the first topology. >>>>> >>>>> Kindly help me with above points as it is hampering the overall >>>>> scope of the project and also time lines. >>>>> >>>>> Do call or email in-case you need any other information. >>>>> >>>>> >>>>> *Nilesh Chhapru,* >>>>> (: +91 9619030491 >>>>> >>>>> >>>>> >>>>> >>>>> ---------------------------------------------------------------------------------------Disclaimer---------------------------------------------------------------------------------------------- >>>>> >>>>> ****Opinions expressed in this e-mail are those of the author and >>>>> do not necessarily represent those of Ugam. Ugam does not accept any responsibility or liability for it. This e-mail message may contain proprietary, confidential or legally privileged information for the sole use of the person or entity to whom this message was originally addressed. Any review, re-transmission, dissemination or other use of or taking of any action in reliance upon this information by persons or entities other than the intended recipient is prohibited. If you have received this e-mail in error, please delete it and all attachments from any servers, hard drives or any other media. >>>>> >>>>> Warning: Sufficient measures have been taken to scan any presence >>>>> of viruses however the recipient should check this email and any attachments for the presence of viruses. Ugam accepts no liability for any damage caused by any virus transmitted by this email. **** >>>> >>>> -- >>>> >>>> Email had 1 attachment: >>>> * india-com.jpg >>>> 35k (image/jpeg) >>> >>> >>> >>> >>> ---------------------------------------------------------------------------------------Disclaimer---------------------------------------------------------------------------------------------- >>> >>> ****Opinions expressed in this e-mail are those of the author and do >>> not necessarily represent those of Ugam. Ugam does not accept any responsibility or liability for it. This e-mail message may contain proprietary, confidential or legally privileged information for the sole use of the person or entity to whom this message was originally addressed. Any review, re-transmission, dissemination or other use of or taking of any action in reliance upon this information by persons or entities other than the intended recipient is prohibited. If you have received this e-mail in error, please delete it and all attachments from any servers, hard drives or any other media. >>> >>> Warning: Sufficient measures have been taken to scan any presence of >>> viruses however the recipient should check this email and any attachments for the presence of viruses. Ugam accepts no liability for any damage caused by any virus transmitted by this email. **** >>> Email had 1 attachment: >>> * image001.jpg >>> 35k (image/jpeg) >> >> >> >> ---------------------------------------------------------------------------------------Disclaimer---------------------------------------------------------------------------------------------- >> >> ****Opinions expressed in this e-mail are those of the author and do >> not necessarily represent those of Ugam. Ugam does not accept any >> responsibility or liability for it. This e-mail message may contain proprietary, confidential or legally privileged information for the sole use of the person or entity to whom this message was originally addressed. Any review, re-transmission, dissemination or other use of or taking of any action in reliance upon this information by persons or entities other than the intended recipient is prohibited. If you have received this e-mail in error, please delete it and all attachments from any servers, hard drives or any other media. >> >> Warning: Sufficient measures have been taken to scan any presence of >> viruses however the recipient should check this email and any >> attachments for the presence of viruses. Ugam accepts no liability for any damage caused by any virus transmitted by this email. **** >> Email had 1 attachment: >> * image001.jpg >> 35k (image/jpeg) > > > ---------------------------------------------------------------------------------------Disclaimer---------------------------------------------------------------------------------------------- > > ****Opinions expressed in this e-mail are those of the author and do not necessarily represent those of Ugam. Ugam does not accept any responsibility or liability for it. This e-mail message may contain proprietary, confidential or legally privileged information for the sole use of the person or entity to whom this message was originally addressed. Any review, re-transmission, dissemination or other use of or taking of any action in reliance upon this information by persons or entities other than the intended recipient is prohibited. If you have received this e-mail in error, please delete it and all attachments from any servers, hard drives or any other media. > > Warning: Sufficient measures have been taken to scan any presence of viruses however the recipient should check this email and any attachments for the presence of viruses. Ugam accepts no liability for any damage caused by any virus transmitted by this email. **** > Email had 1 attachment: > * image001.jpg 35k (image/jpeg)
