[jira] [Resolved] (KAFKA-7502) Cleanup KTable materialization logic in a single place

2019-03-31 Thread Lee Dongjin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin resolved KAFKA-7502.

Resolution: Fixed

> Cleanup KTable materialization logic in a single place
> --
>
> Key: KAFKA-7502
> URL: https://issues.apache.org/jira/browse/KAFKA-7502
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Lee Dongjin
>Priority: Major
>
> Today since we pre-create all the `KTableXXX` operator along with the logical 
> node, we are effectively duplicating the logic to determine whether the 
> resulted KTable should be materialized. More specifically, the 
> materialization principle today is that:
> 1) If users specified Materialized in the DSL and it contains a queryable 
> name. We always materialize.
> 2) If users specified Materialized in the DSL but not contains a queryable 
> name, or if users do not specify a Materialized object at all, Streams may 
> choose to materialize or not. But in any cases, even if the KTable is 
> materialized it will not be queryable since there's no queryable name (i.e. 
> only storeName is not null, but queryableName is null):
> 2.a) If the resulted KTable is from an aggregation, we always materialize 
> since it is needed for storing the aggregation (i.e. we use the 
> MaterializedInternal constructor with nameProvider != null).
> 2.b) If the resulted KTable is from a source topic, we delay the 
> materialization until the downstream operator requires this KTable to be 
> materialized or send-old-values (see `KTableSourceNode` and `KTableSource`).
> 2.c) If the resulted KTable if from a join, we always materialize if users 
> creates a Materialized object even without a queryable name. However this can 
> be optimized similar to 2.b) but is orthogonal to this ticket (see 
> `KTableImpl#buildJoin` where we always use constructor with nameProvider != 
> null).
> 2.d) If the resulted KTable is from a stateless operation like filter / 
> mapValues, we never materialize.
> 
> Now, in all of these cases, we have logical node like "KTableKTableJoinNode", 
> as well as physical node like `ProcessorNode`. Ideally we should always 
> create the logical Plan (i.e. the StreamsGraph), and then optimize it if 
> necessary, and then generate the physical plan (i.e. the Topology), however 
> today we create some physical nodes beforehand, and the above logic is hence 
> duplicated in the creation of both physical nodes and logical nodes. For 
> example, in `KTableKTableJoinNode` we check if Materialized is null for 
> adding a state store, and in `KTableImpl#doJoin` we check if materialized is 
> specified (case 2.c) above). 
> Another example is in TableProcessorNode which is used for 2.d) above, in 
> which it includes the logic whereas its caller, `KTableImpl#doFilter` for 
> example, also contains the logic when deciding to pass `queryableName` 
> parameter to `KTableProcessorSupplier`.
> This is bug-vulnerable since we may update the logic in one class but forgot 
> to update the other class.
> --
> What we want to have is a cleaner code path similar to what we have for 2.b), 
> such that when creating the logical nodes we keep track of whether 1) 
> materialized is specified, and 2) queryable name is provided. And during 
> optimization phase, we may change the inner physical ProcessorBuilder's 
> parameters like queryable name etc, and then when it is time to generate the 
> physical node, we can just blindly take the parameters and go for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8178) KafkaProducer#send(ProducerRecord,Callback) may block for up to 60 seconds

2019-03-31 Thread Sergei Egorov (JIRA)
Sergei Egorov created KAFKA-8178:


 Summary: KafkaProducer#send(ProducerRecord,Callback) may block for 
up to 60 seconds
 Key: KAFKA-8178
 URL: https://issues.apache.org/jira/browse/KAFKA-8178
 Project: Kafka
  Issue Type: Bug
  Components: clients, producer 
Reporter: Sergei Egorov


Hello. I was running reactor-kafka with [the BlockHound 
agent|https://github.com/reactor/BlockHound] (you can see the progress 
[here|https://github.com/reactor/reactor-kafka/pull/75] and even run it 
yourself) and it detected a very dangerous blocking call in 
KafkaProducer#send(ProducerRecord,Callback) which is supposed to be async:

{code:java}
java.lang.Error: Blocking call! java.lang.Object#wait
at reactor.BlockHound$Builder.lambda$new$0(BlockHound.java:154)
at reactor.BlockHound$Builder.lambda$install$8(BlockHound.java:254)
at reactor.BlockHoundRuntime.checkBlocking(BlockHoundRuntime.java:43)
at java.lang.Object.wait(Object.java)
at org.apache.kafka.clients.Metadata.awaitUpdate(Metadata.java:181)
at 
org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:938)
at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:823)
at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
{code}

it blocks for up to "maxBlockTimeMs" (60 seconds by default)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8147) Add changelog topic configuration to KTable suppress

2019-03-31 Thread Maarten (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-8147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806221#comment-16806221
 ] 

Maarten commented on KAFKA-8147:


The version published now is short, but should be complete.

I need some help with the last step though; how can I start a thread on the 
Apache mailing list?

