+1 to Ayan's answer, I think this is a common distributed anti pattern that
trips us all up at some point or another.

You definitely want to (in most cases) yield and create a new
RDD/Dataframe/Dataset and then perform your save operation on that.

On 28 May 2017 at 21:09, ayan guha <guha.a...@gmail.com> wrote:

> Hi
>
> You can modify your parse function to yield/emit the output record,
> instead of inserting. that way, you can essentially call .toDF to convert
> the outcome to a dataframe and then use driver's cassandra connection to
> save to cassandra (data will still in Executors, but now connector itself
> will create local connections and communicate with cassandra from
> executor).
>
> On Mon, May 29, 2017 at 8:55 AM, Stephen Boesch <java...@gmail.com> wrote:
>
>> You would need to use *native* Cassandra API's in each Executor -   not
>> org.apache.spark.sql.cassandra.CassandraSQLContext -  including to create
>> a separate Cassandra connection on each Executor.
>>
>> 2017-05-28 15:47 GMT-07:00 Abdulfattah Safa <fattah.s...@gmail.com>:
>>
>>> So I can't run SQL queries in Executors ?
>>>
>>> On Sun, May 28, 2017 at 11:00 PM Mark Hamstra <m...@clearstorydata.com>
>>> wrote:
>>>
>>>> You can't do that. SparkContext and SparkSession can exist only on the
>>>> Driver.
>>>>
>>>> On Sun, May 28, 2017 at 6:56 AM, Abdulfattah Safa <
>>>> fattah.s...@gmail.com> wrote:
>>>>
>>>>> How can I use SparkContext (to create Spark Session or Cassandra
>>>>> Sessions) in executors?
>>>>> If I pass it as parameter to the foreach or foreachpartition, then it
>>>>> will have a null value.
>>>>> Shall I create a new SparkContext in each executor?
>>>>>
>>>>> Here is what I'm trying to do:
>>>>> Read a dump directory with millions of dump files as follows:
>>>>>
>>>>>     dumpFiles = Directory.listFiles(dumpDirectory)
>>>>>     dumpFilesRDD = sparkContext.parallize(dumpFiles, numOfSlices)
>>>>>     dumpFilesRDD.foreachPartition(dumpFilePath->parse(dumpFilePath))
>>>>>     .
>>>>>     .
>>>>>     .
>>>>>
>>>>> In parse(), each dump file is parsed and inserted into database using
>>>>> SparlSQL. In order to do that, SparkContext is needed in the function 
>>>>> parse
>>>>> to use the sql() method.
>>>>>
>>>>
>>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Reply via email to