> On Jan. 23, 2015, 6:22 p.m., Guozhang Wang wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala,
> >  line 138
> > <https://reviews.apache.org/r/29899/diff/4/?file=831258#file831258line138>
> >
> >     If an exception is thrown in the callback, we will already finish the 
> > loop in line 108; do we need to call loop.done here again?

Since the callback and retry loop are in different threads, it is possible that 
there are more send requests before the exception is thrown in the callback.
We don't want to retry the loop, if the "send" method itself throws an 
unrecoverable exception. If the exception is thrown in the callback, we only 
set a flag. The exception in the main thread gets thrown with the next send 
request. 
loop.done in Line 108 is for the flow where no exception was thrown in the send 
method and a future was returned.


> On Jan. 23, 2015, 6:22 p.m., Guozhang Wang wrote:
> > samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java,
> >  line 26
> > <https://reviews.apache.org/r/29899/diff/4/?file=831263#file831263line26>
> >
> >     Would org.apache.kafka.clients.producer.MockProducer be configured to 
> > satisfy the requests here?

I have addresssed the same question in 
https://reviews.apache.org/r/29899/#comment113812


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29899/#review69423
-----------------------------------------------------------


On Jan. 23, 2015, 12:42 a.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29899/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2015, 12:42 a.m.)
> 
> 
> Review request for samza, Chris Riccomini, Guozhang Wang, and Jay Kreps.
> 
> 
> Bugs: SAMZA-227
>     https://issues.apache.org/jira/browse/SAMZA-227
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Modified logic in KafkaSystemProducer for send & flush based on the behavior 
> of the new java-based Kafka producer API
> Added "MockKafkaProducer" in Samza layer to mock out the buffering behavior 
> provided by Kafka producer. The MockProducer exposed by Kafka does not 
> provide sufficient control for writing unit tests.
> Producer config for Kafka is unified in "KafkaProducerConfig" with 
> appropriate default values. These can be overriden while instantiating the 
> producer config.
> 
> 
> Diffs
> -----
> 
>   build.gradle 7a40ad4ae916610186848c06c4577e7067de98ee 
>   docs/learn/documentation/versioned/jobs/configuration-table.html 
> 4ccc0e70b0817c1301fb82d0aa1efeff2575eea2 
>   gradle/dependency-versions.gradle 44dd42603e93788562fd64c68312570cee71a2aa 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
>  1d5627d0c561a0be6b48ee307b755958e62b783e 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
>  f2defbd39708e959edb1d6674e542b5bc9e02666 
>   samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
> e57b8ba1e09765774314ec469645b5d0bbde060f 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
>  4506ea367eec4e40da45feee777ba73069025a4c 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
>  a0e1ccbfe9dc4fd26ca6b30fc2d1348fb7d007e4 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala
>  7e1383f9c10a9231b3e41f2be54d750d3d2c6523 
>   samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala 
> f1b7511775703775eaa5172d7da88d302a89aa2e 
>   
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
>  553d6b4d6ffe21f4a92c8c347e835d95d71b5863 
>   samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala 
> 0e1c38e5d68f2f3e42ecdb58297a11ff5d29374d 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/MockKafkaProducer.java
>  PRE-CREATION 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
>  c759a7bea7b67714eaa90a97f828079f26acbca4 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
>  8067cbf78214d3c01b7f915d8810b10de57fe6a3 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala
>  72b36f774b2b8845539f26fc592244353cf300cd 
>   samza-kafka/src/test/scala/org/apache/samza/utils/TestUtils.java 
> PRE-CREATION 
>   samza-test/src/main/config/negate-number.properties 
> 4989b279a22ffd8acdb2c31ca4813f6768edc75b 
>   
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
>  ca25258217e5ebc44b34fbc4d69ecb28c81df618 
> 
> Diff: https://reviews.apache.org/r/29899/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build - SUCCESSFUL
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>

Reply via email to