> Add changelog topic configuration to KTable suppress
> 
>
> Key: KAFKA-8147
> URL: https://issues.apache.org/jira/browse/KAFKA-8147
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Maarten
>Assignee: Maarten
>Priority: Minor
>  Labels: needs-kip
>
> The streams DSL does not provide a way to configure the changelog topic 
> created by KTable.suppress.
> From the perspective of an external user this could be implemented similar to 
> the configuration of aggregate + materialized, i.e., 
> {code:java}
> changelogTopicConfigs = // Configs
> materialized = Materialized.as(..).withLoggingEnabled(changelogTopicConfigs)
> ..
> KGroupedStream.aggregate(..,materialized)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails

2019-03-31 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806234#comment-16806234
 ] 

Guozhang Wang commented on KAFKA-4600:
--

Hello [~dana.powers] The root cause is around when we should set `need-rejoin` 
boolean flag to false. Prior to KAFKA-5154 it was reset after the join-group 
response is received, so if there's an error after that, e.g. during sync-group 
round trip, e.g. in this ticket inside the onAssign callback, then the consumer 
will just continue fetching from the previously assigned partitions, like this 
ticket reportedly observed. In KAFKA-5154 we pushed `resetJoinGroupFuture()` 
after the `onJoinComplete` code, which will cover this case if the error was 
thrown inside the callback the consumer will not proceed to fetch from 
previously assigned partitions.

As for error propagation, right now we already log ERROR as `User provided 
listener {} failed on partition assignment`, and because of the fix of 
KAFKA-5154 it will block consumer from proceeding.

> Consumer proceeds on when ConsumerRebalanceListener fails
> -
>
> Key: KAFKA-4600
> URL: https://issues.apache.org/jira/browse/KAFKA-4600
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Braedon Vickers
>Priority: Major
> Fix For: 0.11.0.0
>
>
> One of the use cases for a ConsumerRebalanceListener is to load state 
> necessary for processing a partition when it is assigned. However, when 
> ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. 
> the state isn't loaded), the error is logged and the consumer proceeds on as 
> if nothing happened, happily consuming messages from the new partition. When 
> the state is relied upon for correct processing, this can be very bad, e.g. 
> data loss can occur.
> It would be better if the error was propagated up so it could be dealt with 
> normally. At the very least the assignment should fail so the consumer 
> doesn't see any messages from the new partitions, and the rebalance can be 
> reattempted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails

2019-03-31 Thread Dana Powers (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806240#comment-16806240
 ] 

Dana Powers commented on KAFKA-4600:


I don't think KAFKA-5154 is related to this issue. But maybe I'm reading this 
ticket differently? I thought this ticket was literally about what happens when 
the call to `listener.onPartitionsAssigned(assignedPartitions)` raises an 
Exception. And because that call is currently wrapped in a bare `catch 
(Exception e)`, and because the caught exception is logged but otherwise 
ignored, it seems like this ticket should still be open or marked as wont fix. 
I just don't see how KAFKA-5154 applies here – it was a fix for a NPE inside 
KafkaStreams, but after onPartitionsAssigned() had already been called 
successfully?

 

 

> Consumer proceeds on when ConsumerRebalanceListener fails
> -
>
> Key: KAFKA-4600
> URL: https://issues.apache.org/jira/browse/KAFKA-4600
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Braedon Vickers
>Priority: Major
> Fix For: 0.11.0.0
>
>
> One of the use cases for a ConsumerRebalanceListener is to load state 
> necessary for processing a partition when it is assigned. However, when 
> ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. 
> the state isn't loaded), the error is logged and the consumer proceeds on as 
> if nothing happened, happily consuming messages from the new partition. When 
> the state is relied upon for correct processing, this can be very bad, e.g. 
> data loss can occur.
> It would be better if the error was propagated up so it could be dealt with 
> normally. At the very least the assignment should fail so the consumer 
> doesn't see any messages from the new partitions, and the rebalance can be 
> reattempted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails

2019-03-31 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806244#comment-16806244
 ] 

Guozhang Wang commented on KAFKA-4600:
--

