Hi, Henry, Thanks for the reply.
3. Regarding EOS, supporting read_committed makes sense. 1. Regarding re-implementing 30 methods. Could we structure the code such that the implementation of most of the methods could be shared? Thanks, Jun On Thu, Apr 1, 2021 at 11:18 PM Henry Cai <h...@pinterest.com.invalid> wrote: > Tom, > > I don't think we need to refactor the Consumer/Producer class hierarchy for > this KIP. If we introduce a separate class for BatchConsumer, there are > more than 30 methods in Consumer interface this new class needs to > implement and the implementation would be exactly the same as > KafkaConsumer. > > If we just introduce a new method 'ConsumerBatchRecord pollBatch(duration)' > on the Consumer class, there is no need to make this method to handle > generic type since the return type is a concrete class. The worry you have > that later the user can tweak the content so they can send > ProducerRecord<NewFoo, NewBar> can be avoided if we make the class > ConsumerBatchRecord final and only define one method > 'toProducerBatchRecord()' on the class, and makes the constructor on > ProducerBatchRecord package private as well to make people not able to > create a ProducerBatchRecord with arbitrary content. > > The other design is just have one class BatchRecord (instead of having two > classes: ConsumerBatchRecord and ProducerBatchRecord), > Consumer.pollBatch(duration) will return BatchRecord, > Producer.sendBatch(BatchRecord) will send the batch. BatchRecord class is > final and the constructor is hidden, so it's unmodifiable by the user code. > > On Thu, Apr 1, 2021 at 6:27 AM Tom Bentley <tbent...@redhat.com> wrote: > > > Hi Henry, Jun and Ismael, > > > > A few things make me wonder if building this into the existing Producer > and > > Consumer APIs is really the right thing to do: > > > > 1. Type safety. The existing Producer and Consumer are both generic in K > > and V, but those type parameters are meaningless in the batch case. For > > example, the apparent type safety of a Producer<Foo, Bar> would be > violated > > by using the batch method to actually send a <Baz, Quux>. Another > example: > > What happens if I pass a producer configured for records to someone that > > requires one configured for batches (and vice versa)? > > > > 2. The existing Producer and Consumer would both accept a number of > configs > > which didn't apply in the batch case. > > > > In the discussion for KIP-706 Jason was imagining a more abstracted set > of > > client APIs which separated the data from the topic destination/origin, > and > > he mentioned basically this exact use case. This got me thinking, and > > although I don't want to derail this conversion, I thought I'd sketch > what > > I came up with. > > > > > > On the Consumer side: > > > > ```java > > // Abstraction over where messages come from > > interface ReceiveSource; > > class TopicPartition implements SendTarget, ReceiveSource; > > class Topic implements SendTarget, ReceiveSource; > > class TopicId implements SendTarget, ReceiveSource; > > class TopicPattern implements ReceiveSource; > > > > // New abstraction for consumer-like things > > interface Receiver<X> { > > assign(ReceiveSource source); > > subscribe(ReceiveSource source); > > // etc > > X poll(Duration); > > } > > > > // Consumer doesn't change, except for the implements clause > > interface Consumer<K, V> implements Receiver<ConsumerRecords<K, V>> { > > assign(ReceiveSource source); > > subscribe(ReceiveSource source); > > ConsumerRecords<K, V> poll(Duration); > > } > > > > // KafkaConsumer doesn't change at all > > class KafkaConsumer<K, V> implements Consumer<K, V> { > > } > > > > // Specialise Receiver for batch-based consumption. > > interface BatchConsumer implements Receiver<ConsumerBatch> { > > > > } > > > > // Implementation > > class KafkaBatchConsumer implements BatchConsumer { > > > > } > > > > class ConsumerBatch { > > // For KIP-712, a way to convert batches without exposing low level > > details like ByteBuffer > > ProducerBatchPayload toProducerBatch(); > > } > > ``` > > > > On the producer side: > > ```java > > // Abstraction over targets (see the ReceiveSource for the impls) > > interface SendTarget; > > > > // Abstraction over data that can be send to a target > > interface Payload; > > class ProducerRecordPayload<K, V> implements Payload { > > // Like ProducerRecord, but without the topic and partition > > } > > class ProducerBatchPayload implements Payload { > > // For the KIP-712 case > > } > > > > // A new abstraction over producer-like things > > interface Transmitter<P extends Payload> { > > CompletionStage<T> send(SendTarget target, P payload); > > } > > > > // Producer gains an extends clause > > interface Producer<K, V> extends Transmitter<ProducerRecord<K, V>> { > > } > > > > class KafkaProducer<K, V> implements Producer<K, V> { > > // Unchanged, included for completeness > > } > > > > interface BatchProducer extends Transmitter<ProducerBatch> { > > CompletionStage<T> send(SendTarget target, ProducerBatch) > > } > > > > class KafkaBatchProducer extends BatchProducer { > > // New. In practice a lot of common code between this and KafkaProducer > > could be factored into an abstract class. > > } > > ``` > > > > Really I'm just re-stating Jason's KIP-706 idea in the context of this > KIP, > > but it would address the type safety issue and also enable a batch > consumer > > to have its own set of configs. It also allows the new Producer.send > return > > type to be CompletionStage, which is KIP-706's objective. And, of course > > it's compatible with possible future work around produce to/consume from > > topic id. > > > > Kind regards, > > > > Tom > > > > > > > > On Thu, Apr 1, 2021 at 9:11 AM Henry Cai <h...@pinterest.com.invalid> > > wrote: > > > > > Jun, > > > > > > Thanks for your insight looking into this KIP, we do believe the > shallow > > > iteration will give quite a significant performance boost. > > > > > > On your concerns: > > > > > > 1. Cleaner API. One alternative is to create new batch APIs. On > > consumer, > > > it would become Consumer.pollBatch returns a ConsumerBatch object which > > > contains topic/partition/firstOffsetOfBatch/pointerToByteBuffer, > > similarly > > > Producer.sendBatch(ProducerBatch). Both ConsumerBatch and > ProducerBatch > > > objects are fixed types (no generics), serializer is gone, interceptors > > are > > > probably not needed initially (unless people see the need to intercept > on > > > the batch level). On MM2 side, the current flow is ConsumerRecord -> > > > Connect's SourceRecord -> ProducerRecord, we would need to enhance > > Connect > > > framework to add SourceTask.pollBatch() method which returns a > > SourceBatch > > > object, so the object conversion flow becomes ConsumerBatch -> > > SourceBatch > > > -> ProducerBatch, we probably won't support any transformers on Batch > > > objects. > > > 2. PID/ProducerEpoch/SeqNo passing through RecordBatch. I think those > > > transaction fields are only meaningful in the original source kafka > > > cluster, producer id/seqNo are not the same for the target kafka > cluster. > > > So if MM is not going to support transactions at the moment, we can > clear > > > those fields when they are going through MM. Once MM starts to support > > > transactions in the future, it probably will start its own PID/SeqNo > etc. > > > 3. For EOS and read_committed/read_uncommitted support, we can do > phased > > > support. Phase 1, don't support transactional messages in the source > > > cluster (i.e. abort if it sees control batch records). Phase 2: > applying > > > commit/abort on the batch boundary level. I am not too familiar with > the > > > isolation level and abort transaction code path, but it seems the > control > > > unit is currently on the batch boundary (commit/abort the whole batch), > > if > > > so, this should also be doable. > > > 4. MessageHandler in MM1 or SMT in MM2, initially we don't need to > > support > > > them. Since now the object is a ConsumerBatch and the existing handler > > is > > > written for the individual object. Deserialize the batch into > individual > > > objects would defeat the purpose of performance optimization. > > > 5. Multiple batch performances, will do some testing on this. > > > > > > On Wed, Mar 31, 2021 at 10:14 AM Jun Rao <j...@confluent.io.invalid> > > wrote: > > > > > > > Hi, Henry, > > > > > > > > Thanks for the KIP. Sorry for the late reply. A few comments below. > > > > > > > > 1. The 'shallow' feature is potentially useful. I do agree with Tom > > that > > > > the proposed API changes seem unclean. Quite a few existing stuff > don't > > > > really work together with this (e.g., generics, serializer, > > interceptors, > > > > configs like max.poll.records, etc). It's also hard to explain this > > > change > > > > to the common users of the consumer/producer API. I think it would be > > > > useful to explore if there is another cleaner way of adding this. For > > > > example, you mentioned that creating a new set of APIs doesn't work > for > > > > MM2. However, we could potentially change the connect interface to > > allow > > > > MM2 to use the new API. If this doesn't work, it would be useful to > > > explain > > > > that in the rejected alternative section. > > > > > > > > 2. I am not sure that we could pass through all fields in > RecordBatch. > > > For > > > > example, a MM instance could be receiving RecordBatch from different > > > source > > > > partitions. Mixing the PID/ProducerEpoch/FirstSequence fields across > > them > > > > in a single producer will be weird. So, it would be useful to > document > > > this > > > > part clearer. > > > > > > > > 3. EOS. While MM itself doesn't support mirroring data in an > > > > exactly-once way, it needs to support reading from a topic with EOS > > data. > > > > So, it would be useful to document whether both read_committed and > > > > read_uncommitted mode are supported and what kind of RecordBatch the > > > > consumer returns in each case. > > > > > > > > 4. With the 'shallow' feature, it seems that some existing features > in > > MM > > > > won't work. For example, I am not sure if SMT works in MM2 > > > > and MirrorMakerMessageHandler works in MM1. It would be useful to > > > document > > > > this kind of impact in the KIP. > > > > > > > > 5. Multiple batches per partition in the produce request. This seems > > not > > > > strictly required in KIP-98. However, changing this will probably > add a > > > bit > > > > more complexity in the producer. So, it would be useful to understand > > its > > > > benefits, especially since it doesn't seem to directly help reduce > the > > > CPU > > > > cost in MM. For example, do you have performance numbers with and > > without > > > > this enabled in your MM tests? > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > On Tue, Mar 30, 2021 at 1:27 PM Henry Cai <h...@pinterest.com.invalid > > > > > > wrote: > > > > > > > > > Tom, > > > > > > > > > > Thanks for your comments. Yes it's a bit clumsy to use the > existing > > > > > consumer and producer API to carry the underlying record batch, but > > > > > creating a new set of API would also mean other use cases (e.g. > MM2) > > > > > wouldn't be able to use that feature easily. We can throw > exceptions > > > if > > > > we > > > > > see clients are setting serializer/compression in the consumer > config > > > > > option. > > > > > > > > > > The consumer is essentially getting back a collection of > > > > > RecordBatchByteBuffer records and passing them to the producer. > Most > > > of > > > > > the internal APIs inside consumer and producer code paths are > > actually > > > > > taking on ByteBuffer as the argument so it's not too much work to > get > > > the > > > > > byte buffer through. > > > > > > > > > > For the worry that the client might see the inside of that byte > > buffer, > > > > we > > > > > can create a RecordBatchByteBufferRecord class to wrap the > underlying > > > > byte > > > > > buffer so hopefully they will not drill too deep into that object. > > > > Java's > > > > > ByteBuffer does have a asReadOnlyBuffer() method to return a > > read-only > > > > > buffer, that can be explored as well. > > > > > > > > > > On Tue, Mar 30, 2021 at 4:24 AM Tom Bentley <tbent...@redhat.com> > > > wrote: > > > > > > > > > > > Hi Henry and Ryanne, > > > > > > > > > > > > Related to Ismael's point about the producer & consumer configs > > being > > > > > > dangerous, I can see two parts to this: > > > > > > > > > > > > 2a. Both the proposed configs seem to be fundamentally > incompatible > > > > with > > > > > > the Producer's existing key.serializer, value.serializer and > > > > > > compression.type configs, likewise the consumers key.deserializer > > and > > > > > > value.deserializer. I don't see a way to avoid this, since those > > > > existing > > > > > > configs are already separate things. (I did consider whether > using > > > > > > special-case Deserializer and Serializer could be used instead, > but > > > > that > > > > > > doesn't work nicely; in this use case they're necessarily all > > > > configured > > > > > > together). I think all we could do would be to reject configs > which > > > > tried > > > > > > to set those existing client configs in conjunction with > > > > fetch.raw.bytes > > > > > > and send.raw.bytes. > > > > > > > > > > > > 2b. That still leaves a public Java API which would allow access > to > > > the > > > > > raw > > > > > > byte buffers. AFAICS we don't actually need user code to have > > access > > > to > > > > > the > > > > > > raw buffers. It would be enough to get an opaque object that > > wrapped > > > > the > > > > > > ByteBuffer from the consumer and pass it to the producer. It's > only > > > the > > > > > > consumer and producer code which needs to be able to obtain the > > > wrapped > > > > > > buffer. > > > > > > > > > > > > Kind regards, > > > > > > > > > > > > Tom > > > > > > > > > > > > On Tue, Mar 30, 2021 at 8:41 AM Ismael Juma <ism...@juma.me.uk> > > > wrote: > > > > > > > > > > > > > Hi Henry, > > > > > > > > > > > > > > Can you clarify why this "network performance" issue is only > > > related > > > > to > > > > > > > shallow mirroring? Generally, we want the protocol to be > generic > > > and > > > > > not > > > > > > > have a number of special cases. The more special cases you > have, > > > the > > > > > > > tougher it becomes to test all the edge cases. > > > > > > > > > > > > > > Ismael > > > > > > > > > > > > > > On Mon, Mar 29, 2021 at 9:51 PM Henry Cai > > > <h...@pinterest.com.invalid > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > It's interesting this VOTE thread finally becomes a DISCUSS > > > thread. > > > > > > > > > > > > > > > > For MM2 concern, I will take a look to see whether I can add > > the > > > > > > support > > > > > > > > for MM2. > > > > > > > > > > > > > > > > For Ismael's concern on multiple batches in the > ProduceRequest > > > > > > > (conflicting > > > > > > > > with KIP-98), here is my take: > > > > > > > > > > > > > > > > 1. We do need to group multiple batches in the same request > > > > otherwise > > > > > > the > > > > > > > > network performance will suffer. > > > > > > > > 2. For the concern on transactional message support as in > > KIP-98, > > > > > since > > > > > > > MM1 > > > > > > > > and MM2 currently don't support transactional messages, > KIP-712 > > > > will > > > > > > not > > > > > > > > attempt to support transactions either. I will add a config > > > option > > > > > on > > > > > > > > producer config: allowMultipleBatches. By default this > option > > > will > > > > > be > > > > > > > off > > > > > > > > and the user needs to explicitly turn on this option to use > the > > > > > shallow > > > > > > > > mirror feature. And if we detect both this option and > > > transaction > > > > is > > > > > > > > turned on we will throw an exception to protect current > > > transaction > > > > > > > > processing. > > > > > > > > 3. In the future, when MM2 starts to support exact-once and > > > > > > transactional > > > > > > > > messages (is that KIP-656?), we can revisit this code. The > > > current > > > > > > > > transactional message already makes the compromise that the > > > > messages > > > > > in > > > > > > > the > > > > > > > > same RecordBatch (MessageSet) are sharing the same > > > > > > > > sequence-id/transaction-id, so those messages need to be > > > committed > > > > > all > > > > > > > > together. I think when we support the shallow mirror with > > > > > > transactional > > > > > > > > semantics, we will group all batches in the same > ProduceRequest > > > in > > > > > the > > > > > > > same > > > > > > > > transaction boundary, they need to be committed all together. > > On > > > > the > > > > > > > > broker side, all batches coming from ProduceRequest (or > > > > > FetchResponse) > > > > > > > are > > > > > > > > committed in the same log segment file as one unit (current > > > > > behavior). > > > > > > > > > > > > > > > > On Mon, Mar 29, 2021 at 8:46 AM Ryanne Dolan < > > > > ryannedo...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Ah, I see, thanks Ismael. Now I understand your concern. > > > > > > > > > > > > > > > > > > From KIP-98, re this change in v3: > > > > > > > > > > > > > > > > > > "This allows us to remove the message set size since each > > > message > > > > > set > > > > > > > > > already contains a field for the size. More importantly, > > since > > > > > there > > > > > > is > > > > > > > > > only one message set to be written to the log, partial > > produce > > > > > > failures > > > > > > > > are > > > > > > > > > no longer possible. The full message set is either > > successfully > > > > > > written > > > > > > > > to > > > > > > > > > the log (and replicated) or it is not." > > > > > > > > > > > > > > > > > > The schema and size field don't seem to be an issue, as > > KIP-712 > > > > > > already > > > > > > > > > addresses. > > > > > > > > > > > > > > > > > > The partial produce failure issue is something I don't > > > > understand. > > > > > I > > > > > > > > can't > > > > > > > > > tell if this was done out of convenience at the time or if > > > there > > > > is > > > > > > > > > something incompatible with partial produce success/failure > > and > > > > > EOS. > > > > > > > Does > > > > > > > > > anyone know? > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > On Mon, Mar 29, 2021, 1:41 AM Ismael Juma < > ism...@juma.me.uk > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Ryanne, > > > > > > > > > > > > > > > > > > > > You misunderstood the referenced comment. It is about the > > > > produce > > > > > > > > request > > > > > > > > > > change to have multiple batches: > > > > > > > > > > > > > > > > > > > > "Up to ProduceRequest V2, a ProduceRequest can contain > > > multiple > > > > > > > batches > > > > > > > > > of > > > > > > > > > > messages stored in the record_set field, but this was > > > disabled > > > > in > > > > > > V3. > > > > > > > > We > > > > > > > > > > are proposing to bring the multiple batches feature back > to > > > > > improve > > > > > > > the > > > > > > > > > > network throughput of the mirror maker producer when the > > > > original > > > > > > > batch > > > > > > > > > > size from source broker is too small." > > > > > > > > > > > > > > > > > > > > This is unrelated to shallow iteration. > > > > > > > > > > > > > > > > > > > > Ismael > > > > > > > > > > > > > > > > > > > > On Sun, Mar 28, 2021, 10:15 PM Ryanne Dolan < > > > > > ryannedo...@gmail.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Ismael, I don't think KIP-98 is related. Shallow > > iteration > > > > was > > > > > > > > removed > > > > > > > > > in > > > > > > > > > > > KAFKA-732, which predates KIP-98 by a few years. > > > > > > > > > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > > On Sun, Mar 28, 2021, 11:25 PM Ismael Juma < > > > > ism...@juma.me.uk> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP. I have a few high level comments: > > > > > > > > > > > > > > > > > > > > > > > > 1. Like Tom, I'm not convinced about the proposal to > > make > > > > > this > > > > > > > > change > > > > > > > > > > to > > > > > > > > > > > > MirrorMaker 1 if we intend to deprecate it and remove > > > it. I > > > > > > would > > > > > > > > > > rather > > > > > > > > > > > us > > > > > > > > > > > > focus our efforts on the implementation we intend to > > > > support > > > > > > > going > > > > > > > > > > > forward. > > > > > > > > > > > > 2. The producer/consumer configs seem pretty > dangerous > > > for > > > > > > > general > > > > > > > > > > usage, > > > > > > > > > > > > but the KIP doesn't address the potential downsides. > > > > > > > > > > > > 3. How does the ProducerRequest change impact > > > exactly-once > > > > > (if > > > > > > at > > > > > > > > > all)? > > > > > > > > > > > The > > > > > > > > > > > > change we are reverting was done as part of KIP-98. > > Have > > > we > > > > > > > > > considered > > > > > > > > > > > the > > > > > > > > > > > > original reasons for the change? > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Ismael > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Feb 10, 2021 at 12:58 PM Vahid Hashemian < > > > > > > > > > > > > vahid.hashem...@gmail.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Retitled the thread to conform to the common > format. > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Feb 5, 2021 at 4:00 PM Ning Zhang < > > > > > > > > ning2008w...@gmail.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hello Henry, > > > > > > > > > > > > > > > > > > > > > > > > > > > > This is a very interesting proposal. > > > > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-10728 > > > > > reflects > > > > > > > the > > > > > > > > > > > similar > > > > > > > > > > > > > > concern of re-compressing data in mirror maker. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Probably one thing may need to clarify is: how > > > > "shallow" > > > > > > > > > mirroring > > > > > > > > > > is > > > > > > > > > > > > > only > > > > > > > > > > > > > > applied to mirrormaker use case, if the changes > > need > > > to > > > > > be > > > > > > > made > > > > > > > > > on > > > > > > > > > > > > > generic > > > > > > > > > > > > > > consumer and producer (e.g. by adding > > > `fetch.raw.bytes` > > > > > and > > > > > > > > > > > > > > `send.raw.bytes` to producer and consumer config) > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 2021/02/05 00:59:57, Henry Cai > > > > > > <h...@pinterest.com.INVALID > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Dear Community members, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We are proposing a new feature to improve the > > > > > performance > > > > > > > of > > > > > > > > > > Kafka > > > > > > > > > > > > > mirror > > > > > > > > > > > > > > > maker: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-712%3A+Shallow+Mirroring > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The current Kafka MirrorMaker process (with the > > > > > > underlying > > > > > > > > > > Consumer > > > > > > > > > > > > and > > > > > > > > > > > > > > > Producer library) uses significant CPU cycles > and > > > > > memory > > > > > > to > > > > > > > > > > > > > > > decompress/recompress, deserialize/re-serialize > > > > > messages > > > > > > > and > > > > > > > > > copy > > > > > > > > > > > > > > multiple > > > > > > > > > > > > > > > times of messages bytes along the > > > > mirroring/replicating > > > > > > > > stages. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The KIP proposes a *shallow mirror* feature > which > > > > > brings > > > > > > > back > > > > > > > > > the > > > > > > > > > > > > > shallow > > > > > > > > > > > > > > > iterator concept to the mirror process and also > > > > > proposes > > > > > > to > > > > > > > > > skip > > > > > > > > > > > the > > > > > > > > > > > > > > > unnecessary message decompression and > > recompression > > > > > > steps. > > > > > > > > We > > > > > > > > > > > argue > > > > > > > > > > > > in > > > > > > > > > > > > > > > many cases users just want a simple replication > > > > > pipeline > > > > > > to > > > > > > > > > > > replicate > > > > > > > > > > > > > the > > > > > > > > > > > > > > > message as it is from the source cluster to the > > > > > > destination > > > > > > > > > > > cluster. > > > > > > > > > > > > > In > > > > > > > > > > > > > > > many cases the messages in the source cluster > are > > > > > already > > > > > > > > > > > compressed > > > > > > > > > > > > > and > > > > > > > > > > > > > > > properly batched, users just need an identical > > copy > > > > of > > > > > > the > > > > > > > > > > message > > > > > > > > > > > > > bytes > > > > > > > > > > > > > > > through the mirroring without any > transformation > > or > > > > > > > > > > repartitioning. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We have a prototype implementation in house > with > > > > > > > MirrorMaker > > > > > > > > v1 > > > > > > > > > > and > > > > > > > > > > > > > > > observed *CPU usage dropped from 50% to 15%* > for > > > some > > > > > > > mirror > > > > > > > > > > > > pipelines. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We name this feature: *shallow mirroring* since > > it > > > > has > > > > > > some > > > > > > > > > > > > resemblance > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > the old Kafka 0.7 namesake feature but the > > > > > > implementations > > > > > > > > are > > > > > > > > > > not > > > > > > > > > > > > > quite > > > > > > > > > > > > > > > the same. ‘*Shallow*’ means 1. we *shallowly* > > > > iterate > > > > > > > > > > > RecordBatches > > > > > > > > > > > > > > inside > > > > > > > > > > > > > > > MemoryRecords structure instead of deep > iterating > > > > > records > > > > > > > > > inside > > > > > > > > > > > > > > > RecordBatch; 2. We *shallowly* copy (share) > > > pointers > > > > > > inside > > > > > > > > > > > > ByteBuffer > > > > > > > > > > > > > > > instead of deep copying and deserializing bytes > > > into > > > > > > > objects. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Please share discussions/feedback along this > > email > > > > > > thread. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > > > > > --Vahid > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >