> On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 
> > 299
> > <https://reviews.apache.org/r/25995/diff/1/?file=704523#file704523line299>
> >
> >     Do we need to do this check every time in the loop?

Maybe we can put this check out of while loop but that would probably introduce 
more duplicate code. Since the offset commit is not that frequent and the retry 
is hopefully rare, it should not create much overhead.


> On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 168
> > <https://reviews.apache.org/r/25995/diff/1/?file=704524#file704524line168>
> >
> >     Do you need to turn off auto commit on the consumer threads here?

I thought offset commit should be turned off in the consumer config. Is that 
the case?


> On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 207
> > <https://reviews.apache.org/r/25995/diff/1/?file=704524#file704524line207>
> >
> >     For clean shutdown, you need to
> >     
> >     1) "halt" consumer threads first.
> >     
> >     2) wait for producer to drain all the messages in data channel.
> >     
> >     3) manually commit offsets on consumer threads.
> >     
> >     4) shut down consumer threads.
> >     
> >     Otherwise we will have data duplicates as we commit offsets based on 
> > min.

Talked to Guozhang, changed the process to be as below:
1. shutdown consumer threads.
2. shutdown producer
3. commit offsets
4. shutdown consumer connectors


> On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 447
> > <https://reviews.apache.org/r/25995/diff/1/?file=704524#file704524line447>
> >
> >     Adding comment to the logic of how this works. Also a few questions:
> >     
> >     1) is the map() call synchronized with other threads putting new 
> > offsets into the map?
> >     
> >     2) after the sorting, the logic may be clearer as
> >     
> >     val commitableOffsetIndex = 0
> >     while (offsets[commitableOffsetIndex] - offsets.head == 
> > commitableOffsetIndex) commitableOffsetIndex += 1
> >     
> >     offsetToCommit = offsets[commitableOffsetIndex] + 1

We are using a concurrent map, that guarantees a single put/get operation is 
atomic. Although its possible that the offset we get for different partitions 
might reflects different time point's value. But it should not matter that much 
because later commits will get updated value. And the offset we commit when 
exiting is guaranteed to be after the producer is shutdown. So I think the 
commits during running time does not needs to be 100% accurate.


> On Sept. 25, 2014, 11:48 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/MirrorMaker.scala, line 483
> > <https://reviews.apache.org/r/25995/diff/1/?file=704524#file704524line483>
> >
> >     The send().get() call is missing.

I put it in side the put().


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/25995/#review54620
-----------------------------------------------------------


On Sept. 24, 2014, 4:26 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/25995/
> -----------------------------------------------------------
> 
> (Updated Sept. 24, 2014, 4:26 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1650
>     https://issues.apache.org/jira/browse/KAFKA-1650
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> mirror maker redesign; adding byte bounded blocking queue.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> fbc680fde21b02f11285a4f4b442987356abd17b 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> b8698ee1469c8fbc92ccc176d916eb3e28b87867 
>   core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/25995/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>

Reply via email to