> On Feb. 27, 2015, 7:40 p.m., Yan Fang wrote: > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala, > > line 151 > > <https://reviews.apache.org/r/31520/diff/3/?file=880457#file880457line151> > > > > 1. besides "abdicate", refreshBrokers is also called by > > "messageSink.refreshDropped" in the BrokerProxy thread. > > > > 2. in terms of *"creating multiple objects for the same broker"*, even > > though we "synchronized" this part, it seems still possible to have > > multiple objects for the same broker: > > * thread 1 runs until "val nextoffset = > > topicPartitionsAndOffsets.get(head).get", so the "head" already has topic > > "t1". Then it is blocked by thread 2. > > * thread 2 runs, synchronized, creates the broker for topic "t1", and > > remove it from "droopedTopicAndPartitions". release the lock. > > * thread 1 goes into the "synchronized" part, it will still create > > broker for topic "t1" because "val head" already has the value.
Sorry, the format is a little confusing... "3,4,5" is bullet list for "2". :) - Yan ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31520/#review74558 ----------------------------------------------------------- On Feb. 27, 2015, 5:59 p.m., Chris Riccomini wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31520/ > ----------------------------------------------------------- > > (Updated Feb. 27, 2015, 5:59 p.m.) > > > Review request for samza. > > > Bugs: SAMZA-579 > https://issues.apache.org/jira/browse/SAMZA-579 > > > Repository: samza > > > Description > ------- > > fix test > > > clean up droppedTopicAndPartitions slightly > > > misc formatting cleanup from SAMZA-394. fix bug in KafkaSystemConsumer where > we weren't actually adding to droppedTopicAndPartitions > > > make refreshBrokers thread safe > > > temporarily drop ssps without metadata, and retry later. > > > Diffs > ----- > > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > e3b9d304353981abc4d9760ccb078ff6dddcbb19 > samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala > cc0a4c6913ac7a08d520bb485146b1488f43ef98 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala > 4918e3e88c20f3db11f5ae651ae104cf63b3d592 > samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala > 48ad66eb07e2b5743e4f0cf648c67b48dc05e067 > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala > 6f05f3c028cf77dbee9186d3463b1999c70dac43 > > samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java > 1fef1eaa6266f638cf9f70c982f7d5f99f5c535c > > samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java > e958b51610d6927121bacffdf6d30efc13b5f444 > samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java > cb3083832986d5028fdfb804bc4e8c8e61b6c60b > > Diff: https://reviews.apache.org/r/31520/diff/ > > > Testing > ------- > > > Thanks, > > Chris Riccomini > >