NPE inside KafkaStreams is just another observation of the root cause. Maybe 
reading the attached PR (https://github.com/apache/kafka/pull/3181) would 
better illustrate the scenario. Note that for this ticket, the more severe 
observation is that `happily consuming messages from the new partition. When 
the state is relied upon for correct processing, this can be very bad, e.g. 
data loss can occur.`, and we'd want to `very least the assignment should fail 
so the consumer doesn't see any messages from the new partitions, and the 
rebalance can be reattempted.` The PR of KAFKA-5154 did this purpose.

> Consumer proceeds on when ConsumerRebalanceListener fails
> -
>
> Key: KAFKA-4600
> URL: https://issues.apache.org/jira/browse/KAFKA-4600
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Braedon Vickers
>Priority: Major
> Fix For: 0.11.0.0
>
>
> One of the use cases for a ConsumerRebalanceListener is to load state 
> necessary for processing a partition when it is assigned. However, when 
> ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. 
> the state isn't loaded), the error is logged and the consumer proceeds on as 
> if nothing happened, happily consuming messages from the new partition. When 
> the state is relied upon for correct processing, this can be very bad, e.g. 
> data loss can occur.
> It would be better if the error was propagated up so it could be dealt with 
> normally. At the very least the assignment should fail so the consumer 
> doesn't see any messages from the new partitions, and the rebalance can be 
> reattempted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4600) Consumer proceeds on when ConsumerRebalanceListener fails

2019-03-31 Thread Dana Powers (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806293#comment-16806293
 ] 

Dana Powers commented on KAFKA-4600:


Right on. I've read that PR several times and fail to see how it addresses this 
issue. But I'm afraid I can't communicate my point any clearer, so I'll just 
let this one go.

> Consumer proceeds on when ConsumerRebalanceListener fails
> -
>
> Key: KAFKA-4600
> URL: https://issues.apache.org/jira/browse/KAFKA-4600
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Braedon Vickers
>Priority: Major
> Fix For: 0.11.0.0
>
>
> One of the use cases for a ConsumerRebalanceListener is to load state 
> necessary for processing a partition when it is assigned. However, when 
> ConsumerRebalanceListener.onPartitionsAssigned() fails for some reason (i.e. 
> the state isn't loaded), the error is logged and the consumer proceeds on as 
> if nothing happened, happily consuming messages from the new partition. When 
> the state is relied upon for correct processing, this can be very bad, e.g. 
> data loss can occur.
> It would be better if the error was propagated up so it could be dealt with 
> normally. At the very least the assignment should fail so the consumer 
> doesn't see any messages from the new partitions, and the rebalance can be 
> reattempted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.

2019-03-31 Thread Flower.min (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806334#comment-16806334
 ] 

Flower.min commented on KAFKA-8106:
---

[~junrao] Of course,we need read timestamp and offset from record for 
validating.In the scene of inPlaceAssignment ,we can also read timestamp and 
offset from a record without decompress record's  key and value . We suggest 
remove decompression of record's  key and value not removing operation of 
reading record's timestamp and offset .

> Remove unnecessary decompression operation when logValidator  do validation.
> 
>
> Key: KAFKA-8106
> URL: https://issues.apache.org/jira/browse/KAFKA-8106
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
> Environment: Server : 
> cpu:2*16 ; 
> MemTotal : 256G;
> Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network 
> Connection ; 
> SSD.
>Reporter: Flower.min
>Assignee: Flower.min
>Priority: Major
>  Labels: performance
>
>       We do performance testing about Kafka in specific scenarios as 
> described below .We build a kafka cluster with one broker,and create topics 
> with different number of partitions.Then we start lots of producer processes 
> to send large amounts of messages to one of the topics at one  testing .
> *_Specific Scenario_*
>   
>  *_1.Main config of Kafka_*  
>  # Main config of Kafka  
> server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/]
>  # Number of TopicPartition : 50~2000
>  # Size of Single Message : 1024B
>  
>  *_2.Config of KafkaProducer_* 
> ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory||
> |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB|
> *_3.The best result of performance testing_*  
> ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of 
> production||
> |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s       |23,000,000 messages/s|
> *_4.Phenomenon and  my doubt_*
>     _The upper limit of CPU usage has been reached  But  it does not 
> reach the upper limit of the bandwidth of the server  network. *We are 
> doubtful about which  cost too much CPU time and we want to Improve  
> performance and reduces CPU usage of Kafka server.*_
>   
>  _*5.Analysis*_
>         We analysis the JFIR of Kafka server when doing performance testing 
> .We found the hot spot method is 
> *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and 
> *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When
>   we checking thread stack information we  also have found most CPU being 
> occupied by lots of thread  which  is busy decompressing messages.Then we 
> read source code of Kafka .
>        There is double-layer nested Iterator  when LogValidator do validate 
> every record.And There is a decompression for each message when traversing 
> every RecordBatch iterator. It is consuming CPU and affect total performance 
> that  decompress message._*The purpose of decompressing every messages just 
> for gain total size in bytes of one record and size in bytes of record body 
> when magic value to use is above 1 and no format conversion or value 
> overwriting is required for compressed messages.It is negative for 
> performance in common usage scenarios .*_{color:#33}Therefore, we suggest 
> that *_removing unnecessary decompression operation_* when doing  validation 
> for compressed message  when magic value to use is above 1 and no format 
> conversion or value overwriting is required for compressed messages.{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.

2019-03-31 Thread Ve Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806336#comment-16806336
 ] 

Ve Lee commented on KAFKA-8106:
---

[~hackerwin7] First, you have to be clear that the parameters of the client are 
not completely trusted.

Second, the "*validateMessagesAndAssignOffsetsCompressed*" method not only 
validate the "*Timestamp*", but also "*Offset*" and "*Key*".

 
{{Record =>}}
{{  }}{{Length => varint}}
{{  }}{{Attributes => int8}}
{{  }}{{TimestampDelta => varint}}
{{  }}{{OffsetDelta => varint}}
{{  }}{{KeyLen => varint}}
{{  }}{{Key => data}}
{{  }}{{ValueLen => varint}}
{{  }}{{Value => data}}
{{  }}{{Headers => [Header]}}
[~junrao] We don't have to decompress the "Key" and "Value", and then we can 
still validate the "Timestamp", "Key" and "Offset", we only decompress the 
record of "Length", "Attributes", "TimestampDelta", "OffsetDelta", "KeyLen", 
which already meet the verification requirements. And I think that 
decompressing the "Value" field consumes most of the performance, which is not 
necessary in most cases.

> Remove unnecessary decompression operation when logValidator  do validation.
> 
>
> Key: KAFKA-8106
> URL: https://issues.apache.org/jira/browse/KAFKA-8106
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
> Environment: Server : 
> cpu:2*16 ; 
> MemTotal : 256G;
> Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network 
> Connection ; 
> SSD.
>Reporter: Flower.min
>Assignee: Flower.min
>Priority: Major
>  Labels: performance
>
>       We do performance testing about Kafka in specific scenarios as 
> described below .We build a kafka cluster with one broker,and create topics 
> with different number of partitions.Then we start lots of producer processes 
> to send large amounts of messages to one of the topics at one  testing .
> *_Specific Scenario_*
>   
>  *_1.Main config of Kafka_*  
>  # Main config of Kafka  
> server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/]
>  # Number of TopicPartition : 50~2000
>  # Size of Single Message : 1024B
>  
>  *_2.Config of KafkaProducer_* 
> ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory||
> |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB|
> *_3.The best result of performance testing_*  
> ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of 
> production||
> |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s       |23,000,000 messages/s|
> *_4.Phenomenon and  my doubt_*
>     _The upper limit of CPU usage has been reached  But  it does not 
> reach the upper limit of the bandwidth of the server  network. *We are 
> doubtful about which  cost too much CPU time and we want to Improve  
> performance and reduces CPU usage of Kafka server.*_
>   
>  _*5.Analysis*_
>         We analysis the JFIR of Kafka server when doing performance testing 
> .We found the hot spot method is 
> *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and 
> *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When
>   we checking thread stack information we  also have found most CPU being 
> occupied by lots of thread  which  is busy decompressing messages.Then we 
> read source code of Kafka .
>        There is double-layer nested Iterator  when LogValidator do validate 
> every record.And There is a decompression for each message when traversing 
> every RecordBatch iterator. It is consuming CPU and affect total performance 
> that  decompress message._*The purpose of decompressing every messages just 
> for gain total size in bytes of one record and size in bytes of record body 
> when magic value to use is above 1 and no format conversion or value 
> overwriting is required for compressed messages.It is negative for 
> performance in common usage scenarios .*_{color:#33}Therefore, we suggest 
> that *_removing unnecessary decompression operation_* when doing  validation 
> for compressed message  when magic value to use is above 1 and no format 
> conversion or value overwriting is required for compressed messages.{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.

2019-03-31 Thread Flower.min (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806337#comment-16806337
 ] 

Flower.min commented on KAFKA-8106:
---

[~hackerwin7]I agree with idea of [~qiaochao911] , Maybe we can set a flag to 
choose whether or not need to decompress record's key and value.
In other cases such as The magic value to use is below 2 ,we also need 
decompress record's key and value for validating.**

> Remove unnecessary decompression operation when logValidator  do validation.
> 
>
> Key: KAFKA-8106
> URL: https://issues.apache.org/jira/browse/KAFKA-8106
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
> Environment: Server : 
> cpu:2*16 ; 
> MemTotal : 256G;
> Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network 
> Connection ; 
> SSD.
>Reporter: Flower.min
>Assignee: Flower.min
>Priority: Major
>  Labels: performance
>
>       We do performance testing about Kafka in specific scenarios as 
> described below .We build a kafka cluster with one broker,and create topics 
> with different number of partitions.Then we start lots of producer processes 
> to send large amounts of messages to one of the topics at one  testing .
> *_Specific Scenario_*
>   
>  *_1.Main config of Kafka_*  
>  # Main config of Kafka  
> server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/]
>  # Number of TopicPartition : 50~2000
>  # Size of Single Message : 1024B
>  
>  *_2.Config of KafkaProducer_* 
> ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory||
> |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB|
> *_3.The best result of performance testing_*  
> ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of 
> production||
> |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s       |23,000,000 messages/s|
> *_4.Phenomenon and  my doubt_*
>     _The upper limit of CPU usage has been reached  But  it does not 
> reach the upper limit of the bandwidth of the server  network. *We are 
> doubtful about which  cost too much CPU time and we want to Improve  
> performance and reduces CPU usage of Kafka server.*_
>   
>  _*5.Analysis*_
>         We analysis the JFIR of Kafka server when doing performance testing 
> .We found the hot spot method is 
> *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and 
> *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When
>   we checking thread stack information we  also have found most CPU being 
> occupied by lots of thread  which  is busy decompressing messages.Then we 
> read source code of Kafka .
>        There is double-layer nested Iterator  when LogValidator do validate 
> every record.And There is a decompression for each message when traversing 
> every RecordBatch iterator. It is consuming CPU and affect total performance 
> that  decompress message._*The purpose of decompressing every messages just 
> for gain total size in bytes of one record and size in bytes of record body 
> when magic value to use is above 1 and no format conversion or value 
> overwriting is required for compressed messages.It is negative for 
> performance in common usage scenarios .*_{color:#33}Therefore, we suggest 
> that *_removing unnecessary decompression operation_* when doing  validation 
> for compressed message  when magic value to use is above 1 and no format 
> conversion or value overwriting is required for compressed messages.{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.

2019-03-31 Thread Flower.min (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806337#comment-16806337
 ] 

Flower.min edited comment on KAFKA-8106 at 4/1/19 2:45 AM:
---

[~hackerwin7]I agree with idea of [~qiaochao911] , Maybe we can set a flag to 
choose whether or not need to decompress record's key and value.In other cases 
such as The magic value to use is below 2 ,we also need decompress record's key 
and value for validating.


was (Author: flower.min):
[~hackerwin7]I agree with idea of [~qiaochao911] , Maybe we can set a flag to 
choose whether or not need to decompress record's key and value.
In other cases such as The magic value to use is below 2 ,we also need 
decompress record's key and value for validating.**

> Remove unnecessary decompression operation when logValidator  do validation.
> 
>
> Key: KAFKA-8106
> URL: https://issues.apache.org/jira/browse/KAFKA-8106
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
> Environment: Server : 
> cpu:2*16 ; 
> MemTotal : 256G;
> Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network 
> Connection ; 
> SSD.
>Reporter: Flower.min
>Assignee: Flower.min
>Priority: Major
>  Labels: performance
>
>       We do performance testing about Kafka in specific scenarios as 
> described below .We build a kafka cluster with one broker,and create topics 
> with different number of partitions.Then we start lots of producer processes 
> to send large amounts of messages to one of the topics at one  testing .
> *_Specific Scenario_*
>   
>  *_1.Main config of Kafka_*  
>  # Main config of Kafka  
> server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/]
>  # Number of TopicPartition : 50~2000
>  # Size of Single Message : 1024B
>  
>  *_2.Config of KafkaProducer_* 
> ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory||
> |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB|
> *_3.The best result of performance testing_*  
> ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of 
> production||
> |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s       |23,000,000 messages/s|
> *_4.Phenomenon and  my doubt_*
>     _The upper limit of CPU usage has been reached  But  it does not 
> reach the upper limit of the bandwidth of the server  network. *We are 
> doubtful about which  cost too much CPU time and we want to Improve  
> performance and reduces CPU usage of Kafka server.*_
>   
>  _*5.Analysis*_
>         We analysis the JFIR of Kafka server when doing performance testing 
> .We found the hot spot method is 
> *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and 
> *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When
>   we checking thread stack information we  also have found most CPU being 
> occupied by lots of thread  which  is busy decompressing messages.Then we 
> read source code of Kafka .
>        There is double-layer nested Iterator  when LogValidator do validate 
> every record.And There is a decompression for each message when traversing 
> every RecordBatch iterator. It is consuming CPU and affect total performance 
> that  decompress message._*The purpose of decompressing every messages just 
> for gain total size in bytes of one record and size in bytes of record body 
> when magic value to use is above 1 and no format conversion or value 
> overwriting is required for compressed messages.It is negative for 
> performance in common usage scenarios .*_{color:#33}Therefore, we suggest 
> that *_removing unnecessary decompression operation_* when doing  validation 
> for compressed message  when magic value to use is above 1 and no format 
> conversion or value overwriting is required for compressed messages.{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.

2019-03-31 Thread Flower.min (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806337#comment-16806337
 ] 

Flower.min edited comment on KAFKA-8106 at 4/1/19 2:52 AM:
---

[~hackerwin7]I agree with idea of [~qiaochao911] .Maybe we can set a flag to 
choose whether or not need to decompress record's key and value.In other cases 
such as the magic value to use is below 2 ,we also need decompress record's key 
and value for validating.


was (Author: flower.min):
[~hackerwin7]I agree with idea of [~qiaochao911] , Maybe we can set a flag to 
choose whether or not need to decompress record's key and value.In other cases 
such as The magic value to use is below 2 ,we also need decompress record's key 
and value for validating.

> Remove unnecessary decompression operation when logValidator  do validation.
> 
>
> Key: KAFKA-8106
> URL: https://issues.apache.org/jira/browse/KAFKA-8106
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
> Environment: Server : 
> cpu:2*16 ; 
> MemTotal : 256G;
> Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network 
> Connection ; 
> SSD.
>Reporter: Flower.min
>Assignee: Flower.min
>Priority: Major
>  Labels: performance
>
>       We do performance testing about Kafka in specific scenarios as 
> described below .We build a kafka cluster with one broker,and create topics 
> with different number of partitions.Then we start lots of producer processes 
> to send large amounts of messages to one of the topics at one  testing .
> *_Specific Scenario_*
>   
>  *_1.Main config of Kafka_*  
>  # Main config of Kafka  
> server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/]
>  # Number of TopicPartition : 50~2000
>  # Size of Single Message : 1024B
>  
>  *_2.Config of KafkaProducer_* 
> ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory||
> |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB|
> *_3.The best result of performance testing_*  
> ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of 
> production||
> |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s       |23,000,000 messages/s|
> *_4.Phenomenon and  my doubt_*
>     _The upper limit of CPU usage has been reached  But  it does not 
> reach the upper limit of the bandwidth of the server  network. *We are 
> doubtful about which  cost too much CPU time and we want to Improve  
> performance and reduces CPU usage of Kafka server.*_
>   
>  _*5.Analysis*_
>         We analysis the JFIR of Kafka server when doing performance testing 
> .We found the hot spot method is 
> *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and 
> *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When
>   we checking thread stack information we  also have found most CPU being 
> occupied by lots of thread  which  is busy decompressing messages.Then we 
> read source code of Kafka .
>        There is double-layer nested Iterator  when LogValidator do validate 
> every record.And There is a decompression for each message when traversing 
> every RecordBatch iterator. It is consuming CPU and affect total performance 
> that  decompress message._*The purpose of decompressing every messages just 
> for gain total size in bytes of one record and size in bytes of record body 
> when magic value to use is above 1 and no format conversion or value 
> overwriting is required for compressed messages.It is negative for 
> performance in common usage scenarios .*_{color:#33}Therefore, we suggest 
> that *_removing unnecessary decompression operation_* when doing  validation 
> for compressed message  when magic value to use is above 1 and no format 
> conversion or value overwriting is required for compressed messages.{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2019-03-31 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806349#comment-16806349
 ] 

huxihx commented on KAFKA-7965:
---

[~enether] Are you still working  on this jira? If not, I think I could take 
over it. Thanks.

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Stanislav Kozlovski
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.

2019-03-31 Thread hackerwin7 (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806352#comment-16806352
 ] 

hackerwin7 commented on KAFKA-8106:
---

[~wlwolf87] Currently the the broker decompress to validate record timestamp, 
key, offset, uncompressed size bytes, and magic .

Secondly how can decompress the only fields of `Record`, just because the 
compressor output stream write the bytes to buffer in low level ?

> Remove unnecessary decompression operation when logValidator  do validation.
> 
>
> Key: KAFKA-8106
> URL: https://issues.apache.org/jira/browse/KAFKA-8106
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
> Environment: Server : 
> cpu:2*16 ; 
> MemTotal : 256G;
> Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network 
> Connection ; 
> SSD.
>Reporter: Flower.min
>Assignee: Flower.min
>Priority: Major
>  Labels: performance
>
>       We do performance testing about Kafka in specific scenarios as 
> described below .We build a kafka cluster with one broker,and create topics 
> with different number of partitions.Then we start lots of producer processes 
> to send large amounts of messages to one of the topics at one  testing .
> *_Specific Scenario_*
>   
>  *_1.Main config of Kafka_*  
>  # Main config of Kafka  
> server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/]
>  # Number of TopicPartition : 50~2000
>  # Size of Single Message : 1024B
>  
>  *_2.Config of KafkaProducer_* 
> ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory||
> |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB|
> *_3.The best result of performance testing_*  
> ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of 
> production||
> |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s       |23,000,000 messages/s|
> *_4.Phenomenon and  my doubt_*
>     _The upper limit of CPU usage has been reached  But  it does not 
> reach the upper limit of the bandwidth of the server  network. *We are 
> doubtful about which  cost too much CPU time and we want to Improve  
> performance and reduces CPU usage of Kafka server.*_
>   
>  _*5.Analysis*_
>         We analysis the JFIR of Kafka server when doing performance testing 
> .We found the hot spot method is 
> *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and 
> *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When
>   we checking thread stack information we  also have found most CPU being 
> occupied by lots of thread  which  is busy decompressing messages.Then we 
> read source code of Kafka .
>        There is double-layer nested Iterator  when LogValidator do validate 
> every record.And There is a decompression for each message when traversing 
> every RecordBatch iterator. It is consuming CPU and affect total performance 
> that  decompress message._*The purpose of decompressing every messages just 
> for gain total size in bytes of one record and size in bytes of record body 
> when magic value to use is above 1 and no format conversion or value 
> overwriting is required for compressed messages.It is negative for 
> performance in common usage scenarios .*_{color:#33}Therefore, we suggest 
> that *_removing unnecessary decompression operation_* when doing  validation 
> for compressed message  when magic value to use is above 1 and no format 
> conversion or value overwriting is required for compressed messages.{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.

2019-03-31 Thread Ve Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806363#comment-16806363
 ] 

Ve Lee commented on KAFKA-8106:
---

{{RecordBatch =>}}
{{  }}{{FirstOffset => int64}}
{{  }}{{Length => int32}}
{{  }}{{PartitionLeaderEpoch => int32}}
{{  }}{{Magic => int8 }}
{{  }}{{CRC => int32}}
{{  }}{{Attributes => int16}}
{{  }}{{LastOffsetDelta => int32}}
{{  }}{{FirstTimestamp => int64}}
{{  }}{{MaxTimestamp => int64}}
{{  }}{{ProducerId => int64}}
{{  }}{{ProducerEpoch => int16}}
{{  }}{{FirstSequence => int32}}
{{  }}{{Records => [Record]}}
 

[~hackerwin7] The MaxTimestamp already exists in RecordBatch,but it was not 
used but recalculated once.

 

Decompress the record of "Length", "Attributes", "TimestampDelta", 
"OffsetDelta", "KeyLen", to see the [~Flower.min] pr 
[https://github.com/apache/kafka/pull/6476]
 

> Remove unnecessary decompression operation when logValidator  do validation.
> 
>
> Key: KAFKA-8106
> URL: https://issues.apache.org/jira/browse/KAFKA-8106
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
> Environment: Server : 
> cpu:2*16 ; 
> MemTotal : 256G;
> Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network 
> Connection ; 
> SSD.
>Reporter: Flower.min
>Assignee: Flower.min
>Priority: Major
>  Labels: performance
>
>       We do performance testing about Kafka in specific scenarios as 
> described below .We build a kafka cluster with one broker,and create topics 
> with different number of partitions.Then we start lots of producer processes 
> to send large amounts of messages to one of the topics at one  testing .
> *_Specific Scenario_*
>   
>  *_1.Main config of Kafka_*  
>  # Main config of Kafka  
> server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/]
>  # Number of TopicPartition : 50~2000
>  # Size of Single Message : 1024B
>  
>  *_2.Config of KafkaProducer_* 
> ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory||
> |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB|
> *_3.The best result of performance testing_*  
> ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of 
> production||
> |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s       |23,000,000 messages/s|
> *_4.Phenomenon and  my doubt_*
>     _The upper limit of CPU usage has been reached  But  it does not 
> reach the upper limit of the bandwidth of the server  network. *We are 
> doubtful about which  cost too much CPU time and we want to Improve  
> performance and reduces CPU usage of Kafka server.*_
>   
>  _*5.Analysis*_
>         We analysis the JFIR of Kafka server when doing performance testing 
> .We found the hot spot method is 
> *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and 
> *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When
>   we checking thread stack information we  also have found most CPU being 
> occupied by lots of thread  which  is busy decompressing messages.Then we 
> read source code of Kafka .
>        There is double-layer nested Iterator  when LogValidator do validate 
> every record.And There is a decompression for each message when traversing 
> every RecordBatch iterator. It is consuming CPU and affect total performance 
> that  decompress message._*The purpose of decompressing every messages just 
> for gain total size in bytes of one record and size in bytes of record body 
> when magic value to use is above 1 and no format conversion or value 
> overwriting is required for compressed messages.It is negative for 
> performance in common usage scenarios .*_{color:#33}Therefore, we suggest 
> that *_removing unnecessary decompression operation_* when doing  validation 
> for compressed message  when magic value to use is above 1 and no format 
> conversion or value overwriting is required for compressed messages.{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.

2019-03-31 Thread Ve Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806363#comment-16806363
 ] 

Ve Lee edited comment on KAFKA-8106 at 4/1/19 3:50 AM:
---

{{RecordBatch =>}}
 {{  }}{{FirstOffset => int64}}
 {{  }}{{Length => int32}}
 {{  }}{{PartitionLeaderEpoch => int32}}
 {{  }}{{Magic => int8 }}
 {{  }}{{CRC => int32}}
 {{  }}{{Attributes => int16}}
 {{  }}{{LastOffsetDelta => int32}}
 {{  }}{{FirstTimestamp => int64}}
 {{  }}{{MaxTimestamp => int64}}
 {{  }}{{ProducerId => int64}}
 {{  }}{{ProducerEpoch => int16}}
 {{  }}{{FirstSequence => int32}}
 {{  }}{{Records => [Record]}}
  

[~hackerwin7] The MaxTimestamp already exists in RecordBatch,but it was not 
used but recalculated once.

 

The "uncompressedSizeInBytes" is Record's Length, and the "Magic" is in 
RecordBatch.

Decompress the record of "Length", "Attributes", "TimestampDelta", 
"OffsetDelta", "KeyLen", to see the [~Flower.min] pr 
[https://github.com/apache/kafka/pull/6476]
  


was (Author: wlwolf87):
{{RecordBatch =>}}
{{  }}{{FirstOffset => int64}}
{{  }}{{Length => int32}}
{{  }}{{PartitionLeaderEpoch => int32}}
{{  }}{{Magic => int8 }}
{{  }}{{CRC => int32}}
{{  }}{{Attributes => int16}}
{{  }}{{LastOffsetDelta => int32}}
{{  }}{{FirstTimestamp => int64}}
{{  }}{{MaxTimestamp => int64}}
{{  }}{{ProducerId => int64}}
{{  }}{{ProducerEpoch => int16}}
{{  }}{{FirstSequence => int32}}
{{  }}{{Records => [Record]}}
 

[~hackerwin7] The MaxTimestamp already exists in RecordBatch,but it was not 
used but recalculated once.

 

Decompress the record of "Length", "Attributes", "TimestampDelta", 
"OffsetDelta", "KeyLen", to see the [~Flower.min] pr 
[https://github.com/apache/kafka/pull/6476]
 

> Remove unnecessary decompression operation when logValidator  do validation.
> 
>
> Key: KAFKA-8106
> URL: https://issues.apache.org/jira/browse/KAFKA-8106
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
> Environment: Server : 
> cpu:2*16 ; 
> MemTotal : 256G;
> Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network 
> Connection ; 
> SSD.
>Reporter: Flower.min
>Assignee: Flower.min
>Priority: Major
>  Labels: performance
>
>       We do performance testing about Kafka in specific scenarios as 
> described below .We build a kafka cluster with one broker,and create topics 
> with different number of partitions.Then we start lots of producer processes 
> to send large amounts of messages to one of the topics at one  testing .
> *_Specific Scenario_*
>   
>  *_1.Main config of Kafka_*  
>  # Main config of Kafka  
> server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/]
>  # Number of TopicPartition : 50~2000
>  # Size of Single Message : 1024B
>  
>  *_2.Config of KafkaProducer_* 
> ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory||
> |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB|
> *_3.The best result of performance testing_*  
> ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of 
> production||
> |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s       |23,000,000 messages/s|
> *_4.Phenomenon and  my doubt_*
>     _The upper limit of CPU usage has been reached  But  it does not 
> reach the upper limit of the bandwidth of the server  network. *We are 
> doubtful about which  cost too much CPU time and we want to Improve  
> performance and reduces CPU usage of Kafka server.*_
>   
>  _*5.Analysis*_
>         We analysis the JFIR of Kafka server when doing performance testing 
> .We found the hot spot method is 
> *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and 
> *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When
>   we checking thread stack information we  also have found most CPU being 
> occupied by lots of thread  which  is busy decompressing messages.Then we 
> read source code of Kafka .
>        There is double-layer nested Iterator  when LogValidator do validate 
> every record.And There is a decompression for each message when traversing 
> every RecordBatch iterator. It is consuming CPU and affect total performance 
> that  decompress message._*The purpose of decompressing every messages just 
> for gain total size in bytes of one record and size in bytes of record body 
> when magic value to use is above 1 and no format conversion or value 
> overwriting is required for compressed messages.It is negative for 
> performance in common usage scenarios .*_{color:#33}Therefore, we suggest 
> that *_removing unnecessary decompression operation_* when doing  validation 
> for compress

[jira] [Commented] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.

2019-03-31 Thread hackerwin7 (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16806375#comment-16806375
 ] 

hackerwin7 commented on KAFKA-8106:
---

[~wlwolf87] `MaxTimestamp` I think we can not use it to validation because of 
the trusted issue.

Thanks for the PR, just review this, it avoid the value and key read to 
decompress. just like partial decompression

any ideas to prevent record level operation across this?

> Remove unnecessary decompression operation when logValidator  do validation.
> 
>
> Key: KAFKA-8106
> URL: https://issues.apache.org/jira/browse/KAFKA-8106
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
> Environment: Server : 
> cpu:2*16 ; 
> MemTotal : 256G;
> Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network 
> Connection ; 
> SSD.
>Reporter: Flower.min
>Assignee: Flower.min
>Priority: Major
>  Labels: performance
>
>       We do performance testing about Kafka in specific scenarios as 
> described below .We build a kafka cluster with one broker,and create topics 
> with different number of partitions.Then we start lots of producer processes 
> to send large amounts of messages to one of the topics at one  testing .
> *_Specific Scenario_*
>   
>  *_1.Main config of Kafka_*  
>  # Main config of Kafka  
> server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/]
>  # Number of TopicPartition : 50~2000
>  # Size of Single Message : 1024B
>  
>  *_2.Config of KafkaProducer_* 
> ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory||
> |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB|
> *_3.The best result of performance testing_*  
> ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of 
> production||
> |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s       |23,000,000 messages/s|
> *_4.Phenomenon and  my doubt_*
>     _The upper limit of CPU usage has been reached  But  it does not 
> reach the upper limit of the bandwidth of the server  network. *We are 
> doubtful about which  cost too much CPU time and we want to Improve  
> performance and reduces CPU usage of Kafka server.*_
>   
>  _*5.Analysis*_
>         We analysis the JFIR of Kafka server when doing performance testing 
> .We found the hot spot method is 
> *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and 
> *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When
>   we checking thread stack information we  also have found most CPU being 
> occupied by lots of thread  which  is busy decompressing messages.Then we 
> read source code of Kafka .
>        There is double-layer nested Iterator  when LogValidator do validate 
> every record.And There is a decompression for each message when traversing 
> every RecordBatch iterator. It is consuming CPU and affect total performance 
> that  decompress message._*The purpose of decompressing every messages just 
> for gain total size in bytes of one record and size in bytes of record body 
> when magic value to use is above 1 and no format conversion or value 
> overwriting is required for compressed messages.It is negative for 
> performance in common usage scenarios .*_{color:#33}Therefore, we suggest 
> that *_removing unnecessary decompression operation_* when doing  validation 
> for compressed message  when magic value to use is above 1 and no format 
> conversion or value overwriting is required for compressed messages.{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)