-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23648/#review48338
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/23648/#comment84840>

    Instead of repeating the assignments here can you do: this(selector, 
metadata, ...) and then do the assignment of only txcontext?



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/23648/#comment84857>

    One issue with this call is that it can block and add to the total time 
that poll needs to wait which breaks the API contract of poll (i.e., it should 
return in at most timeout ms). Can we fold these into the poll below?
    
    Also, (especially since the method is prefixed with) maybe... it would be 
cleaner to move the txContext null check inside the method.



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/23648/#comment84879>

    No need for now - right?



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/23648/#comment84880>

    and here?



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/23648/#comment84881>

    Should have brought this up in the other RB: why is group id an int and 0? 
Group id is a string in an OCR.



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/23648/#comment84882>

    Should we break out maybe update txcoordinator metadata to its own method 
(and reissue a call to leastLoadedNode)? i.e., the txcoordinatormetadata 
request need not be gated/coupled with topic metadata requests.



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/23648/#comment84874>

    Offset commits are only needed to redo aborted transactions - i.e., 
typically in a failure scenario - so this is not really required for the scope 
of this jira right? Furthermore, we would want the partition of the offsets 
topic to be part of the transaction itself and the offset manager is not 
transaction aware either.



clients/src/main/java/org/apache/kafka/clients/producer/InvalidTransactionStatusException.java
<https://reviews.apache.org/r/23648/#comment84878>

    Comment needs to be fixed.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/23648/#comment84884>

    Can make this a bit more concise by calling the constructor with 
this.txContext.



clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
<https://reviews.apache.org/r/23648/#comment84887>

    tx.fetch is a slightly odd name. i.e., why is it a fetch?



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/23648/#comment84888>

    flushRequested



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84867>

    You mean transactional producers.



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84868>

    Prefer PENDING or OPEN to ONGOING.



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84869>

    The name is a bit confusing since we also have txcoordinatormetadata - I'm 
wondering if it helps significantly to have what seems to be a specialized 
context nested inside a context.



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84870>

    updateTxCoordinatorMetadata



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84871>

    Can you make the comment a bit clearer - i.e., what offsets specifically 
does this refer to? I think you mean start offsets/state of the input 
partitions of this transaction.



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84866>

    Let us call it txGroupId instead.



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84872>

    What if the coordinator itself fails after a while and needs to be 
re-queried? Not sure if you intend for all of these to be covered in the 
subsequent work that addresses failure scenarios more thoroughly.
    



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84873>

    Same here.



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84889>

    For this sort of API it may be better/clear enough to just have a getter 
and have the caller do a null check.



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84890>

    I think it would be clearer to use a negated condition. i.e., if (txStatus 
!= TransactionStatus.NOTRANSACTION)



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84891>

    if (txStatus != TransactionStatus.ONGOING)



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84892>

    if (txStatus != TransactionStatus.ONGOING)



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84895>

    Each check is a full iteration. Rather, can we just keep a count of pending 
messages for this check?



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84894>

    May need to unnecessarily wait. i.e., who interrupts?



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84893>

    Unnecessary catch?



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84886>

    Is there a significant benefit in nesting this here?



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionControl.java
<https://reviews.apache.org/r/23648/#comment84897>

    Maybe we should set prepare_commit/abort to 3 and 4.



clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorNotAvailableException.java
<https://reviews.apache.org/r/23648/#comment84898>

    Incorrect comment.



clients/src/main/java/org/apache/kafka/common/errors/TransactionFailedException.java
<https://reviews.apache.org/r/23648/#comment84899>

    Same


- Joel Koshy


On July 17, 2014, 5:37 p.m., Raul Castro Fernandez wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23648/
> -----------------------------------------------------------
> 
> (Updated July 17, 2014, 5:37 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1524
>     https://issues.apache.org/jira/browse/KAFKA-1524
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1524; Implement transactional producer
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 522881c972ca42ff4dfb6237a2db15b625334d7e 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/InvalidTransactionStatusException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 00775abbcac850b0f2bb9a70b6fbc7cdf319bcf6 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> c0f1d57e0feb894d9f246058cd0396461afe3225 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
> 36e8398416036cab84faad1f07159e5adefd8086 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> f9de4af426449cceca12a8de9a9f54a6241d28d8 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  1ed3c28b436d28381d9402896e32d16f2586c65e 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionControl.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorNotAvailableException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/TransactionFailedException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/Compressor.java 
> 0323f5f7032dceb49d820c17a41b78c56591ffc4 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
> 759f577eaf0e7d28a84926d4aa30f4ef0cb27bc2 
>   clients/src/main/java/org/apache/kafka/common/record/Record.java 
> 10df9fd8d3f4ec8c277650fa7eab269f3ea30d85 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  93b58d02eac0f8ca28440e3e0ebea28ed3a7673c 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 
> 5489acac6806b3ae5e6d568d401d5a20c86cac05 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/TransactionContextTest.java
>  PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 
> 94a11121e207d5cf94dbc94443a8aa7edf387782 
> 
> Diff: https://reviews.apache.org/r/23648/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Raul Castro Fernandez
> 
>

Reply via email to