> On July 22, 2014, 7:59 p.m., Joel Koshy wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java, > > line 92 > > <https://reviews.apache.org/r/23648/diff/1/?file=634391#file634391line92> > > > > What if the coordinator itself fails after a while and needs to be > > re-queried? Not sure if you intend for all of these to be covered in the > > subsequent work that addresses failure scenarios more thoroughly. > >
Yes, I will revisit this once we check all failure scenarios, as there are other changes we may need to make. > On July 22, 2014, 7:59 p.m., Joel Koshy wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java, > > line 146 > > <https://reviews.apache.org/r/23648/diff/1/?file=634391#file634391line146> > > > > I think it would be clearer to use a negated condition. i.e., if > > (txStatus != TransactionStatus.NOTRANSACTION) There are more than 2 states. With a negated condition we would not allow to start a transaction when the status is ABORTED, for example. > On July 22, 2014, 7:59 p.m., Joel Koshy wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java, > > line 158 > > <https://reviews.apache.org/r/23648/diff/1/?file=634391#file634391line158> > > > > if (txStatus != TransactionStatus.ONGOING) Similar to previous. In this case we would be allowing to abort a committed transaction. > On July 22, 2014, 7:59 p.m., Joel Koshy wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java, > > line 170 > > <https://reviews.apache.org/r/23648/diff/1/?file=634391#file634391line170> > > > > if (txStatus != TransactionStatus.ONGOING) In this case, repeating a commit command would throw an exception. I'm not sure if this is what we want, as there might be some failure scenarios where we want to be able to repeat commits. Not sure about this, maybe I should wait to have the failure handling done and then check this again? > On July 22, 2014, 7:59 p.m., Joel Koshy wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java, > > line 215 > > <https://reviews.apache.org/r/23648/diff/1/?file=634391#file634391line215> > > > > Each check is a full iteration. Rather, can we just keep a count of > > pending messages for this check? Yes, the iteration also removes messages. The idea is to remove them lazily when it is necessary to check if there are more to send. The alternative seems more complex, as we would need to attach a listener to check and remove received messages. Also, by keeping the remove logic in the same method that counts, we avoid the need to synch. between listener and this thread, i.e. it seems easier to return the exact count. > On July 22, 2014, 7:59 p.m., Joel Koshy wrote: > > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 186 > > <https://reviews.apache.org/r/23648/diff/1/?file=634383#file634383line186> > > > > One issue with this call is that it can block and add to the total time > > that poll needs to wait which breaks the API contract of poll (i.e., it > > should return in at most timeout ms). Can we fold these into the poll below? > > > > Also, (especially since the method is prefixed with) maybe... it would > > be cleaner to move the txContext null check inside the method. Regarding poll, do you mean putting the maybeUpdate... methods inside poll? This would require to pass at least two additional references. What if instead I measure the time to perform these maybeUpdate... and substrate from the remaining timeout for poll? I think this should be accurate enough > On July 22, 2014, 7:59 p.m., Joel Koshy wrote: > > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 463 > > <https://reviews.apache.org/r/23648/diff/1/?file=634383#file634383line463> > > > > Offset commits are only needed to redo aborted transactions - i.e., > > typically in a failure scenario - so this is not really required for the > > scope of this jira right? Furthermore, we would want the partition of the > > offsets topic to be part of the transaction itself and the offset manager > > is not transaction aware either. Correct. I started sketching this and should have not included in this patch. I will provide a complete version once we handle failure scenarios. > On July 22, 2014, 7:59 p.m., Joel Koshy wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java, > > line 218 > > <https://reviews.apache.org/r/23648/diff/1/?file=634391#file634391line218> > > > > May need to unnecessarily wait. i.e., who interrupts? True. My intention was to check if all ACKs are received, otherwise wait a fraction of the total timeout. Instead of wait(remainingWaitMs) it would be something like wait(remainingWaitMs/4). - Raul ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23648/#review48338 ----------------------------------------------------------- On July 17, 2014, 5:37 p.m., Raul Castro Fernandez wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/23648/ > ----------------------------------------------------------- > > (Updated July 17, 2014, 5:37 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1524 > https://issues.apache.org/jira/browse/KAFKA-1524 > > > Repository: kafka > > > Description > ------- > > KAFKA-1524; Implement transactional producer > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java > 522881c972ca42ff4dfb6237a2db15b625334d7e > > clients/src/main/java/org/apache/kafka/clients/producer/InvalidTransactionStatusException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 00775abbcac850b0f2bb9a70b6fbc7cdf319bcf6 > clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java > c0f1d57e0feb894d9f246058cd0396461afe3225 > clients/src/main/java/org/apache/kafka/clients/producer/Producer.java > 36e8398416036cab84faad1f07159e5adefd8086 > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > f9de4af426449cceca12a8de9a9f54a6241d28d8 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > 1ed3c28b436d28381d9402896e32d16f2586c65e > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java > dd0af8aee98abed5d4a0dc50989e37888bb353fe > > clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionControl.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorNotAvailableException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/TransactionFailedException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/record/Compressor.java > 0323f5f7032dceb49d820c17a41b78c56591ffc4 > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java > 759f577eaf0e7d28a84926d4aa30f4ef0cb27bc2 > clients/src/main/java/org/apache/kafka/common/record/Record.java > 10df9fd8d3f4ec8c277650fa7eab269f3ea30d85 > > clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java > 93b58d02eac0f8ca28440e3e0ebea28ed3a7673c > clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java > 5489acac6806b3ae5e6d568d401d5a20c86cac05 > > clients/src/test/java/org/apache/kafka/clients/producer/TransactionContextTest.java > PRE-CREATION > clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java > 94a11121e207d5cf94dbc94443a8aa7edf387782 > > Diff: https://reviews.apache.org/r/23648/diff/ > > > Testing > ------- > > > Thanks, > > Raul Castro Fernandez > >