Hi Bhavesh,

Please see inline comments.

Jiangjie (Becket) Qin

On 1/29/15, 7:00 PM, "Bhavesh Mistry" <mistry.p.bhav...@gmail.com> wrote:

>Hi Jiangjie,
>
>Thanks for the input.
>
>a) Is MM will  producer ack will be attach to Producer Instance or per
>topic.  Use case is that one instance of MM
>needs to handle both strong ack and also ack=0 for some topic.  Or it
>would
>be better to set-up another instance of MM.
The acks setting is producer level setting instead of topic level setting.
In this case you probably need to set up another instance.
>
>b) Regarding TCP connections, Why does #producer instance attach to TCP
>connection.  Is it possible to use Broker Connection TCP Pool, producer
>will just checkout TCP connection  to Broker.  So, # of Producer Instance
>does not correlation to Brokers Connection.  Is this possible ?
In new producer, each producer maintains a connection to each broker
within the producer instance. Making producer instances to share the TCP
connections is a very big change to the current design, so I suppose we
won’t be able to do that.
>
>
>Thanks,
>
>Bhavesh
>
>On Thu, Jan 29, 2015 at 11:50 AM, Jiangjie Qin <j...@linkedin.com.invalid>
>wrote:
>
>> Hi Bhavesh,
>>
>> I think it is the right discussion to have when we are talking about the
>> new new design for MM.
>> Please see the inline comments.
>>
>> Jiangjie (Becket) Qin
>>
>> On 1/28/15, 10:48 PM, "Bhavesh Mistry" <mistry.p.bhav...@gmail.com>
>>wrote:
>>
>> >Hi Jiangjie,
>> >
>> >I just wanted to let you know about our use case and stress the point
>>that
>> >local data center broker cluster have fewer partitions than the
>> >destination
>> >offline broker cluster. Just because we do the batch pull from CAMUS
>>and
>> >in
>> >order to drain data faster than the injection rate (from four DCs for
>>same
>> >topic).
>> Keeping the same partition number in source and target cluster will be
>>an
>> option but will not be enforced by default.
>> >
>> >We are facing following issues (probably due to configuration):
>> >
>> >1)      We occasionally loose data due to message batch size is too
>>large
>> >(2MB) on target data (we are using old producer but I think new
>>producer
>> >will solve this problem to some extend).
>> We do see this issue in LinkedIn as well. New producer also might have
>> this issue. There are some proposal of solutions, but no real work
>>started
>> yet. For now, as a workaround, setting a more aggressive batch size on
>> producer side should work.
>> >2)      Since only one instance is set to MM data,  we are not able to
>> >set-up ack per topic instead ack is attached to producer instance.
>> I don’t quite get the question here.
>> >3)      How are you going to address two phase commit problem if ack is
>> >set
>> >to strongest, but auto commit is on for consumer (meaning producer does
>> >not
>> >get ack,  but consumer auto committed offset that message).  Is there
>> >transactional (Kafka transaction is in process) based ack and commit
>> >offset
>> >?
>> Auto offset commit should be turned off in this case. The offset will
>>only
>> be committed once by the offset commit thread. So there is no two phase
>> commit.
>> >4)      How are you planning to avoid duplicated message?  ( Is
>> >brokergoing
>> >have moving window of message collected and de-dupe ?)  Possibly, we
>>get
>> >this from retry set to 5…?
>> We are not trying to completely avoid duplicates. The duplicates will
>> still be there if:
>> 1. Producer retries on failure.
>> 2. Mirror maker is hard killed.
>> Currently, dedup is expected to be done by user if necessary.
>> >5)      Last, is there any warning or any thing you can provide insight
>> >from MM component about data injection rate into destination
>>partitions is
>> >NOT evenly distributed regardless  of  keyed or non-keyed message
>>(Hence
>> >there is ripple effect such as data not arriving late, or data is
>>arriving
>> >out of order in  intern of time stamp  and early some time, and CAMUS
>> >creates huge number of file count on HDFS due to uneven injection rate
>>.
>> >Camus Job is  configured to run every 3 minutes.)
>> I think uneven data distribution is typically caused by server side
>> unbalance, instead of something mirror maker could control. In new
>>mirror
>> maker, however, there is a customizable message handler, that might be
>> able to help a little bit. In message handler, you can explicitly set a
>> partition that you want to produce the message to. So if you know the
>> uneven data distribution in target cluster, you may offset it here. But
>> that probably only works for non-keyed messages.
>> >
>> >I am not sure if this is right discussion form to bring these to
>> >your/kafka
>> >Dev team attention.  This might be off track,
>> >
>> >
>> >Thanks,
>> >
>> >Bhavesh
>> >
>> >On Wed, Jan 28, 2015 at 11:07 AM, Jiangjie Qin
>><j...@linkedin.com.invalid
>> >
>> >wrote:
>> >
>> >> I’ve updated the KIP page. Feedbacks are welcome.
>> >>
>> >> Regarding the simple mirror maker design. I thought over it and have
>> >>some
>> >> worries:
>> >> There are two things that might worth thinking:
>> >> 1. One of the enhancement to mirror maker is adding a message
>>handler to
>> >> do things like reformatting. I think we might potentially want to
>>have
>> >> more threads processing the messages than the number of consumers.
>>If we
>> >> follow the simple mirror maker solution, we lose this flexibility.
>> >> 2. This might not matter too much, but creating more consumers means
>> >>more
>> >> footprint of TCP connection / memory.
>> >>
>> >> Any thoughts on this?
>> >>
>> >> Thanks.
>> >>
>> >> Jiangjie (Becket) Qin
>> >>
>> >> On 1/26/15, 10:35 AM, "Jiangjie Qin" <j...@linkedin.com> wrote:
>> >>
>> >> >Hi Jay and Neha,
>> >> >
>> >> >Thanks a lot for the reply and explanation. I do agree it makes more
>> >>sense
>> >> >to avoid duplicate effort and plan based on new consumer. I’ll
>>modify
>> >>the
>> >> >KIP.
>> >> >
>> >> >To Jay’s question on message ordering - The data channel selection
>> >>makes
>> >> >sure that the messages from the same source partition will sent by
>>the
>> >> >same producer. So the order of the messages is guaranteed with
>>proper
>> >> >producer settings (MaxInFlightRequests=1,retries=Integer.MaxValue,
>> >>etc.)
>> >> >For keyed messages, because they come from the same source partition
>> >>and
>> >> >will end up in the same target partition, as long as they are sent
>>by
>> >>the
>> >> >same producer, the order is guaranteed.
>> >> >For non-keyed messages, the messages coming from the same source
>> >>partition
>> >> >might go to different target partitions. The order is only
>>guaranteed
>> >> >within each partition.
>> >> >
>> >> >Anyway, I’ll modify the KIP and data channel will be away.
>> >> >
>> >> >Thanks.
>> >> >
>> >> >Jiangjie (Becket) Qin
>> >> >
>> >> >
>> >> >On 1/25/15, 4:34 PM, "Neha Narkhede" <n...@confluent.io> wrote:
>> >> >
>> >> >>I think there is some value in investigating if we can go back to
>>the
>> >> >>simple mirror maker design, as Jay points out. Here you have N
>> >>threads,
>> >> >>each has a consumer and a producer.
>> >> >>
>> >> >>The reason why we had to move away from that was a combination of
>>the
>> >> >>difference in throughput between the consumer and the old producer
>>and
>> >> >>the
>> >> >>deficiency of the consumer rebalancing that limits the total
>>number of
>> >> >>mirror maker threads. So the only option available was to increase
>>the
>> >> >>throughput of the limited # of mirror maker threads that could be
>> >> >>deployed.
>> >> >>Now that queuing design may not make sense, if the new producer's
>> >> >>throughput is almost similar to the consumer AND the fact that the
>>new
>> >> >>round-robin based consumer rebalancing can allow a very high
>>number of
>> >> >>mirror maker instances to exist.
>> >> >>
>> >> >>This is the end state that the mirror maker should be in once the
>>new
>> >> >>consumer is complete, so it wouldn't hurt to see if we can just
>>move
>> >>to
>> >> >>that right now.
>> >> >>
>> >> >>On Fri, Jan 23, 2015 at 8:40 PM, Jay Kreps <jay.kr...@gmail.com>
>> >>wrote:
>> >> >>
>> >> >>> QQ: If we ever use a different technique for the data channel
>> >>selection
>> >> >>> than for the producer partitioning won't that break ordering? How
>> >>can
>> >> >>>we
>> >> >>> ensure these things stay in sync?
>> >> >>>
>> >> >>> With respect to the new consumer--I really do want to encourage
>> >>people
>> >> >>>to
>> >> >>> think through how MM will work with the new consumer. I mean this
>> >>isn't
>> >> >>> very far off, maybe a few months if we hustle? I could imagine us
>> >> >>>getting
>> >> >>> this mm fix done maybe sooner, maybe in a month? So I guess this
>> >>buys
>> >> >>>us an
>> >> >>> extra month before we rip it out and throw it away? Maybe two?
>>This
>> >>bug
>> >> >>>has
>> >> >>> been there for a while, though, right? Is it worth it? Probably
>>it
>> >>is,
>> >> >>>but
>> >> >>> it still kind of sucks to have the duplicate effort.
>> >> >>>
>> >> >>> So anyhow let's definitely think about how things will work with
>>the
>> >> >>>new
>> >> >>> consumer. I think we can probably just have N threads, each
>>thread
>> >>has
>> >> >>>a
>> >> >>> producer and consumer and is internally single threaded. Any
>>reason
>> >> >>>this
>> >> >>> wouldn't work?
>> >> >>>
>> >> >>> -Jay
>> >> >>>
>> >> >>>
>> >> >>> On Wed, Jan 21, 2015 at 5:29 PM, Jiangjie Qin
>> >> >>><j...@linkedin.com.invalid>
>> >> >>> wrote:
>> >> >>>
>> >> >>> > Hi Jay,
>> >> >>> >
>> >> >>> > Thanks for comments. Please see inline responses.
>> >> >>> >
>> >> >>> > Jiangjie (Becket) Qin
>> >> >>> >
>> >> >>> > On 1/21/15, 1:33 PM, "Jay Kreps" <jay.kr...@gmail.com> wrote:
>> >> >>> >
>> >> >>> > >Hey guys,
>> >> >>> > >
>> >> >>> > >A couple questions/comments:
>> >> >>> > >
>> >> >>> > >1. The callback and user-controlled commit offset
>>functionality
>> >>is
>> >> >>> already
>> >> >>> > >in the new consumer which we are working on in parallel. If we
>> >> >>> accelerated
>> >> >>> > >that work it might help concentrate efforts. I admit this
>>might
>> >>take
>> >> >>> > >slightly longer in calendar time but could still probably get
>> >>done
>> >> >>>this
>> >> >>> > >quarter. Have you guys considered that approach?
>> >> >>> > Yes, I totally agree that ideally we should put efforts on new
>> >> >>>consumer.
>> >> >>> > The main reason for still working on the old consumer is that
>>we
>> >> >>>expect
>> >> >>> it
>> >> >>> > would still be used in LinkedIn for quite a while before the
>>new
>> >> >>>consumer
>> >> >>> > could be fully rolled out. And we recently suffering a lot from
>> >> >>>mirror
>> >> >>> > maker data loss issue. So our current plan is making necessary
>> >> >>>changes to
>> >> >>> > make current mirror maker stable in production. Then we can
>>test
>> >>and
>> >> >>> > rollout new consumer gradually without getting burnt.
>> >> >>> > >
>> >> >>> > >2. I think partitioning on the hash of the topic partition is
>> >>not a
>> >> >>>very
>> >> >>> > >good idea because that will make the case of going from a
>>cluster
>> >> >>>with
>> >> >>> > >fewer partitions to one with more partitions not work. I
>>think an
>> >> >>> > >intuitive
>> >> >>> > >way to do this would be the following:
>> >> >>> > >a. Default behavior: Just do what the producer does. I.e. if
>>you
>> >> >>> specify a
>> >> >>> > >key use it for partitioning, if not just partition in a
>> >>round-robin
>> >> >>> > >fashion.
>> >> >>> > >b. Add a --preserve-partition option that will explicitly
>> >>inherent
>> >> >>>the
>> >> >>> > >partition from the source irrespective of whether there is a
>>key
>> >>or
>> >> >>> which
>> >> >>> > >partition that key would hash to.
>> >> >>> > Sorry that I did not explain this clear enough. The hash of
>>topic
>> >> >>> > partition is only used when decide which mirror maker data
>>channel
>> >> >>>queue
>> >> >>> > the consumer thread should put message into. It only tries to
>>make
>> >> >>>sure
>> >> >>> > the messages from the same partition is sent by the same
>>producer
>> >> >>>thread
>> >> >>> > to guarantee the sending order. This is not at all related to
>> >>which
>> >> >>> > partition in target cluster the messages end up. That is still
>> >> >>>decided by
>> >> >>> > producer.
>> >> >>> > >
>> >> >>> > >3. You don't actually give the ConsumerRebalanceListener
>> >>interface.
>> >> >>>What
>> >> >>> > >is
>> >> >>> > >that going to look like?
>> >> >>> > Good point! I should have put it in the wiki. I just added it.
>> >> >>> > >
>> >> >>> > >4. What is MirrorMakerRecord? I think ideally the
>> >> >>> > >MirrorMakerMessageHandler
>> >> >>> > >interface would take a ConsumerRecord as input and return a
>> >> >>> > >ProducerRecord,
>> >> >>> > >right? That would allow you to transform the key, value,
>> >>partition,
>> >> >>>or
>> >> >>> > >destination topic...
>> >> >>> > MirrorMakerRecord is introduced in KAFKA-1650, which is exactly
>> >>the
>> >> >>>same
>> >> >>> > as ConsumerRecord in KAFKA-1760.
>> >> >>> > private[kafka] class MirrorMakerRecord (val sourceTopic:
>>String,
>> >> >>> >   val sourcePartition: Int,
>> >> >>> >   val sourceOffset: Long,
>> >> >>> >   val key: Array[Byte],
>> >> >>> >   val value: Array[Byte]) {
>> >> >>> >   def size = value.length + {if (key == null) 0 else
>>key.length}
>> >> >>> > }
>> >> >>> >
>> >> >>> > However, because source partition and offset is needed in
>>producer
>> >> >>>thread
>> >> >>> > for consumer offsets bookkeeping, the record returned by
>> >> >>> > MirrorMakerMessageHandler needs to contain those information.
>> >> >>>Therefore
>> >> >>> > ProducerRecord does not work here. We could probably let
>>message
>> >> >>>handler
>> >> >>> > take ConsumerRecord for both input and output.
>> >> >>> > >
>> >> >>> > >5. Have you guys thought about what the implementation will
>>look
>> >> >>>like in
>> >> >>> > >terms of threading architecture etc with the new consumer?
>>That
>> >>will
>> >> >>>be
>> >> >>> > >soon so even if we aren't starting with that let's make sure
>>we
>> >>can
>> >> >>>get
>> >> >>> > >rid
>> >> >>> > >of a lot of the current mirror maker accidental complexity in
>> >>terms
>> >> >>>of
>> >> >>> > >threads and queues when we move to that.
>> >> >>> > I haven¹t thought about it throughly. The quick idea is after
>> >> >>>migration
>> >> >>> to
>> >> >>> > the new consumer, it is probably better to use a single
>>consumer
>> >> >>>thread.
>> >> >>> > If multithread is needed, decoupling consumption and processing
>> >>might
>> >> >>>be
>> >> >>> > used. MirrorMaker definitely needs to be changed after new
>> >>consumer
>> >> >>>get
>> >> >>> > checked in. I¹ll document the changes and can submit follow up
>> >> >>>patches
>> >> >>> > after the new consumer is available.
>> >> >>> > >
>> >> >>> > >-Jay
>> >> >>> > >
>> >> >>> > >On Tue, Jan 20, 2015 at 4:31 PM, Jiangjie Qin
>> >> >>><j...@linkedin.com.invalid
>> >> >>> >
>> >> >>> > >wrote:
>> >> >>> > >
>> >> >>> > >> Hi Kafka Devs,
>> >> >>> > >>
>> >> >>> > >> We are working on Kafka Mirror Maker enhancement. A KIP is
>> >>posted
>> >> >>>to
>> >> >>> > >> document and discuss on the followings:
>> >> >>> > >> 1. KAFKA-1650: No Data loss mirror maker change
>> >> >>> > >> 2. KAFKA-1839: To allow partition aware mirror.
>> >> >>> > >> 3. KAFKA-1840: To allow message filtering/format conversion
>> >> >>> > >> Feedbacks are welcome. Please let us know if you have any
>> >> >>>questions or
>> >> >>> > >> concerns.
>> >> >>> > >>
>> >> >>> > >> Thanks.
>> >> >>> > >>
>> >> >>> > >> Jiangjie (Becket) Qin
>> >> >>> > >>
>> >> >>> >
>> >> >>> >
>> >> >>>
>> >> >>
>> >> >>
>> >> >>
>> >> >>--
>> >> >>Thanks,
>> >> >>Neha
>> >> >
>> >>
>> >>
>>
>>

Reply via email to