> On July 21, 2014, 6:56 p.m., Joel Koshy wrote: > > core/src/main/scala/kafka/server/KafkaApis.scala, line 157 > > <https://reviews.apache.org/r/23568/diff/4/?file=635099#file635099line157> > > > > Looking at this method in the other patch - this only gives the head - > > what about the other partitions?
Originally, for each parition in txPartitions, a transactionRequest is created and sent to respective broker, with parition listed as the head of txPartitions. And the broker only needs to append request to the head of txPartitions. Now I have batched the transactionRequest sent to the same broker, and the broker will append transactionRequest to all the partitions in txPartitions that it leads. > On July 21, 2014, 6:56 p.m., Joel Koshy wrote: > > core/src/main/scala/kafka/server/KafkaApis.scala, line 224 > > <https://reviews.apache.org/r/23568/diff/4/?file=635099#file635099line224> > > > > Rather than do this one partition at a time we should group them by > > broker. Sure. Now I have batched the transactionRequest sent to the same broker > On July 21, 2014, 6:56 p.m., Joel Koshy wrote: > > core/src/main/scala/kafka/server/KafkaApis.scala, line 229 > > <https://reviews.apache.org/r/23568/diff/4/?file=635099#file635099line229> > > > > I think it is fine to use a channel manager similar to the controller > > channel manager but that is no longer specific to the controller. i.e., we > > should probably move it out to become a more generic re-usable > > "ChannelManager" module. > > > > In fact, given the critical nature of controller to broker > > communication we should probably dedicate a separate channel manager > > entirely to transactions so that it doesn't interfere with the > > controller-broker communication. I re-write the code to use a separate channel manager used solely by transaction manager. For the sake of not changing existing code and leave code refactor to future work, I have duplicated BrokerChangeListener() in TransactionManager.scala. > On July 21, 2014, 6:56 p.m., Joel Koshy wrote: > > core/src/main/scala/kafka/server/KafkaApis.scala, line 274 > > <https://reviews.apache.org/r/23568/diff/4/?file=635099#file635099line274> > > > > Same comments here apply as the above (wrt duplicate code) > On July 21, 2014, 6:56 p.m., Joel Koshy wrote: > > core/src/main/scala/kafka/server/TransactionManager.scala, line 74 > > <https://reviews.apache.org/r/23568/diff/4/?file=635102#file635102line74> > > > > How about we come up some other name for this - or even just > > TransactionalHW but that is a bit too wordy. Just want to avoid confusion > > with the replica HW. Sure. I have used checkpointTransactionHW. Is this better? > On July 21, 2014, 6:56 p.m., Joel Koshy wrote: > > core/src/main/scala/kafka/server/TransactionManager.scala, line 84 > > <https://reviews.apache.org/r/23568/diff/4/?file=635102#file635102line84> > > > > One issue with this approach is that every commit/abort will cause a > > linear scan of this queue - we can discuss some alternative ways to > > maintain the set of pending transactions and associated txcontrol offsets. I think this approach will have O(1) average cost for every commit/abort. I can explain it in person. > On July 21, 2014, 6:56 p.m., Joel Koshy wrote: > > core/src/main/scala/kafka/server/TransactionManager.scala, line 115 > > <https://reviews.apache.org/r/23568/diff/4/?file=635102#file635102line115> > > > > As discussed elsewhere, txid should not be the key. I see. I will use Array.empty[Byte] as message key. > On July 21, 2014, 6:56 p.m., Joel Koshy wrote: > > core/src/main/scala/kafka/message/Message.scala, line 32 > > <https://reviews.apache.org/r/23568/diff/4/?file=635098#file635098line32> > > > > Should also store the txcontrol in the message header. Yes you are right. Originally we expect transaction manager and consumer to read txcontrol from payload. We should put txcontrol in the header instead. > On July 21, 2014, 6:56 p.m., Joel Koshy wrote: > > core/src/main/scala/kafka/server/KafkaApis.scala, line 151 > > <https://reviews.apache.org/r/23568/diff/4/?file=635099#file635099line151> > > > > Initially, I was thinking we could just append to local log (since we > > definitely want to avoid duplicating code) until we have the API for > > durable append (to a replicated log). That is part of refactoring KafkaApis > > and is actually blocked on KAFKA-1333 so unless you need this for working > > through all the failure cases I would suggest just doing a local append for > > now. Yes. I recall our discussion to just append to local log. The purpose of using DelayedProduce here is to work through failure cases. > On July 21, 2014, 6:56 p.m., Joel Koshy wrote: > > core/src/main/scala/kafka/server/KafkaConfig.scala, line 304 > > <https://reviews.apache.org/r/23568/diff/4/?file=635100#file635100line304> > > > > Should remove the comment on "atomic" commits - that was only for the > > consumer offsets topic. OK! > On July 21, 2014, 6:56 p.m., Joel Koshy wrote: > > core/src/main/scala/kafka/server/TransactionManager.scala, line 39 > > <https://reviews.apache.org/r/23568/diff/4/?file=635102#file635102line39> > > > > We generally avoid using tuples and use case classes instead - since > > that is a lot clearer. I see. Will use case classes in the future. - Dong ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23568/#review48261 ----------------------------------------------------------- On July 22, 2014, 11:45 p.m., Dong Lin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/23568/ > ----------------------------------------------------------- > > (Updated July 22, 2014, 11:45 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1523 > https://issues.apache.org/jira/browse/KAFKA-1523 > > > Repository: kafka > > > Description > ------- > > KAFKA-1523 Transaction manager module > > > Diffs > ----- > > core/src/main/scala/kafka/admin/TopicCommand.scala > 8d5c2e7088fc6e8bf69e775ea7f5893b94580fdf > core/src/main/scala/kafka/common/Topic.scala > ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 > core/src/main/scala/kafka/controller/ControllerChannelManager.scala > 8763968fbff697e4c5c98ab1274627c192a4d26a > core/src/main/scala/kafka/message/Message.scala > d2a7293c7be4022af30884330924791340acc5c1 > core/src/main/scala/kafka/server/KafkaApis.scala > 0b668f230c8556fdf08654ce522a11847d0bf39b > core/src/main/scala/kafka/server/KafkaConfig.scala > ef75b67b67676ae5b8931902cbc8c0c2cc72c0d3 > core/src/main/scala/kafka/server/KafkaServer.scala > c22e51e0412843ec993721ad3230824c0aadd2ba > core/src/main/scala/kafka/server/TransactionManager.scala PRE-CREATION > core/src/main/scala/kafka/utils/ZkUtils.scala > dcdc1ce2b02c996294e19cf480736106aaf29511 > > Diff: https://reviews.apache.org/r/23568/diff/ > > > Testing > ------- > > > Thanks, > > Dong Lin > >