Guozhang, I agree with 1-3, I do think what I was proposing was simpler but perhaps there are gaps in that?
Hey Joel--Here was a sketch of what I was proposing. I do think this get's rid of manual offset tracking, especially doing so across threads with dedicated commit threads, which I think is pretty complex. while(true) { val recs = consumer.poll(Long.MaxValue); for (rec <- recs) producer.send(rec, logErrorCallback) if(System.currentTimeMillis - lastCommit > commitInterval) { producer.flush() consumer.commit() lastCommit = System.currentTimeMillis } } (See the previous email for details). I think the question is: is there any reason--performance, correctness, etc--that this won't work? Basically I think you guys have thought about this more so I may be missing something. If so let's flag it while we still have leeway on the consumer. If we think that will work, well I do think it is conceptually a lot simpler than the current code, though I suppose one could disagree on that. -Jay On Wed, Feb 11, 2015 at 5:53 AM, Joel Koshy <jjkosh...@gmail.com> wrote: > Hi Jay, > > > The data channels are actually a big part of the complexity of the zero > > data loss design, though, right? Because then you need some reverse > channel > > to flow the acks back to the consumer based on where you are versus just > > acking what you have read and written (as in the code snippet I put up). > > I'm not sure if we are on the same page. Even if the data channel was > not there the current handling for zero data loss would remain very > similar - you would need to maintain lists of unacked source offsets. > I'm wondering if the KIP needs more detail on how it is currently > implemented; or are suggesting a different approach (in which case I > have not fully understood). I'm not sure what you mean by flowing acks > back to the consumer - the MM commits offsets after the producer ack > has been received. There is some additional complexity introduced in > reducing duplicates on a rebalance - this is actually optional (since > duplicates are currently a given). The reason that was done anyway is > that with the auto-commit turned off duplicates are almost guaranteed > on a rebalance. > > > I think the point that Neha and I were trying to make was that the > > motivation to embed stuff into MM kind of is related to how complex a > > simple "consume and produce" with good throughput will be. If it is > simple > > to write such a thing in a few lines, the pain of embedding a bunch of > > stuff won't be worth it, if it has to be as complex as the current mm > then > > of course we will need all kinds of plug ins because no one will be able > to > > write such a thing. I don't have a huge concern with a simple plug-in > but I > > think if it turns into something more complex with filtering and > > aggregation or whatever we really need to stop and think a bit about the > > design. > > I agree - I don't think there is a use-case for any complex plug-in. > It is pretty much what Becket has described currently for the message > handler - i.e., take an incoming record and return a list of outgoing > records (which could be empty if you filter). > > So here is my take on the MM: > - Bare bones: simple consumer - producer pairs (0.7 style). This is > ideal, but does not handle no data loss > - Above plus support no data loss. This actually adds quite a bit of > complexity. > - Above plus the message handler. This is a trivial addition I think > that makes the MM usable in a few other mirroring-like applications. > > Joel > > > On Tue, Feb 10, 2015 at 12:31 PM, Joel Koshy <jjkosh...@gmail.com> > wrote: > > > > > > > > > > > On Tue, Feb 10, 2015 at 12:13:46PM -0800, Neha Narkhede wrote: > > > > I think all of us agree that we want to design MirrorMaker for 0 data > > > loss. > > > > With the absence of the data channel, 0 data loss will be much > simpler to > > > > implement. > > > > > > The data channel is irrelevant to the implementation of zero data > > > loss. The complexity in the implementation of no data loss that you > > > are seeing in mirror-maker affects all consume-then-produce patterns > > > whether or not there is a data channel. You still need to maintain a > > > list of unacked offsets. What I meant earlier is that we can > > > brainstorm completely different approaches to supporting no data loss, > > > but the current implementation is the only solution we are aware of. > > > > > > > > > > > My arguments for adding a message handler are that: > > > > > 1. It is more efficient to do something in common for all the > clients > > > in > > > > > pipeline than letting each client do the same thing for many > times. And > > > > > there are concrete use cases for the message handler already. > > > > > > > > > > > > > What are the concrete use cases? > > > > > > I think Becket already described a couple of use cases earlier in the > > > thread. > > > > > > <quote> > > > > > > 1. Format conversion. We have a use case where clients of source > > > cluster > > > use an internal schema and clients of target cluster use a different > > > public schema. > > > 2. Message filtering: For the messages published to source cluster, > > > there > > > are some messages private to source cluster clients and should not > > > exposed > > > to target cluster clients. It would be difficult to publish those > > > messages > > > into different partitions because they need to be ordered. > > > I agree that we can always filter/convert messages after they are > > > copied > > > to the target cluster, but that costs network bandwidth unnecessarily, > > > especially if that is a cross colo mirror. With the handler, we can > > > co-locate the mirror maker with source cluster and save that cost. > > > Also, > > > imagine there are many downstream consumers consuming from the target > > > cluster, filtering/reformatting the messages before the messages reach > > > the > > > target cluster is much more efficient than having each of the > > > consumers do > > > this individually on their own. > > > > > > </quote> > > > > > > > > > > > Also the KIP still refers to the data channel in a few places > (Motivation > > > > and "On consumer rebalance" sections). Can you update the wiki so it > is > > > > easier to review the new design, especially the data loss part. > > > > > > > > > > > > On Tue, Feb 10, 2015 at 10:36 AM, Joel Koshy <jjkosh...@gmail.com> > > > wrote: > > > > > > > > > I think the message handler adds little to no complexity to the > mirror > > > > > maker. Jay/Neha, the MM became scary due to the rearchitecture we > did > > > > > for 0.8 due to performance issues compared with 0.7 - we should > remove > > > > > the data channel if it can match the current throughput. I agree > it is > > > > > worth prototyping and testing that so the MM architecture is > > > > > simplified. > > > > > > > > > > The MM became a little scarier in KAFKA-1650 in order to support no > > > > > data loss. I think the implementation for no data loss will remain > > > > > about the same even in the new model (even without the data > channel) - > > > > > we can probably brainstorm more if there is a better/simpler way > to do > > > > > it (maybe there is in the absence of the data channel) but at the > time > > > > > it was the best we (i.e., Becket, myself, Jun and Guozhang who > > > > > participated on the review) could come up with. > > > > > > > > > > So I'm definitely +1 on whatever it takes to support no data loss. > I > > > > > think most people would want that out of the box. > > > > > > > > > > As for the message handler, as Becket wrote and I agree with, it is > > > > > really a trivial addition that would benefit (perhaps not most, > but at > > > > > least some). So I'm personally +1 on that as well. That said, I'm > also > > > > > okay with it not being there. I think the MM is fairly stand-alone > and > > > > > simple enough that it is entirely reasonable and absolutely > feasible > > > > > for companies to fork/re-implement the mirror maker for their own > > > > > needs. > > > > > > > > > > So in summary, I'm +1 on the KIP. > > > > > > > > > > Thanks, > > > > > > > > > > Joel > > > > > > > > > > On Mon, Feb 09, 2015 at 09:19:57PM +0000, Jiangjie Qin wrote: > > > > > > I just updated the KIP page and incorporated Jay and Neha’s > > > suggestion. > > > > > As > > > > > > a brief summary of where we are: > > > > > > > > > > > > Consensus reached: > > > > > > Have N independent mirror maker threads each has their own > consumers > > > but > > > > > > share a producer. The mirror maker threads will be responsible > for > > > > > > decompression, compression and offset commit. No data channel and > > > > > separate > > > > > > offset commit thread is needed. Consumer rebalance callback will > be > > > used > > > > > > to avoid duplicates on rebalance. > > > > > > > > > > > > Still under discussion: > > > > > > Whether message handler is needed. > > > > > > > > > > > > My arguments for adding a message handler are that: > > > > > > 1. It is more efficient to do something in common for all the > > > clients in > > > > > > pipeline than letting each client do the same thing for many > times. > > > And > > > > > > there are concrete use cases for the message handler already. > > > > > > 2. It is not a big complicated add-on to mirror maker. > > > > > > 3. Without a message handler, for customers needs it, they have > to > > > > > > re-implement all the logics of mirror maker by themselves just in > > > order > > > > > to > > > > > > add this handling in pipeline. > > > > > > > > > > > > Any thoughts? > > > > > > > > > > > > Thanks. > > > > > > > > > > > > ―Jiangjie (Becket) Qin > > > > > > > > > > > > On 2/8/15, 6:35 PM, "Jiangjie Qin" <j...@linkedin.com> wrote: > > > > > > > > > > > > >Hi Jay, thanks a lot for the comments. > > > > > > >I think this solution is better. We probably don’t need data > channel > > > > > > >anymore. It can be replaced with a list of producer if we need > more > > > > > sender > > > > > > >thread. > > > > > > >I’ll update the KIP page. > > > > > > > > > > > > > >The reasoning about message handler is mainly for efficiency > > > purpose. > > > > > I’m > > > > > > >thinking that if something can be done in pipeline for all the > > > clients > > > > > > >such as filtering/reformatting, it is probably better to do it > in > > > the > > > > > > >pipeline than asking 100 clients do the same thing for 100 > times. > > > > > > > > > > > > > >―Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > > > >On 2/8/15, 4:59 PM, "Jay Kreps" <jay.kr...@gmail.com> wrote: > > > > > > > > > > > > > >>Yeah, I second Neha's comments. The current mm code has taken > > > something > > > > > > >>pretty simple and made it pretty scary with callbacks and > > > wait/notify > > > > > > >>stuff. Do we believe this works? I can't tell by looking at it > > > which is > > > > > > >>kind of bad for something important like this. I don't mean > this as > > > > > > >>criticism, I know the history: we added in memory queues to > help > > > with > > > > > > >>other > > > > > > >>performance problems without thinking about correctness, then > we > > > added > > > > > > >>stuff to work around the in-memory queues not lose data, and > so on. > > > > > > >> > > > > > > >>Can we instead do the opposite exercise and start with the > basics > > > of > > > > > what > > > > > > >>mm should do and think about what deficiencies prevents this > > > approach > > > > > > >>from > > > > > > >>working? Then let's make sure the currently in-flight work will > > > remove > > > > > > >>these deficiencies. After all mm is kind of the prototypical > kafka > > > use > > > > > > >>case > > > > > > >>so if we can't make our clients to this probably no one else > can. > > > > > > >> > > > > > > >>I think mm should just be N independent threads each of which > has > > > their > > > > > > >>own > > > > > > >>consumer but share a producer and each of which looks like > this: > > > > > > >> > > > > > > >>while(true) { > > > > > > >> val recs = consumer.poll(Long.MaxValue); > > > > > > >> for (rec <- recs) > > > > > > >> producer.send(rec, logErrorCallback) > > > > > > >> if(System.currentTimeMillis - lastCommit > commitInterval) > { > > > > > > >> producer.flush() > > > > > > >> consumer.commit() > > > > > > >> lastCommit = System.currentTimeMillis > > > > > > >> } > > > > > > >>} > > > > > > >> > > > > > > >>This will depend on setting the retry count in the producer to > > > > > something > > > > > > >>high with a largish backoff so that a failed send attempt > doesn't > > > drop > > > > > > >>data. > > > > > > >> > > > > > > >>We will need to use the callback to force a flush and offset > > > commit on > > > > > > >>rebalance. > > > > > > >> > > > > > > >>This approach may have a few more TCP connections due to using > > > multiple > > > > > > >>consumers but I think it is a lot easier to reason about and > the > > > total > > > > > > >>number of mm instances is always going to be small. > > > > > > >> > > > > > > >>Let's talk about where this simple approach falls short, I > think > > > that > > > > > > >>will > > > > > > >>help us understand your motivations for additional elements. > > > > > > >> > > > > > > >>Another advantage of this is that it is so simple I don't > think we > > > > > really > > > > > > >>even need to both making mm extensible because writing your own > > > code > > > > > that > > > > > > >>does custom processing or transformation is just ten lines and > no > > > plug > > > > > in > > > > > > >>system is going to make it simpler. > > > > > > >> > > > > > > >>-Jay > > > > > > >> > > > > > > >> > > > > > > >>On Sun, Feb 8, 2015 at 2:40 PM, Neha Narkhede < > n...@confluent.io> > > > > > wrote: > > > > > > >> > > > > > > >>> Few comments - > > > > > > >>> > > > > > > >>> 1. Why do we need the message handler? Do you have concrete > use > > > cases > > > > > > >>>in > > > > > > >>> mind? If not, we should consider adding it in the future > when/if > > > we > > > > > do > > > > > > >>>have > > > > > > >>> use cases for it. The purpose of the mirror maker is a simple > > > tool > > > > > for > > > > > > >>> setting up Kafka cluster replicas. I don't see why we need to > > > > > include a > > > > > > >>> message handler for doing stream transformations or > filtering. > > > You > > > > > can > > > > > > >>> always write a simple process for doing that once the data is > > > copied > > > > > as > > > > > > >>>is > > > > > > >>> in the target cluster > > > > > > >>> 2. Why keep both designs? We should prefer the simpler design > > > unless > > > > > it > > > > > > >>>is > > > > > > >>> not feasible due to the performance issue that we previously > > > had. Did > > > > > > >>>you > > > > > > >>> get a chance to run some tests to see if that is really > still a > > > > > problem > > > > > > >>>or > > > > > > >>> not? It will be easier to think about the design and also > make > > > the > > > > > KIP > > > > > > >>> complete if we make a call on the design first. > > > > > > >>> 3. Can you explain the need for keeping a list of unacked > > > offsets per > > > > > > >>> partition? Consider adding a section on retries and how you > plan > > > to > > > > > > >>>handle > > > > > > >>> the case when the producer runs out of all retries. > > > > > > >>> > > > > > > >>> Thanks, > > > > > > >>> Neha > > > > > > >>> > > > > > > >>> On Sun, Feb 8, 2015 at 2:06 PM, Jiangjie Qin > > > > > > >>><j...@linkedin.com.invalid> > > > > > > >>> wrote: > > > > > > >>> > > > > > > >>> > Hi Neha, > > > > > > >>> > > > > > > > >>> > Yes, I’ve updated the KIP so the entire KIP is based on new > > > > > consumer > > > > > > >>>now. > > > > > > >>> > I’ve put both designs with and without data channel in the > KIP > > > as I > > > > > > >>>still > > > > > > >>> > feel we might need the data channel to provide more > > > flexibility, > > > > > > >>> > especially after message handler is introduced. I’ve put my > > > > > thinking > > > > > > >>>of > > > > > > >>> > the pros and cons of the two designs in the KIP as well. > It’ll > > > be > > > > > > >>>great > > > > > > >>> if > > > > > > >>> > you can give a review and comment. > > > > > > >>> > > > > > > > >>> > Thanks. > > > > > > >>> > > > > > > > >>> > Jiangjie (Becket) Qin > > > > > > >>> > > > > > > > >>> > On 2/6/15, 7:30 PM, "Neha Narkhede" <n...@confluent.io> > wrote: > > > > > > >>> > > > > > > > >>> > >Hey Becket, > > > > > > >>> > > > > > > > > >>> > >What are the next steps on this KIP. As per your comment > > > earlier > > > > > on > > > > > > >>>the > > > > > > >>> > >thread - > > > > > > >>> > > > > > > > > >>> > >I do agree it makes more sense > > > > > > >>> > >> to avoid duplicate effort and plan based on new > consumer. > > > I’ll > > > > > > >>>modify > > > > > > >>> > >>the > > > > > > >>> > >> KIP. > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > >Did you get a chance to think about the simplified design > > > that we > > > > > > >>> proposed > > > > > > >>> > >earlier? Do you plan to update the KIP with that proposal? > > > > > > >>> > > > > > > > > >>> > >Thanks, > > > > > > >>> > >Neha > > > > > > >>> > > > > > > > > >>> > >On Wed, Feb 4, 2015 at 12:12 PM, Jiangjie Qin > > > > > > >>><j...@linkedin.com.invalid > > > > > > >>> > > > > > > > >>> > >wrote: > > > > > > >>> > > > > > > > > >>> > >> In mirror maker we do not do de-serialization on the > > > messages. > > > > > > >>>Mirror > > > > > > >>> > >> maker use source TopicPartition hash to chose a > producer to > > > send > > > > > > >>> > >>messages > > > > > > >>> > >> from the same source partition. The partition those > > > messages end > > > > > > >>>up > > > > > > >>> with > > > > > > >>> > >> are decided by Partitioner class in KafkaProducer > (assuming > > > you > > > > > > >>>are > > > > > > >>> > >>using > > > > > > >>> > >> the new producer), which uses hash code of bytes[]. > > > > > > >>> > >> > > > > > > >>> > >> If deserialization is needed, it has to be done in > message > > > > > > >>>handler. > > > > > > >>> > >> > > > > > > >>> > >> Thanks. > > > > > > >>> > >> > > > > > > >>> > >> Jiangjie (Becket) Qin > > > > > > >>> > >> > > > > > > >>> > >> On 2/4/15, 11:33 AM, "Bhavesh Mistry" < > > > > > mistry.p.bhav...@gmail.com> > > > > > > >>> > >>wrote: > > > > > > >>> > >> > > > > > > >>> > >> >Hi Jiangjie, > > > > > > >>> > >> > > > > > > > >>> > >> >Thanks for entertaining my question so far. Last > > > question, I > > > > > > >>>have is > > > > > > >>> > >> >about > > > > > > >>> > >> >serialization of message key. If the key > de-serialization > > > > > > >>>(Class) is > > > > > > >>> > >>not > > > > > > >>> > >> >present at the MM instance, then does it use raw byte > > > hashcode > > > > > to > > > > > > >>> > >> >determine > > > > > > >>> > >> >the partition ? How are you going to address the > situation > > > > > where > > > > > > >>>key > > > > > > >>> > >> >needs > > > > > > >>> > >> >to be de-serialization and get actual hashcode needs > to be > > > > > > >>>computed > > > > > > >>> ?. > > > > > > >>> > >> > > > > > > > >>> > >> > > > > > > > >>> > >> >Thanks, > > > > > > >>> > >> > > > > > > > >>> > >> >Bhavesh > > > > > > >>> > >> > > > > > > > >>> > >> >On Fri, Jan 30, 2015 at 1:41 PM, Jiangjie Qin > > > > > > >>> > >><j...@linkedin.com.invalid> > > > > > > >>> > >> >wrote: > > > > > > >>> > >> > > > > > > > >>> > >> >> Hi Bhavesh, > > > > > > >>> > >> >> > > > > > > >>> > >> >> Please see inline comments. > > > > > > >>> > >> >> > > > > > > >>> > >> >> Jiangjie (Becket) Qin > > > > > > >>> > >> >> > > > > > > >>> > >> >> On 1/29/15, 7:00 PM, "Bhavesh Mistry" > > > > > > >>><mistry.p.bhav...@gmail.com> > > > > > > >>> > >> >>wrote: > > > > > > >>> > >> >> > > > > > > >>> > >> >> >Hi Jiangjie, > > > > > > >>> > >> >> > > > > > > > >>> > >> >> >Thanks for the input. > > > > > > >>> > >> >> > > > > > > > >>> > >> >> >a) Is MM will producer ack will be attach to > Producer > > > > > > >>>Instance or > > > > > > >>> > >>per > > > > > > >>> > >> >> >topic. Use case is that one instance of MM > > > > > > >>> > >> >> >needs to handle both strong ack and also ack=0 for > some > > > > > topic. > > > > > > >>> Or > > > > > > >>> > >>it > > > > > > >>> > >> >> >would > > > > > > >>> > >> >> >be better to set-up another instance of MM. > > > > > > >>> > >> >> The acks setting is producer level setting instead of > > > topic > > > > > > >>>level > > > > > > >>> > >> >>setting. > > > > > > >>> > >> >> In this case you probably need to set up another > > > instance. > > > > > > >>> > >> >> > > > > > > > >>> > >> >> >b) Regarding TCP connections, Why does #producer > > > instance > > > > > > >>>attach > > > > > > >>> to > > > > > > >>> > >>TCP > > > > > > >>> > >> >> >connection. Is it possible to use Broker > Connection TCP > > > > > Pool, > > > > > > >>> > >>producer > > > > > > >>> > >> >> >will just checkout TCP connection to Broker. So, > # of > > > > > > >>>Producer > > > > > > >>> > >> >>Instance > > > > > > >>> > >> >> >does not correlation to Brokers Connection. Is this > > > > > possible > > > > > > >>>? > > > > > > >>> > >> >> In new producer, each producer maintains a > connection to > > > each > > > > > > >>> broker > > > > > > >>> > >> >> within the producer instance. Making producer > instances > > > to > > > > > > >>>share > > > > > > >>> the > > > > > > >>> > >>TCP > > > > > > >>> > >> >> connections is a very big change to the current > design, > > > so I > > > > > > >>> suppose > > > > > > >>> > >>we > > > > > > >>> > >> >> won’t be able to do that. > > > > > > >>> > >> >> > > > > > > > >>> > >> >> > > > > > > > >>> > >> >> >Thanks, > > > > > > >>> > >> >> > > > > > > > >>> > >> >> >Bhavesh > > > > > > >>> > >> >> > > > > > > > >>> > >> >> >On Thu, Jan 29, 2015 at 11:50 AM, Jiangjie Qin > > > > > > >>> > >> >><j...@linkedin.com.invalid > > > > > > >>> > >> >> > > > > > > > >>> > >> >> >wrote: > > > > > > >>> > >> >> > > > > > > > >>> > >> >> >> Hi Bhavesh, > > > > > > >>> > >> >> >> > > > > > > >>> > >> >> >> I think it is the right discussion to have when > we are > > > > > > >>>talking > > > > > > >>> > >>about > > > > > > >>> > >> >>the > > > > > > >>> > >> >> >> new new design for MM. > > > > > > >>> > >> >> >> Please see the inline comments. > > > > > > >>> > >> >> >> > > > > > > >>> > >> >> >> Jiangjie (Becket) Qin > > > > > > >>> > >> >> >> > > > > > > >>> > >> >> >> On 1/28/15, 10:48 PM, "Bhavesh Mistry" > > > > > > >>> > >><mistry.p.bhav...@gmail.com> > > > > > > >>> > >> >> >>wrote: > > > > > > >>> > >> >> >> > > > > > > >>> > >> >> >> >Hi Jiangjie, > > > > > > >>> > >> >> >> > > > > > > > >>> > >> >> >> >I just wanted to let you know about our use case > and > > > > > stress > > > > > > >>>the > > > > > > >>> > >> >>point > > > > > > >>> > >> >> >>that > > > > > > >>> > >> >> >> >local data center broker cluster have fewer > > > partitions > > > > > than > > > > > > >>>the > > > > > > >>> > >> >> >> >destination > > > > > > >>> > >> >> >> >offline broker cluster. Just because we do the > batch > > > pull > > > > > > >>>from > > > > > > >>> > >>CAMUS > > > > > > >>> > >> >> >>and > > > > > > >>> > >> >> >> >in > > > > > > >>> > >> >> >> >order to drain data faster than the injection > rate > > > (from > > > > > > >>>four > > > > > > >>> DCs > > > > > > >>> > >> >>for > > > > > > >>> > >> >> >>same > > > > > > >>> > >> >> >> >topic). > > > > > > >>> > >> >> >> Keeping the same partition number in source and > target > > > > > > >>>cluster > > > > > > >>> > >>will > > > > > > >>> > >> >>be > > > > > > >>> > >> >> >>an > > > > > > >>> > >> >> >> option but will not be enforced by default. > > > > > > >>> > >> >> >> > > > > > > > >>> > >> >> >> >We are facing following issues (probably due to > > > > > > >>>configuration): > > > > > > >>> > >> >> >> > > > > > > > >>> > >> >> >> >1) We occasionally loose data due to message > > > batch > > > > > > >>>size is > > > > > > >>> > >>too > > > > > > >>> > >> >> >>large > > > > > > >>> > >> >> >> >(2MB) on target data (we are using old producer > but I > > > > > think > > > > > > >>>new > > > > > > >>> > >> >> >>producer > > > > > > >>> > >> >> >> >will solve this problem to some extend). > > > > > > >>> > >> >> >> We do see this issue in LinkedIn as well. New > producer > > > > > also > > > > > > >>> might > > > > > > >>> > >> >>have > > > > > > >>> > >> >> >> this issue. There are some proposal of solutions, > but > > > no > > > > > > >>>real > > > > > > >>> work > > > > > > >>> > >> >> >>started > > > > > > >>> > >> >> >> yet. For now, as a workaround, setting a more > > > aggressive > > > > > > >>>batch > > > > > > >>> > >>size > > > > > > >>> > >> >>on > > > > > > >>> > >> >> >> producer side should work. > > > > > > >>> > >> >> >> >2) Since only one instance is set to MM > data, > > > we > > > > > are > > > > > > >>>not > > > > > > >>> > >>able > > > > > > >>> > >> >>to > > > > > > >>> > >> >> >> >set-up ack per topic instead ack is attached to > > > producer > > > > > > >>> > >>instance. > > > > > > >>> > >> >> >> I don’t quite get the question here. > > > > > > >>> > >> >> >> >3) How are you going to address two phase > commit > > > > > > >>>problem > > > > > > >>> if > > > > > > >>> > >> >>ack is > > > > > > >>> > >> >> >> >set > > > > > > >>> > >> >> >> >to strongest, but auto commit is on for consumer > > > (meaning > > > > > > >>> > >>producer > > > > > > >>> > >> >>does > > > > > > >>> > >> >> >> >not > > > > > > >>> > >> >> >> >get ack, but consumer auto committed offset that > > > > > message). > > > > > > >>> Is > > > > > > >>> > >> >>there > > > > > > >>> > >> >> >> >transactional (Kafka transaction is in process) > > > based ack > > > > > > >>>and > > > > > > >>> > >>commit > > > > > > >>> > >> >> >> >offset > > > > > > >>> > >> >> >> >? > > > > > > >>> > >> >> >> Auto offset commit should be turned off in this > case. > > > The > > > > > > >>>offset > > > > > > >>> > >>will > > > > > > >>> > >> >> >>only > > > > > > >>> > >> >> >> be committed once by the offset commit thread. So > > > there is > > > > > > >>>no > > > > > > >>> two > > > > > > >>> > >> >>phase > > > > > > >>> > >> >> >> commit. > > > > > > >>> > >> >> >> >4) How are you planning to avoid duplicated > > > message? > > > > > > >>>( Is > > > > > > >>> > >> >> >> >brokergoing > > > > > > >>> > >> >> >> >have moving window of message collected and > de-dupe > > > ?) > > > > > > >>> > >>Possibly, we > > > > > > >>> > >> >> >>get > > > > > > >>> > >> >> >> >this from retry set to 5…? > > > > > > >>> > >> >> >> We are not trying to completely avoid duplicates. > The > > > > > > >>>duplicates > > > > > > >>> > >>will > > > > > > >>> > >> >> >> still be there if: > > > > > > >>> > >> >> >> 1. Producer retries on failure. > > > > > > >>> > >> >> >> 2. Mirror maker is hard killed. > > > > > > >>> > >> >> >> Currently, dedup is expected to be done by user if > > > > > > >>>necessary. > > > > > > >>> > >> >> >> >5) Last, is there any warning or any thing > you > > > can > > > > > > >>>provide > > > > > > >>> > >> >>insight > > > > > > >>> > >> >> >> >from MM component about data injection rate into > > > > > > >>>destination > > > > > > >>> > >> >> >>partitions is > > > > > > >>> > >> >> >> >NOT evenly distributed regardless of keyed or > > > non-keyed > > > > > > >>> message > > > > > > >>> > >> >> >>(Hence > > > > > > >>> > >> >> >> >there is ripple effect such as data not arriving > > > late, or > > > > > > >>>data > > > > > > >>> is > > > > > > >>> > >> >> >>arriving > > > > > > >>> > >> >> >> >out of order in intern of time stamp and early > some > > > > > time, > > > > > > >>>and > > > > > > >>> > >> >>CAMUS > > > > > > >>> > >> >> >> >creates huge number of file count on HDFS due to > > > uneven > > > > > > >>> injection > > > > > > >>> > >> >>rate > > > > > > >>> > >> >> >>. > > > > > > >>> > >> >> >> >Camus Job is configured to run every 3 minutes.) > > > > > > >>> > >> >> >> I think uneven data distribution is typically > caused > > > by > > > > > > >>>server > > > > > > >>> > >>side > > > > > > >>> > >> >> >> unbalance, instead of something mirror maker could > > > > > control. > > > > > > >>>In > > > > > > >>> new > > > > > > >>> > >> >> >>mirror > > > > > > >>> > >> >> >> maker, however, there is a customizable message > > > handler, > > > > > > >>>that > > > > > > >>> > >>might > > > > > > >>> > >> >>be > > > > > > >>> > >> >> >> able to help a little bit. In message handler, > you can > > > > > > >>> explicitly > > > > > > >>> > >> >>set a > > > > > > >>> > >> >> >> partition that you want to produce the message > to. So > > > if > > > > > you > > > > > > >>> know > > > > > > >>> > >>the > > > > > > >>> > >> >> >> uneven data distribution in target cluster, you > may > > > offset > > > > > > >>>it > > > > > > >>> > >>here. > > > > > > >>> > >> >>But > > > > > > >>> > >> >> >> that probably only works for non-keyed messages. > > > > > > >>> > >> >> >> > > > > > > > >>> > >> >> >> >I am not sure if this is right discussion form to > > > bring > > > > > > >>>these > > > > > > >>> to > > > > > > >>> > >> >> >> >your/kafka > > > > > > >>> > >> >> >> >Dev team attention. This might be off track, > > > > > > >>> > >> >> >> > > > > > > > >>> > >> >> >> > > > > > > > >>> > >> >> >> >Thanks, > > > > > > >>> > >> >> >> > > > > > > > >>> > >> >> >> >Bhavesh > > > > > > >>> > >> >> >> > > > > > > > >>> > >> >> >> >On Wed, Jan 28, 2015 at 11:07 AM, Jiangjie Qin > > > > > > >>> > >> >> >><j...@linkedin.com.invalid > > > > > > >>> > >> >> >> > > > > > > > >>> > >> >> >> >wrote: > > > > > > >>> > >> >> >> > > > > > > > >>> > >> >> >> >> I’ve updated the KIP page. Feedbacks are > welcome. > > > > > > >>> > >> >> >> >> > > > > > > >>> > >> >> >> >> Regarding the simple mirror maker design. I > thought > > > > > over > > > > > > >>>it > > > > > > >>> and > > > > > > >>> > >> >>have > > > > > > >>> > >> >> >> >>some > > > > > > >>> > >> >> >> >> worries: > > > > > > >>> > >> >> >> >> There are two things that might worth thinking: > > > > > > >>> > >> >> >> >> 1. One of the enhancement to mirror maker is > > > adding a > > > > > > >>>message > > > > > > >>> > >> >> >>handler to > > > > > > >>> > >> >> >> >> do things like reformatting. I think we might > > > > > potentially > > > > > > >>> want > > > > > > >>> > >>to > > > > > > >>> > >> >> >>have > > > > > > >>> > >> >> >> >> more threads processing the messages than the > > > number of > > > > > > >>> > >>consumers. > > > > > > >>> > >> >> >>If we > > > > > > >>> > >> >> >> >> follow the simple mirror maker solution, we > lose > > > this > > > > > > >>> > >>flexibility. > > > > > > >>> > >> >> >> >> 2. This might not matter too much, but creating > > > more > > > > > > >>> consumers > > > > > > >>> > >> >>means > > > > > > >>> > >> >> >> >>more > > > > > > >>> > >> >> >> >> footprint of TCP connection / memory. > > > > > > >>> > >> >> >> >> > > > > > > >>> > >> >> >> >> Any thoughts on this? > > > > > > >>> > >> >> >> >> > > > > > > >>> > >> >> >> >> Thanks. > > > > > > >>> > >> >> >> >> > > > > > > >>> > >> >> >> >> Jiangjie (Becket) Qin > > > > > > >>> > >> >> >> >> > > > > > > >>> > >> >> >> >> On 1/26/15, 10:35 AM, "Jiangjie Qin" < > > > > > j...@linkedin.com> > > > > > > >>> > wrote: > > > > > > >>> > >> >> >> >> > > > > > > >>> > >> >> >> >> >Hi Jay and Neha, > > > > > > >>> > >> >> >> >> > > > > > > > >>> > >> >> >> >> >Thanks a lot for the reply and explanation. I > do > > > agree > > > > > > >>>it > > > > > > >>> > >>makes > > > > > > >>> > >> >>more > > > > > > >>> > >> >> >> >>sense > > > > > > >>> > >> >> >> >> >to avoid duplicate effort and plan based on > new > > > > > > >>>consumer. > > > > > > >>> I’ll > > > > > > >>> > >> >> >>modify > > > > > > >>> > >> >> >> >>the > > > > > > >>> > >> >> >> >> >KIP. > > > > > > >>> > >> >> >> >> > > > > > > > >>> > >> >> >> >> >To Jay’s question on message ordering - The > data > > > > > channel > > > > > > >>> > >> >>selection > > > > > > >>> > >> >> >> >>makes > > > > > > >>> > >> >> >> >> >sure that the messages from the same source > > > partition > > > > > > >>>will > > > > > > >>> > >>sent > > > > > > >>> > >> >>by > > > > > > >>> > >> >> >>the > > > > > > >>> > >> >> >> >> >same producer. So the order of the messages is > > > > > > >>>guaranteed > > > > > > >>> with > > > > > > >>> > >> >> >>proper > > > > > > >>> > >> >> >> >> >producer settings > > > > > > >>> > >> >>(MaxInFlightRequests=1,retries=Integer.MaxValue, > > > > > > >>> > >> >> >> >>etc.) > > > > > > >>> > >> >> >> >> >For keyed messages, because they come from the > > > same > > > > > > >>>source > > > > > > >>> > >> >>partition > > > > > > >>> > >> >> >> >>and > > > > > > >>> > >> >> >> >> >will end up in the same target partition, as > long > > > as > > > > > > >>>they > > > > > > >>> are > > > > > > >>> > >> >>sent > > > > > > >>> > >> >> >>by > > > > > > >>> > >> >> >> >>the > > > > > > >>> > >> >> >> >> >same producer, the order is guaranteed. > > > > > > >>> > >> >> >> >> >For non-keyed messages, the messages coming > from > > > the > > > > > > >>>same > > > > > > >>> > >>source > > > > > > >>> > >> >> >> >>partition > > > > > > >>> > >> >> >> >> >might go to different target partitions. The > > > order is > > > > > > >>>only > > > > > > >>> > >> >> >>guaranteed > > > > > > >>> > >> >> >> >> >within each partition. > > > > > > >>> > >> >> >> >> > > > > > > > >>> > >> >> >> >> >Anyway, I’ll modify the KIP and data channel > will > > > be > > > > > > >>>away. > > > > > > >>> > >> >> >> >> > > > > > > > >>> > >> >> >> >> >Thanks. > > > > > > >>> > >> >> >> >> > > > > > > > >>> > >> >> >> >> >Jiangjie (Becket) Qin > > > > > > >>> > >> >> >> >> > > > > > > > >>> > >> >> >> >> > > > > > > > >>> > >> >> >> >> >On 1/25/15, 4:34 PM, "Neha Narkhede" < > > > > > n...@confluent.io> > > > > > > >>> > >>wrote: > > > > > > >>> > >> >> >> >> > > > > > > > >>> > >> >> >> >> >>I think there is some value in investigating > if > > > we > > > > > can > > > > > > >>>go > > > > > > >>> > >>back > > > > > > >>> > >> >>to > > > > > > >>> > >> >> >>the > > > > > > >>> > >> >> >> >> >>simple mirror maker design, as Jay points > out. > > > Here > > > > > you > > > > > > >>> have > > > > > > >>> > >>N > > > > > > >>> > >> >> >> >>threads, > > > > > > >>> > >> >> >> >> >>each has a consumer and a producer. > > > > > > >>> > >> >> >> >> >> > > > > > > >>> > >> >> >> >> >>The reason why we had to move away from that > was > > > a > > > > > > >>> > >>combination > > > > > > >>> > >> >>of > > > > > > >>> > >> >> >>the > > > > > > >>> > >> >> >> >> >>difference in throughput between the consumer > > > and the > > > > > > >>>old > > > > > > >>> > >> >>producer > > > > > > >>> > >> >> >>and > > > > > > >>> > >> >> >> >> >>the > > > > > > >>> > >> >> >> >> >>deficiency of the consumer rebalancing that > > > limits > > > > > the > > > > > > >>> total > > > > > > >>> > >> >> >>number of > > > > > > >>> > >> >> >> >> >>mirror maker threads. So the only option > > > available > > > > > was > > > > > > >>>to > > > > > > >>> > >> >>increase > > > > > > >>> > >> >> >>the > > > > > > >>> > >> >> >> >> >>throughput of the limited # of mirror maker > > > threads > > > > > > >>>that > > > > > > >>> > >>could > > > > > > >>> > >> >>be > > > > > > >>> > >> >> >> >> >>deployed. > > > > > > >>> > >> >> >> >> >>Now that queuing design may not make sense, > if > > > the > > > > > new > > > > > > >>> > >> >>producer's > > > > > > >>> > >> >> >> >> >>throughput is almost similar to the consumer > AND > > > the > > > > > > >>>fact > > > > > > >>> > >>that > > > > > > >>> > >> >>the > > > > > > >>> > >> >> >>new > > > > > > >>> > >> >> >> >> >>round-robin based consumer rebalancing can > allow > > > a > > > > > very > > > > > > >>> high > > > > > > >>> > >> >> >>number of > > > > > > >>> > >> >> >> >> >>mirror maker instances to exist. > > > > > > >>> > >> >> >> >> >> > > > > > > >>> > >> >> >> >> >>This is the end state that the mirror maker > > > should be > > > > > > >>>in > > > > > > >>> once > > > > > > >>> > >> >>the > > > > > > >>> > >> >> >>new > > > > > > >>> > >> >> >> >> >>consumer is complete, so it wouldn't hurt to > see > > > if > > > > > we > > > > > > >>>can > > > > > > >>> > >>just > > > > > > >>> > >> >> >>move > > > > > > >>> > >> >> >> >>to > > > > > > >>> > >> >> >> >> >>that right now. > > > > > > >>> > >> >> >> >> >> > > > > > > >>> > >> >> >> >> >>On Fri, Jan 23, 2015 at 8:40 PM, Jay Kreps > > > > > > >>> > >><jay.kr...@gmail.com > > > > > > >>> > >> > > > > > > > >>> > >> >> >> >>wrote: > > > > > > >>> > >> >> >> >> >> > > > > > > >>> > >> >> >> >> >>> QQ: If we ever use a different technique > for > > > the > > > > > data > > > > > > >>> > >>channel > > > > > > >>> > >> >> >> >>selection > > > > > > >>> > >> >> >> >> >>> than for the producer partitioning won't > that > > > break > > > > > > >>> > >>ordering? > > > > > > >>> > >> >>How > > > > > > >>> > >> >> >> >>can > > > > > > >>> > >> >> >> >> >>>we > > > > > > >>> > >> >> >> >> >>> ensure these things stay in sync? > > > > > > >>> > >> >> >> >> >>> > > > > > > >>> > >> >> >> >> >>> With respect to the new consumer--I really > do > > > want > > > > > to > > > > > > >>> > >> >>encourage > > > > > > >>> > >> >> >> >>people > > > > > > >>> > >> >> >> >> >>>to > > > > > > >>> > >> >> >> >> >>> think through how MM will work with the new > > > > > consumer. > > > > > > >>>I > > > > > > >>> > >>mean > > > > > > >>> > >> >>this > > > > > > >>> > >> >> >> >>isn't > > > > > > >>> > >> >> >> >> >>> very far off, maybe a few months if we > hustle? > > > I > > > > > > >>>could > > > > > > >>> > >> >>imagine us > > > > > > >>> > >> >> >> >> >>>getting > > > > > > >>> > >> >> >> >> >>> this mm fix done maybe sooner, maybe in a > > > month? > > > > > So I > > > > > > >>> guess > > > > > > >>> > >> >>this > > > > > > >>> > >> >> >> >>buys > > > > > > >>> > >> >> >> >> >>>us an > > > > > > >>> > >> >> >> >> >>> extra month before we rip it out and throw > it > > > away? > > > > > > >>>Maybe > > > > > > >>> > >>two? > > > > > > >>> > >> >> >>This > > > > > > >>> > >> >> >> >>bug > > > > > > >>> > >> >> >> >> >>>has > > > > > > >>> > >> >> >> >> >>> been there for a while, though, right? Is > it > > > worth > > > > > > >>>it? > > > > > > >>> > >> >>Probably > > > > > > >>> > >> >> >>it > > > > > > >>> > >> >> >> >>is, > > > > > > >>> > >> >> >> >> >>>but > > > > > > >>> > >> >> >> >> >>> it still kind of sucks to have the > duplicate > > > > > effort. > > > > > > >>> > >> >> >> >> >>> > > > > > > >>> > >> >> >> >> >>> So anyhow let's definitely think about how > > > things > > > > > > >>>will > > > > > > >>> work > > > > > > >>> > >> >>with > > > > > > >>> > >> >> >>the > > > > > > >>> > >> >> >> >> >>>new > > > > > > >>> > >> >> >> >> >>> consumer. I think we can probably just > have N > > > > > > >>>threads, > > > > > > >>> each > > > > > > >>> > >> >> >>thread > > > > > > >>> > >> >> >> >>has > > > > > > >>> > >> >> >> >> >>>a > > > > > > >>> > >> >> >> >> >>> producer and consumer and is internally > single > > > > > > >>>threaded. > > > > > > >>> > >>Any > > > > > > >>> > >> >> >>reason > > > > > > >>> > >> >> >> >> >>>this > > > > > > >>> > >> >> >> >> >>> wouldn't work? > > > > > > >>> > >> >> >> >> >>> > > > > > > >>> > >> >> >> >> >>> -Jay > > > > > > >>> > >> >> >> >> >>> > > > > > > >>> > >> >> >> >> >>> > > > > > > >>> > >> >> >> >> >>> On Wed, Jan 21, 2015 at 5:29 PM, Jiangjie > Qin > > > > > > >>> > >> >> >> >> >>><j...@linkedin.com.invalid> > > > > > > >>> > >> >> >> >> >>> wrote: > > > > > > >>> > >> >> >> >> >>> > > > > > > >>> > >> >> >> >> >>> > Hi Jay, > > > > > > >>> > >> >> >> >> >>> > > > > > > > >>> > >> >> >> >> >>> > Thanks for comments. Please see inline > > > responses. > > > > > > >>> > >> >> >> >> >>> > > > > > > > >>> > >> >> >> >> >>> > Jiangjie (Becket) Qin > > > > > > >>> > >> >> >> >> >>> > > > > > > > >>> > >> >> >> >> >>> > On 1/21/15, 1:33 PM, "Jay Kreps" > > > > > > >>><jay.kr...@gmail.com> > > > > > > >>> > >> >>wrote: > > > > > > >>> > >> >> >> >> >>> > > > > > > > >>> > >> >> >> >> >>> > >Hey guys, > > > > > > >>> > >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> >>> > >A couple questions/comments: > > > > > > >>> > >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> >>> > >1. The callback and user-controlled > commit > > > > > offset > > > > > > >>> > >> >> >>functionality > > > > > > >>> > >> >> >> >>is > > > > > > >>> > >> >> >> >> >>> already > > > > > > >>> > >> >> >> >> >>> > >in the new consumer which we are > working on > > > in > > > > > > >>> parallel. > > > > > > >>> > >> >>If we > > > > > > >>> > >> >> >> >> >>> accelerated > > > > > > >>> > >> >> >> >> >>> > >that work it might help concentrate > > > efforts. I > > > > > > >>>admit > > > > > > >>> > >>this > > > > > > >>> > >> >> >>might > > > > > > >>> > >> >> >> >>take > > > > > > >>> > >> >> >> >> >>> > >slightly longer in calendar time but > could > > > still > > > > > > >>> > >>probably > > > > > > >>> > >> >>get > > > > > > >>> > >> >> >> >>done > > > > > > >>> > >> >> >> >> >>>this > > > > > > >>> > >> >> >> >> >>> > >quarter. Have you guys considered that > > > approach? > > > > > > >>> > >> >> >> >> >>> > Yes, I totally agree that ideally we > should > > > put > > > > > > >>>efforts > > > > > > >>> > >>on > > > > > > >>> > >> >>new > > > > > > >>> > >> >> >> >> >>>consumer. > > > > > > >>> > >> >> >> >> >>> > The main reason for still working on the > old > > > > > > >>>consumer > > > > > > >>> is > > > > > > >>> > >> >>that > > > > > > >>> > >> >> >>we > > > > > > >>> > >> >> >> >> >>>expect > > > > > > >>> > >> >> >> >> >>> it > > > > > > >>> > >> >> >> >> >>> > would still be used in LinkedIn for > quite a > > > while > > > > > > >>> before > > > > > > >>> > >>the > > > > > > >>> > >> >> >>new > > > > > > >>> > >> >> >> >> >>>consumer > > > > > > >>> > >> >> >> >> >>> > could be fully rolled out. And we > recently > > > > > > >>>suffering a > > > > > > >>> > >>lot > > > > > > >>> > >> >>from > > > > > > >>> > >> >> >> >> >>>mirror > > > > > > >>> > >> >> >> >> >>> > maker data loss issue. So our current > plan is > > > > > > >>>making > > > > > > >>> > >> >>necessary > > > > > > >>> > >> >> >> >> >>>changes to > > > > > > >>> > >> >> >> >> >>> > make current mirror maker stable in > > > production. > > > > > > >>>Then we > > > > > > >>> > >>can > > > > > > >>> > >> >> >>test > > > > > > >>> > >> >> >> >>and > > > > > > >>> > >> >> >> >> >>> > rollout new consumer gradually without > > > getting > > > > > > >>>burnt. > > > > > > >>> > >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> >>> > >2. I think partitioning on the hash of > the > > > topic > > > > > > >>> > >>partition > > > > > > >>> > >> >>is > > > > > > >>> > >> >> >> >>not a > > > > > > >>> > >> >> >> >> >>>very > > > > > > >>> > >> >> >> >> >>> > >good idea because that will make the > case of > > > > > going > > > > > > >>> from > > > > > > >>> > >>a > > > > > > >>> > >> >> >>cluster > > > > > > >>> > >> >> >> >> >>>with > > > > > > >>> > >> >> >> >> >>> > >fewer partitions to one with more > > > partitions not > > > > > > >>> work. I > > > > > > >>> > >> >> >>think an > > > > > > >>> > >> >> >> >> >>> > >intuitive > > > > > > >>> > >> >> >> >> >>> > >way to do this would be the following: > > > > > > >>> > >> >> >> >> >>> > >a. Default behavior: Just do what the > > > producer > > > > > > >>>does. > > > > > > >>> > >>I.e. > > > > > > >>> > >> >>if > > > > > > >>> > >> >> >>you > > > > > > >>> > >> >> >> >> >>> specify a > > > > > > >>> > >> >> >> >> >>> > >key use it for partitioning, if not just > > > > > partition > > > > > > >>>in > > > > > > >>> a > > > > > > >>> > >> >> >> >>round-robin > > > > > > >>> > >> >> >> >> >>> > >fashion. > > > > > > >>> > >> >> >> >> >>> > >b. Add a --preserve-partition option > that > > > will > > > > > > >>> > >>explicitly > > > > > > >>> > >> >> >> >>inherent > > > > > > >>> > >> >> >> >> >>>the > > > > > > >>> > >> >> >> >> >>> > >partition from the source irrespective > of > > > > > whether > > > > > > >>> there > > > > > > >>> > >>is > > > > > > >>> > >> >>a > > > > > > >>> > >> >> >>key > > > > > > >>> > >> >> >> >>or > > > > > > >>> > >> >> >> >> >>> which > > > > > > >>> > >> >> >> >> >>> > >partition that key would hash to. > > > > > > >>> > >> >> >> >> >>> > Sorry that I did not explain this clear > > > enough. > > > > > The > > > > > > >>> hash > > > > > > >>> > >>of > > > > > > >>> > >> >> >>topic > > > > > > >>> > >> >> >> >> >>> > partition is only used when decide which > > > mirror > > > > > > >>>maker > > > > > > >>> > >>data > > > > > > >>> > >> >> >>channel > > > > > > >>> > >> >> >> >> >>>queue > > > > > > >>> > >> >> >> >> >>> > the consumer thread should put message > into. > > > It > > > > > > >>>only > > > > > > >>> > >>tries > > > > > > >>> > >> >>to > > > > > > >>> > >> >> >>make > > > > > > >>> > >> >> >> >> >>>sure > > > > > > >>> > >> >> >> >> >>> > the messages from the same partition is > sent > > > by > > > > > the > > > > > > >>> same > > > > > > >>> > >> >> >>producer > > > > > > >>> > >> >> >> >> >>>thread > > > > > > >>> > >> >> >> >> >>> > to guarantee the sending order. This is > not > > > at > > > > > all > > > > > > >>> > >>related > > > > > > >>> > >> >>to > > > > > > >>> > >> >> >> >>which > > > > > > >>> > >> >> >> >> >>> > partition in target cluster the messages > end > > > up. > > > > > > >>>That > > > > > > >>> is > > > > > > >>> > >> >>still > > > > > > >>> > >> >> >> >> >>>decided by > > > > > > >>> > >> >> >> >> >>> > producer. > > > > > > >>> > >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> >>> > >3. You don't actually give the > > > > > > >>> ConsumerRebalanceListener > > > > > > >>> > >> >> >> >>interface. > > > > > > >>> > >> >> >> >> >>>What > > > > > > >>> > >> >> >> >> >>> > >is > > > > > > >>> > >> >> >> >> >>> > >that going to look like? > > > > > > >>> > >> >> >> >> >>> > Good point! I should have put it in the > > > wiki. I > > > > > > >>>just > > > > > > >>> > >>added > > > > > > >>> > >> >>it. > > > > > > >>> > >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> >>> > >4. What is MirrorMakerRecord? I think > > > ideally > > > > > the > > > > > > >>> > >> >> >> >> >>> > >MirrorMakerMessageHandler > > > > > > >>> > >> >> >> >> >>> > >interface would take a ConsumerRecord as > > > input > > > > > and > > > > > > >>> > >>return a > > > > > > >>> > >> >> >> >> >>> > >ProducerRecord, > > > > > > >>> > >> >> >> >> >>> > >right? That would allow you to > transform the > > > > > key, > > > > > > >>> value, > > > > > > >>> > >> >> >> >>partition, > > > > > > >>> > >> >> >> >> >>>or > > > > > > >>> > >> >> >> >> >>> > >destination topic... > > > > > > >>> > >> >> >> >> >>> > MirrorMakerRecord is introduced in > > > KAFKA-1650, > > > > > > >>>which is > > > > > > >>> > >> >>exactly > > > > > > >>> > >> >> >> >>the > > > > > > >>> > >> >> >> >> >>>same > > > > > > >>> > >> >> >> >> >>> > as ConsumerRecord in KAFKA-1760. > > > > > > >>> > >> >> >> >> >>> > private[kafka] class MirrorMakerRecord > (val > > > > > > >>> sourceTopic: > > > > > > >>> > >> >> >>String, > > > > > > >>> > >> >> >> >> >>> > val sourcePartition: Int, > > > > > > >>> > >> >> >> >> >>> > val sourceOffset: Long, > > > > > > >>> > >> >> >> >> >>> > val key: Array[Byte], > > > > > > >>> > >> >> >> >> >>> > val value: Array[Byte]) { > > > > > > >>> > >> >> >> >> >>> > def size = value.length + {if (key == > > > null) 0 > > > > > > >>>else > > > > > > >>> > >> >> >>key.length} > > > > > > >>> > >> >> >> >> >>> > } > > > > > > >>> > >> >> >> >> >>> > > > > > > > >>> > >> >> >> >> >>> > However, because source partition and > offset > > > is > > > > > > >>>needed > > > > > > >>> in > > > > > > >>> > >> >> >>producer > > > > > > >>> > >> >> >> >> >>>thread > > > > > > >>> > >> >> >> >> >>> > for consumer offsets bookkeeping, the > record > > > > > > >>>returned > > > > > > >>> by > > > > > > >>> > >> >> >> >> >>> > MirrorMakerMessageHandler needs to > contain > > > those > > > > > > >>> > >> >>information. > > > > > > >>> > >> >> >> >> >>>Therefore > > > > > > >>> > >> >> >> >> >>> > ProducerRecord does not work here. We > could > > > > > > >>>probably > > > > > > >>> let > > > > > > >>> > >> >> >>message > > > > > > >>> > >> >> >> >> >>>handler > > > > > > >>> > >> >> >> >> >>> > take ConsumerRecord for both input and > > > output. > > > > > > >>> > >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> >>> > >5. Have you guys thought about what the > > > > > > >>>implementation > > > > > > >>> > >>will > > > > > > >>> > >> >> >>look > > > > > > >>> > >> >> >> >> >>>like in > > > > > > >>> > >> >> >> >> >>> > >terms of threading architecture etc with > > > the new > > > > > > >>> > >>consumer? > > > > > > >>> > >> >> >>That > > > > > > >>> > >> >> >> >>will > > > > > > >>> > >> >> >> >> >>>be > > > > > > >>> > >> >> >> >> >>> > >soon so even if we aren't starting with > that > > > > > let's > > > > > > >>> make > > > > > > >>> > >> >>sure > > > > > > >>> > >> >> >>we > > > > > > >>> > >> >> >> >>can > > > > > > >>> > >> >> >> >> >>>get > > > > > > >>> > >> >> >> >> >>> > >rid > > > > > > >>> > >> >> >> >> >>> > >of a lot of the current mirror maker > > > accidental > > > > > > >>> > >>complexity > > > > > > >>> > >> >>in > > > > > > >>> > >> >> >> >>terms > > > > > > >>> > >> >> >> >> >>>of > > > > > > >>> > >> >> >> >> >>> > >threads and queues when we move to that. > > > > > > >>> > >> >> >> >> >>> > I haven¹t thought about it throughly. The > > > quick > > > > > > >>>idea is > > > > > > >>> > >> >>after > > > > > > >>> > >> >> >> >> >>>migration > > > > > > >>> > >> >> >> >> >>> to > > > > > > >>> > >> >> >> >> >>> > the new consumer, it is probably better > to > > > use a > > > > > > >>>single > > > > > > >>> > >> >> >>consumer > > > > > > >>> > >> >> >> >> >>>thread. > > > > > > >>> > >> >> >> >> >>> > If multithread is needed, decoupling > > > consumption > > > > > > >>>and > > > > > > >>> > >> >>processing > > > > > > >>> > >> >> >> >>might > > > > > > >>> > >> >> >> >> >>>be > > > > > > >>> > >> >> >> >> >>> > used. MirrorMaker definitely needs to be > > > changed > > > > > > >>>after > > > > > > >>> > >>new > > > > > > >>> > >> >> >> >>consumer > > > > > > >>> > >> >> >> >> >>>get > > > > > > >>> > >> >> >> >> >>> > checked in. I¹ll document the changes > and can > > > > > > >>>submit > > > > > > >>> > >>follow > > > > > > >>> > >> >>up > > > > > > >>> > >> >> >> >> >>>patches > > > > > > >>> > >> >> >> >> >>> > after the new consumer is available. > > > > > > >>> > >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> >>> > >-Jay > > > > > > >>> > >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> >>> > >On Tue, Jan 20, 2015 at 4:31 PM, > Jiangjie > > > Qin > > > > > > >>> > >> >> >> >> >>><j...@linkedin.com.invalid > > > > > > >>> > >> >> >> >> >>> > > > > > > > >>> > >> >> >> >> >>> > >wrote: > > > > > > >>> > >> >> >> >> >>> > > > > > > > > >>> > >> >> >> >> >>> > >> Hi Kafka Devs, > > > > > > >>> > >> >> >> >> >>> > >> > > > > > > >>> > >> >> >> >> >>> > >> We are working on Kafka Mirror Maker > > > > > > >>>enhancement. A > > > > > > >>> > >>KIP > > > > > > >>> > >> >>is > > > > > > >>> > >> >> >> >>posted > > > > > > >>> > >> >> >> >> >>>to > > > > > > >>> > >> >> >> >> >>> > >> document and discuss on the > followings: > > > > > > >>> > >> >> >> >> >>> > >> 1. KAFKA-1650: No Data loss mirror > maker > > > > > change > > > > > > >>> > >> >> >> >> >>> > >> 2. KAFKA-1839: To allow partition > aware > > > > > mirror. > > > > > > >>> > >> >> >> >> >>> > >> 3. KAFKA-1840: To allow message > > > > > filtering/format > > > > > > >>> > >> >>conversion > > > > > > >>> > >> >> >> >> >>> > >> Feedbacks are welcome. Please let us > know > > > if > > > > > you > > > > > > >>> have > > > > > > >>> > >>any > > > > > > >>> > >> >> >> >> >>>questions or > > > > > > >>> > >> >> >> >> >>> > >> concerns. > > > > > > >>> > >> >> >> >> >>> > >> > > > > > > >>> > >> >> >> >> >>> > >> Thanks. > > > > > > >>> > >> >> >> >> >>> > >> > > > > > > >>> > >> >> >> >> >>> > >> Jiangjie (Becket) Qin > > > > > > >>> > >> >> >> >> >>> > >> > > > > > > >>> > >> >> >> >> >>> > > > > > > > >>> > >> >> >> >> >>> > > > > > > > >>> > >> >> >> >> >>> > > > > > > >>> > >> >> >> >> >> > > > > > > >>> > >> >> >> >> >> > > > > > > >>> > >> >> >> >> >> > > > > > > >>> > >> >> >> >> >>-- > > > > > > >>> > >> >> >> >> >>Thanks, > > > > > > >>> > >> >> >> >> >>Neha > > > > > > >>> > >> >> >> >> > > > > > > > >>> > >> >> >> >> > > > > > > >>> > >> >> >> >> > > > > > > >>> > >> >> >> > > > > > > >>> > >> >> >> > > > > > > >>> > >> >> > > > > > > >>> > >> >> > > > > > > >>> > >> > > > > > > >>> > >> > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > >-- > > > > > > >>> > >Thanks, > > > > > > >>> > >Neha > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > > >>> -- > > > > > > >>> Thanks, > > > > > > >>> Neha > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Thanks, > > > > Neha > > > > > > > >