Hi Rick, Can you share the entire log for this issue? I suspect the concurrent access happens on the bootstrappedSet (LinkedHashSet -> not thread safe) between the Job Coordinator and SamzaAppMaster.
When a container fails, the AM tried to read the locality information. If some other container requests for the Jobmodel at the same time, the JobCoordinator also bootstraps. However, these 2 events are supposed to happen in order (first the AM reads locality info, then the JC refreshed JobModel). I think this ordering is not guaranteed during job startup when containers may still be coming up. I am not entirely sure if this is what is happening. It will be great if you can share the log. Thanks! navina On Fri, Dec 18, 2015 at 8:53 AM, Rick Mangi <r...@chartbeat.com> wrote: > Hi all, > > I just started seeing these errors the other day. I am heavily refactoring > my code, but it works locally. I’m wondering if anyone has seen this error > when deploying to yarn. > > This is in stderr log on my application master. > > Exception in thread "AMRM Callback Handler Thread" > org.apache.hadoop.yarn.exceptions.YarnRuntimeException: > java.util.ConcurrentModificationException > at > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:299) > Caused by: java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextEntry(LinkedHashMap.java:394) > at java.util.LinkedHashMap$KeyIterator.next(LinkedHashMap.java:405) > at > org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer.getBootstrappedStream(CoordinatorStreamSystemConsumer.java:184) > at > org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager.getBootstrappedStream(AbstractCoordinatorStreamManager.java:85) > at > org.apache.samza.container.LocalityManager.readContainerLocality(LocalityManager.java:98) > at > org.apache.samza.job.model.JobModel.getContainerToHostValue(JobModel.java:96) > at > org.apache.samza.job.yarn.SamzaTaskManager.onContainerCompleted(SamzaTaskManager.java:213) > at > org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$onContainersCompleted$1$$anonfun$apply$5.apply(SamzaAppMaster.scala:143) > at > org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$onContainersCompleted$1$$anonfun$apply$5.apply(SamzaAppMaster.scala:143) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$onContainersCompleted$1.apply(SamzaAppMaster.scala:143) > at > org.apache.samza.job.yarn.SamzaAppMaster$$anonfun$onContainersCompleted$1.apply(SamzaAppMaster.scala:143) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.samza.job.yarn.SamzaAppMaster$.onContainersCompleted(SamzaAppMaster.scala:143) > at > org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287) > > The jobs start up briefly and then the AM starts throwing this error and > fails the job. > > > -- Navina R.