I think that before we start making more changes to Mirror Maker there should be unit tests in place for it.
Currently Mirror Maker is broker on trunk (there is a patch to fix it) because of a recent change. That is only going to get more unwieldily as more change happens. On Wed, Jan 21, 2015 at 8:29 PM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > Hi Jay, > > Thanks for comments. Please see inline responses. > > Jiangjie (Becket) Qin > > On 1/21/15, 1:33 PM, "Jay Kreps" <jay.kr...@gmail.com> wrote: > > >Hey guys, > > > >A couple questions/comments: > > > >1. The callback and user-controlled commit offset functionality is already > >in the new consumer which we are working on in parallel. If we accelerated > >that work it might help concentrate efforts. I admit this might take > >slightly longer in calendar time but could still probably get done this > >quarter. Have you guys considered that approach? > Yes, I totally agree that ideally we should put efforts on new consumer. > The main reason for still working on the old consumer is that we expect it > would still be used in LinkedIn for quite a while before the new consumer > could be fully rolled out. And we recently suffering a lot from mirror > maker data loss issue. So our current plan is making necessary changes to > make current mirror maker stable in production. Then we can test and > rollout new consumer gradually without getting burnt. > > > >2. I think partitioning on the hash of the topic partition is not a very > >good idea because that will make the case of going from a cluster with > >fewer partitions to one with more partitions not work. I think an > >intuitive > >way to do this would be the following: > >a. Default behavior: Just do what the producer does. I.e. if you specify a > >key use it for partitioning, if not just partition in a round-robin > >fashion. > >b. Add a --preserve-partition option that will explicitly inherent the > >partition from the source irrespective of whether there is a key or which > >partition that key would hash to. > Sorry that I did not explain this clear enough. The hash of topic > partition is only used when decide which mirror maker data channel queue > the consumer thread should put message into. It only tries to make sure > the messages from the same partition is sent by the same producer thread > to guarantee the sending order. This is not at all related to which > partition in target cluster the messages end up. That is still decided by > producer. > > > >3. You don't actually give the ConsumerRebalanceListener interface. What > >is > >that going to look like? > Good point! I should have put it in the wiki. I just added it. > > > >4. What is MirrorMakerRecord? I think ideally the > >MirrorMakerMessageHandler > >interface would take a ConsumerRecord as input and return a > >ProducerRecord, > >right? That would allow you to transform the key, value, partition, or > >destination topic... > MirrorMakerRecord is introduced in KAFKA-1650, which is exactly the same > as ConsumerRecord in KAFKA-1760. > private[kafka] class MirrorMakerRecord (val sourceTopic: String, > val sourcePartition: Int, > val sourceOffset: Long, > val key: Array[Byte], > val value: Array[Byte]) { > def size = value.length + {if (key == null) 0 else key.length} > } > > However, because source partition and offset is needed in producer thread > for consumer offsets bookkeeping, the record returned by > MirrorMakerMessageHandler needs to contain those information. Therefore > ProducerRecord does not work here. We could probably let message handler > take ConsumerRecord for both input and output. > > > >5. Have you guys thought about what the implementation will look like in > >terms of threading architecture etc with the new consumer? That will be > >soon so even if we aren't starting with that let's make sure we can get > >rid > >of a lot of the current mirror maker accidental complexity in terms of > >threads and queues when we move to that. > I haven¹t thought about it throughly. The quick idea is after migration to > the new consumer, it is probably better to use a single consumer thread. > If multithread is needed, decoupling consumption and processing might be > used. MirrorMaker definitely needs to be changed after new consumer get > checked in. I¹ll document the changes and can submit follow up patches > after the new consumer is available. > > > >-Jay > > > >On Tue, Jan 20, 2015 at 4:31 PM, Jiangjie Qin <j...@linkedin.com.invalid> > >wrote: > > > >> Hi Kafka Devs, > >> > >> We are working on Kafka Mirror Maker enhancement. A KIP is posted to > >> document and discuss on the followings: > >> 1. KAFKA-1650: No Data loss mirror maker change > >> 2. KAFKA-1839: To allow partition aware mirror. > >> 3. KAFKA-1840: To allow message filtering/format conversion > >> Feedbacks are welcome. Please let us know if you have any questions or > >> concerns. > >> > >> Thanks. > >> > >> Jiangjie (Becket) Qin > >> > >