Hi, I think that what you are facing is documented in SPARK: http://spark.apache.org/docs/latest/rdd-programming-guide.html#understanding-closures-
May I ask what are you trying to achieve here? From what I understand, you have a list of JSON files which you want to read separately, as they have different schemas. Is that right? Can you please let me know the following details as well: 1. SPARK Version 2. The full code 3. The path to the files Regards, Gourav Sengupta On Thu, Sep 21, 2017 at 4:55 PM, ayan guha <guha.a...@gmail.com> wrote: > The point here is - spark session is not available in executors. So, you > have to use appropriate storage clients. > > On Fri, Sep 22, 2017 at 1:44 AM, lucas.g...@gmail.com < > lucas.g...@gmail.com> wrote: > >> I'm not sure what you're doing. But I have in the past used spark to >> consume a manifest file and then execute a .mapPartition on the result like >> this: >> >> >> def map_key_to_event(s3_events_data_lake): >> >> def _map_key_to_event(event_key_list, s3_client=test_stub): >> print("Events in list") >> start = time.time() >> >> return_list = [] >> >> if s3_client is None: >> s3_client = boto3.Session().client('s3') >> >> for event_key in event_key_list: >> try: >> response = s3_client.get_object(Bucket=s3_events_data_lake, >> Key=event_key) >> contents = response['Body'].read().decode('utf-8') >> entity = json.loads(contents) >> event_type = json.loads(entity["Message"])["type"] >> entity["Message"] = json.loads(entity["Message"]) >> # json.dumps here because Spark doesn't have a good json >> datatype. >> return_list.append((event_type, json.dumps(entity))) >> except Exception: >> print("Key: {k} did not yield a valid object: >> {o}".format(k=event_key, o=contents)) >> >> end = time.time() >> print('time elapsed:') >> print(end - start) >> >> return return_list >> >> return _map_key_to_event >> >> >> pkeys = spark.context.parallelize(full_list_for_time_slice, 32) >> print("partitions: ") >> print(pkeys.getNumPartitions()) >> events = pkeys.mapPartitions(map_func) >> >> >> >> >> >> In this case I'm loading heterogeneous json files with wildly different >> schemas, then saving them into parquet file / event type (IE turning one >> big heterogeneous dump into numerous smaller homogenous dumps) >> >> I'm sure this isn't the only or even best way to do it. >> >> The underlying issue is that you're trying to violate the programming >> model. The model in this case consists of telling the driver what you want >> and then having the executors go do it. >> >> Spark Context is a driver level abstraction, it kind of doesn't make >> sense in the executor context, the executor is acting on behalf of the >> driver and shouldn't need a back reference to it. You'd end up with some >> interesting execution graphs. >> >> This is a common pattern in spark as far as I can tell. IE calling a map >> and and then doing something with the items in the executor, either >> computing or enriching. My case above is a bit weird and I'm not certain >> it's the right mechanism in that I'm literally taking a manifest file and >> turning it into 'n' actual records. >> >> Also, if you're going to be constructing a connection string / jdbc call >> / s3 client... You really don't want to use a straight .map(func). You'll >> end up instantiating a connection on every iteration. >> >> Hope this is somewhat helpful. >> >> Gary >> >> On 21 September 2017 at 06:31, Weichen Xu <weichen...@databricks.com> >> wrote: >> >>> Spark do not allow executor code using `sparkSession`. >>> But I think you can move all json files to one directory, and them run: >>> >>> ``` >>> spark.read.json("/path/to/jsonFileDir") >>> ``` >>> But if you want to get filename at the same time, you can use >>> ``` >>> spark.sparkContext.wholeTextFiles("/path/to/jsonFileDir")... >>> ``` >>> >>> On Thu, Sep 21, 2017 at 9:18 PM, Riccardo Ferrari <ferra...@gmail.com> >>> wrote: >>> >>>> Depends on your use-case however broadcasting >>>> <https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables> >>>> could be a better option. >>>> >>>> On Thu, Sep 21, 2017 at 2:03 PM, Chackravarthy Esakkimuthu < >>>> chaku.mi...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> I want to know how to pass sparkSession from driver to executor. >>>>> >>>>> I have a spark program (batch job) which does following, >>>>> >>>>> ################# >>>>> >>>>> val spark = SparkSession.builder().appName("SampleJob").config("spark. >>>>> master", "local") .getOrCreate() >>>>> >>>>> val df = this is dataframe which has list of file names (hdfs) >>>>> >>>>> df.foreach { fileName => >>>>> >>>>> *spark.read.json(fileName)* >>>>> >>>>> ...... some logic here.... >>>>> } >>>>> >>>>> ################# >>>>> >>>>> >>>>> *spark.read.json(fileName) --- this fails as it runs in executor. When >>>>> I put it outside foreach, i.e. in driver, it works.* >>>>> >>>>> As I am trying to use spark (sparkSession) in executor which is not >>>>> visible outside driver. But I want to read hdfs files inside foreach, how >>>>> do I do it. >>>>> >>>>> Can someone help how to do this. >>>>> >>>>> Thanks, >>>>> Chackra >>>>> >>>> >>>> >>> >> > > > -- > Best Regards, > Ayan Guha >