Hi, Henry, Thanks for the KIP. Sorry for the late reply. A few comments below.
1. The 'shallow' feature is potentially useful. I do agree with Tom that the proposed API changes seem unclean. Quite a few existing stuff don't really work together with this (e.g., generics, serializer, interceptors, configs like max.poll.records, etc). It's also hard to explain this change to the common users of the consumer/producer API. I think it would be useful to explore if there is another cleaner way of adding this. For example, you mentioned that creating a new set of APIs doesn't work for MM2. However, we could potentially change the connect interface to allow MM2 to use the new API. If this doesn't work, it would be useful to explain that in the rejected alternative section. 2. I am not sure that we could pass through all fields in RecordBatch. For example, a MM instance could be receiving RecordBatch from different source partitions. Mixing the PID/ProducerEpoch/FirstSequence fields across them in a single producer will be weird. So, it would be useful to document this part clearer. 3. EOS. While MM itself doesn't support mirroring data in an exactly-once way, it needs to support reading from a topic with EOS data. So, it would be useful to document whether both read_committed and read_uncommitted mode are supported and what kind of RecordBatch the consumer returns in each case. 4. With the 'shallow' feature, it seems that some existing features in MM won't work. For example, I am not sure if SMT works in MM2 and MirrorMakerMessageHandler works in MM1. It would be useful to document this kind of impact in the KIP. 5. Multiple batches per partition in the produce request. This seems not strictly required in KIP-98. However, changing this will probably add a bit more complexity in the producer. So, it would be useful to understand its benefits, especially since it doesn't seem to directly help reduce the CPU cost in MM. For example, do you have performance numbers with and without this enabled in your MM tests? Thanks, Jun On Tue, Mar 30, 2021 at 1:27 PM Henry Cai <h...@pinterest.com.invalid> wrote: > Tom, > > Thanks for your comments. Yes it's a bit clumsy to use the existing > consumer and producer API to carry the underlying record batch, but > creating a new set of API would also mean other use cases (e.g. MM2) > wouldn't be able to use that feature easily. We can throw exceptions if we > see clients are setting serializer/compression in the consumer config > option. > > The consumer is essentially getting back a collection of > RecordBatchByteBuffer records and passing them to the producer. Most of > the internal APIs inside consumer and producer code paths are actually > taking on ByteBuffer as the argument so it's not too much work to get the > byte buffer through. > > For the worry that the client might see the inside of that byte buffer, we > can create a RecordBatchByteBufferRecord class to wrap the underlying byte > buffer so hopefully they will not drill too deep into that object. Java's > ByteBuffer does have a asReadOnlyBuffer() method to return a read-only > buffer, that can be explored as well. > > On Tue, Mar 30, 2021 at 4:24 AM Tom Bentley <tbent...@redhat.com> wrote: > > > Hi Henry and Ryanne, > > > > Related to Ismael's point about the producer & consumer configs being > > dangerous, I can see two parts to this: > > > > 2a. Both the proposed configs seem to be fundamentally incompatible with > > the Producer's existing key.serializer, value.serializer and > > compression.type configs, likewise the consumers key.deserializer and > > value.deserializer. I don't see a way to avoid this, since those existing > > configs are already separate things. (I did consider whether using > > special-case Deserializer and Serializer could be used instead, but that > > doesn't work nicely; in this use case they're necessarily all configured > > together). I think all we could do would be to reject configs which tried > > to set those existing client configs in conjunction with fetch.raw.bytes > > and send.raw.bytes. > > > > 2b. That still leaves a public Java API which would allow access to the > raw > > byte buffers. AFAICS we don't actually need user code to have access to > the > > raw buffers. It would be enough to get an opaque object that wrapped the > > ByteBuffer from the consumer and pass it to the producer. It's only the > > consumer and producer code which needs to be able to obtain the wrapped > > buffer. > > > > Kind regards, > > > > Tom > > > > On Tue, Mar 30, 2021 at 8:41 AM Ismael Juma <ism...@juma.me.uk> wrote: > > > > > Hi Henry, > > > > > > Can you clarify why this "network performance" issue is only related to > > > shallow mirroring? Generally, we want the protocol to be generic and > not > > > have a number of special cases. The more special cases you have, the > > > tougher it becomes to test all the edge cases. > > > > > > Ismael > > > > > > On Mon, Mar 29, 2021 at 9:51 PM Henry Cai <h...@pinterest.com.invalid> > > > wrote: > > > > > > > It's interesting this VOTE thread finally becomes a DISCUSS thread. > > > > > > > > For MM2 concern, I will take a look to see whether I can add the > > support > > > > for MM2. > > > > > > > > For Ismael's concern on multiple batches in the ProduceRequest > > > (conflicting > > > > with KIP-98), here is my take: > > > > > > > > 1. We do need to group multiple batches in the same request otherwise > > the > > > > network performance will suffer. > > > > 2. For the concern on transactional message support as in KIP-98, > since > > > MM1 > > > > and MM2 currently don't support transactional messages, KIP-712 will > > not > > > > attempt to support transactions either. I will add a config option > on > > > > producer config: allowMultipleBatches. By default this option will > be > > > off > > > > and the user needs to explicitly turn on this option to use the > shallow > > > > mirror feature. And if we detect both this option and transaction is > > > > turned on we will throw an exception to protect current transaction > > > > processing. > > > > 3. In the future, when MM2 starts to support exact-once and > > transactional > > > > messages (is that KIP-656?), we can revisit this code. The current > > > > transactional message already makes the compromise that the messages > in > > > the > > > > same RecordBatch (MessageSet) are sharing the same > > > > sequence-id/transaction-id, so those messages need to be committed > all > > > > together. I think when we support the shallow mirror with > > transactional > > > > semantics, we will group all batches in the same ProduceRequest in > the > > > same > > > > transaction boundary, they need to be committed all together. On the > > > > broker side, all batches coming from ProduceRequest (or > FetchResponse) > > > are > > > > committed in the same log segment file as one unit (current > behavior). > > > > > > > > On Mon, Mar 29, 2021 at 8:46 AM Ryanne Dolan <ryannedo...@gmail.com> > > > > wrote: > > > > > > > > > Ah, I see, thanks Ismael. Now I understand your concern. > > > > > > > > > > From KIP-98, re this change in v3: > > > > > > > > > > "This allows us to remove the message set size since each message > set > > > > > already contains a field for the size. More importantly, since > there > > is > > > > > only one message set to be written to the log, partial produce > > failures > > > > are > > > > > no longer possible. The full message set is either successfully > > written > > > > to > > > > > the log (and replicated) or it is not." > > > > > > > > > > The schema and size field don't seem to be an issue, as KIP-712 > > already > > > > > addresses. > > > > > > > > > > The partial produce failure issue is something I don't understand. > I > > > > can't > > > > > tell if this was done out of convenience at the time or if there is > > > > > something incompatible with partial produce success/failure and > EOS. > > > Does > > > > > anyone know? > > > > > > > > > > Ryanne > > > > > > > > > > On Mon, Mar 29, 2021, 1:41 AM Ismael Juma <ism...@juma.me.uk> > wrote: > > > > > > > > > > > Ryanne, > > > > > > > > > > > > You misunderstood the referenced comment. It is about the produce > > > > request > > > > > > change to have multiple batches: > > > > > > > > > > > > "Up to ProduceRequest V2, a ProduceRequest can contain multiple > > > batches > > > > > of > > > > > > messages stored in the record_set field, but this was disabled in > > V3. > > > > We > > > > > > are proposing to bring the multiple batches feature back to > improve > > > the > > > > > > network throughput of the mirror maker producer when the original > > > batch > > > > > > size from source broker is too small." > > > > > > > > > > > > This is unrelated to shallow iteration. > > > > > > > > > > > > Ismael > > > > > > > > > > > > On Sun, Mar 28, 2021, 10:15 PM Ryanne Dolan < > ryannedo...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > Ismael, I don't think KIP-98 is related. Shallow iteration was > > > > removed > > > > > in > > > > > > > KAFKA-732, which predates KIP-98 by a few years. > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > On Sun, Mar 28, 2021, 11:25 PM Ismael Juma <ism...@juma.me.uk> > > > > wrote: > > > > > > > > > > > > > > > Thanks for the KIP. I have a few high level comments: > > > > > > > > > > > > > > > > 1. Like Tom, I'm not convinced about the proposal to make > this > > > > change > > > > > > to > > > > > > > > MirrorMaker 1 if we intend to deprecate it and remove it. I > > would > > > > > > rather > > > > > > > us > > > > > > > > focus our efforts on the implementation we intend to support > > > going > > > > > > > forward. > > > > > > > > 2. The producer/consumer configs seem pretty dangerous for > > > general > > > > > > usage, > > > > > > > > but the KIP doesn't address the potential downsides. > > > > > > > > 3. How does the ProducerRequest change impact exactly-once > (if > > at > > > > > all)? > > > > > > > The > > > > > > > > change we are reverting was done as part of KIP-98. Have we > > > > > considered > > > > > > > the > > > > > > > > original reasons for the change? > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Ismael > > > > > > > > > > > > > > > > On Wed, Feb 10, 2021 at 12:58 PM Vahid Hashemian < > > > > > > > > vahid.hashem...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Retitled the thread to conform to the common format. > > > > > > > > > > > > > > > > > > On Fri, Feb 5, 2021 at 4:00 PM Ning Zhang < > > > > ning2008w...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hello Henry, > > > > > > > > > > > > > > > > > > > > This is a very interesting proposal. > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-10728 > reflects > > > the > > > > > > > similar > > > > > > > > > > concern of re-compressing data in mirror maker. > > > > > > > > > > > > > > > > > > > > Probably one thing may need to clarify is: how "shallow" > > > > > mirroring > > > > > > is > > > > > > > > > only > > > > > > > > > > applied to mirrormaker use case, if the changes need to > be > > > made > > > > > on > > > > > > > > > generic > > > > > > > > > > consumer and producer (e.g. by adding `fetch.raw.bytes` > and > > > > > > > > > > `send.raw.bytes` to producer and consumer config) > > > > > > > > > > > > > > > > > > > > On 2021/02/05 00:59:57, Henry Cai > > <h...@pinterest.com.INVALID > > > > > > > > > > > wrote: > > > > > > > > > > > Dear Community members, > > > > > > > > > > > > > > > > > > > > > > We are proposing a new feature to improve the > performance > > > of > > > > > > Kafka > > > > > > > > > mirror > > > > > > > > > > > maker: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-712%3A+Shallow+Mirroring > > > > > > > > > > > > > > > > > > > > > > The current Kafka MirrorMaker process (with the > > underlying > > > > > > Consumer > > > > > > > > and > > > > > > > > > > > Producer library) uses significant CPU cycles and > memory > > to > > > > > > > > > > > decompress/recompress, deserialize/re-serialize > messages > > > and > > > > > copy > > > > > > > > > > multiple > > > > > > > > > > > times of messages bytes along the mirroring/replicating > > > > stages. > > > > > > > > > > > > > > > > > > > > > > The KIP proposes a *shallow mirror* feature which > brings > > > back > > > > > the > > > > > > > > > shallow > > > > > > > > > > > iterator concept to the mirror process and also > proposes > > to > > > > > skip > > > > > > > the > > > > > > > > > > > unnecessary message decompression and recompression > > steps. > > > > We > > > > > > > argue > > > > > > > > in > > > > > > > > > > > many cases users just want a simple replication > pipeline > > to > > > > > > > replicate > > > > > > > > > the > > > > > > > > > > > message as it is from the source cluster to the > > destination > > > > > > > cluster. > > > > > > > > > In > > > > > > > > > > > many cases the messages in the source cluster are > already > > > > > > > compressed > > > > > > > > > and > > > > > > > > > > > properly batched, users just need an identical copy of > > the > > > > > > message > > > > > > > > > bytes > > > > > > > > > > > through the mirroring without any transformation or > > > > > > repartitioning. > > > > > > > > > > > > > > > > > > > > > > We have a prototype implementation in house with > > > MirrorMaker > > > > v1 > > > > > > and > > > > > > > > > > > observed *CPU usage dropped from 50% to 15%* for some > > > mirror > > > > > > > > pipelines. > > > > > > > > > > > > > > > > > > > > > > We name this feature: *shallow mirroring* since it has > > some > > > > > > > > resemblance > > > > > > > > > > to > > > > > > > > > > > the old Kafka 0.7 namesake feature but the > > implementations > > > > are > > > > > > not > > > > > > > > > quite > > > > > > > > > > > the same. ‘*Shallow*’ means 1. we *shallowly* iterate > > > > > > > RecordBatches > > > > > > > > > > inside > > > > > > > > > > > MemoryRecords structure instead of deep iterating > records > > > > > inside > > > > > > > > > > > RecordBatch; 2. We *shallowly* copy (share) pointers > > inside > > > > > > > > ByteBuffer > > > > > > > > > > > instead of deep copying and deserializing bytes into > > > objects. > > > > > > > > > > > > > > > > > > > > > > Please share discussions/feedback along this email > > thread. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > --Vahid > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >