> On June 8, 2016, 3:36 p.m., Chris Pettitt wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala, > > line 89 > > <https://reviews.apache.org/r/48213/diff/2-3/?file=1406178#file1406178line89> > > > > You need to save the value of the producer before this. Something like: > > > > ``` > > final currentProducer = producer; > > if (currentProducer == null) { > > ... > > ``` > > > > And you need to replace all usages of producer in this method with > > currentProducer. This ensures that the producer is not changed out from > > under you midway through the method or later in the callback.
Great solution! I modified the code using local var currentProducer. Thanks. - Xinyu ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/48213/#review136654 ----------------------------------------------------------- On June 8, 2016, 11:53 p.m., Xinyu Liu wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/48213/ > ----------------------------------------------------------- > > (Updated June 8, 2016, 11:53 p.m.) > > > Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data > Infrastructure). > > > Repository: samza > > > Description > ------- > > All the system producers need to be thread safe in order to be used in > multithreaded tasks. The following are the changes > (ElasticSearchSystemProducer is already thread safe so no change made there): > > In KafkaSystemProducer, remove the buggy retry logic and treat any exception > as fatal. > In HdfsSystemProducer, add synchronization lock to all public methods. > > > Diffs > ----- > > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala > 1f4b5c46436e44b7c7cd1a49689c4f43f1f6ed1b > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala > 3769e103616dc0f1fd869706cc086e24cd926c48 > > samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemProducerJava.java > 04c9113fd6c3dd56c49ff46c8c1c0ff12f68e5e2 > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemProducer.scala > 8e32bba6ced090f0fc8d4e5176fe0788df36981d > > Diff: https://reviews.apache.org/r/48213/diff/ > > > Testing > ------- > > Unit tests and local testing. > > > Thanks, > > Xinyu Liu > >