-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review83392
-----------------------------------------------------------


Sorry for the delay. Overall, this looks good.

As discussed earlier, this patch needs a minor rebase.

There are a couple of points to note:
- In KAFKA-1499 you added broker-side compression. When writing out the 
compacted messages, we should compress using the configured compression codec. 
We can do this as an incremental change if you prefer. i.e., your current patch 
makes the log cleaner compression-aware. A subsequent patch can handle writing 
out to the configured codec. That part could be non-trivial as we would then 
probably want to do some batching when writing out compacted compressed 
messages.
- In KAFKA-1755 I had added some defensive code to prevent compressed messages 
and unkeyed messages from getting in. The compression-related code will need to 
be removed. Again, let me know if you need any help with this.

Let me know if you need help with any of this.


core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/24214/#comment134376>

    I would suggest one of two options over this (i.e., instead of two helper 
methods)
    - Inline both here and get rid of those
    - Have a single private helper (e.g., collectRetainedMessages)



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/24214/#comment134377>

    We should now compress with the compression codec of the topic (KAFKA-1499)



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/24214/#comment134378>

    We should instead do a trivial refactor in ByteBufferMessageSet to compress 
messages in a preallocated buffer. It would be preferable to avoid having this 
compression logic in different places.


- Joel Koshy


On Jan. 17, 2015, 6:53 p.m., Manikumar Reddy O wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> -----------------------------------------------------------
> 
> (Updated Jan. 17, 2015, 6:53 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
>     https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Updating the rebased code
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> f8e7cd5fabce78c248a9027c4bb374a792508675 
>   core/src/main/scala/kafka/tools/TestLogCleaning.scala 
> af496f7c547a5ac7a4096a6af325dad0d8feec6f 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
> 07acd460b1259e0a3f4069b8b8dcd8123ef5810e 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> -------
> 
> /*TestLogCleaning stress test output for compressed messages/
> 
> Producing 100000 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-6014466306002699464.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-177538909590644701.txt
> 100000 rows of data produced, 13165 rows of data consumed (86.8% reduction).
> De-duplicating and validating output files...
> Validated 9005 values, 0 mismatches.
> 
> Producing 1000000 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-3298578695475992991.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-7192293977610206930.txt
> 1000000 rows of data produced, 119926 rows of data consumed (88.0% reduction).
> De-duplicating and validating output files...
> Validated 89947 values, 0 mismatches.
> 
> Producing 10000000 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-3336255463347572934.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-9149188270705707725.txt
> 10000000 rows of data produced, 1645281 rows of data consumed (83.5% 
> reduction).
> De-duplicating and validating output files...
> Validated 899853 values, 0 mismatches.
> 
> 
> /*TestLogCleaning stress test output for non-compressed messages*/
> 
> Producing 100000 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-5174543709786189363.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-5143455017777144701.txt
> 100000 rows of data produced, 22775 rows of data consumed (77.2% reduction).
> De-duplicating and validating output files...
> Validated 17874 values, 0 mismatches.
> 
> Producing 1000000 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-7814446915546169271.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-5172557663160447626.txt
> 1000000 rows of data produced, 129230 rows of data consumed (87.1% reduction).
> De-duplicating and validating output files...
> Validated 89947 values, 0 mismatches.
> 
> Producing 10000000 messages...
> Logging produce requests to 
> /tmp/kafka-log-cleaner-produced-6092986571905399164.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to 
> /tmp/kafka-log-cleaner-consumed-63626021421841220.txt
> 10000000 rows of data produced, 1136608 rows of data consumed (88.6% 
> reduction).
> De-duplicating and validating output files...
> Validated 899853 values, 0 mismatches.
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>

Reply via email to