If it is in same host...It is expected. Afaik u cannot create >1 spark CTX on same host. All I can suggest is to run. Ur apps outside cluster and on 2 different hosts. If that fails u will need to put. Logs in ur failing app to determine why it is failing. If u can send me short snippet for the two I can try to reproduce provided it app can be reproduced by reading from filesystem instead.... Hth
On 5 Jan 2017 10:35 am, "Palash Gupta" <spline_pal...@yahoo.com> wrote: > Hi Macro, > > Yes it was in the same host when problem was found. > > Even when I tried to start with different host, the problem is still there. > > Any hints or suggestion will be appreciated. > > Thanks & Best Regards, > Palash Gupta > > > ------------------------------ > *From:* Marco Mistroni <mmistr...@gmail.com> > *To:* Palash Gupta <spline_pal...@yahoo.com> > *Cc:* ayan guha <guha.a...@gmail.com>; User <user@spark.apache.org> > *Sent:* Thursday, January 5, 2017 1:01 PM > *Subject:* Re: [TorrentBroadcast] Pyspark Application terminated saying > "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0" > > Hi > If it only happens when u run 2 app at same time could it be that these 2 > apps somehow run on same host? > Kr > > On 5 Jan 2017 9:00 am, "Palash Gupta" <spline_pal...@yahoo.com> wrote: > > Hi Marco and respected member, > > I have done all the possible things suggested by Forum but still I'm > having same issue: > > 1. I will migrate my applications to production environment where I will > have more resources > Palash>> I migrated my application in production where I have more CPU > Cores, Memory & total 7 host in spark cluster. > 2. Use Spark 2.0.0 function to load CSV rather using databrics api > Palash>> Earlier I'm using databricks csv api with Spark 2.0.0. As > suggested by one of the mate, Now I'm using spark 2.0.0 built in csv loader. > 3. In production I will run multiple spark application at a time and try > to reproduce this error for both file system and HDFS loading cas > Palash>> yes I reproduced and it only happen when two spark application > run at a time. Please see the logs: > > 17/01/05 01:50:15 WARN scheduler.TaskSetManager: Lost task 0.0 in stage > 0.0 (TID 0, 10.15.187.79): java.io.IOException: org.apache.spa > rk.SparkException: Failed to get broadcast_1_piece0 of broadcast_1 > at org.apache.spark.util.Utils$. tryOrIOException(Utils.scala: > 1260) > at org.apache.spark.broadcast. TorrentBroadcast. > readBroadcastBlock( TorrentBroadcast.scala:174) > at org.apache.spark.broadcast. TorrentBroadcast._value$ > lzycompute(TorrentBroadcast. scala:65) > at org.apache.spark.broadcast. TorrentBroadcast._value( > TorrentBroadcast.scala:65) > at org.apache.spark.broadcast. TorrentBroadcast.getValue( > TorrentBroadcast.scala:89) > at org.apache.spark.broadcast. Broadcast.value(Broadcast. scala:70) > at org.apache.spark.scheduler. ResultTask.runTask(ResultTask. > scala:67) > at org.apache.spark.scheduler. Task.run(Task.scala:85) > at org.apache.spark.executor. Executor$TaskRunner.run( > Executor.scala:274) > at java.util.concurrent. ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1145) > at java.util.concurrent. ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread. java:745) > Caused by: org.apache.spark. SparkException: Failed to get > broadcast_1_piece0 of broadcast_1 > at org.apache.spark.broadcast. TorrentBroadcast$$anonfun$org$ > apache$spark$broadcast$ TorrentBroadcast$$readBlocks$ 1.apply$mcVI$s > p(TorrentBroadcast.scala:146) > at org.apache.spark.broadcast. TorrentBroadcast$$anonfun$org$ > apache$spark$broadcast$ TorrentBroadcast$$readBlocks$ 1.apply(Torren > tBroadcast.scala:125) > at org.apache.spark.broadcast. TorrentBroadcast$$anonfun$org$ > apache$spark$broadcast$ TorrentBroadcast$$readBlocks$ 1.apply(Torren > tBroadcast.scala:125) > at scala.collection.immutable. List.foreach(List.scala:381) > at org.apache.spark.broadcast. TorrentBroadcast.org > <http://org.apache.spark.broadcast.torrentbroadcast.org/>$apache$ > spark$broadcast$ TorrentBroadcast$$readBlocks( TorrentBroadcast.scala: > 125) > at org.apache.spark.broadcast. TorrentBroadcast$$anonfun$ > readBroadcastBlock$1.apply( TorrentBroadcast.scala:186) > at org.apache.spark.util.Utils$. tryOrIOException(Utils.scala: > 1253) > ... 11 more > > 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Starting task 0.1 in > stage 0.0 (TID 1, 10.15.187.78, partition 0, ANY, 7305 bytes) > 17/01/05 01:50:15 INFO cluster. CoarseGrainedSchedulerBackend$ > DriverEndpoint: Launching task 1 on executor id: 1 hostname: 10.15.187.78 > . > 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Lost task 0.1 in stage > 0.0 (TID 1) on executor 10.15.187.78: java.io.IOException (org > .apache.spark.SparkException: Failed to get broadcast_1_piece0 of > broadcast_1) [duplicate 1] > 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Starting task 0.2 in > stage 0.0 (TID 2, 10.15.187.78, partition 0, ANY, 7305 bytes) > 17/01/05 01:50:15 INFO cluster. CoarseGrainedSchedulerBackend$ > DriverEndpoint: Launching task 2 on executor id: 1 hostname: 10.15.187.78 > . > 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Lost task 0.2 in stage > 0.0 (TID 2) on executor 10.15.187.78: java.io.IOException (org > .apache.spark.SparkException: Failed to get broadcast_1_piece0 of > broadcast_1) [duplicate 2] > 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Starting task 0.3 in > stage 0.0 (TID 3, 10.15.187.76, partition 0, ANY, 7305 bytes) > 17/01/05 01:50:15 INFO cluster. CoarseGrainedSchedulerBackend$ > DriverEndpoint: Launching task 3 on executor id: 6 hostname: 10.15.187.76 > . > 17/01/05 01:50:16 INFO scheduler.TaskSetManager: Lost task 0.3 in stage > 0.0 (TID 3) on executor 10.15.187.76: java.io.IOException (org > .apache.spark.SparkException: Failed to get broadcast_1_piece0 of > broadcast_1) [duplicate 3] > 17/01/05 01:50:16 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 > failed 4 times; aborting job > 17/01/05 01:50:16 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, > whose tasks have all completed, from pool > 17/01/05 01:50:16 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0 > 17/01/05 01:50:16 INFO scheduler.DAGScheduler: ResultStage 0 (load at > NativeMethodAccessorImpl.java: -2) failed in 2.110 s > 17/01/05 01:50:16 INFO scheduler.DAGScheduler: Job 0 failed: load at > NativeMethodAccessorImpl.java: -2, took 2.262950 s > Traceback (most recent call last): > File "/home/hadoop/development/ datareloadwithps.py", line 851, in > <module> > datareporcessing(expected_ datetime,expected_directory_ hdfs) > File "/home/hadoop/development/ datareloadwithps.py", line 204, in > datareporcessing > df_codingsc_raw = sqlContext.read.format("csv"). > option("header",'true').load( HDFS_BASE_URL + hdfs_dir + filename) > File "/usr/local/spark/python/lib/ pyspark.zip/pyspark/sql/ > readwriter.py", line 147, in load > File "/usr/local/spark/python/lib/ py4j-0.10.1-src.zip/py4j/java_ > gateway.py", line 933, in __call__ > File "/usr/local/spark/python/lib/ pyspark.zip/pyspark/sql/utils. py", > line 63, in deco > File "/usr/local/spark/python/lib/ py4j-0.10.1-src.zip/py4j/ > protocol.py", line 312, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o58.load. > > > > > > > > Thanks & Best Regards, > Palash Gupta > > > ------------------------------ > *From:* Palash Gupta <spline_pal...@yahoo.com> > *To:* Marco Mistroni <mmistr...@gmail.com> > *Cc:* ayan guha <guha.a...@gmail.com>; User <user@spark.apache.org> > *Sent:* Saturday, December 31, 2016 12:43 PM > *Subject:* Re: [TorrentBroadcast] Pyspark Application terminated saying > "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0" > > Hi Marco, > > Thanks! > > Please have my response: > > so you have a pyspark application running on spark 2.0 > Palash>> Yes > > You have python scripts dropping files on HDFS > Palash>> Yes (it is not part of spark process, just independent python > script) > > then you have two spark job > Palash>> Yes > > - 1 load expected hour data (pls explain. HOw many files on average) > Palash>> > > 35,000 rows in each file at least with 150 columns > > Number of CSV file types: 7 > > Number of file for each type: 4 > > total number of file: 28 > > - 1 load delayed data(pls explain. how many files on average) > Palash>> We may or may not get delayed data in each hour. But for example > disconnection between CSV generation system and spark system has a network > issue then we will get many delayed hour files. > > On average: > > 35,000 rows in each file at least with 150 columns > > Number of CSV file types: 7 > > Number of file for each type: 2 > > total number of file: 14 > > Do these scripts run continuously (they have a while loop) or you kick > them off via a job scheduler on an hourly basis > Palash>> No this script is running in linux cron schedule (not in while > loop). > > Do these scripts run on a cluster? > Palash>> My pyspark application is running in a standalone cluster mode > where I have only two VM (One master, two workers). > > So, at T1 in HDFS there are 3 csv files. Your job starts up and load all 3 > of them, does aggregation etc then populate mongo > > Palash>> Yes > > > At T+1 hour, in HDFS there are now 5 files (the previous 3 plus 2 > additonal. Presumably these files are not deleted). So your job now loads 5 > files, does aggregation and store data in mongodb? Or does your job at T+1 > only loads deltas (the two new csv files which appeared at T+1)? > > Palash>> No it will only handle with newly arrived file for new expected > hour. But in delayed data handling there is a possibility to reprocess an > specific hour data and re-calculate KPI and update in mongodb. > > You said before that simply parsing csv files via spark in a standalone > app works fine. > Palash>> I said that when I stopped delayed data loading spark script now > expected hour data loading is smooth and running good since last three days. > > Then what you can try is to do exactly the same processig you are doing > but instead of loading csv files from HDFS you can load from local > directory and see if the problem persists......(this just to exclude any > issues with loading HDFS data.) > Palash>> The issue is same loading from file system. When I'm running only > single script it is smooth. When I'm running both script at a time in two > separate pyspark applications, sometimes it is failing showing this error > while loading file from file system. > > Now I'm doing below things as per suggestion: > > 1. I will migrate my applications to production environment where I will > have more resources > 2. Use Spark 2.0.0 function to load CSV rather using databrics api > 3. In production I will run multiple spark application at a time and try > to reproduce this error for both file system and HDFS loading case > > When I'm done I will share details with you. > > If you have any suggestion more for debug point of view, you can add here > for me > > > > Thanks & Best Regards, > Palash Gupta > > > ------------------------------ > *From:* Marco Mistroni <mmistr...@gmail.com> > *To:* "spline_pal...@yahoo.com" <spline_pal...@yahoo.com> > *Cc:* ayan guha <guha.a...@gmail.com>; User <user@spark.apache.org> > *Sent:* Saturday, December 31, 2016 1:42 AM > *Subject:* Re: [TorrentBroadcast] Pyspark Application terminated saying > "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0" > > Hi Palash > > so you have a pyspark application running on spark 2.0 > You have python scripts dropping files on HDFS > then you have two spark job > - 1 load expected hour data (pls explain. HOw many files on average) > - 1 load delayed data(pls explain. how many files on average) > > Do these scripts run continuously (they have a while loop) or you kick > them off via a job scheduler on an hourly basis > Do these scripts run on a cluster? > > > So, at T1 in HDFS there are 3 csv files. Your job starts up and load all 3 > of them, does aggregation etc then populate mongo > At T+1 hour, in HDFS there are now 5 files (the previous 3 plus 2 > additonal. Presumably these files are not deleted). So your job now loads 5 > files, does aggregation and store data in mongodb? Or does your job at T+1 > only loads deltas (the two new csv files which appeared at T+1)? > > You said before that simply parsing csv files via spark in a standalone > app works fine. Then what you can try is to do exactly the same processig > you are doing but instead of loading csv files from HDFS you can load from > local directory and see if the problem persists......(this just to exclude > any issues with loading HDFS data.) > > hth > Marco > > > > > > > > > > > > > On Fri, Dec 30, 2016 at 2:02 PM, Palash Gupta <spline_pal...@yahoo.com> > wrote: > > Hi Marco & Ayan, > > I have now clearer idea about what Marco means by Reduce. I will do it to > dig down. > > Let me answer to your queries: > > hen you see the broadcast errors, does your job terminate? > Palash>> Yes it terminated the app. > > Or are you assuming that something is wrong just because you see the > message in the logs? > > Palash>> No it terminated for the very first step of Spark processing (in > my case loading csv from hdfs) > > Plus...Wrt logic....Who writes the CSV? With what frequency? > Palash>> We parsed xml files using python (not in spark scope) & make csv > and put in hdfs > > Does it app run all the time loading CSV from hadoop? > > Palash>> Every hour two separate pyspark app are running > 1. Loading current expected hour data, prepare kpi, do aggregation, load > in mongodb > 2. Same operation will run for delayed hour data > > > Are you using spark streaming? > Palash>> No > > Does it app run fine with an older version of spark (1.6 ) > Palash>> I didn't test with Spark 1.6. My app is running now good as I > stopped second app (delayed data loading) since last two days. Even most of > the case both are running well except few times... > > > Sent from Yahoo Mail on Android > <https://overview.mail.yahoo.com/mobile/?.src=Android> > > On Fri, 30 Dec, 2016 at 4:57 pm, Marco Mistroni > <mmistr...@gmail.com> wrote: > Correct. I mean reduce the functionality. > Uhm I realised I didn't ask u a fundamental question. When you see the > broadcast errors, does your job terminate? Or are you assuming that > something is wrong just because you see the message in the logs? > Plus...Wrt logic....Who writes the CSV? With what frequency? > Does it app run all the time loading CSV from hadoop? > Are you using spark streaming? > Does it app run fine with an older version of spark (1.6 ) > Hth > > On 30 Dec 2016 12:44 pm, "ayan guha" <guha.a...@gmail.com> wrote: > > @Palash: I think what Macro meant by "reduce functionality" is to reduce > scope of your application's functionality so that you can isolate the issue > in certain part(s) of the app...I do not think he meant "reduce" operation > :) > > On Fri, Dec 30, 2016 at 9:26 PM, Palash Gupta <spline_pal...@yahoo.com. > invalid> wrote: > > Hi Marco, > > All of your suggestions are highly appreciated, whatever you said so far. > I would apply to implement in my code and let you know. > > Let me answer your query: > > What does your program do? > Palash>> In each hour I am loading many CSV files and then I'm making some > KPI(s) out of them. Finally I am doing some aggregation and inserting into > mongodb from spark. > > you say it runs for 2-3 hours, what is the logic? just processing a huge > amount of data? doing ML ? > Palash>> Yes you are right whatever I'm processing it should not take much > time. Initially my processing was taking only 5 minutes as I was using all > cores running only one application. When I created more separate spark > applications for handling delayed data loading and implementing more use > cases with parallel run, I started facing the error randomly. And due to > separate resource distribution among four parallel spark application to run > in parallel now some task is taking longer time than usual. But still it > should not take 2-3 hours time... > > Currently whole applications are running in a development environment > where we have only two VM cluster and I will migrate to production platform > by next week. I will let you know if there is any improvement over there. > > I'd say break down your application.. reduce functionality , run and see > outcome. then add more functionality, run and see again. > > Palash>> Macro as I'm not very good in Spark. It would be helpful for me > if you provide some example of reduce functionality. Cause I'm using Spark > data frame, join data frames, use SQL statement to manipulate KPI(s). Here > How could I apply reduce functionality? > > > > Thanks & Best Regards, > Palash Gupta > > > ------------------------------ > *From:* Marco Mistroni <mmistr...@gmail.com> > *To:* "spline_pal...@yahoo.com" <spline_pal...@yahoo.com> > *Cc:* User <user@spark.apache.org> > *Sent:* Thursday, December 29, 2016 11:28 PM > > *Subject:* Re: [TorrentBroadcast] Pyspark Application terminated saying > "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0" > > Hello > no sorry i dont have any further insight into that.... i have seen > similar errors but for completely different issues, and in most of hte > cases it had to do with my data or my processing rather than Spark itself. > What does your program do? you say it runs for 2-3 hours, what is the > logic? just processing a huge amount of data? > doing ML ? > i'd say break down your application.. reduce functionality , run and see > outcome. then add more functionality, run and see again. > I found myself doing htese kinds of things when i got errors in my spark > apps. > > To get a concrete help you will have to trim down the code to a few lines > that can reproduces the error That will be a great start > > Sorry for not being of much help > > hth > marco > > > > > > On Thu, Dec 29, 2016 at 12:00 PM, Palash Gupta <spline_pal...@yahoo.com> > wrote: > > Hi Marco, > > Thanks for your response. > > Yes I tested it before & am able to load from linux filesystem and it also > sometimes have similar issue. > > However in both cases (either from hadoop or linux file system), this > error comes in some specific scenario as per my observations: > > 1. When two parallel spark separate application is initiated from one > driver (not all the time, sometime) > 2. If one spark jobs are running for more than expected hour let say 2-3 > hours, the second application terminated giving the error. > > To debug the problem for me it will be good if you can share some possible > reasons why failed to broadcast error may come. > > Or if you need more logs I can share. > > Thanks again Spark User Group. > > Best Regards > Palash Gupta > > > > Sent from Yahoo Mail on Android > <https://overview.mail.yahoo.com/mobile/?.src=Android> > > On Thu, 29 Dec, 2016 at 2:57 pm, Marco Mistroni > <mmistr...@gmail.com> wrote: > Hi > Pls try to read a CSV from filesystem instead of hadoop. If you can read > it successfully then your hadoop file is the issue and you can start > debugging from there. > Hth > > On 29 Dec 2016 6:26 am, "Palash Gupta" <spline_pal...@yahoo.com. invalid> > wrote: > > Hi Apache Spark User team, > > > > Greetings! > > I started developing an application using Apache Hadoop and Spark using > python. My pyspark application randomly terminated saying "Failed to get > broadcast_1*" and I have been searching for suggestion and support in > Stakeoverflow at Failed to get broadcast_1_piece0 of broadcast_1 in > pyspark application > <http://stackoverflow.com/questions/41236661/failed-to-get-broadcast-1-piece0-of-broadcast-1-in-pyspark-application> > > > Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application > I was building an application on Apache Spark 2.00 with Python 3.4 and > trying to load some CSV files from HDFS (... > > <http://stackoverflow.com/questions/41236661/failed-to-get-broadcast-1-piece0-of-broadcast-1-in-pyspark-application> > > > Could you please provide suggestion registering myself in Apache User list > or how can I get suggestion or support to debug the problem I am facing? > > Your response will be highly appreciated. > > > > Thanks & Best Regards, > Engr. Palash Gupta > WhatsApp/Viber: +8801817181502 > Skype: palash2494 > > > > > > > > > -- > Best Regards, > Ayan Guha > > > > > > > > >