> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/container/LocalityManager.java, 
> > line 82
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055358#file1055358line82>
> >
> >     What's the reason to have both hostname and hostip here? Some javadoc 
> > would be nice

Actually, when I made the change to write the container-to-host mapping to the 
coordinator stream, I used the hostIp (in SAMZA-618). When I started working on 
host-affinity, I realized that yarn returns the container with metinfo about 
the host. It contains the hostname rather than the IP. 

I think we can only hostName. I doubt if we are using hostIp anywhere else.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java, line 51
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055363#file1055363line51>
> >
> >     Just curious: why are we choosing 3.6s as the sleep time here? Is it 
> > from experience?

Not really. I just chose those as defaults. We should be able to configure each 
of them. I will add doc in the configuration table for this. I am not sure what 
the right defaults are without experimenting with jobs with varying configs.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java, line 99
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055363#file1055363line99>
> >
> >     Do we need a default value here as well? What's the right behavior if 
> > this config variable is not configured? Do we always default to whatever 
> > the default Java at the AM machine? Sounded a bit of concern for me.

No. In this case, the null check happens at the caller (See Line 69 in my 
changes to YarnJob.scala). This was the behavior when this class was in scala 
and I just translated the same to Java. 

If the option is not configured, it defaults to whatever is defined as 
JAVA_HOME in the AM machine. Why is this a concern? I think Yarn cluster 
installation requires a JAVA_HOME to be defined.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java, line 24
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055363#file1055363line24>
> >
> >     nit: would be nice to add javadoc here.

Ok. I have copied the javadoc from the configuration table.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, 
> > line 32
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line32>
> >
> >     nit: javadoc.

I added a brief intro to this class. I have explained the details in the 
javadocs for the allocator threads.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, 
> > line 37
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line37>
> >
> >     For better code re-use and readability, I think that it might be worth 
> > thinking of creating two derived classes, instead of using a boolean flag 
> > in this class?

Hmm.. That was my initial thought. Let me try to make the classes derived and 
see if it looks better.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, 
> > line 52
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line52>
> >
> >     It would be good to document the overall threading model here. It seems 
> > that the following are the main threads involved (correct me if I am wrong):
> >     - The main thread that drive the AM to send out container requests to RM
> >     - The allocator thread here to assign the allocated containers to 
> > pending requests
> >     - The callback handler thread that receives the responses from RM and 
> > populate the allocated containers collection in containerRequestState
> >     - And the SamzaTaskManager handler thread that handles container 
> > failures and re-request the containers from RM
> >     
> >     It would be nice to document the above a little bit here s.t. we have a 
> > clear picture of which data structure will be shared among which threads.

Yep. You got it right!

I will document the various threads involved in the SamzaTaskManager class. It 
is going to be a little tough to explain which datastructures are shared among 
threads. I have already mentioned them in the ContainerRequestState class.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, 
> > line 71
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line71>
> >
> >     Quick question: it seems the code here tries to take the available 
> > containers at the beginning of the loop and looping through the pending 
> > requests for allocation. What if the callback handler tries to add to the 
> > availableContainers at the same time? Is the List<Container> thread safe? I 
> > think the code here still works due to the producer/consumer model on the 
> > same structure here, as long as the remove/add of the first element in the 
> > List<> does not collide w/ each other.

Even if the callback handler thread adds to availableContainers at the same 
time, it should not cause any problem because adding containers (in 
addContainer) and removing containers from the list are both within 
synchronized blocks. 
Now, it could happen that after we read an empty list of allocated containers 
on the host, the callback thread adds a container. Even though, we may not 
enter the while-loop for assignment, it will be considered once the allocator 
thread wakes up after ALLOCATOR_SLEEP_TIME ms.

Add & remove are synchronized. I don't think it matters whether we synchronize 
on read for List<Container>. Let me know if you think it is essential here.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, 
> > line 80
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line80>
> >
> >     Would it be better to wrap this together w/ 
> > containerRequestState.updateStateAfterAssignment()? If I understand 
> > correctly, this step is to make sure that the request queue in the 
> > containerRequestState is in sync w/ the buffered requests in amClient. I 
> > think that containerRequestState is also updated when a request is send 
> > async via amClient. It seems to me that they would be better paired up in 
> > two API functions.

