----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23648/#review48338 -----------------------------------------------------------
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java <https://reviews.apache.org/r/23648/#comment84840> Instead of repeating the assignments here can you do: this(selector, metadata, ...) and then do the assignment of only txcontext? clients/src/main/java/org/apache/kafka/clients/NetworkClient.java <https://reviews.apache.org/r/23648/#comment84857> One issue with this call is that it can block and add to the total time that poll needs to wait which breaks the API contract of poll (i.e., it should return in at most timeout ms). Can we fold these into the poll below? Also, (especially since the method is prefixed with) maybe... it would be cleaner to move the txContext null check inside the method. clients/src/main/java/org/apache/kafka/clients/NetworkClient.java <https://reviews.apache.org/r/23648/#comment84879> No need for now - right? clients/src/main/java/org/apache/kafka/clients/NetworkClient.java <https://reviews.apache.org/r/23648/#comment84880> and here? clients/src/main/java/org/apache/kafka/clients/NetworkClient.java <https://reviews.apache.org/r/23648/#comment84881> Should have brought this up in the other RB: why is group id an int and 0? Group id is a string in an OCR. clients/src/main/java/org/apache/kafka/clients/NetworkClient.java <https://reviews.apache.org/r/23648/#comment84882> Should we break out maybe update txcoordinator metadata to its own method (and reissue a call to leastLoadedNode)? i.e., the txcoordinatormetadata request need not be gated/coupled with topic metadata requests. clients/src/main/java/org/apache/kafka/clients/NetworkClient.java <https://reviews.apache.org/r/23648/#comment84874> Offset commits are only needed to redo aborted transactions - i.e., typically in a failure scenario - so this is not really required for the scope of this jira right? Furthermore, we would want the partition of the offsets topic to be part of the transaction itself and the offset manager is not transaction aware either. clients/src/main/java/org/apache/kafka/clients/producer/InvalidTransactionStatusException.java <https://reviews.apache.org/r/23648/#comment84878> Comment needs to be fixed. clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/23648/#comment84884> Can make this a bit more concise by calling the constructor with this.txContext. clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java <https://reviews.apache.org/r/23648/#comment84887> tx.fetch is a slightly odd name. i.e., why is it a fetch? clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java <https://reviews.apache.org/r/23648/#comment84888> flushRequested clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java <https://reviews.apache.org/r/23648/#comment84867> You mean transactional producers. clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java <https://reviews.apache.org/r/23648/#comment84868> Prefer PENDING or OPEN to ONGOING. clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java <https://reviews.apache.org/r/23648/#comment84869> The name is a bit confusing since we also have txcoordinatormetadata - I'm wondering if it helps significantly to have what seems to be a specialized context nested inside a context. clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java <https://reviews.apache.org/r/23648/#comment84870> updateTxCoordinatorMetadata clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java <https://reviews.apache.org/r/23648/#comment84871> Can you make the comment a bit clearer - i.e., what offsets specifically does this refer to? I think you mean start offsets/state of the input partitions of this transaction. clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java <https://reviews.apache.org/r/23648/#comment84866> Let us call it txGroupId instead. clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java <https://reviews.apache.org/r/23648/#comment84872> What if the coordinator itself fails after a while and needs to be re-queried? Not sure if you intend for all of these to be covered in the subsequent work that addresses failure scenarios more thoroughly. clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java <https://reviews.apache.org/r/23648/#comment84873> Same here. clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java <https://reviews.apache.org/r/23648/#comment84889> For this sort of API it may be better/clear enough to just have a getter and have the caller do a null check. clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java <https://reviews.apache.org/r/23648/#comment84890> I think it would be clearer to use a negated condition. i.e., if (txStatus != TransactionStatus.NOTRANSACTION) clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java <https://reviews.apache.org/r/23648/#comment84891> if (txStatus != TransactionStatus.ONGOING) clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java <https://reviews.apache.org/r/23648/#comment84892> if (txStatus != TransactionStatus.ONGOING) clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java <https://reviews.apache.org/r/23648/#comment84895> Each check is a full iteration. Rather, can we just keep a count of pending messages for this check? clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java <https://reviews.apache.org/r/23648/#comment84894> May need to unnecessarily wait. i.e., who interrupts? clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java <https://reviews.apache.org/r/23648/#comment84893> Unnecessary catch? clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java <https://reviews.apache.org/r/23648/#comment84886> Is there a significant benefit in nesting this here? clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionControl.java <https://reviews.apache.org/r/23648/#comment84897> Maybe we should set prepare_commit/abort to 3 and 4. clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorNotAvailableException.java <https://reviews.apache.org/r/23648/#comment84898> Incorrect comment. clients/src/main/java/org/apache/kafka/common/errors/TransactionFailedException.java <https://reviews.apache.org/r/23648/#comment84899> Same - Joel Koshy On July 17, 2014, 5:37 p.m., Raul Castro Fernandez wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/23648/ > ----------------------------------------------------------- > > (Updated July 17, 2014, 5:37 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1524 > https://issues.apache.org/jira/browse/KAFKA-1524 > > > Repository: kafka > > > Description > ------- > > KAFKA-1524; Implement transactional producer > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java > 522881c972ca42ff4dfb6237a2db15b625334d7e > > clients/src/main/java/org/apache/kafka/clients/producer/InvalidTransactionStatusException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 00775abbcac850b0f2bb9a70b6fbc7cdf319bcf6 > clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java > c0f1d57e0feb894d9f246058cd0396461afe3225 > clients/src/main/java/org/apache/kafka/clients/producer/Producer.java > 36e8398416036cab84faad1f07159e5adefd8086 > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > f9de4af426449cceca12a8de9a9f54a6241d28d8 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > 1ed3c28b436d28381d9402896e32d16f2586c65e > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java > dd0af8aee98abed5d4a0dc50989e37888bb353fe > > clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionControl.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorNotAvailableException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/TransactionFailedException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/record/Compressor.java > 0323f5f7032dceb49d820c17a41b78c56591ffc4 > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java > 759f577eaf0e7d28a84926d4aa30f4ef0cb27bc2 > clients/src/main/java/org/apache/kafka/common/record/Record.java > 10df9fd8d3f4ec8c277650fa7eab269f3ea30d85 > > clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java > 93b58d02eac0f8ca28440e3e0ebea28ed3a7673c > clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java > 5489acac6806b3ae5e6d568d401d5a20c86cac05 > > clients/src/test/java/org/apache/kafka/clients/producer/TransactionContextTest.java > PRE-CREATION > clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java > 94a11121e207d5cf94dbc94443a8aa7edf387782 > > Diff: https://reviews.apache.org/r/23648/diff/ > > > Testing > ------- > > > Thanks, > > Raul Castro Fernandez > >