[jira] [Resolved] (KAFKA-7502) Cleanup KTable materialization logic in a single place
[ https://issues.apache.org/jira/browse/KAFKA-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lee Dongjin resolved KAFKA-7502. Resolution: Fixed > Cleanup KTable materialization logic in a single place > -- > > Key: KAFKA-7502 > URL: https://issues.apache.org/jira/browse/KAFKA-7502 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Lee Dongjin >Priority: Major > > Today since we pre-create all the `KTableXXX` operator along with the logical > node, we are effectively duplicating the logic to determine whether the > resulted KTable should be materialized. More specifically, the > materialization principle today is that: > 1) If users specified Materialized in the DSL and it contains a queryable > name. We always materialize. > 2) If users specified Materialized in the DSL but not contains a queryable > name, or if users do not specify a Materialized object at all, Streams may > choose to materialize or not. But in any cases, even if the KTable is > materialized it will not be queryable since there's no queryable name (i.e. > only storeName is not null, but queryableName is null): > 2.a) If the resulted KTable is from an aggregation, we always materialize > since it is needed for storing the aggregation (i.e. we use the > MaterializedInternal constructor with nameProvider != null). > 2.b) If the resulted KTable is from a source topic, we delay the > materialization until the downstream operator requires this KTable to be > materialized or send-old-values (see `KTableSourceNode` and `KTableSource`). > 2.c) If the resulted KTable if from a join, we always materialize if users > creates a Materialized object even without a queryable name. However this can > be optimized similar to 2.b) but is orthogonal to this ticket (see > `KTableImpl#buildJoin` where we always use constructor with nameProvider != > null). > 2.d) If the resulted KTable is from a stateless operation like filter / > mapValues, we never materialize. > > Now, in all of these cases, we have logical node like "KTableKTableJoinNode", > as well as physical node like `ProcessorNode`. Ideally we should always > create the logical Plan (i.e. the StreamsGraph), and then optimize it if > necessary, and then generate the physical plan (i.e. the Topology), however > today we create some physical nodes beforehand, and the above logic is hence > duplicated in the creation of both physical nodes and logical nodes. For > example, in `KTableKTableJoinNode` we check if Materialized is null for > adding a state store, and in `KTableImpl#doJoin` we check if materialized is > specified (case 2.c) above). > Another example is in TableProcessorNode which is used for 2.d) above, in > which it includes the logic whereas its caller, `KTableImpl#doFilter` for > example, also contains the logic when deciding to pass `queryableName` > parameter to `KTableProcessorSupplier`. > This is bug-vulnerable since we may update the logic in one class but forgot > to update the other class. > -- > What we want to have is a cleaner code path similar to what we have for 2.b), > such that when creating the logical nodes we keep track of whether 1) > materialized is specified, and 2) queryable name is provided. And during > optimization phase, we may change the inner physical ProcessorBuilder's > parameters like queryable name etc, and then when it is time to generate the > physical node, we can just blindly take the parameters and go for it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8178) KafkaProducer#send(ProducerRecord,Callback) may block for up to 60 seconds
Sergei Egorov created KAFKA-8178: Summary: KafkaProducer#send(ProducerRecord,Callback) may block for up to 60 seconds Key: KAFKA-8178 URL: https://issues.apache.org/jira/browse/KAFKA-8178 Project: Kafka Issue Type: Bug Components: clients, producer Reporter: Sergei Egorov Hello. I was running reactor-kafka with [the BlockHound agent|https://github.com/reactor/BlockHound] (you can see the progress [here|https://github.com/reactor/reactor-kafka/pull/75] and even run it yourself) and it detected a very dangerous blocking call in KafkaProducer#send(ProducerRecord,Callback) which is supposed to be async: {code:java} java.lang.Error: Blocking call! java.lang.Object#wait at reactor.BlockHound$Builder.lambda$new$0(BlockHound.java:154) at reactor.BlockHound$Builder.lambda$install$8(BlockHound.java:254) at reactor.BlockHoundRuntime.checkBlocking(BlockHoundRuntime.java:43) at java.lang.Object.wait(Object.java) at org.apache.kafka.clients.Metadata.awaitUpdate(Metadata.java:181) at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:938) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:823) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) {code} it blocks for up to "maxBlockTimeMs" (60 seconds by default) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8147) Add changelog topic configuration to KTable suppress
[ https://issues.apache.org/jira/browse/KAFKA-8147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806221#comment-16806221 ] Maarten commented on KAFKA-8147: The version published now is short, but should be complete. I need some help with the last step though; how can I start a thread on the Apache mailing list? > Add changelog topic configuration to KTable suppress > > > Key: KAFKA-8147 > URL: https://issues.apache.org/jira/browse/KAFKA-8147 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.1.1 >Reporter: Maarten >Assignee: Maarten >Priority: Minor > Labels: needs-kip > > The streams DSL does not provide a way to configure the changelog topic > created by KTable.suppress. > From the perspective of an external user this could be implemented similar to > the configuration of aggregate + materialized, i.e., > {code:java} > changelogTopicConfigs = // Configs > materialized = Materialized.as(..).withLoggingEnabled(changelogTopicConfigs) > .. > KGroupedStream.aggregate(..,materialized) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails
[ https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806234#comment-16806234 ] Guozhang Wang commented on KAFKA-4600: -- Hello [~dana.powers] The root cause is around when we should set `need-rejoin` boolean flag to false. Prior to KAFKA-5154 it was reset after the join-group response is received, so if there's an error after that, e.g. during sync-group round trip, e.g. in this ticket inside the onAssign callback, then the consumer will just continue fetching from the previously assigned partitions, like this ticket reportedly observed. In KAFKA-5154 we pushed `resetJoinGroupFuture()` after the `onJoinComplete` code, which will cover this case if the error was thrown inside the callback the consumer will not proceed to fetch from previously assigned partitions. As for error propagation, right now we already log ERROR as `User provided listener {} failed on partition assignment`, and because of the fix of KAFKA-5154 it will block consumer from proceeding. > Consumer proceeds on when ConsumerRebalanceListener fails > - > > Key: KAFKA-4600 > URL: https://issues.apache.org/jira/browse/KAFKA-4600 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.1.1 >Reporter: Braedon Vickers >Priority: Major > Fix For: 0.11.0.0 > > > One of the use cases for a ConsumerRebalanceListener is to load state > necessary for processing a partition when it is assigned. However, when > ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. > the state isn't loaded), the error is logged and the consumer proceeds on as > if nothing happened, happily consuming messages from the new partition. When > the state is relied upon for correct processing, this can be very bad, e.g. > data loss can occur. > It would be better if the error was propagated up so it could be dealt with > normally. At the very least the assignment should fail so the consumer > doesn't see any messages from the new partitions, and the rebalance can be > reattempted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails
[ https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806240#comment-16806240 ] Dana Powers commented on KAFKA-4600: I don't think KAFKA-5154 is related to this issue. But maybe I'm reading this ticket differently? I thought this ticket was literally about what happens when the call to `listener.onPartitionsAssigned(assignedPartitions)` raises an Exception. And because that call is currently wrapped in a bare `catch (Exception e)`, and because the caught exception is logged but otherwise ignored, it seems like this ticket should still be open or marked as wont fix. I just don't see how KAFKA-5154 applies here – it was a fix for a NPE inside KafkaStreams, but after onPartitionsAssigned() had already been called successfully? > Consumer proceeds on when ConsumerRebalanceListener fails > - > > Key: KAFKA-4600 > URL: https://issues.apache.org/jira/browse/KAFKA-4600 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.1.1 >Reporter: Braedon Vickers >Priority: Major > Fix For: 0.11.0.0 > > > One of the use cases for a ConsumerRebalanceListener is to load state > necessary for processing a partition when it is assigned. However, when > ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. > the state isn't loaded), the error is logged and the consumer proceeds on as > if nothing happened, happily consuming messages from the new partition. When > the state is relied upon for correct processing, this can be very bad, e.g. > data loss can occur. > It would be better if the error was propagated up so it could be dealt with > normally. At the very least the assignment should fail so the consumer > doesn't see any messages from the new partitions, and the rebalance can be > reattempted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails
[ https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806244#comment-16806244 ] Guozhang Wang commented on KAFKA-4600: -- NPE inside KafkaStreams is just another observation of the root cause. Maybe reading the attached PR (https://github.com/apache/kafka/pull/3181) would better illustrate the scenario. Note that for this ticket, the more severe observation is that `happily consuming messages from the new partition. When the state is relied upon for correct processing, this can be very bad, e.g. data loss can occur.`, and we'd want to `very least the assignment should fail so the consumer doesn't see any messages from the new partitions, and the rebalance can be reattempted.` The PR of KAFKA-5154 did this purpose. > Consumer proceeds on when ConsumerRebalanceListener fails > - > > Key: KAFKA-4600 > URL: https://issues.apache.org/jira/browse/KAFKA-4600 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.1.1 >Reporter: Braedon Vickers >Priority: Major > Fix For: 0.11.0.0 > > > One of the use cases for a ConsumerRebalanceListener is to load state > necessary for processing a partition when it is assigned. However, when > ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. > the state isn't loaded), the error is logged and the consumer proceeds on as > if nothing happened, happily consuming messages from the new partition. When > the state is relied upon for correct processing, this can be very bad, e.g. > data loss can occur. > It would be better if the error was propagated up so it could be dealt with > normally. At the very least the assignment should fail so the consumer > doesn't see any messages from the new partitions, and the rebalance can be > reattempted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails
[ https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806293#comment-16806293 ] Dana Powers commented on KAFKA-4600: Right on. I've read that PR several times and fail to see how it addresses this issue. But I'm afraid I can't communicate my point any clearer, so I'll just let this one go. > Consumer proceeds on when ConsumerRebalanceListener fails > - > > Key: KAFKA-4600 > URL: https://issues.apache.org/jira/browse/KAFKA-4600 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.1.1 >Reporter: Braedon Vickers >Priority: Major > Fix For: 0.11.0.0 > > > One of the use cases for a ConsumerRebalanceListener is to load state > necessary for processing a partition when it is assigned. However, when > ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. > the state isn't loaded), the error is logged and the consumer proceeds on as > if nothing happened, happily consuming messages from the new partition. When > the state is relied upon for correct processing, this can be very bad, e.g. > data loss can occur. > It would be better if the error was propagated up so it could be dealt with > normally. At the very least the assignment should fail so the consumer > doesn't see any messages from the new partitions, and the rebalance can be > reattempted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.
[ https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806334#comment-16806334 ] Flower.min commented on KAFKA-8106: --- [~junrao] Of course,we need read timestamp and offset from record for validating.In the scene of inPlaceAssignment ,we can also read timestamp and offset from a record without decompress record's key and value . We suggest remove decompression of record's key and value not removing operation of reading record's timestamp and offset . > Remove unnecessary decompression operation when logValidator do validation. > > > Key: KAFKA-8106 > URL: https://issues.apache.org/jira/browse/KAFKA-8106 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 > Environment: Server : > cpu:2*16 ; > MemTotal : 256G; > Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network > Connection ; > SSD. >Reporter: Flower.min >Assignee: Flower.min >Priority: Major > Labels: performance > > We do performance testing about Kafka in specific scenarios as > described below .We build a kafka cluster with one broker,and create topics > with different number of partitions.Then we start lots of producer processes > to send large amounts of messages to one of the topics at one testing . > *_Specific Scenario_* > > *_1.Main config of Kafka_* > # Main config of Kafka > server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/] > # Number of TopicPartition : 50~2000 > # Size of Single Message : 1024B > > *_2.Config of KafkaProducer_* > ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory|| > |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB| > *_3.The best result of performance testing_* > ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of > production|| > |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s |23,000,000 messages/s| > *_4.Phenomenon and my doubt_* > _The upper limit of CPU usage has been reached But it does not > reach the upper limit of the bandwidth of the server network. *We are > doubtful about which cost too much CPU time and we want to Improve > performance and reduces CPU usage of Kafka server.*_ > > _*5.Analysis*_ > We analysis the JFIR of Kafka server when doing performance testing > .We found the hot spot method is > *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and > *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When > we checking thread stack information we also have found most CPU being > occupied by lots of thread which is busy decompressing messages.Then we > read source code of Kafka . > There is double-layer nested Iterator when LogValidator do validate > every record.And There is a decompression for each message when traversing > every RecordBatch iterator. It is consuming CPU and affect total performance > that decompress message._*The purpose of decompressing every messages just > for gain total size in bytes of one record and size in bytes of record body > when magic value to use is above 1 and no format conversion or value > overwriting is required for compressed messages.It is negative for > performance in common usage scenarios .*_{color:#33}Therefore, we suggest > that *_removing unnecessary decompression operation_* when doing validation > for compressed message when magic value to use is above 1 and no format > conversion or value overwriting is required for compressed messages.{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.
[ https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806336#comment-16806336 ] Ve Lee commented on KAFKA-8106: --- [~hackerwin7] First, you have to be clear that the parameters of the client are not completely trusted. Second, the "*validateMessagesAndAssignOffsetsCompressed*" method not only validate the "*Timestamp*", but also "*Offset*" and "*Key*". {{Record =>}} {{ }}{{Length => varint}} {{ }}{{Attributes => int8}} {{ }}{{TimestampDelta => varint}} {{ }}{{OffsetDelta => varint}} {{ }}{{KeyLen => varint}} {{ }}{{Key => data}} {{ }}{{ValueLen => varint}} {{ }}{{Value => data}} {{ }}{{Headers => [Header]}} [~junrao] We don't have to decompress the "Key" and "Value", and then we can still validate the "Timestamp", "Key" and "Offset", we only decompress the record of "Length", "Attributes", "TimestampDelta", "OffsetDelta", "KeyLen", which already meet the verification requirements. And I think that decompressing the "Value" field consumes most of the performance, which is not necessary in most cases. > Remove unnecessary decompression operation when logValidator do validation. > > > Key: KAFKA-8106 > URL: https://issues.apache.org/jira/browse/KAFKA-8106 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 > Environment: Server : > cpu:2*16 ; > MemTotal : 256G; > Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network > Connection ; > SSD. >Reporter: Flower.min >Assignee: Flower.min >Priority: Major > Labels: performance > > We do performance testing about Kafka in specific scenarios as > described below .We build a kafka cluster with one broker,and create topics > with different number of partitions.Then we start lots of producer processes > to send large amounts of messages to one of the topics at one testing . > *_Specific Scenario_* > > *_1.Main config of Kafka_* > # Main config of Kafka > server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/] > # Number of TopicPartition : 50~2000 > # Size of Single Message : 1024B > > *_2.Config of KafkaProducer_* > ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory|| > |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB| > *_3.The best result of performance testing_* > ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of > production|| > |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s |23,000,000 messages/s| > *_4.Phenomenon and my doubt_* > _The upper limit of CPU usage has been reached But it does not > reach the upper limit of the bandwidth of the server network. *We are > doubtful about which cost too much CPU time and we want to Improve > performance and reduces CPU usage of Kafka server.*_ > > _*5.Analysis*_ > We analysis the JFIR of Kafka server when doing performance testing > .We found the hot spot method is > *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and > *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When > we checking thread stack information we also have found most CPU being > occupied by lots of thread which is busy decompressing messages.Then we > read source code of Kafka . > There is double-layer nested Iterator when LogValidator do validate > every record.And There is a decompression for each message when traversing > every RecordBatch iterator. It is consuming CPU and affect total performance > that decompress message._*The purpose of decompressing every messages just > for gain total size in bytes of one record and size in bytes of record body > when magic value to use is above 1 and no format conversion or value > overwriting is required for compressed messages.It is negative for > performance in common usage scenarios .*_{color:#33}Therefore, we suggest > that *_removing unnecessary decompression operation_* when doing validation > for compressed message when magic value to use is above 1 and no format > conversion or value overwriting is required for compressed messages.{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.
[ https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806337#comment-16806337 ] Flower.min commented on KAFKA-8106: --- [~hackerwin7]I agree with idea of [~qiaochao911] , Maybe we can set a flag to choose whether or not need to decompress record's key and value. In other cases such as The magic value to use is below 2 ,we also need decompress record's key and value for validating.** > Remove unnecessary decompression operation when logValidator do validation. > > > Key: KAFKA-8106 > URL: https://issues.apache.org/jira/browse/KAFKA-8106 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 > Environment: Server : > cpu:2*16 ; > MemTotal : 256G; > Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network > Connection ; > SSD. >Reporter: Flower.min >Assignee: Flower.min >Priority: Major > Labels: performance > > We do performance testing about Kafka in specific scenarios as > described below .We build a kafka cluster with one broker,and create topics > with different number of partitions.Then we start lots of producer processes > to send large amounts of messages to one of the topics at one testing . > *_Specific Scenario_* > > *_1.Main config of Kafka_* > # Main config of Kafka > server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/] > # Number of TopicPartition : 50~2000 > # Size of Single Message : 1024B > > *_2.Config of KafkaProducer_* > ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory|| > |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB| > *_3.The best result of performance testing_* > ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of > production|| > |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s |23,000,000 messages/s| > *_4.Phenomenon and my doubt_* > _The upper limit of CPU usage has been reached But it does not > reach the upper limit of the bandwidth of the server network. *We are > doubtful about which cost too much CPU time and we want to Improve > performance and reduces CPU usage of Kafka server.*_ > > _*5.Analysis*_ > We analysis the JFIR of Kafka server when doing performance testing > .We found the hot spot method is > *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and > *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When > we checking thread stack information we also have found most CPU being > occupied by lots of thread which is busy decompressing messages.Then we > read source code of Kafka . > There is double-layer nested Iterator when LogValidator do validate > every record.And There is a decompression for each message when traversing > every RecordBatch iterator. It is consuming CPU and affect total performance > that decompress message._*The purpose of decompressing every messages just > for gain total size in bytes of one record and size in bytes of record body > when magic value to use is above 1 and no format conversion or value > overwriting is required for compressed messages.It is negative for > performance in common usage scenarios .*_{color:#33}Therefore, we suggest > that *_removing unnecessary decompression operation_* when doing validation > for compressed message when magic value to use is above 1 and no format > conversion or value overwriting is required for compressed messages.{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.
[ https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806337#comment-16806337 ] Flower.min edited comment on KAFKA-8106 at 4/1/19 2:45 AM: --- [~hackerwin7]I agree with idea of [~qiaochao911] , Maybe we can set a flag to choose whether or not need to decompress record's key and value.In other cases such as The magic value to use is below 2 ,we also need decompress record's key and value for validating. was (Author: flower.min): [~hackerwin7]I agree with idea of [~qiaochao911] , Maybe we can set a flag to choose whether or not need to decompress record's key and value. In other cases such as The magic value to use is below 2 ,we also need decompress record's key and value for validating.** > Remove unnecessary decompression operation when logValidator do validation. > > > Key: KAFKA-8106 > URL: https://issues.apache.org/jira/browse/KAFKA-8106 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 > Environment: Server : > cpu:2*16 ; > MemTotal : 256G; > Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network > Connection ; > SSD. >Reporter: Flower.min >Assignee: Flower.min >Priority: Major > Labels: performance > > We do performance testing about Kafka in specific scenarios as > described below .We build a kafka cluster with one broker,and create topics > with different number of partitions.Then we start lots of producer processes > to send large amounts of messages to one of the topics at one testing . > *_Specific Scenario_* > > *_1.Main config of Kafka_* > # Main config of Kafka > server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/] > # Number of TopicPartition : 50~2000 > # Size of Single Message : 1024B > > *_2.Config of KafkaProducer_* > ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory|| > |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB| > *_3.The best result of performance testing_* > ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of > production|| > |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s |23,000,000 messages/s| > *_4.Phenomenon and my doubt_* > _The upper limit of CPU usage has been reached But it does not > reach the upper limit of the bandwidth of the server network. *We are > doubtful about which cost too much CPU time and we want to Improve > performance and reduces CPU usage of Kafka server.*_ > > _*5.Analysis*_ > We analysis the JFIR of Kafka server when doing performance testing > .We found the hot spot method is > *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and > *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When > we checking thread stack information we also have found most CPU being > occupied by lots of thread which is busy decompressing messages.Then we > read source code of Kafka . > There is double-layer nested Iterator when LogValidator do validate > every record.And There is a decompression for each message when traversing > every RecordBatch iterator. It is consuming CPU and affect total performance > that decompress message._*The purpose of decompressing every messages just > for gain total size in bytes of one record and size in bytes of record body > when magic value to use is above 1 and no format conversion or value > overwriting is required for compressed messages.It is negative for > performance in common usage scenarios .*_{color:#33}Therefore, we suggest > that *_removing unnecessary decompression operation_* when doing validation > for compressed message when magic value to use is above 1 and no format > conversion or value overwriting is required for compressed messages.{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.
[ https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806337#comment-16806337 ] Flower.min edited comment on KAFKA-8106 at 4/1/19 2:52 AM: --- [~hackerwin7]I agree with idea of [~qiaochao911] .Maybe we can set a flag to choose whether or not need to decompress record's key and value.In other cases such as the magic value to use is below 2 ,we also need decompress record's key and value for validating. was (Author: flower.min): [~hackerwin7]I agree with idea of [~qiaochao911] , Maybe we can set a flag to choose whether or not need to decompress record's key and value.In other cases such as The magic value to use is below 2 ,we also need decompress record's key and value for validating. > Remove unnecessary decompression operation when logValidator do validation. > > > Key: KAFKA-8106 > URL: https://issues.apache.org/jira/browse/KAFKA-8106 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 > Environment: Server : > cpu:2*16 ; > MemTotal : 256G; > Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network > Connection ; > SSD. >Reporter: Flower.min >Assignee: Flower.min >Priority: Major > Labels: performance > > We do performance testing about Kafka in specific scenarios as > described below .We build a kafka cluster with one broker,and create topics > with different number of partitions.Then we start lots of producer processes > to send large amounts of messages to one of the topics at one testing . > *_Specific Scenario_* > > *_1.Main config of Kafka_* > # Main config of Kafka > server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/] > # Number of TopicPartition : 50~2000 > # Size of Single Message : 1024B > > *_2.Config of KafkaProducer_* > ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory|| > |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB| > *_3.The best result of performance testing_* > ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of > production|| > |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s |23,000,000 messages/s| > *_4.Phenomenon and my doubt_* > _The upper limit of CPU usage has been reached But it does not > reach the upper limit of the bandwidth of the server network. *We are > doubtful about which cost too much CPU time and we want to Improve > performance and reduces CPU usage of Kafka server.*_ > > _*5.Analysis*_ > We analysis the JFIR of Kafka server when doing performance testing > .We found the hot spot method is > *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and > *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When > we checking thread stack information we also have found most CPU being > occupied by lots of thread which is busy decompressing messages.Then we > read source code of Kafka . > There is double-layer nested Iterator when LogValidator do validate > every record.And There is a decompression for each message when traversing > every RecordBatch iterator. It is consuming CPU and affect total performance > that decompress message._*The purpose of decompressing every messages just > for gain total size in bytes of one record and size in bytes of record body > when magic value to use is above 1 and no format conversion or value > overwriting is required for compressed messages.It is negative for > performance in common usage scenarios .*_{color:#33}Therefore, we suggest > that *_removing unnecessary decompression operation_* when doing validation > for compressed message when magic value to use is above 1 and no format > conversion or value overwriting is required for compressed messages.{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
[ https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806349#comment-16806349 ] huxihx commented on KAFKA-7965: --- [~enether] Are you still working on this jira? If not, I think I could take over it. Thanks. > Flaky Test > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > > > Key: KAFKA-7965 > URL: https://issues.apache.org/jira/browse/KAFKA-7965 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 2.2.0, 2.3.0 >Reporter: Matthias J. Sax >Assignee: Stanislav Kozlovski >Priority: Critical > Labels: flaky-test > Fix For: 2.3.0, 2.2.1 > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/] > {quote}java.lang.AssertionError: Received 0, expected at least 68 at > org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.assertTrue(Assert.java:41) at > kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.
[ https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806352#comment-16806352 ] hackerwin7 commented on KAFKA-8106: --- [~wlwolf87] Currently the the broker decompress to validate record timestamp, key, offset, uncompressed size bytes, and magic . Secondly how can decompress the only fields of `Record`, just because the compressor output stream write the bytes to buffer in low level ? > Remove unnecessary decompression operation when logValidator do validation. > > > Key: KAFKA-8106 > URL: https://issues.apache.org/jira/browse/KAFKA-8106 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 > Environment: Server : > cpu:2*16 ; > MemTotal : 256G; > Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network > Connection ; > SSD. >Reporter: Flower.min >Assignee: Flower.min >Priority: Major > Labels: performance > > We do performance testing about Kafka in specific scenarios as > described below .We build a kafka cluster with one broker,and create topics > with different number of partitions.Then we start lots of producer processes > to send large amounts of messages to one of the topics at one testing . > *_Specific Scenario_* > > *_1.Main config of Kafka_* > # Main config of Kafka > server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/] > # Number of TopicPartition : 50~2000 > # Size of Single Message : 1024B > > *_2.Config of KafkaProducer_* > ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory|| > |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB| > *_3.The best result of performance testing_* > ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of > production|| > |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s |23,000,000 messages/s| > *_4.Phenomenon and my doubt_* > _The upper limit of CPU usage has been reached But it does not > reach the upper limit of the bandwidth of the server network. *We are > doubtful about which cost too much CPU time and we want to Improve > performance and reduces CPU usage of Kafka server.*_ > > _*5.Analysis*_ > We analysis the JFIR of Kafka server when doing performance testing > .We found the hot spot method is > *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and > *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When > we checking thread stack information we also have found most CPU being > occupied by lots of thread which is busy decompressing messages.Then we > read source code of Kafka . > There is double-layer nested Iterator when LogValidator do validate > every record.And There is a decompression for each message when traversing > every RecordBatch iterator. It is consuming CPU and affect total performance > that decompress message._*The purpose of decompressing every messages just > for gain total size in bytes of one record and size in bytes of record body > when magic value to use is above 1 and no format conversion or value > overwriting is required for compressed messages.It is negative for > performance in common usage scenarios .*_{color:#33}Therefore, we suggest > that *_removing unnecessary decompression operation_* when doing validation > for compressed message when magic value to use is above 1 and no format > conversion or value overwriting is required for compressed messages.{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.
[ https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806363#comment-16806363 ] Ve Lee commented on KAFKA-8106: --- {{RecordBatch =>}} {{ }}{{FirstOffset => int64}} {{ }}{{Length => int32}} {{ }}{{PartitionLeaderEpoch => int32}} {{ }}{{Magic => int8 }} {{ }}{{CRC => int32}} {{ }}{{Attributes => int16}} {{ }}{{LastOffsetDelta => int32}} {{ }}{{FirstTimestamp => int64}} {{ }}{{MaxTimestamp => int64}} {{ }}{{ProducerId => int64}} {{ }}{{ProducerEpoch => int16}} {{ }}{{FirstSequence => int32}} {{ }}{{Records => [Record]}} [~hackerwin7] The MaxTimestamp already exists in RecordBatch,but it was not used but recalculated once. Decompress the record of "Length", "Attributes", "TimestampDelta", "OffsetDelta", "KeyLen", to see the [~Flower.min] pr [https://github.com/apache/kafka/pull/6476] > Remove unnecessary decompression operation when logValidator do validation. > > > Key: KAFKA-8106 > URL: https://issues.apache.org/jira/browse/KAFKA-8106 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 > Environment: Server : > cpu:2*16 ; > MemTotal : 256G; > Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network > Connection ; > SSD. >Reporter: Flower.min >Assignee: Flower.min >Priority: Major > Labels: performance > > We do performance testing about Kafka in specific scenarios as > described below .We build a kafka cluster with one broker,and create topics > with different number of partitions.Then we start lots of producer processes > to send large amounts of messages to one of the topics at one testing . > *_Specific Scenario_* > > *_1.Main config of Kafka_* > # Main config of Kafka > server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/] > # Number of TopicPartition : 50~2000 > # Size of Single Message : 1024B > > *_2.Config of KafkaProducer_* > ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory|| > |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB| > *_3.The best result of performance testing_* > ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of > production|| > |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s |23,000,000 messages/s| > *_4.Phenomenon and my doubt_* > _The upper limit of CPU usage has been reached But it does not > reach the upper limit of the bandwidth of the server network. *We are > doubtful about which cost too much CPU time and we want to Improve > performance and reduces CPU usage of Kafka server.*_ > > _*5.Analysis*_ > We analysis the JFIR of Kafka server when doing performance testing > .We found the hot spot method is > *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and > *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When > we checking thread stack information we also have found most CPU being > occupied by lots of thread which is busy decompressing messages.Then we > read source code of Kafka . > There is double-layer nested Iterator when LogValidator do validate > every record.And There is a decompression for each message when traversing > every RecordBatch iterator. It is consuming CPU and affect total performance > that decompress message._*The purpose of decompressing every messages just > for gain total size in bytes of one record and size in bytes of record body > when magic value to use is above 1 and no format conversion or value > overwriting is required for compressed messages.It is negative for > performance in common usage scenarios .*_{color:#33}Therefore, we suggest > that *_removing unnecessary decompression operation_* when doing validation > for compressed message when magic value to use is above 1 and no format > conversion or value overwriting is required for compressed messages.{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.
[ https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806363#comment-16806363 ] Ve Lee edited comment on KAFKA-8106 at 4/1/19 3:50 AM: --- {{RecordBatch =>}} {{ }}{{FirstOffset => int64}} {{ }}{{Length => int32}} {{ }}{{PartitionLeaderEpoch => int32}} {{ }}{{Magic => int8 }} {{ }}{{CRC => int32}} {{ }}{{Attributes => int16}} {{ }}{{LastOffsetDelta => int32}} {{ }}{{FirstTimestamp => int64}} {{ }}{{MaxTimestamp => int64}} {{ }}{{ProducerId => int64}} {{ }}{{ProducerEpoch => int16}} {{ }}{{FirstSequence => int32}} {{ }}{{Records => [Record]}} [~hackerwin7] The MaxTimestamp already exists in RecordBatch,but it was not used but recalculated once. The "uncompressedSizeInBytes" is Record's Length, and the "Magic" is in RecordBatch. Decompress the record of "Length", "Attributes", "TimestampDelta", "OffsetDelta", "KeyLen", to see the [~Flower.min] pr [https://github.com/apache/kafka/pull/6476] was (Author: wlwolf87): {{RecordBatch =>}} {{ }}{{FirstOffset => int64}} {{ }}{{Length => int32}} {{ }}{{PartitionLeaderEpoch => int32}} {{ }}{{Magic => int8 }} {{ }}{{CRC => int32}} {{ }}{{Attributes => int16}} {{ }}{{LastOffsetDelta => int32}} {{ }}{{FirstTimestamp => int64}} {{ }}{{MaxTimestamp => int64}} {{ }}{{ProducerId => int64}} {{ }}{{ProducerEpoch => int16}} {{ }}{{FirstSequence => int32}} {{ }}{{Records => [Record]}} [~hackerwin7] The MaxTimestamp already exists in RecordBatch,but it was not used but recalculated once. Decompress the record of "Length", "Attributes", "TimestampDelta", "OffsetDelta", "KeyLen", to see the [~Flower.min] pr [https://github.com/apache/kafka/pull/6476] > Remove unnecessary decompression operation when logValidator do validation. > > > Key: KAFKA-8106 > URL: https://issues.apache.org/jira/browse/KAFKA-8106 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 > Environment: Server : > cpu:2*16 ; > MemTotal : 256G; > Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network > Connection ; > SSD. >Reporter: Flower.min >Assignee: Flower.min >Priority: Major > Labels: performance > > We do performance testing about Kafka in specific scenarios as > described below .We build a kafka cluster with one broker,and create topics > with different number of partitions.Then we start lots of producer processes > to send large amounts of messages to one of the topics at one testing . > *_Specific Scenario_* > > *_1.Main config of Kafka_* > # Main config of Kafka > server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/] > # Number of TopicPartition : 50~2000 > # Size of Single Message : 1024B > > *_2.Config of KafkaProducer_* > ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory|| > |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB| > *_3.The best result of performance testing_* > ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of > production|| > |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s |23,000,000 messages/s| > *_4.Phenomenon and my doubt_* > _The upper limit of CPU usage has been reached But it does not > reach the upper limit of the bandwidth of the server network. *We are > doubtful about which cost too much CPU time and we want to Improve > performance and reduces CPU usage of Kafka server.*_ > > _*5.Analysis*_ > We analysis the JFIR of Kafka server when doing performance testing > .We found the hot spot method is > *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and > *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When > we checking thread stack information we also have found most CPU being > occupied by lots of thread which is busy decompressing messages.Then we > read source code of Kafka . > There is double-layer nested Iterator when LogValidator do validate > every record.And There is a decompression for each message when traversing > every RecordBatch iterator. It is consuming CPU and affect total performance > that decompress message._*The purpose of decompressing every messages just > for gain total size in bytes of one record and size in bytes of record body > when magic value to use is above 1 and no format conversion or value > overwriting is required for compressed messages.It is negative for > performance in common usage scenarios .*_{color:#33}Therefore, we suggest > that *_removing unnecessary decompression operation_* when doing validation > for compress
[jira] [Commented] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.
[ https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806375#comment-16806375 ] hackerwin7 commented on KAFKA-8106: --- [~wlwolf87] `MaxTimestamp` I think we can not use it to validation because of the trusted issue. Thanks for the PR, just review this, it avoid the value and key read to decompress. just like partial decompression any ideas to prevent record level operation across this? > Remove unnecessary decompression operation when logValidator do validation. > > > Key: KAFKA-8106 > URL: https://issues.apache.org/jira/browse/KAFKA-8106 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 > Environment: Server : > cpu:2*16 ; > MemTotal : 256G; > Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network > Connection ; > SSD. >Reporter: Flower.min >Assignee: Flower.min >Priority: Major > Labels: performance > > We do performance testing about Kafka in specific scenarios as > described below .We build a kafka cluster with one broker,and create topics > with different number of partitions.Then we start lots of producer processes > to send large amounts of messages to one of the topics at one testing . > *_Specific Scenario_* > > *_1.Main config of Kafka_* > # Main config of Kafka > server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/] > # Number of TopicPartition : 50~2000 > # Size of Single Message : 1024B > > *_2.Config of KafkaProducer_* > ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory|| > |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB| > *_3.The best result of performance testing_* > ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of > production|| > |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s |23,000,000 messages/s| > *_4.Phenomenon and my doubt_* > _The upper limit of CPU usage has been reached But it does not > reach the upper limit of the bandwidth of the server network. *We are > doubtful about which cost too much CPU time and we want to Improve > performance and reduces CPU usage of Kafka server.*_ > > _*5.Analysis*_ > We analysis the JFIR of Kafka server when doing performance testing > .We found the hot spot method is > *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and > *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When > we checking thread stack information we also have found most CPU being > occupied by lots of thread which is busy decompressing messages.Then we > read source code of Kafka . > There is double-layer nested Iterator when LogValidator do validate > every record.And There is a decompression for each message when traversing > every RecordBatch iterator. It is consuming CPU and affect total performance > that decompress message._*The purpose of decompressing every messages just > for gain total size in bytes of one record and size in bytes of record body > when magic value to use is above 1 and no format conversion or value > overwriting is required for compressed messages.It is negative for > performance in common usage scenarios .*_{color:#33}Therefore, we suggest > that *_removing unnecessary decompression operation_* when doing validation > for compressed message when magic value to use is above 1 and no format > conversion or value overwriting is required for compressed messages.{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)