----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/48213/#review136654 -----------------------------------------------------------
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala (line 87) <https://reviews.apache.org/r/48213/#comment201746> You need to save the value of the producer before this. Something like: ``` final currentProducer = producer; if (currentProducer == null) { ... ``` And you need to replace all usages of producer in this method with currentProducer. This ensures that the producer is not changed out from under you midway through the method or later in the callback. - Chris Pettitt On June 8, 2016, 1:07 a.m., Xinyu Liu wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/48213/ > ----------------------------------------------------------- > > (Updated June 8, 2016, 1:07 a.m.) > > > Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data > Infrastructure). > > > Repository: samza > > > Description > ------- > > All the system producers need to be thread safe in order to be used in > multithreaded tasks. The following are the changes > (ElasticSearchSystemProducer is already thread safe so no change made there): > > In KafkaSystemProducer, remove the buggy retry logic and treat any exception > as fatal. > In HdfsSystemProducer, add synchronization lock to all public methods. > > > Diffs > ----- > > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala > 1f4b5c46436e44b7c7cd1a49689c4f43f1f6ed1b > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala > 3769e103616dc0f1fd869706cc086e24cd926c48 > > samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java > 04c9113fd6c3dd56c49ff46c8c1c0ff12f68e5e2 > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala > 8e32bba6ced090f0fc8d4e5176fe0788df36981d > > Diff: https://reviews.apache.org/r/48213/diff/ > > > Testing > ------- > > Unit tests and local testing. > > > Thanks, > > Xinyu Liu > >