Hi,

Why are you using RDDs? And how are the files stored in terms if
compression?

Regards
Gourav

On Sat, 25 Apr 2020, 08:54 Roland Johann, <roland.joh...@phenetic.io.invalid>
wrote:

> You can read both, the logs and the tree file into dataframes and join
> them. Doing this spark can distribute the relevant records or even the
> whole dataframe via broadcast to optimize the execution.
>
> Best regards
>
> Sonal Goyal <sonalgoy...@gmail.com> schrieb am Sa. 25. Apr. 2020 um 06:59:
>
>> How does your tree_lookup_value function work?
>>
>> Thanks,
>> Sonal
>> Nube Technologies <http://www.nubetech.co>
>>
>> <http://in.linkedin.com/in/sonalgoyal>
>>
>>
>>
>>
>> On Fri, Apr 24, 2020 at 8:47 PM Arjun Chundiran <arjun...@gmail.com>
>> wrote:
>>
>>> Hi Team,
>>>
>>> I have asked this question in stack overflow
>>> <https://stackoverflow.com/questions/61386719/load-a-master-data-file-to-spark-ecosystem>
>>> and I didn't really get any convincing answers. Can somebody help me to
>>> solve this issue?
>>>
>>> Below is my problem
>>>
>>> While building a log processing system, I came across a scenario where I
>>> need to look up data from a tree file (Like a DB) for each and every log
>>> line for corresponding value. What is the best approach to load an external
>>> file which is very large into the spark ecosystem? The tree file is of size
>>> 2GB.
>>>
>>> Here is my scenario
>>>
>>>    1. I have a file contains huge number of log lines.
>>>    2. Each log line needs to be split by a delimiter to 70 fields
>>>    3. Need to lookup the data from tree file for one of the 70 fields
>>>    of a log line.
>>>
>>> I am using Apache Spark Python API and running on a 3 node cluster.
>>>
>>> Below is the code which I have written. But it is really slow
>>>
>>> def process_logline(line, tree):
>>>     row_dict = {}
>>>     line_list = line.split(" ")
>>>     row_dict["host"] = tree_lookup_value(tree, line_list[0])
>>>     new_row = Row(**row_dict)
>>>     return new_row
>>> def run_job(vals):
>>>     spark.sparkContext.addFile('somefile')
>>>     tree_val = open(SparkFiles.get('somefile'))
>>>     lines = spark.sparkContext.textFile("log_file")
>>>     converted_lines_rdd = lines.map(lambda l: process_logline(l, tree_val))
>>>     log_line_rdd = spark.createDataFrame(converted_lines_rdd)
>>>     log_line_rdd.show()
>>>
>>> Basically I need some option to load the file one time in memory of workers 
>>> and start using it entire job life time using Python API.
>>>
>>> Thanks in advance
>>> Arjun
>>>
>>>
>>>
>>> --
> Roland Johann
> Software Developer/Data Engineer
>
> phenetic GmbH
> Lütticher Straße 10, 50674 Köln, Germany
>
> Mobil: +49 172 365 26 46
> Mail: roland.joh...@phenetic.io
> Web: phenetic.io
>
> Handelsregister: Amtsgericht Köln (HRB 92595)
> Geschäftsführer: Roland Johann, Uwe Reimann
>

Reply via email to