The program consists of two executions - one that only collects() back to the client, one that executes the map function.
Are you running this as a "YARN single job" execution? IN that case, there may be an issue that this incorrectly tries to submit to a stopping YARN cluster. On Fri, Mar 24, 2017 at 10:32 AM, Robert Metzger <rmetz...@apache.org> wrote: > Hi, > > Can you provide more logs to help us understand whats going on? > > One note regarding your application: You are calling .collect() and send > the collection with the map() call to the cluster again. > This is pretty inefficient and can potentially break your application (in > particular the RPC system of Flink). > > I would recommend to use broadcast variables to send the dataset to the > map operator: https://cwiki.apache.org/confluence/display/ > FLINK/Variables+Closures+vs.+Broadcast+Variables > > > On Thu, Mar 23, 2017 at 3:11 PM, <rimin...@sina.cn> wrote: > >> Hi ,alll, >> i have a 36000 documents,and the document all transfer a vector , one doc >> is a vector,and dimension is the same,so have DataSet >> ------------------------ >> val data :DataSet[(String,SparseVector)]= ....//36000 record >> val toData = data.collect() >> val docSims = data.map{x=> >> val fromId=x._1 >> val docsims = toData.filter{y=>y._1!=fromId}.map{y=> >> val score =1- cosDisticnce(x._2,y._2) >> (y._1,score) >> }.toList.sortWith{(a,b)=>a._2>b._2}.take(20) >> (fromId,docsims) >> } >> docSims.writeAsText(file) >> ..... >> when run the job on yarn,it will get error ,the message is following: >> java.lang.InterruptedException at java.util.concurrent.locks.Abs >> tractQueuedSynchronizer$ConditionObject.reportInterruptAfter >> Wait(AbstractQueuedSynchronizer.java:2017) >> at java.util.concurrent.locks.AbstractQueuedSynchronizer$Condit >> ionObject.await(AbstractQueuedSynchronizer.java:2052) >> at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlocking >> Queue.java:442) >> at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsync >> Impl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274) >> >> >> someone can tell me ?thank you > > >