> On July 21, 2014, 6:56 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/KafkaApis.scala, line 157
> > <https://reviews.apache.org/r/23568/diff/4/?file=635099#file635099line157>
> >
> >     Looking at this method in the other patch - this only gives the head - 
> > what about the other partitions?

Originally, for each parition in txPartitions, a transactionRequest is created 
and sent to respective broker, with parition listed as the head of 
txPartitions. And the broker only needs to append request to the head of 
txPartitions.

Now I have batched the transactionRequest sent to the same broker, and the 
broker will append transactionRequest to all the partitions in txPartitions 
that it leads.


> On July 21, 2014, 6:56 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/KafkaApis.scala, line 224
> > <https://reviews.apache.org/r/23568/diff/4/?file=635099#file635099line224>
> >
> >     Rather than do this one partition at a time we should group them by 
> > broker.

Sure. Now I have batched the transactionRequest sent to the same broker


> On July 21, 2014, 6:56 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/KafkaApis.scala, line 229
> > <https://reviews.apache.org/r/23568/diff/4/?file=635099#file635099line229>
> >
> >     I think it is fine to use a channel manager similar to the controller 
> > channel manager but that is no longer specific to the controller. i.e., we 
> > should probably move it out to become a more generic re-usable 
> > "ChannelManager" module.
> >     
> >     In fact, given the critical nature of controller to broker 
> > communication we should probably dedicate a separate channel manager 
> > entirely to transactions so that it doesn't interfere with the 
> > controller-broker communication.

I re-write the code to use a separate channel manager used solely by 
transaction manager. For the sake of not changing existing code and leave code 
refactor to future work, I have duplicated BrokerChangeListener() in 
TransactionManager.scala.


> On July 21, 2014, 6:56 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/KafkaApis.scala, line 274
> > <https://reviews.apache.org/r/23568/diff/4/?file=635099#file635099line274>
> >
> >     Same comments here apply as the above (wrt duplicate code)


> On July 21, 2014, 6:56 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/TransactionManager.scala, line 74
> > <https://reviews.apache.org/r/23568/diff/4/?file=635102#file635102line74>
> >
> >     How about we come up some other name for this - or even just 
> > TransactionalHW but that is a bit too wordy. Just want to avoid confusion 
> > with the replica HW.

Sure. I have used checkpointTransactionHW. Is this better?


> On July 21, 2014, 6:56 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/TransactionManager.scala, line 84
> > <https://reviews.apache.org/r/23568/diff/4/?file=635102#file635102line84>
> >
> >     One issue with this approach is that every commit/abort will cause a 
> > linear scan of this queue - we can discuss some alternative ways to 
> > maintain the set of pending transactions and associated txcontrol offsets.

I think this approach will have O(1) average cost for every commit/abort. I can 
explain it in person.


> On July 21, 2014, 6:56 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/TransactionManager.scala, line 115
> > <https://reviews.apache.org/r/23568/diff/4/?file=635102#file635102line115>
> >
> >     As discussed elsewhere, txid should not be the key.

I see. I will use Array.empty[Byte] as message key.


> On July 21, 2014, 6:56 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/message/Message.scala, line 32
> > <https://reviews.apache.org/r/23568/diff/4/?file=635098#file635098line32>
> >
> >     Should also store the txcontrol in the message header.

Yes you are right. Originally we expect transaction manager and consumer to 
read txcontrol from payload. We should put txcontrol in the header instead.


> On July 21, 2014, 6:56 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/KafkaApis.scala, line 151
> > <https://reviews.apache.org/r/23568/diff/4/?file=635099#file635099line151>
> >
> >     Initially, I was thinking we could just append to local log (since we 
> > definitely want to avoid duplicating code) until we have the API for 
> > durable append (to a replicated log). That is part of refactoring KafkaApis 
> > and is actually blocked on KAFKA-1333 so unless you need this for working 
> > through all the failure cases I would suggest just doing a local append for 
> > now.

Yes. I recall our discussion to just append to local log. The purpose of using 
DelayedProduce here is to work through failure cases.


> On July 21, 2014, 6:56 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/KafkaConfig.scala, line 304
> > <https://reviews.apache.org/r/23568/diff/4/?file=635100#file635100line304>
> >
> >     Should remove the comment on "atomic" commits - that was only for the 
> > consumer offsets topic.

OK!


> On July 21, 2014, 6:56 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/TransactionManager.scala, line 39
> > <https://reviews.apache.org/r/23568/diff/4/?file=635102#file635102line39>
> >
> >     We generally avoid using tuples and use case classes instead - since 
> > that is a lot clearer.

I see. Will use case classes in the future.


- Dong


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23568/#review48261
-----------------------------------------------------------


On July 22, 2014, 11:45 p.m., Dong Lin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23568/
> -----------------------------------------------------------
> 
> (Updated July 22, 2014, 11:45 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1523
>     https://issues.apache.org/jira/browse/KAFKA-1523
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1523 Transaction manager module
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/admin/TopicCommand.scala 
> 8d5c2e7088fc6e8bf69e775ea7f5893b94580fdf 
>   core/src/main/scala/kafka/common/Topic.scala 
> ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
> 8763968fbff697e4c5c98ab1274627c192a4d26a 
>   core/src/main/scala/kafka/message/Message.scala 
> d2a7293c7be4022af30884330924791340acc5c1 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> 0b668f230c8556fdf08654ce522a11847d0bf39b 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> ef75b67b67676ae5b8931902cbc8c0c2cc72c0d3 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> c22e51e0412843ec993721ad3230824c0aadd2ba 
>   core/src/main/scala/kafka/server/TransactionManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/ZkUtils.scala 
> dcdc1ce2b02c996294e19cf480736106aaf29511 
> 
> Diff: https://reviews.apache.org/r/23568/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Dong Lin
> 
>

Reply via email to