> On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote: > > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 589 > > <https://reviews.apache.org/r/53282/diff/1/?file=1548705#file1548705line589> > > > > Typo: Envelope
fixed. Thanks for catching this! > On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote: > > samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java, > > line 81 > > <https://reviews.apache.org/r/53282/diff/1/?file=1548706#file1548706line81> > > > > This would probably be good to have as info. fixed. > On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote: > > samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java, > > line 91 > > <https://reviews.apache.org/r/53282/diff/1/?file=1548708#file1548708line91> > > > > Minor: s/completeCallbacks/completedCallbacks? fixed > On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote: > > samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java, > > line 91 > > <https://reviews.apache.org/r/53282/diff/1/?file=1548708#file1548708line91> > > > > Minor: s/completeCallbacks/completedCallbacks? same above. > On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote: > > samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java, > > line 99 > > <https://reviews.apache.org/r/53282/diff/1/?file=1548708#file1548708line99> > > > > Minor: Is this within the max line width? fixed. > On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote: > > samza-core/src/main/java/org/apache/samza/util/TimerClock.java, line 25 > > <https://reviews.apache.org/r/53282/diff/1/?file=1548709#file1548709line25> > > > > Can we just add this method to the existing Clock interface? Weird to > > have two clock interfaces. Chris raised the same question. The other interface has an extra method I don't need, plus I want to use lamdba for this. I think I can make the HighResolutionClock extends from this one. Does that sounds better? > On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote: > > samza-core/src/main/java/org/apache/samza/util/Utils.java, line 38 > > <https://reviews.apache.org/r/53282/diff/1/?file=1548710#file1548710line38> > > > > Prefer not having a util class for this one method, which I think is > > only used in one place? Also don't think this is worth having a util method > > for. Good point. The method was only used by one class (RunLoopFactory). I moved the method directly in that class to avoid the util class. > On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote: > > samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala, line > > 33 > > <https://reviews.apache.org/r/53282/diff/1/?file=1548711#file1548711line33> > > > > Minor: s/timer/timers (same for field name) I put this config to mean all timer metrics are turned on/off. So I guess this config name should be fine. > On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote: > > samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala, line > > 64 > > <https://reviews.apache.org/r/53282/diff/1/?file=1548711#file1548711line64> > > > > Minor: s/using/use fixed > On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote: > > samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala, line > > 69 > > <https://reviews.apache.org/r/53282/diff/1/?file=1548711#file1548711line69> > > > > Don't need `return` here (and later). fixed > On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote: > > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, > > line 65 > > <https://reviews.apache.org/r/53282/diff/1/?file=1548713#file1548713line65> > > > > I don't think we should ever be creating a default new SerdeManager > > just for this class. Same for SystemConsumerMetrics (and other places where > > this pattern is used for objects). Constants are fine. I think these are created as default for the testing purpose. Normally we will pass in the real SerdeManager. > On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote: > > samza-core/src/main/scala/org/apache/samza/util/Util.scala, line 397 > > <https://reviews.apache.org/r/53282/diff/1/?file=1548714#file1548714line397> > > > > This seems like a weird method to have. Would prefer to remove. This makes a lot easier to convert a Java TimerClock object to a scala function to return the time. I agree it's not pretty, but there has to be a workaround for converting this right now. > On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote: > > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala, > > line 108 > > <https://reviews.apache.org/r/53282/diff/1/?file=1548713#file1548713line108> > > > > Same here, not sure if we should use the default. Same as above. > On Nov. 2, 2016, 7:57 p.m., Prateek Maheshwari wrote: > > samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala, > > line 137 > > <https://reviews.apache.org/r/53282/diff/1/?file=1548719#file1548719line137> > > > > See comment about passing this in instead. fixed. - Xinyu ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/53282/#review154602 ----------------------------------------------------------- On Nov. 2, 2016, 5:56 p.m., Xinyu Liu wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/53282/ > ----------------------------------------------------------- > > (Updated Nov. 2, 2016, 5:56 p.m.) > > > Review request for samza, Chris Pettitt, Jake Maes, and Navina Ramesh. > > > Repository: samza > > > Description > ------- > > In the recent experiments of samza batch job (consuming hdfs data on hadoop), > the results are subpar to map/reduce and spark. By looking at the metrics > closely, we found two basic problems: > > 1) Not enough data to process. This is spotted as the unprocessed message > queue length was zero for quite a lot of times. > > 2) Not process fast enough. We found samza performed closely in both median > size records (100B) and small record (10B), while spark can scale very well > in the small record (over 1M/s). > > The first problem is solved by increasing the buffer size. This ticket is to > address the second problem, which contains three major improvements: > > - Option to turn off timer metrics calculation: one of the main time spent in > samza processing turns out to be just keeping the timer metrics. While it is > useful in debugging, it becomes a bottleneck when running a stable job with > high performance. In my testing job which consumes 8M mock data, it took 30 > secs with timer metrics on. After turning it off, it only took 14 secs. > > - Java coding improvements: The AsyncRunLoop code can be further optimized > for efficiency. Some of the thread-safe data structure I am using is not for > optimal performance (Collections.synchronizedSet). I switched to use > CopyOnWriteArraySet, which has far better performance due to more reads and > small set size. > > - Specific handling for in-order processing improvements: AsyncRunLoop > handles the callbacks regardless of whether it's in-order or out-of-order > (max concurrency > 1), which incurs quite some cost. By simplying the logic > for in-order handling, the performance gains. > > After all three improvements, my test job with mock input (8M messages) can > be processed within 8 sec (down from org 30 secs), so it's 1M/s for one cpu > core. > > For the performance benchmark jobs running in Hadoop, we also see a 4 times > improvement with all the fixes above. Please take a look at the attached > spreedsheet (see the numbers with fix(turn off the timing metrics) and > fix2(all three together). > > > Diffs > ----- > > samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java > 609a956a1f2fa97419c2f66fe2fb6876aaaeecd0 > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java > 8fac8155c7f64e67d4a39ec6943f98da1e1d63d9 > samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java > 052b3b91ec609ca6288662cfa2d3e71b0273d020 > samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java > 9b700998d2af040c6734289f7f28bbd78c36bd2c > samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java > 132cf59eb593524a4cac134aeceeeb37a4c74b1f > samza-core/src/main/java/org/apache/samza/util/TimerClock.java PRE-CREATION > samza-core/src/main/java/org/apache/samza/util/Utils.java > 472e0a59d5aa992b136292c8a3347c311e2cd606 > samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala > c3fd8bfb2e16a4c5146d34682d04cb1d4e9bbe72 > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > e0468ee89c89fd720834461771ebb36475475bcb > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala > e2aed5b1c2e77a914268963b21809380972037b6 > samza-core/src/main/scala/org/apache/samza/util/Util.scala > c4836f202f7eda1d4e71eac94fd48e46207b0316 > samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java > 6000ffaf2b8723d48a72e58b571f242a42dc8128 > samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java > 99e1e18bcfa6bca1e275d8ae030a77ff8d70a4eb > samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java > f1dbf35165e6ddfc02e3522887c25d78a4bbfcd7 > samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java > d7110f34a9eae6e9ffc15b4982bfbb180da88b2d > > samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala > c975893a42689732c39c39600fecacee843bf9d6 > > Diff: https://reviews.apache.org/r/53282/diff/ > > > Testing > ------- > > ./gradlew build > > Tested in the yarn hadoop cluster with different kinds of jobs. > > > File Attachments > ---------------- > > hdfs performance > > https://reviews.apache.org/media/uploaded/files/2016/11/02/c05007fe-2fdd-4c8c-b5ef-b7862dea13b2__hdfs_perf.png > > > Thanks, > > Xinyu Liu > >