Are you suggesting I move the "removeContainerRequest" line within the 
synchronized block? We are not exactly trying to validate that the request 
queue in the containerRequestState is in sync with the buffer in the amClient. 
It doesn't affect the "State" that we maintain. So. I decided to keep it out of 
the synchronized block. Either way, I don't think it makes a lot of difference. 
It might make it harder to unit test, though. :)


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, 
> > line 96
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line96>
> >
> >     At this moment, are we truly running out of containers? Or simply need 
> > to wait for more containers from the RM? I think that we are not sure here. 
> > Would it better to log info "Waiting %s more seconds for more containers to 
> > be allocated"?

Agreed. I will change it!


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, 
> > line 138
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line138>
> >
> >     It seems that we can have a common thread class within the 
> > ContainerAllocator class and two different classes implementing 
> > non-host-affinity and host-affinity algorithms here. Worth to think of?

Hmm.. Haven't thought about it because I was worried about messing up the 
request states. It will be easier to have 2 derived classes - one for 
host-affinity and one for non-host-affinity. The common functionalities can be 
in the parent class. Let me think about this common thread class implementation.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, 
> > line 301
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line301>
> >
> >     If amClient.addContainerRequest() is in the same sync block w/ 
> > containerRequestState.updateRequestState() here, I would think that 
> > amClient.removeContainerRequest() should be in the same sync block w/ 
> > containerRequestState.updateRequestState in the allocator threads too.

Yeah.. That's a reasonable argument.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java, 
> > line 314
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055364#file1055364line314>
> >
> >     Just a thought, maybe directly declaring 
> > ContainerRequestState.addContainer() to be a synchronized method if it 
> > always requires global lock on containerRequestState?

Sure. Making it synchronized should work. However, it will end-up with a some 
methods being synchronized and some not in ContainerRequestState. That is why I 
decided to leave the synchronized blocks. 

I don't have a strong opinion on this. I can make it synchornized if you think 
it makes the code better!


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java,
> >  line 34
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055366#file1055366line34>
> >
> >     This is defined redundantly in both ContainerAllocator and here. It 
> > would be good to just keep one.

Ok. Makes sense.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java,
> >  line 57
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055366#file1055366line57>
> >
> >     I see the opportunity to remove this conditional flag and split this 
> > state into two derived classes as well. Thoughts?

Ah.. Ok. For now, I will try to refactor the allocator code with threads :) I 
will keep this opportunity reserved as a TODO in the code.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java,
> >  line 112
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055366#file1055366line112>
> >
> >     So, here to make sure that the request count and the allocated 
> > containers count are not changed in the comparison, access of the two 
> > variables and update of the two variables need to be atomic. Is it 
> > guaranteed by the global lock on the state?
> >     
> >     Or, I guess the worst result from here is that the 
> > requestCountOnThisHost has not been updated while the container is 
> > allocated. Hence, one allocated container goes to ANY_HOST and missed the 
> > chance of being allocated to the preferred host? We may live w/ this if we 
> > want to avoid locking here for performance. It would be good to make a note 
> > here.

Yeah. The global local on the state object shoud take care of this. I don't 
think we need to do any kind of locking here. It will lead to double locking 
issues.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java,
> >  line 136
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055366#file1055366line136>
> >
> >     It would good to comment on under which lock this method should be 
> > called.

I made this a private method to the class (will be available in the next RB 
patch) because it is just a helper method. Not intended for any client to 
directly call it.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/scala/org/apache/samza/job/yarn/ContainerUtil.scala, 
> > line 19
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055372#file1055372line19>
> >
> >     Any reason this class is still in scala? If it is new/refactored, would 
> > it make more sense to change it to java?

:) At one point, I just got tired of moving things to java :P It is hard to 
find equivalents of certain scala aspects in java. 

If this class doesn't use any functions as parameters, I will move it to java. 
Otherwise, I prefer to keep it this way for now.


> On Aug. 31, 2015, 9:02 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala,
> >  line 174
> > <https://reviews.apache.org/r/37817/diff/1/?file=1055384#file1055384line174>
> >
> >     I thought that the purpose of these tests are authorganal to the change 
> > in host-affinity and would need to be kept here?

