+1 to Ayan's answer, I think this is a common distributed anti pattern that trips us all up at some point or another.
You definitely want to (in most cases) yield and create a new RDD/Dataframe/Dataset and then perform your save operation on that. On 28 May 2017 at 21:09, ayan guha <guha.a...@gmail.com> wrote: > Hi > > You can modify your parse function to yield/emit the output record, > instead of inserting. that way, you can essentially call .toDF to convert > the outcome to a dataframe and then use driver's cassandra connection to > save to cassandra (data will still in Executors, but now connector itself > will create local connections and communicate with cassandra from > executor). > > On Mon, May 29, 2017 at 8:55 AM, Stephen Boesch <java...@gmail.com> wrote: > >> You would need to use *native* Cassandra API's in each Executor - not >> org.apache.spark.sql.cassandra.CassandraSQLContext - including to create >> a separate Cassandra connection on each Executor. >> >> 2017-05-28 15:47 GMT-07:00 Abdulfattah Safa <fattah.s...@gmail.com>: >> >>> So I can't run SQL queries in Executors ? >>> >>> On Sun, May 28, 2017 at 11:00 PM Mark Hamstra <m...@clearstorydata.com> >>> wrote: >>> >>>> You can't do that. SparkContext and SparkSession can exist only on the >>>> Driver. >>>> >>>> On Sun, May 28, 2017 at 6:56 AM, Abdulfattah Safa < >>>> fattah.s...@gmail.com> wrote: >>>> >>>>> How can I use SparkContext (to create Spark Session or Cassandra >>>>> Sessions) in executors? >>>>> If I pass it as parameter to the foreach or foreachpartition, then it >>>>> will have a null value. >>>>> Shall I create a new SparkContext in each executor? >>>>> >>>>> Here is what I'm trying to do: >>>>> Read a dump directory with millions of dump files as follows: >>>>> >>>>> dumpFiles = Directory.listFiles(dumpDirectory) >>>>> dumpFilesRDD = sparkContext.parallize(dumpFiles, numOfSlices) >>>>> dumpFilesRDD.foreachPartition(dumpFilePath->parse(dumpFilePath)) >>>>> . >>>>> . >>>>> . >>>>> >>>>> In parse(), each dump file is parsed and inserted into database using >>>>> SparlSQL. In order to do that, SparkContext is needed in the function >>>>> parse >>>>> to use the sql() method. >>>>> >>>> >>>> >> > > > -- > Best Regards, > Ayan Guha >