broadcast hang out
Hi all, i meet up with a problem that torrent broadcast hang out in my spark cluster (1.2, standalone) , particularly serious when driver and executors are cross-region. when i read the code of broadcast i found that a sync block read here: def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = { // A monitor for the thread to wait on. val result = Promise[ManagedBuffer]() fetchBlocks(host, port, execId, Array(blockId), new BlockFetchingListener { override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = { result.failure(exception) } override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { val ret = ByteBuffer.allocate(data.size.toInt) ret.put(data.nioByteBuffer()) ret.flip() result.success(new NioManagedBuffer(ret)) } }) Await.result(result.future, Duration.Inf) } it seems that fetchBlockSync method does not have a timeout limit but wait forever ? Anybody can show me how to control the timeout here?
Re: broadcast hang out
Thx. But this method is in BlockTransferService.scala of spark which i can not replace unless i rewrite the core code. I wonder if it is handled somewhere already. 2015-03-16 11:27 GMT+08:00 Chester Chen : > can you just replace "Duration.Inf" with a shorter duration ? how about > > import scala.concurrent.duration._ > val timeout = new Timeout(10 seconds) > Await.result(result.future, timeout.duration) > > or > > val timeout = new FiniteDuration(10, TimeUnit.SECONDS) > Await.result(result.future, timeout) > > or simply > import scala.concurrent.duration._ > Await.result(result.future, 10 seconds) > > > > On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb wrote: > >> Hi all, i meet up with a problem that torrent broadcast hang out in my >> spark cluster (1.2, standalone) , particularly serious when driver and >> executors are cross-region. when i read the code of broadcast i found that >> a sync block read here: >> >> def fetchBlockSync(host: String, port: Int, execId: String, blockId: >> String): ManagedBuffer = { >> // A monitor for the thread to wait on. >> val result = Promise[ManagedBuffer]() >> fetchBlocks(host, port, execId, Array(blockId), >> new BlockFetchingListener { >> override def onBlockFetchFailure(blockId: String, exception: >> Throwable): Unit = { >> result.failure(exception) >> } >> override def onBlockFetchSuccess(blockId: String, data: >> ManagedBuffer): Unit = { >> val ret = ByteBuffer.allocate(data.size.toInt) >> ret.put(data.nioByteBuffer()) >> ret.flip() >> result.success(new NioManagedBuffer(ret)) >> } >> }) >> >> Await.result(result.future, Duration.Inf) >> } >> >> it seems that fetchBlockSync method does not have a timeout limit but wait >> forever ? Anybody can show me how to control the timeout here? >> > >
Re: broadcast hang out
yes 2015-03-16 11:43 GMT+08:00 Mridul Muralidharan : > Cross region as in different data centers ? > > - Mridul > > On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb wrote: > > Hi all, i meet up with a problem that torrent broadcast hang out in my > > spark cluster (1.2, standalone) , particularly serious when driver and > > executors are cross-region. when i read the code of broadcast i found > that > > a sync block read here: > > > > def fetchBlockSync(host: String, port: Int, execId: String, blockId: > > String): ManagedBuffer = { > > // A monitor for the thread to wait on. > > val result = Promise[ManagedBuffer]() > > fetchBlocks(host, port, execId, Array(blockId), > > new BlockFetchingListener { > > override def onBlockFetchFailure(blockId: String, exception: > > Throwable): Unit = { > > result.failure(exception) > > } > > override def onBlockFetchSuccess(blockId: String, data: > > ManagedBuffer): Unit = { > > val ret = ByteBuffer.allocate(data.size.toInt) > > ret.put(data.nioByteBuffer()) > > ret.flip() > > result.success(new NioManagedBuffer(ret)) > > } > > }) > > > > Await.result(result.future, Duration.Inf) > > } > > > > it seems that fetchBlockSync method does not have a timeout limit but > wait > > forever ? Anybody can show me how to control the timeout here? >
Re: broadcast hang out
Anyone can help? Thanks a lot ! 2015-03-16 11:45 GMT+08:00 lonely Feb : > yes > > 2015-03-16 11:43 GMT+08:00 Mridul Muralidharan : > >> Cross region as in different data centers ? >> >> - Mridul >> >> On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb wrote: >> > Hi all, i meet up with a problem that torrent broadcast hang out in my >> > spark cluster (1.2, standalone) , particularly serious when driver and >> > executors are cross-region. when i read the code of broadcast i found >> that >> > a sync block read here: >> > >> > def fetchBlockSync(host: String, port: Int, execId: String, blockId: >> > String): ManagedBuffer = { >> > // A monitor for the thread to wait on. >> > val result = Promise[ManagedBuffer]() >> > fetchBlocks(host, port, execId, Array(blockId), >> > new BlockFetchingListener { >> > override def onBlockFetchFailure(blockId: String, exception: >> > Throwable): Unit = { >> > result.failure(exception) >> > } >> > override def onBlockFetchSuccess(blockId: String, data: >> > ManagedBuffer): Unit = { >> > val ret = ByteBuffer.allocate(data.size.toInt) >> > ret.put(data.nioByteBuffer()) >> > ret.flip() >> > result.success(new NioManagedBuffer(ret)) >> > } >> > }) >> > >> > Await.result(result.future, Duration.Inf) >> > } >> > >> > it seems that fetchBlockSync method does not have a timeout limit but >> wait >> > forever ? Anybody can show me how to control the timeout here? >> > >
Spark Sql with python udf fail
Hi all, I tried to transfer some hive jobs into spark-sql. When i ran a sql job with python udf i got a exception: java.lang.ArrayIndexOutOfBoundsException: 9 at org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142) at org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37) at org.apache.spark.sql.catalyst.expressions.EqualTo.eval(predicates.scala:166) at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$apply$1.apply(predicates.scala:30) at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate$$anonfun$apply$1.apply(predicates.scala:30) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:156) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) I suspected there was an odd line in the input file. But the input file is so large and i could not found any abnormal lines with several jobs to check. How can i get the abnormal line here ?
Spark SQL cannot tolerate regexp with BIGINT
Hi all, we are transfer our HIVE job into SparkSQL, but we found a litter difference between HIVE and Spark SQL that our sql has a statement like: select A from B where id regexp '^12345$' in HIVE it works fine but in Spark SQL we got a: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String Can this statement be handled with Spark SQL?
Re: Spark SQL cannot tolerate regexp with BIGINT
OK, I'll try. On Apr 30, 2015 06:54, "Reynold Xin" wrote: > We added ExpectedInputConversion rule recently in analysis: > https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala#L647 > > With this rule, the analyzer automatically adds cast for expressions that > inherit ExpectsInputTypes. We can make all string functions inherit > ExpectsInputTypes and specify input types, so the casts are added > automatically. Would you like to submit a PR? > > > > On Wed, Apr 29, 2015 at 2:06 PM, Olivier Girardot > wrote: > >> I guess you can use cast(id as String) instead of just id in your where >> clause ? >> >> Le mer. 29 avr. 2015 à 12:13, lonely Feb a écrit : >> >> > Hi all, we are transfer our HIVE job into SparkSQL, but we found a >> litter >> > difference between HIVE and Spark SQL that our sql has a statement like: >> > >> > select A from B where id regexp '^12345$' >> > >> > in HIVE it works fine but in Spark SQL we got a: >> > >> > java.lang.ClassCastException: java.lang.Long cannot be cast to >> > java.lang.String >> > >> > Can this statement be handled with Spark SQL? >> > >> > >
spark core api vs. google cloud dataflow
oogle Cloud Dataflow provides distributed dataset which called PCollection, and syntactic sugar based on PCollection is provided in the form of "apply". Note that "apply" is different from spark api "map" which passing each element of the source through a function func. I wonder can spark support this kind of syntactic sugar, if not, why?