Hi, Why are you using RDDs? And how are the files stored in terms if compression?
Regards Gourav On Sat, 25 Apr 2020, 08:54 Roland Johann, <roland.joh...@phenetic.io.invalid> wrote: > You can read both, the logs and the tree file into dataframes and join > them. Doing this spark can distribute the relevant records or even the > whole dataframe via broadcast to optimize the execution. > > Best regards > > Sonal Goyal <sonalgoy...@gmail.com> schrieb am Sa. 25. Apr. 2020 um 06:59: > >> How does your tree_lookup_value function work? >> >> Thanks, >> Sonal >> Nube Technologies <http://www.nubetech.co> >> >> <http://in.linkedin.com/in/sonalgoyal> >> >> >> >> >> On Fri, Apr 24, 2020 at 8:47 PM Arjun Chundiran <arjun...@gmail.com> >> wrote: >> >>> Hi Team, >>> >>> I have asked this question in stack overflow >>> <https://stackoverflow.com/questions/61386719/load-a-master-data-file-to-spark-ecosystem> >>> and I didn't really get any convincing answers. Can somebody help me to >>> solve this issue? >>> >>> Below is my problem >>> >>> While building a log processing system, I came across a scenario where I >>> need to look up data from a tree file (Like a DB) for each and every log >>> line for corresponding value. What is the best approach to load an external >>> file which is very large into the spark ecosystem? The tree file is of size >>> 2GB. >>> >>> Here is my scenario >>> >>> 1. I have a file contains huge number of log lines. >>> 2. Each log line needs to be split by a delimiter to 70 fields >>> 3. Need to lookup the data from tree file for one of the 70 fields >>> of a log line. >>> >>> I am using Apache Spark Python API and running on a 3 node cluster. >>> >>> Below is the code which I have written. But it is really slow >>> >>> def process_logline(line, tree): >>> row_dict = {} >>> line_list = line.split(" ") >>> row_dict["host"] = tree_lookup_value(tree, line_list[0]) >>> new_row = Row(**row_dict) >>> return new_row >>> def run_job(vals): >>> spark.sparkContext.addFile('somefile') >>> tree_val = open(SparkFiles.get('somefile')) >>> lines = spark.sparkContext.textFile("log_file") >>> converted_lines_rdd = lines.map(lambda l: process_logline(l, tree_val)) >>> log_line_rdd = spark.createDataFrame(converted_lines_rdd) >>> log_line_rdd.show() >>> >>> Basically I need some option to load the file one time in memory of workers >>> and start using it entire job life time using Python API. >>> >>> Thanks in advance >>> Arjun >>> >>> >>> >>> -- > Roland Johann > Software Developer/Data Engineer > > phenetic GmbH > Lütticher Straße 10, 50674 Köln, Germany > > Mobil: +49 172 365 26 46 > Mail: roland.joh...@phenetic.io > Web: phenetic.io > > Handelsregister: Amtsgericht Köln (HRB 92595) > Geschäftsführer: Roland Johann, Uwe Reimann >