Hi Bhavesh, Please see inline comments.
Jiangjie (Becket) Qin On 1/29/15, 7:00 PM, "Bhavesh Mistry" <mistry.p.bhav...@gmail.com> wrote: >Hi Jiangjie, > >Thanks for the input. > >a) Is MM will producer ack will be attach to Producer Instance or per >topic. Use case is that one instance of MM >needs to handle both strong ack and also ack=0 for some topic. Or it >would >be better to set-up another instance of MM. The acks setting is producer level setting instead of topic level setting. In this case you probably need to set up another instance. > >b) Regarding TCP connections, Why does #producer instance attach to TCP >connection. Is it possible to use Broker Connection TCP Pool, producer >will just checkout TCP connection to Broker. So, # of Producer Instance >does not correlation to Brokers Connection. Is this possible ? In new producer, each producer maintains a connection to each broker within the producer instance. Making producer instances to share the TCP connections is a very big change to the current design, so I suppose we won’t be able to do that. > > >Thanks, > >Bhavesh > >On Thu, Jan 29, 2015 at 11:50 AM, Jiangjie Qin <j...@linkedin.com.invalid> >wrote: > >> Hi Bhavesh, >> >> I think it is the right discussion to have when we are talking about the >> new new design for MM. >> Please see the inline comments. >> >> Jiangjie (Becket) Qin >> >> On 1/28/15, 10:48 PM, "Bhavesh Mistry" <mistry.p.bhav...@gmail.com> >>wrote: >> >> >Hi Jiangjie, >> > >> >I just wanted to let you know about our use case and stress the point >>that >> >local data center broker cluster have fewer partitions than the >> >destination >> >offline broker cluster. Just because we do the batch pull from CAMUS >>and >> >in >> >order to drain data faster than the injection rate (from four DCs for >>same >> >topic). >> Keeping the same partition number in source and target cluster will be >>an >> option but will not be enforced by default. >> > >> >We are facing following issues (probably due to configuration): >> > >> >1) We occasionally loose data due to message batch size is too >>large >> >(2MB) on target data (we are using old producer but I think new >>producer >> >will solve this problem to some extend). >> We do see this issue in LinkedIn as well. New producer also might have >> this issue. There are some proposal of solutions, but no real work >>started >> yet. For now, as a workaround, setting a more aggressive batch size on >> producer side should work. >> >2) Since only one instance is set to MM data, we are not able to >> >set-up ack per topic instead ack is attached to producer instance. >> I don’t quite get the question here. >> >3) How are you going to address two phase commit problem if ack is >> >set >> >to strongest, but auto commit is on for consumer (meaning producer does >> >not >> >get ack, but consumer auto committed offset that message). Is there >> >transactional (Kafka transaction is in process) based ack and commit >> >offset >> >? >> Auto offset commit should be turned off in this case. The offset will >>only >> be committed once by the offset commit thread. So there is no two phase >> commit. >> >4) How are you planning to avoid duplicated message? ( Is >> >brokergoing >> >have moving window of message collected and de-dupe ?) Possibly, we >>get >> >this from retry set to 5…? >> We are not trying to completely avoid duplicates. The duplicates will >> still be there if: >> 1. Producer retries on failure. >> 2. Mirror maker is hard killed. >> Currently, dedup is expected to be done by user if necessary. >> >5) Last, is there any warning or any thing you can provide insight >> >from MM component about data injection rate into destination >>partitions is >> >NOT evenly distributed regardless of keyed or non-keyed message >>(Hence >> >there is ripple effect such as data not arriving late, or data is >>arriving >> >out of order in intern of time stamp and early some time, and CAMUS >> >creates huge number of file count on HDFS due to uneven injection rate >>. >> >Camus Job is configured to run every 3 minutes.) >> I think uneven data distribution is typically caused by server side >> unbalance, instead of something mirror maker could control. In new >>mirror >> maker, however, there is a customizable message handler, that might be >> able to help a little bit. In message handler, you can explicitly set a >> partition that you want to produce the message to. So if you know the >> uneven data distribution in target cluster, you may offset it here. But >> that probably only works for non-keyed messages. >> > >> >I am not sure if this is right discussion form to bring these to >> >your/kafka >> >Dev team attention. This might be off track, >> > >> > >> >Thanks, >> > >> >Bhavesh >> > >> >On Wed, Jan 28, 2015 at 11:07 AM, Jiangjie Qin >><j...@linkedin.com.invalid >> > >> >wrote: >> > >> >> I’ve updated the KIP page. Feedbacks are welcome. >> >> >> >> Regarding the simple mirror maker design. I thought over it and have >> >>some >> >> worries: >> >> There are two things that might worth thinking: >> >> 1. One of the enhancement to mirror maker is adding a message >>handler to >> >> do things like reformatting. I think we might potentially want to >>have >> >> more threads processing the messages than the number of consumers. >>If we >> >> follow the simple mirror maker solution, we lose this flexibility. >> >> 2. This might not matter too much, but creating more consumers means >> >>more >> >> footprint of TCP connection / memory. >> >> >> >> Any thoughts on this? >> >> >> >> Thanks. >> >> >> >> Jiangjie (Becket) Qin >> >> >> >> On 1/26/15, 10:35 AM, "Jiangjie Qin" <j...@linkedin.com> wrote: >> >> >> >> >Hi Jay and Neha, >> >> > >> >> >Thanks a lot for the reply and explanation. I do agree it makes more >> >>sense >> >> >to avoid duplicate effort and plan based on new consumer. I’ll >>modify >> >>the >> >> >KIP. >> >> > >> >> >To Jay’s question on message ordering - The data channel selection >> >>makes >> >> >sure that the messages from the same source partition will sent by >>the >> >> >same producer. So the order of the messages is guaranteed with >>proper >> >> >producer settings (MaxInFlightRequests=1,retries=Integer.MaxValue, >> >>etc.) >> >> >For keyed messages, because they come from the same source partition >> >>and >> >> >will end up in the same target partition, as long as they are sent >>by >> >>the >> >> >same producer, the order is guaranteed. >> >> >For non-keyed messages, the messages coming from the same source >> >>partition >> >> >might go to different target partitions. The order is only >>guaranteed >> >> >within each partition. >> >> > >> >> >Anyway, I’ll modify the KIP and data channel will be away. >> >> > >> >> >Thanks. >> >> > >> >> >Jiangjie (Becket) Qin >> >> > >> >> > >> >> >On 1/25/15, 4:34 PM, "Neha Narkhede" <n...@confluent.io> wrote: >> >> > >> >> >>I think there is some value in investigating if we can go back to >>the >> >> >>simple mirror maker design, as Jay points out. Here you have N >> >>threads, >> >> >>each has a consumer and a producer. >> >> >> >> >> >>The reason why we had to move away from that was a combination of >>the >> >> >>difference in throughput between the consumer and the old producer >>and >> >> >>the >> >> >>deficiency of the consumer rebalancing that limits the total >>number of >> >> >>mirror maker threads. So the only option available was to increase >>the >> >> >>throughput of the limited # of mirror maker threads that could be >> >> >>deployed. >> >> >>Now that queuing design may not make sense, if the new producer's >> >> >>throughput is almost similar to the consumer AND the fact that the >>new >> >> >>round-robin based consumer rebalancing can allow a very high >>number of >> >> >>mirror maker instances to exist. >> >> >> >> >> >>This is the end state that the mirror maker should be in once the >>new >> >> >>consumer is complete, so it wouldn't hurt to see if we can just >>move >> >>to >> >> >>that right now. >> >> >> >> >> >>On Fri, Jan 23, 2015 at 8:40 PM, Jay Kreps <jay.kr...@gmail.com> >> >>wrote: >> >> >> >> >> >>> QQ: If we ever use a different technique for the data channel >> >>selection >> >> >>> than for the producer partitioning won't that break ordering? How >> >>can >> >> >>>we >> >> >>> ensure these things stay in sync? >> >> >>> >> >> >>> With respect to the new consumer--I really do want to encourage >> >>people >> >> >>>to >> >> >>> think through how MM will work with the new consumer. I mean this >> >>isn't >> >> >>> very far off, maybe a few months if we hustle? I could imagine us >> >> >>>getting >> >> >>> this mm fix done maybe sooner, maybe in a month? So I guess this >> >>buys >> >> >>>us an >> >> >>> extra month before we rip it out and throw it away? Maybe two? >>This >> >>bug >> >> >>>has >> >> >>> been there for a while, though, right? Is it worth it? Probably >>it >> >>is, >> >> >>>but >> >> >>> it still kind of sucks to have the duplicate effort. >> >> >>> >> >> >>> So anyhow let's definitely think about how things will work with >>the >> >> >>>new >> >> >>> consumer. I think we can probably just have N threads, each >>thread >> >>has >> >> >>>a >> >> >>> producer and consumer and is internally single threaded. Any >>reason >> >> >>>this >> >> >>> wouldn't work? >> >> >>> >> >> >>> -Jay >> >> >>> >> >> >>> >> >> >>> On Wed, Jan 21, 2015 at 5:29 PM, Jiangjie Qin >> >> >>><j...@linkedin.com.invalid> >> >> >>> wrote: >> >> >>> >> >> >>> > Hi Jay, >> >> >>> > >> >> >>> > Thanks for comments. Please see inline responses. >> >> >>> > >> >> >>> > Jiangjie (Becket) Qin >> >> >>> > >> >> >>> > On 1/21/15, 1:33 PM, "Jay Kreps" <jay.kr...@gmail.com> wrote: >> >> >>> > >> >> >>> > >Hey guys, >> >> >>> > > >> >> >>> > >A couple questions/comments: >> >> >>> > > >> >> >>> > >1. The callback and user-controlled commit offset >>functionality >> >>is >> >> >>> already >> >> >>> > >in the new consumer which we are working on in parallel. If we >> >> >>> accelerated >> >> >>> > >that work it might help concentrate efforts. I admit this >>might >> >>take >> >> >>> > >slightly longer in calendar time but could still probably get >> >>done >> >> >>>this >> >> >>> > >quarter. Have you guys considered that approach? >> >> >>> > Yes, I totally agree that ideally we should put efforts on new >> >> >>>consumer. >> >> >>> > The main reason for still working on the old consumer is that >>we >> >> >>>expect >> >> >>> it >> >> >>> > would still be used in LinkedIn for quite a while before the >>new >> >> >>>consumer >> >> >>> > could be fully rolled out. And we recently suffering a lot from >> >> >>>mirror >> >> >>> > maker data loss issue. So our current plan is making necessary >> >> >>>changes to >> >> >>> > make current mirror maker stable in production. Then we can >>test >> >>and >> >> >>> > rollout new consumer gradually without getting burnt. >> >> >>> > > >> >> >>> > >2. I think partitioning on the hash of the topic partition is >> >>not a >> >> >>>very >> >> >>> > >good idea because that will make the case of going from a >>cluster >> >> >>>with >> >> >>> > >fewer partitions to one with more partitions not work. I >>think an >> >> >>> > >intuitive >> >> >>> > >way to do this would be the following: >> >> >>> > >a. Default behavior: Just do what the producer does. I.e. if >>you >> >> >>> specify a >> >> >>> > >key use it for partitioning, if not just partition in a >> >>round-robin >> >> >>> > >fashion. >> >> >>> > >b. Add a --preserve-partition option that will explicitly >> >>inherent >> >> >>>the >> >> >>> > >partition from the source irrespective of whether there is a >>key >> >>or >> >> >>> which >> >> >>> > >partition that key would hash to. >> >> >>> > Sorry that I did not explain this clear enough. The hash of >>topic >> >> >>> > partition is only used when decide which mirror maker data >>channel >> >> >>>queue >> >> >>> > the consumer thread should put message into. It only tries to >>make >> >> >>>sure >> >> >>> > the messages from the same partition is sent by the same >>producer >> >> >>>thread >> >> >>> > to guarantee the sending order. This is not at all related to >> >>which >> >> >>> > partition in target cluster the messages end up. That is still >> >> >>>decided by >> >> >>> > producer. >> >> >>> > > >> >> >>> > >3. You don't actually give the ConsumerRebalanceListener >> >>interface. >> >> >>>What >> >> >>> > >is >> >> >>> > >that going to look like? >> >> >>> > Good point! I should have put it in the wiki. I just added it. >> >> >>> > > >> >> >>> > >4. What is MirrorMakerRecord? I think ideally the >> >> >>> > >MirrorMakerMessageHandler >> >> >>> > >interface would take a ConsumerRecord as input and return a >> >> >>> > >ProducerRecord, >> >> >>> > >right? That would allow you to transform the key, value, >> >>partition, >> >> >>>or >> >> >>> > >destination topic... >> >> >>> > MirrorMakerRecord is introduced in KAFKA-1650, which is exactly >> >>the >> >> >>>same >> >> >>> > as ConsumerRecord in KAFKA-1760. >> >> >>> > private[kafka] class MirrorMakerRecord (val sourceTopic: >>String, >> >> >>> > val sourcePartition: Int, >> >> >>> > val sourceOffset: Long, >> >> >>> > val key: Array[Byte], >> >> >>> > val value: Array[Byte]) { >> >> >>> > def size = value.length + {if (key == null) 0 else >>key.length} >> >> >>> > } >> >> >>> > >> >> >>> > However, because source partition and offset is needed in >>producer >> >> >>>thread >> >> >>> > for consumer offsets bookkeeping, the record returned by >> >> >>> > MirrorMakerMessageHandler needs to contain those information. >> >> >>>Therefore >> >> >>> > ProducerRecord does not work here. We could probably let >>message >> >> >>>handler >> >> >>> > take ConsumerRecord for both input and output. >> >> >>> > > >> >> >>> > >5. Have you guys thought about what the implementation will >>look >> >> >>>like in >> >> >>> > >terms of threading architecture etc with the new consumer? >>That >> >>will >> >> >>>be >> >> >>> > >soon so even if we aren't starting with that let's make sure >>we >> >>can >> >> >>>get >> >> >>> > >rid >> >> >>> > >of a lot of the current mirror maker accidental complexity in >> >>terms >> >> >>>of >> >> >>> > >threads and queues when we move to that. >> >> >>> > I haven¹t thought about it throughly. The quick idea is after >> >> >>>migration >> >> >>> to >> >> >>> > the new consumer, it is probably better to use a single >>consumer >> >> >>>thread. >> >> >>> > If multithread is needed, decoupling consumption and processing >> >>might >> >> >>>be >> >> >>> > used. MirrorMaker definitely needs to be changed after new >> >>consumer >> >> >>>get >> >> >>> > checked in. I¹ll document the changes and can submit follow up >> >> >>>patches >> >> >>> > after the new consumer is available. >> >> >>> > > >> >> >>> > >-Jay >> >> >>> > > >> >> >>> > >On Tue, Jan 20, 2015 at 4:31 PM, Jiangjie Qin >> >> >>><j...@linkedin.com.invalid >> >> >>> > >> >> >>> > >wrote: >> >> >>> > > >> >> >>> > >> Hi Kafka Devs, >> >> >>> > >> >> >> >>> > >> We are working on Kafka Mirror Maker enhancement. A KIP is >> >>posted >> >> >>>to >> >> >>> > >> document and discuss on the followings: >> >> >>> > >> 1. KAFKA-1650: No Data loss mirror maker change >> >> >>> > >> 2. KAFKA-1839: To allow partition aware mirror. >> >> >>> > >> 3. KAFKA-1840: To allow message filtering/format conversion >> >> >>> > >> Feedbacks are welcome. Please let us know if you have any >> >> >>>questions or >> >> >>> > >> concerns. >> >> >>> > >> >> >> >>> > >> Thanks. >> >> >>> > >> >> >> >>> > >> Jiangjie (Becket) Qin >> >> >>> > >> >> >> >>> > >> >> >>> > >> >> >>> >> >> >> >> >> >> >> >> >> >> >> >>-- >> >> >>Thanks, >> >> >>Neha >> >> > >> >> >> >> >> >>