So, I nuked the SamzaAppMasterTaskManager.scala class. Part of the operations 
of that class is now in SamzaTaskManager.java and the rest in 
ContainerAllocator.java. 
I have added unit tests for each of those classes covering all relevant cases 
from TestSamzaAppMasterTaskManager.scala. I will update this patch with those 
changes.


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37817/#review97059
-----------------------------------------------------------


On Aug. 26, 2015, 10:14 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37817/
> -----------------------------------------------------------
> 
> (Updated Aug. 26, 2015, 10:14 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chinmay Soman, Chris Riccomini, and Yi 
> Pan (Data Infrastructure).
> 
> 
> Bugs: SAMZA-619
>     https://issues.apache.org/jira/browse/SAMZA-619
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This a major change to how we request and assign containers to resources. It 
> uses a threaded model for request and allocation. More comments in the 
> javadoc. 
> 
> Major changes to note:
> 1. Changed yarn dependency to 2.6
> 2. Moved YarnConfig to java config class
> 2. Removed SamzaAppMasterTaskManager
> 3. SamzaAppState replaces SamzaAppMasterState
> 4. SamzaTaskManager is very similar to SamzaAppMasterTaskManager, except that 
> it delegates onContainerAllocated and requestContainers to the thread in 
> ContainerAllocator 
> 5. Removed state.unclaimedContainers
> 6. Allocator thread sleep time and request timeout is configurable
> 7. host-affinity can be enabled by using the config 
> "yarn.samza.host-affinity.enabled". Host affinity is guaranteed to work with 
> FairScheduler that has continuous scheduling enabled. Details on this config 
> can be found at SAMZA-617 
> [design](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).
> 
> Pending items:
> 1. Adding unit tests for AppMaster, TaskManager, ContainerAllocator
> 2. Update config documentation 
> 3. Update web-site with info on this feature
> 
> 
> Diffs
> -----
> 
>   build.gradle 3a7fabcd4f4e5c8db950c3a33bba33618c5565b4 
>   gradle/dependency-versions.gradle 36d564b6ca895f042ee4802643e49180f4947b62 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java 
> c567bf4a1747a7a6aff23be74cf25b1a44e57b42 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
>  5455881e2a4a9c865137f2708f655641b93c2bfb 
>   samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 
> 7b592740c082225fc217a24188bc874f883db63b 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 61e228b1ff8619da4ad2e11b15a5afb7999c4996 
>   samza-core/src/main/scala/org/apache/samza/util/Util.scala 
> c8438bd6ed01a95860867505d84c04c493a5a197 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 
> PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java 
> PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java 
> PRE-CREATION 
>   
> samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java 
> PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaAppState.java 
> PRE-CREATION 
>   
> samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java 
> PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java 
> PRE-CREATION 
>   samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml 
> ce2145adde09b0e8097a1c711bc296ee20392e63 
>   samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 
> 1f51551893c42bb13d7941c8b0e5594ac7f42585 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/ContainerUtil.scala 
> PRE-CREATION 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala 
> af42c6a6636953a95f79837fe372e0dbd735df70 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala
>  d475c4c566f17f88a04db5fbc84cc2e27eb333d2 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala
>  03acfe1bbbabf8f54be9f36fdae785476da45135 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
>  060538623e4d67b986bc635518e7fe8ebdde9e24 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala 
> f667c83a7fff43a5efdc64cde019b2cf35f38cb9 
>   
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
>  1743c8611a94fa1c7f7dafd8ff8c713039f3df8c 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 
> 8dd70c9977473e083e463d01b049c40e15b21f4a 
>   
> samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
>  09f4dc32a4b18aeb3accb856c360ea2f95c82673 
>   
> samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
>  7fd51221149ba59e7ea1bc0b51d58726ec17a7d7 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
>  df5992e659302d2918c4e2c30b6122ed51ab9fe8 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
>  6f4bfaf916d687426f746fd3b09cd56490bb500e 
>   
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
>  2eec65f02826de40493925c08ff344a8cc4feecb 
> 
> Diff: https://reviews.apache.org/r/37817/diff/
> 
> 
> Testing
> -------
> 
> Tested with hello-samza locally. Tested with a sample job on a 3 node Yarn 
> cluster. Works as expected.
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>

Reply via email to