----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/37817/#review98316 -----------------------------------------------------------
samza-core/src/main/java/org/apache/samza/job/model/JobModel.java (lines 65 - 72) <https://reviews.apache.org/r/37817/#comment155027> duplicated code with line 116 - 122 samza-core/src/main/scala/org/apache/samza/util/Util.scala (line 44) <https://reviews.apache.org/r/37817/#comment154725> not needed, 1) it is very short alreay 2) sometimes you are using clock, sometimes, calling System.currentTimeMillis, why not just calling System.currentTimeMillis anywhere? samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java (line 35) <https://reviews.apache.org/r/37817/#comment154682> if we are forced to implement the Runnable anyway, why not implement the interface in this abstract class? samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java (lines 67 - 68) <https://reviews.apache.org/r/37817/#comment154681> To be more precise, when run host-affinity-enabled job first time, the locality value is null too. samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java (line 89) <https://reviews.apache.org/r/37817/#comment154997> -> */ samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (line 37) <https://reviews.apache.org/r/37817/#comment154683> just follow the same code style: start the variable with a letter samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (lines 48 - 49) <https://reviews.apache.org/r/37817/#comment154685> samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (line 69) <https://reviews.apache.org/r/37817/#comment154688> maybe better to put the sleep after containerRequestState.releaseExtraContainers(), because there is no reason that we sleep at the very beginning. samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (line 71) <https://reviews.apache.org/r/37817/#comment155024> getContainersOnAHost returns allocatedContainers, not availableContainers. So what is the difference beween allocatedContainer and available conainers? samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (line 80) <https://reviews.apache.org/r/37817/#comment155000> this getId is for the global container Id, right? samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (line 84) <https://reviews.apache.org/r/37817/#comment155025> do not understand this comment. Why are there any extra containers? Do we request more containers than needed? Or this is only for host-enabled? samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java (lines 88 - 89) <https://reviews.apache.org/r/37817/#comment154687> change to log.info(msg, e)? samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 41) <https://reviews.apache.org/r/37817/#comment154689> remove _ samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 42) <https://reviews.apache.org/r/37817/#comment154696> I think ANY_HOST = "ANY_HOST" is less confusing. samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (lines 44 - 47) <https://reviews.apache.org/r/37817/#comment154691> what is this comment for? samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 55) <https://reviews.apache.org/r/37817/#comment154692> naming to requestQueue maybe more descriptive. Leave it to you. samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 61) <https://reviews.apache.org/r/37817/#comment154693> Maybe requests2StateMap/requests2CountMap ? just 2 cents. samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (lines 96 - 99) <https://reviews.apache.org/r/37817/#comment154694> 1. format the comment a little. :) using /* * */ 2. this comment does not explain the code following it. Maybe this comment should be part of the allocatedContainers variable javadoc. samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 113) <https://reviews.apache.org/r/37817/#comment154695> just curious if it is possible that, the hostname of the machine is different from the before-":"-part of the HttpAddress? samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 116) <https://reviews.apache.org/r/37817/#comment154698> can line 116 to 163 be refactored a little? Now it looks very confusing to me. (Too many nested if) Is something like: if (requestCountOnThisHost > 0 && (allocatedContainersOnThisHost == null || allocatedContainersOnThisHost.size() < requestCountOnThisHost)) addToAllocatedContainerList(hostName, container); } else { addToAllocatedContainerList(ANY_HOST, container); } sufficient for the logic? samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 124) <https://reviews.apache.org/r/37817/#comment155013> this is a little verbose. (just my opinion. :) samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (lines 135 - 137) <https://reviews.apache.org/r/37817/#comment154697> nit: follow the block comments format when the comments can not be fit in one line. :) /* * */ samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 199) <https://reviews.apache.org/r/37817/#comment155007> you mentioned "buffer" in a few places, which variable do you refer to? samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (lines 231 - 232) <https://reviews.apache.org/r/37817/#comment155008> why is this logic not in the hostAffinityEnabled clause? samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (lines 267 - 273) <https://reviews.apache.org/r/37817/#comment154699> why not just use allocatedContainers.get(host); samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 276) <https://reviews.apache.org/r/37817/#comment155020> this is not safe. Maybe return an unmodifiable version? samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java (line 280) <https://reviews.apache.org/r/37817/#comment155022> this seems not safe to me. samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java (line 51) <https://reviews.apache.org/r/37817/#comment154700> no _ samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java (lines 79 - 80) <https://reviews.apache.org/r/37817/#comment154706> it is worth differenciating the containerId and container.getId samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java (line 178) <https://reviews.apache.org/r/37817/#comment154701> wrap it in the log or the exception? Because if users are using the StreamAppender, they may lose those information. samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java (line 196) <https://reviews.apache.org/r/37817/#comment154702> same: wrap it samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java (line 199) <https://reviews.apache.org/r/37817/#comment154703> same: wrap it samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java (line 41) <https://reviews.apache.org/r/37817/#comment154707> no _ samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java (lines 60 - 68) <https://reviews.apache.org/r/37817/#comment155029> actually both HostAwareContainerAllocator and ContainerAllocator share the same logic: if value is null, use ANY_HOST. We may consider to abstract them to AbstractContainerAllocator. my 0.02. samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java (line 72) <https://reviews.apache.org/r/37817/#comment154709> remove empty line samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java (line 77) <https://reviews.apache.org/r/37817/#comment155030> you mean, ANY or ANY_HOST? samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java (line 84) <https://reviews.apache.org/r/37817/#comment154710> the same, move to the end of the try clause? samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java (line 86) <https://reviews.apache.org/r/37817/#comment154717> is it possible to refactor this part a little to make it more readable? Currently too many nested if statement. Many conditions can be combined. samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java (line 96) <https://reviews.apache.org/r/37817/#comment155032> same confusion. getContainersOnAHost gives the allocated containers, not avaialble containers, right? I guess getContainersOnAHost should talk to the RM to negotiate the possible resource? samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java (line 142) <https://reviews.apache.org/r/37817/#comment154721> do we define how many times we want to try before fail the whole job? Because if there is not sufficient resource, it is meaningless to keep trying the request. samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java (line 158) <https://reviews.apache.org/r/37817/#comment154714> warp it samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java (line 161) <https://reviews.apache.org/r/37817/#comment154715> wrap it samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java (line 42) <https://reviews.apache.org/r/37817/#comment154799> -> */ samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java (line 50) <https://reviews.apache.org/r/37817/#comment154800> -> */ samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java (line 56) <https://reviews.apache.org/r/37817/#comment154802> -> */ samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java (line 57) <https://reviews.apache.org/r/37817/#comment154803> if this is for the AM container id, maybe amContainerId makes more sense? samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java (line 115) <https://reviews.apache.org/r/37817/#comment154718> to */ samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java (line 120) <https://reviews.apache.org/r/37817/#comment154719> to */ samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java (line 130) <https://reviews.apache.org/r/37817/#comment154720> to */ samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java (line 140) <https://reviews.apache.org/r/37817/#comment154974> since now we have two different containerId, one is globally unique, ContainerStatus.getContainerId, and the other one is Samza's containerId (based on the container count). Maybe worth differenciating the name. It confused me a little. samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java (lines 39 - 42) <https://reviews.apache.org/r/37817/#comment154723> format the comment to /* * */ samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java (lines 55 - 64) <https://reviews.apache.org/r/37817/#comment154724> combine them into one if statement? samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java (lines 69 - 71) <https://reviews.apache.org/r/37817/#comment155034> I think the defaul Core, Memo, Priority should not be here because they already are in the YarnConfig. So we may want to pass those values from AbstractContainerAllocator, instead of having them here. samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 51) <https://reviews.apache.org/r/37817/#comment155035> where is the callback handler? Is it the SamzaTaskManager ? samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 58) <https://reviews.apache.org/r/37817/#comment154967> -> */ samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 61) <https://reviews.apache.org/r/37817/#comment154968> remove _ samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 85) <https://reviews.apache.org/r/37817/#comment154969> we usually have the config in a separate class. Maybe put it into the YarnConfig? samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 118) <https://reviews.apache.org/r/37817/#comment154970> TODO samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 139) <https://reviews.apache.org/r/37817/#comment154971> wrap it. samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 152) <https://reviews.apache.org/r/37817/#comment154972> -> */ samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 154) <https://reviews.apache.org/r/37817/#comment154973> javadoc will help understand this method samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (lines 157 - 161) <https://reviews.apache.org/r/37817/#comment154976> can this be simplified as if (state.runningContainers.containsValue(containerStatus.getContainerId()), then ... samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 174) <https://reviews.apache.org/r/37817/#comment154977> why is the containerId in the containerFailures list by default? It is possible that this containerId not in the list, right? samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (lines 183 - 185) <https://reviews.apache.org/r/37817/#comment154980> from this logic, even the Container fails because of DISKS_FAILED or ABORTED, we will still have the following log.info and do the line 186-205. Is it supposed to be this? samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 186) <https://reviews.apache.org/r/37817/#comment154981> use the PREEMPTED instead of -100? samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 244) <https://reviews.apache.org/r/37817/#comment154989> why not just give the currentFailCount and lastFailureTime default values? Then we can get rid of the "else" part. looks simpler. samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java (line 245) <https://reviews.apache.org/r/37817/#comment154986> should be 1. because it is used in line 270 samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala (line 69) <https://reviews.apache.org/r/37817/#comment154728> it should be amJavaHome.isEmpty() , because we set "" as the default value in YarnConfig. - Yan Fang On Sept. 9, 2015, 12:28 a.m., Navina Ramesh wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/37817/ > ----------------------------------------------------------- > > (Updated Sept. 9, 2015, 12:28 a.m.) > > > Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi > Pan (Data Infrastructure). > > > Bugs: SAMZA-619 > https://issues.apache.org/jira/browse/SAMZA-619 > > > Repository: samza > > > Description > ------- > > This a major change to how we request and assign containers to resources. It > uses a threaded model for request and allocation. More comments in the > javadoc. > > Major changes to note: > 1. Changed yarn dependency to 2.6 > 2. Moved YarnConfig to java config class > 2. Removed SamzaAppMasterTaskManager > 3. SamzaAppState replaces SamzaAppMasterState > 4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that > it delegates onContainerAllocated and requestContainers to the thread in > ContainerAllocator > 5. Removed state.unclaimedContainers > 6. Allocator thread sleep time and request timeout is configurable > 7. host-affinity can be enabled by using the config > "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with > FairScheduler that has continuous scheduling enabled. Details on this config > can be found at SAMZA-617 > [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf). > 8. Added unit tests for TaskManager & ContainerAllocator > 9. Updated config documentation > > Pending items: > 1. Update web-site with info on this feature (SAMZA-668) > > > Diffs > ----- > > build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 > docs/learn/documentation/versioned/jobs/configuration-table.html > c23d7d37f151a0dbdd71f52588773bc67edf88c8 > gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 > samza-core/src/main/java/org/apache/samza/container/LocalityManager.java > c567bf4a1747a7a6aff23be74cf25b1a44e57b42 > > samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java > 5455881e2a4a9c865137f2708f655641b93c2bfb > samza-core/src/main/java/org/apache/samza/job/model/JobModel.java > 7b592740c082225fc217a24188bc874f883db63b > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 61e228b1ff8619da4ad2e11b15a5afb7999c4996 > samza-core/src/main/scala/org/apache/samza/util/Util.scala > c8438bd6ed01a95860867505d84c04c493a5a197 > samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java > PRE-CREATION > > samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java > PRE-CREATION > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java > PRE-CREATION > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java > PRE-CREATION > > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java > PRE-CREATION > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerUtil.java > PRE-CREATION > > samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java > PRE-CREATION > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java > PRE-CREATION > > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java > PRE-CREATION > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java > PRE-CREATION > samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml > ce2145adde09b0e8097a1c711bc296ee20392e63 > samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala > 1f51551893c42bb13d7941c8b0e5594ac7f42585 > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala > af42c6a6636953a95f79837fe372e0dbd735df70 > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala > d475c4c566f17f88a04db5fbc84cc2e27eb333d2 > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala > 03acfe1bbbabf8f54be9f36fdae785476da45135 > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala > 060538623e4d67b986bc635518e7fe8ebdde9e24 > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala > f667c83a7fff43a5efdc64cde019b2cf35f38cb9 > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala > 1743c8611a94fa1c7f7dafd8ff8c713039f3df8c > samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala > 8dd70c9977473e083e463d01b049c40e15b21f4a > > samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala > 09f4dc32a4b18aeb3accb856c360ea2f95c82673 > > samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala > 7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 > > samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java > PRE-CREATION > > samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java > PRE-CREATION > > samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java > PRE-CREATION > > samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java > PRE-CREATION > > samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java > PRE-CREATION > > samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java > PRE-CREATION > > samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java > PRE-CREATION > samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockHttpServer.java > PRE-CREATION > samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockNMClient.java > PRE-CREATION > > samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestAMRMClientImpl.java > PRE-CREATION > samza-yarn/src/test/java/org/apache/samza/job/yarn/util/TestUtil.java > PRE-CREATION > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala > df5992e659302d2918c4e2c30b6122ed51ab9fe8 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala > 6f4bfaf916d687426f746fd3b09cd56490bb500e > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala > 2eec65f02826de40493925c08ff344a8cc4feecb > > Diff: https://reviews.apache.org/r/37817/diff/ > > > Testing > ------- > > Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn > cluster. Works as expected. > > > Thanks, > > Navina Ramesh > >