Hi, Henry, Thanks for the response.
1. I agree with Tom that it's worth thinking about a separate class for shallow iteration instead of trying to add more complexity into the existing producer/consumer API. We could potentially make the new class an internal API if it's only useful for MM. 3. I am not sure that we could ignore transactional messages in the first phase. The usage of EOS is increasing. Also, one can't tell from the metadata of a topic whether it has EOS data or not. So, there is no easy way to skip EOS data from the source. Jun On Thu, Apr 1, 2021 at 1:07 AM Henry Cai <h...@pinterest.com.invalid> wrote: > Jun, > > Thanks for your insight looking into this KIP, we do believe the shallow > iteration will give quite a significant performance boost. > > On your concerns: > > 1. Cleaner API. One alternative is to create new batch APIs. On consumer, > it would become Consumer.pollBatch returns a ConsumerBatch object which > contains topic/partition/firstOffsetOfBatch/pointerToByteBuffer, similarly > Producer.sendBatch(ProducerBatch). Both ConsumerBatch and ProducerBatch > objects are fixed types (no generics), serializer is gone, interceptors are > probably not needed initially (unless people see the need to intercept on > the batch level). On MM2 side, the current flow is ConsumerRecord -> > Connect's SourceRecord -> ProducerRecord, we would need to enhance Connect > framework to add SourceTask.pollBatch() method which returns a SourceBatch > object, so the object conversion flow becomes ConsumerBatch -> SourceBatch > -> ProducerBatch, we probably won't support any transformers on Batch > objects. > 2. PID/ProducerEpoch/SeqNo passing through RecordBatch. I think those > transaction fields are only meaningful in the original source kafka > cluster, producer id/seqNo are not the same for the target kafka cluster. > So if MM is not going to support transactions at the moment, we can clear > those fields when they are going through MM. Once MM starts to support > transactions in the future, it probably will start its own PID/SeqNo etc. > 3. For EOS and read_committed/read_uncommitted support, we can do phased > support. Phase 1, don't support transactional messages in the source > cluster (i.e. abort if it sees control batch records). Phase 2: applying > commit/abort on the batch boundary level. I am not too familiar with the > isolation level and abort transaction code path, but it seems the control > unit is currently on the batch boundary (commit/abort the whole batch), if > so, this should also be doable. > 4. MessageHandler in MM1 or SMT in MM2, initially we don't need to support > them. Since now the object is a ConsumerBatch and the existing handler is > written for the individual object. Deserialize the batch into individual > objects would defeat the purpose of performance optimization. > 5. Multiple batch performances, will do some testing on this. > > On Wed, Mar 31, 2021 at 10:14 AM Jun Rao <j...@confluent.io.invalid> wrote: > > > Hi, Henry, > > > > Thanks for the KIP. Sorry for the late reply. A few comments below. > > > > 1. The 'shallow' feature is potentially useful. I do agree with Tom that > > the proposed API changes seem unclean. Quite a few existing stuff don't > > really work together with this (e.g., generics, serializer, interceptors, > > configs like max.poll.records, etc). It's also hard to explain this > change > > to the common users of the consumer/producer API. I think it would be > > useful to explore if there is another cleaner way of adding this. For > > example, you mentioned that creating a new set of APIs doesn't work for > > MM2. However, we could potentially change the connect interface to allow > > MM2 to use the new API. If this doesn't work, it would be useful to > explain > > that in the rejected alternative section. > > > > 2. I am not sure that we could pass through all fields in RecordBatch. > For > > example, a MM instance could be receiving RecordBatch from different > source > > partitions. Mixing the PID/ProducerEpoch/FirstSequence fields across them > > in a single producer will be weird. So, it would be useful to document > this > > part clearer. > > > > 3. EOS. While MM itself doesn't support mirroring data in an > > exactly-once way, it needs to support reading from a topic with EOS data. > > So, it would be useful to document whether both read_committed and > > read_uncommitted mode are supported and what kind of RecordBatch the > > consumer returns in each case. > > > > 4. With the 'shallow' feature, it seems that some existing features in MM > > won't work. For example, I am not sure if SMT works in MM2 > > and MirrorMakerMessageHandler works in MM1. It would be useful to > document > > this kind of impact in the KIP. > > > > 5. Multiple batches per partition in the produce request. This seems not > > strictly required in KIP-98. However, changing this will probably add a > bit > > more complexity in the producer. So, it would be useful to understand its > > benefits, especially since it doesn't seem to directly help reduce the > CPU > > cost in MM. For example, do you have performance numbers with and without > > this enabled in your MM tests? > > > > Thanks, > > > > Jun > > > > On Tue, Mar 30, 2021 at 1:27 PM Henry Cai <h...@pinterest.com.invalid> > > wrote: > > > > > Tom, > > > > > > Thanks for your comments. Yes it's a bit clumsy to use the existing > > > consumer and producer API to carry the underlying record batch, but > > > creating a new set of API would also mean other use cases (e.g. MM2) > > > wouldn't be able to use that feature easily. We can throw exceptions > if > > we > > > see clients are setting serializer/compression in the consumer config > > > option. > > > > > > The consumer is essentially getting back a collection of > > > RecordBatchByteBuffer records and passing them to the producer. Most > of > > > the internal APIs inside consumer and producer code paths are actually > > > taking on ByteBuffer as the argument so it's not too much work to get > the > > > byte buffer through. > > > > > > For the worry that the client might see the inside of that byte buffer, > > we > > > can create a RecordBatchByteBufferRecord class to wrap the underlying > > byte > > > buffer so hopefully they will not drill too deep into that object. > > Java's > > > ByteBuffer does have a asReadOnlyBuffer() method to return a read-only > > > buffer, that can be explored as well. > > > > > > On Tue, Mar 30, 2021 at 4:24 AM Tom Bentley <tbent...@redhat.com> > wrote: > > > > > > > Hi Henry and Ryanne, > > > > > > > > Related to Ismael's point about the producer & consumer configs being > > > > dangerous, I can see two parts to this: > > > > > > > > 2a. Both the proposed configs seem to be fundamentally incompatible > > with > > > > the Producer's existing key.serializer, value.serializer and > > > > compression.type configs, likewise the consumers key.deserializer and > > > > value.deserializer. I don't see a way to avoid this, since those > > existing > > > > configs are already separate things. (I did consider whether using > > > > special-case Deserializer and Serializer could be used instead, but > > that > > > > doesn't work nicely; in this use case they're necessarily all > > configured > > > > together). I think all we could do would be to reject configs which > > tried > > > > to set those existing client configs in conjunction with > > fetch.raw.bytes > > > > and send.raw.bytes. > > > > > > > > 2b. That still leaves a public Java API which would allow access to > the > > > raw > > > > byte buffers. AFAICS we don't actually need user code to have access > to > > > the > > > > raw buffers. It would be enough to get an opaque object that wrapped > > the > > > > ByteBuffer from the consumer and pass it to the producer. It's only > the > > > > consumer and producer code which needs to be able to obtain the > wrapped > > > > buffer. > > > > > > > > Kind regards, > > > > > > > > Tom > > > > > > > > On Tue, Mar 30, 2021 at 8:41 AM Ismael Juma <ism...@juma.me.uk> > wrote: > > > > > > > > > Hi Henry, > > > > > > > > > > Can you clarify why this "network performance" issue is only > related > > to > > > > > shallow mirroring? Generally, we want the protocol to be generic > and > > > not > > > > > have a number of special cases. The more special cases you have, > the > > > > > tougher it becomes to test all the edge cases. > > > > > > > > > > Ismael > > > > > > > > > > On Mon, Mar 29, 2021 at 9:51 PM Henry Cai > <h...@pinterest.com.invalid > > > > > > > > wrote: > > > > > > > > > > > It's interesting this VOTE thread finally becomes a DISCUSS > thread. > > > > > > > > > > > > For MM2 concern, I will take a look to see whether I can add the > > > > support > > > > > > for MM2. > > > > > > > > > > > > For Ismael's concern on multiple batches in the ProduceRequest > > > > > (conflicting > > > > > > with KIP-98), here is my take: > > > > > > > > > > > > 1. We do need to group multiple batches in the same request > > otherwise > > > > the > > > > > > network performance will suffer. > > > > > > 2. For the concern on transactional message support as in KIP-98, > > > since > > > > > MM1 > > > > > > and MM2 currently don't support transactional messages, KIP-712 > > will > > > > not > > > > > > attempt to support transactions either. I will add a config > option > > > on > > > > > > producer config: allowMultipleBatches. By default this option > will > > > be > > > > > off > > > > > > and the user needs to explicitly turn on this option to use the > > > shallow > > > > > > mirror feature. And if we detect both this option and > transaction > > is > > > > > > turned on we will throw an exception to protect current > transaction > > > > > > processing. > > > > > > 3. In the future, when MM2 starts to support exact-once and > > > > transactional > > > > > > messages (is that KIP-656?), we can revisit this code. The > current > > > > > > transactional message already makes the compromise that the > > messages > > > in > > > > > the > > > > > > same RecordBatch (MessageSet) are sharing the same > > > > > > sequence-id/transaction-id, so those messages need to be > committed > > > all > > > > > > together. I think when we support the shallow mirror with > > > > transactional > > > > > > semantics, we will group all batches in the same ProduceRequest > in > > > the > > > > > same > > > > > > transaction boundary, they need to be committed all together. On > > the > > > > > > broker side, all batches coming from ProduceRequest (or > > > FetchResponse) > > > > > are > > > > > > committed in the same log segment file as one unit (current > > > behavior). > > > > > > > > > > > > On Mon, Mar 29, 2021 at 8:46 AM Ryanne Dolan < > > ryannedo...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Ah, I see, thanks Ismael. Now I understand your concern. > > > > > > > > > > > > > > From KIP-98, re this change in v3: > > > > > > > > > > > > > > "This allows us to remove the message set size since each > message > > > set > > > > > > > already contains a field for the size. More importantly, since > > > there > > > > is > > > > > > > only one message set to be written to the log, partial produce > > > > failures > > > > > > are > > > > > > > no longer possible. The full message set is either successfully > > > > written > > > > > > to > > > > > > > the log (and replicated) or it is not." > > > > > > > > > > > > > > The schema and size field don't seem to be an issue, as KIP-712 > > > > already > > > > > > > addresses. > > > > > > > > > > > > > > The partial produce failure issue is something I don't > > understand. > > > I > > > > > > can't > > > > > > > tell if this was done out of convenience at the time or if > there > > is > > > > > > > something incompatible with partial produce success/failure and > > > EOS. > > > > > Does > > > > > > > anyone know? > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > On Mon, Mar 29, 2021, 1:41 AM Ismael Juma <ism...@juma.me.uk> > > > wrote: > > > > > > > > > > > > > > > Ryanne, > > > > > > > > > > > > > > > > You misunderstood the referenced comment. It is about the > > produce > > > > > > request > > > > > > > > change to have multiple batches: > > > > > > > > > > > > > > > > "Up to ProduceRequest V2, a ProduceRequest can contain > multiple > > > > > batches > > > > > > > of > > > > > > > > messages stored in the record_set field, but this was > disabled > > in > > > > V3. > > > > > > We > > > > > > > > are proposing to bring the multiple batches feature back to > > > improve > > > > > the > > > > > > > > network throughput of the mirror maker producer when the > > original > > > > > batch > > > > > > > > size from source broker is too small." > > > > > > > > > > > > > > > > This is unrelated to shallow iteration. > > > > > > > > > > > > > > > > Ismael > > > > > > > > > > > > > > > > On Sun, Mar 28, 2021, 10:15 PM Ryanne Dolan < > > > ryannedo...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Ismael, I don't think KIP-98 is related. Shallow iteration > > was > > > > > > removed > > > > > > > in > > > > > > > > > KAFKA-732, which predates KIP-98 by a few years. > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > On Sun, Mar 28, 2021, 11:25 PM Ismael Juma < > > ism...@juma.me.uk> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Thanks for the KIP. I have a few high level comments: > > > > > > > > > > > > > > > > > > > > 1. Like Tom, I'm not convinced about the proposal to make > > > this > > > > > > change > > > > > > > > to > > > > > > > > > > MirrorMaker 1 if we intend to deprecate it and remove > it. I > > > > would > > > > > > > > rather > > > > > > > > > us > > > > > > > > > > focus our efforts on the implementation we intend to > > support > > > > > going > > > > > > > > > forward. > > > > > > > > > > 2. The producer/consumer configs seem pretty dangerous > for > > > > > general > > > > > > > > usage, > > > > > > > > > > but the KIP doesn't address the potential downsides. > > > > > > > > > > 3. How does the ProducerRequest change impact > exactly-once > > > (if > > > > at > > > > > > > all)? > > > > > > > > > The > > > > > > > > > > change we are reverting was done as part of KIP-98. Have > we > > > > > > > considered > > > > > > > > > the > > > > > > > > > > original reasons for the change? > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > Ismael > > > > > > > > > > > > > > > > > > > > On Wed, Feb 10, 2021 at 12:58 PM Vahid Hashemian < > > > > > > > > > > vahid.hashem...@gmail.com> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Retitled the thread to conform to the common format. > > > > > > > > > > > > > > > > > > > > > > On Fri, Feb 5, 2021 at 4:00 PM Ning Zhang < > > > > > > ning2008w...@gmail.com> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hello Henry, > > > > > > > > > > > > > > > > > > > > > > > > This is a very interesting proposal. > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-10728 > > > reflects > > > > > the > > > > > > > > > similar > > > > > > > > > > > > concern of re-compressing data in mirror maker. > > > > > > > > > > > > > > > > > > > > > > > > Probably one thing may need to clarify is: how > > "shallow" > > > > > > > mirroring > > > > > > > > is > > > > > > > > > > > only > > > > > > > > > > > > applied to mirrormaker use case, if the changes need > to > > > be > > > > > made > > > > > > > on > > > > > > > > > > > generic > > > > > > > > > > > > consumer and producer (e.g. by adding > `fetch.raw.bytes` > > > and > > > > > > > > > > > > `send.raw.bytes` to producer and consumer config) > > > > > > > > > > > > > > > > > > > > > > > > On 2021/02/05 00:59:57, Henry Cai > > > > <h...@pinterest.com.INVALID > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > Dear Community members, > > > > > > > > > > > > > > > > > > > > > > > > > > We are proposing a new feature to improve the > > > performance > > > > > of > > > > > > > > Kafka > > > > > > > > > > > mirror > > > > > > > > > > > > > maker: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-712%3A+Shallow+Mirroring > > > > > > > > > > > > > > > > > > > > > > > > > > The current Kafka MirrorMaker process (with the > > > > underlying > > > > > > > > Consumer > > > > > > > > > > and > > > > > > > > > > > > > Producer library) uses significant CPU cycles and > > > memory > > > > to > > > > > > > > > > > > > decompress/recompress, deserialize/re-serialize > > > messages > > > > > and > > > > > > > copy > > > > > > > > > > > > multiple > > > > > > > > > > > > > times of messages bytes along the > > mirroring/replicating > > > > > > stages. > > > > > > > > > > > > > > > > > > > > > > > > > > The KIP proposes a *shallow mirror* feature which > > > brings > > > > > back > > > > > > > the > > > > > > > > > > > shallow > > > > > > > > > > > > > iterator concept to the mirror process and also > > > proposes > > > > to > > > > > > > skip > > > > > > > > > the > > > > > > > > > > > > > unnecessary message decompression and recompression > > > > steps. > > > > > > We > > > > > > > > > argue > > > > > > > > > > in > > > > > > > > > > > > > many cases users just want a simple replication > > > pipeline > > > > to > > > > > > > > > replicate > > > > > > > > > > > the > > > > > > > > > > > > > message as it is from the source cluster to the > > > > destination > > > > > > > > > cluster. > > > > > > > > > > > In > > > > > > > > > > > > > many cases the messages in the source cluster are > > > already > > > > > > > > > compressed > > > > > > > > > > > and > > > > > > > > > > > > > properly batched, users just need an identical copy > > of > > > > the > > > > > > > > message > > > > > > > > > > > bytes > > > > > > > > > > > > > through the mirroring without any transformation or > > > > > > > > repartitioning. > > > > > > > > > > > > > > > > > > > > > > > > > > We have a prototype implementation in house with > > > > > MirrorMaker > > > > > > v1 > > > > > > > > and > > > > > > > > > > > > > observed *CPU usage dropped from 50% to 15%* for > some > > > > > mirror > > > > > > > > > > pipelines. > > > > > > > > > > > > > > > > > > > > > > > > > > We name this feature: *shallow mirroring* since it > > has > > > > some > > > > > > > > > > resemblance > > > > > > > > > > > > to > > > > > > > > > > > > > the old Kafka 0.7 namesake feature but the > > > > implementations > > > > > > are > > > > > > > > not > > > > > > > > > > > quite > > > > > > > > > > > > > the same. ‘*Shallow*’ means 1. we *shallowly* > > iterate > > > > > > > > > RecordBatches > > > > > > > > > > > > inside > > > > > > > > > > > > > MemoryRecords structure instead of deep iterating > > > records > > > > > > > inside > > > > > > > > > > > > > RecordBatch; 2. We *shallowly* copy (share) > pointers > > > > inside > > > > > > > > > > ByteBuffer > > > > > > > > > > > > > instead of deep copying and deserializing bytes > into > > > > > objects. > > > > > > > > > > > > > > > > > > > > > > > > > > Please share discussions/feedback along this email > > > > thread. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > > > --Vahid > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >