> On Feb. 27, 2015, 7:40 p.m., Yan Fang wrote:
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala,
> >  line 151
> > <https://reviews.apache.org/r/31520/diff/3/?file=880457#file880457line151>
> >
> >     1. besides "abdicate", refreshBrokers is also called by 
> > "messageSink.refreshDropped" in the BrokerProxy thread.
> >     
> >     2. in terms of *"creating multiple objects for the same broker"*, even 
> > though we "synchronized" this part, it seems still possible to have 
> > multiple objects for the same broker: 
> >     * thread 1 runs until "val nextoffset = 
> > topicPartitionsAndOffsets.get(head).get", so the "head" already has topic 
> > "t1". Then it is blocked by thread 2.
> >     * thread 2 runs, synchronized, creates the broker for topic "t1", and 
> > remove it from "droopedTopicAndPartitions". release the lock.
> >     * thread 1 goes into the "synchronized" part, it will still create 
> > broker for topic "t1" because "val head" already has the value.

Sorry, the format is a little confusing... "3,4,5" is bullet list for "2". :)


- Yan


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31520/#review74558
-----------------------------------------------------------


On Feb. 27, 2015, 5:59 p.m., Chris Riccomini wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31520/
> -----------------------------------------------------------
> 
> (Updated Feb. 27, 2015, 5:59 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-579
>     https://issues.apache.org/jira/browse/SAMZA-579
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> fix test
> 
> 
> clean up droppedTopicAndPartitions slightly
> 
> 
> misc formatting cleanup from SAMZA-394. fix bug in KafkaSystemConsumer where 
> we weren't actually adding to droppedTopicAndPartitions
> 
> 
> make refreshBrokers thread safe
> 
> 
> temporarily drop ssps without metadata, and retry later.
> 
> 
> Diffs
> -----
> 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> e3b9d304353981abc4d9760ccb078ff6dddcbb19 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
> cc0a4c6913ac7a08d520bb485146b1488f43ef98 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
>  4918e3e88c20f3db11f5ae651ae104cf63b3d592 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala 
> 48ad66eb07e2b5743e4f0cf648c67b48dc05e067 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
>  6f05f3c028cf77dbee9186d3463b1999c70dac43 
>   
> samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java 
> 1fef1eaa6266f638cf9f70c982f7d5f99f5c535c 
>   
> samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java 
> e958b51610d6927121bacffdf6d30efc13b5f444 
>   samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java 
> cb3083832986d5028fdfb804bc4e8c8e61b6c60b 
> 
> Diff: https://reviews.apache.org/r/31520/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Chris Riccomini
> 
>

Reply via email to