I have 400k of JSON files. Which is between 10 kb and 500 kb in size. They don`t have the same schema, so I have to loop over them one at a time.
This works, but is`s very slow. This process takes 5 days! So now I have tried to run this functions in a ThreadPool. But it don`t seems to work. *Start local spark. The system have 16 cores and 64 GB.* number_cores = int(multiprocessing.cpu_count()) mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') # e.g. 4015976448 memory_gb = int(mem_bytes/(1024.**3)) # e.g. 3.74 def get_spark_session(app_name: str, conf: SparkConf): conf.setMaster('local[{}]'.format(number_cores)) conf \ .set('spark.driver.memory', '{}g'.format(memory_gb)) \ .set("spark.sql.repl.eagerEval.enabled", "True") \ .set("spark.sql.adaptive.enabled", "True") \ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ .set("spark.sql.repl.eagerEval.maxNumRows", "10000") return SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate() spark = get_spark_session("Falk", SparkConf()) *Function to rename columns with \\ * # We take a dataframe and return a new one with required changes def cleanDataFrame(df: DataFrame) -> DataFrame: # Returns a new sanitized field name (this function can be anything really) def sanitizeFieldName(s: str) -> str: return s.replace("-", "_").replace("&", "_").replace("\"", "_")\ .replace("[", "_").replace("]", "_").replace(".", "_") # We call this on all fields to create a copy and to perform any changes we might # want to do to the field. def sanitizeField(field: StructField) -> StructField: field = copy(field) field.name = sanitizeFieldName(field.name) # We recursively call cleanSchema on all types field.dataType = cleanSchema(field.dataType) return field def cleanSchema(dataType: [DataType]) -> [DateType]: dataType = copy(dataType) # If the type is a StructType we need to recurse otherwise we can return since # we've reached the leaf node if isinstance(dataType, StructType): # We call our sanitizer for all top level fields dataType.fields = [sanitizeField(f) for f in dataType.fields] elif isinstance(dataType, ArrayType): dataType.elementType = cleanSchema(dataType.elementType) return dataType # Now since we have the new schema we can create a new DataFrame by using the old Frame's RDD as data and the new schema as the schema for the data return spark.createDataFrame(df.rdd, cleanSchema(df.schema)) *Function to flatten out a nested dataframe.* from pyspark.sql.types import * from pyspark.sql.functions import * def flatten_test(df, sep="_"): """Returns a flattened dataframe. .. versionadded:: x.X.X Parameters ---------- sep : str Delimiter for flatted columns. Default `_` Notes ----- Don`t use `.` as `sep` It won't work on nested data frames with more than one level. And you will have to use `columns.name`. Flattening Map Types will have to find every key in the column. This can be slow. Examples -------- data_mixed = [ { "state": "Florida", "shortname": "FL", "info": {"governor": "Rick Scott"}, "counties": [ {"name": "Dade", "population": 12345}, {"name": "Broward", "population": 40000}, {"name": "Palm Beach", "population": 60000}, ], }, { "state": "Ohio", "shortname": "OH", "info": {"governor": "John Kasich"}, "counties": [ {"name": "Summit", "population": 1234}, {"name": "Cuyahoga", "population": 1337}, ], }, ] data_mixed = spark.createDataFrame(data=data_mixed) data_mixed.printSchema() root |-- counties: array (nullable = true) | |-- element: map (containsNull = true) | | |-- key: string | | |-- value: string (valueContainsNull = true) |-- info: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true) |-- shortname: string (nullable = true) |-- state: string (nullable = true) data_mixed_flat = flatten_test(df, sep=":") data_mixed_flat.printSchema() root |-- shortname: string (nullable = true) |-- state: string (nullable = true) |-- counties:name: string (nullable = true) |-- counties:population: string (nullable = true) |-- info:governor: string (nullable = true) data = [ { "id": 1, "name": "Cole Volk", "fitness": {"height": 130, "weight": 60}, }, {"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}}, { "id": 2, "name": "Faye Raker", "fitness": {"height": 130, "weight": 60}, }, ] df = spark.createDataFrame(data=data) df.printSchema() root |-- fitness: map (nullable = true) | |-- key: string | |-- value: long (valueContainsNull = true) |-- id: long (nullable = true) |-- name: string (nullable = true) df_flat = flatten_test(df, sep=":") df_flat.printSchema() root |-- id: long (nullable = true) |-- name: string (nullable = true) |-- fitness:height: long (nullable = true) |-- fitness:weight: long (nullable = true) data_struct = [ (("James",None,"Smith"),"OH","M"), (("Anna","Rose",""),"NY","F"), (("Julia","","Williams"),"OH","F"), (("Maria","Anne","Jones"),"NY","M"), (("Jen","Mary","Brown"),"NY","M"), (("Mike","Mary","Williams"),"OH","M") ] schema = StructType([ StructField('name', StructType([ StructField('firstname', StringType(), True), StructField('middlename', StringType(), True), StructField('lastname', StringType(), True) ])), StructField('state', StringType(), True), StructField('gender', StringType(), True) ]) df_struct = spark.createDataFrame(data = data_struct, schema = schema) df_struct.printSchema() root |-- name: struct (nullable = true) | |-- firstname: string (nullable = true) | |-- middlename: string (nullable = true) | |-- lastname: string (nullable = true) |-- state: string (nullable = true) |-- gender: string (nullable = true) df_struct_flat = flatten_test(df_struct, sep=":") df_struct_flat.printSchema() root |-- state: string (nullable = true) |-- gender: string (nullable = true) |-- name:firstname: string (nullable = true) |-- name:middlename: string (nullable = true) |-- name:lastname: string (nullable = true) """ # compute Complex Fields (Arrays, Structs and Maptypes) in Schema complex_fields = dict([(field.name, field.dataType) for field in df.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType or type(field.dataType) == MapType]) while len(complex_fields) !=0: col_name = list(complex_fields.keys())[0] #print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name]))) # if StructType then convert all sub element to columns. # i.e. flatten structs if (type(complex_fields[col_name]) == StructType): expanded = [col(col_name + '.' + k).alias(col_name + sep + k) for k in [n.name for n in complex_fields[col_name]]] df = df.select("*", *expanded).drop(col_name) # if ArrayType then add the Array Elements as Rows using the explode function # i.e. explode Arrays elif (type(complex_fields[col_name]) == ArrayType): df = df.withColumn(col_name, explode_outer(col_name)) # if MapType then convert all sub element to columns. # i.e. flatten elif (type(complex_fields[col_name]) == MapType): keys_df = df.select(explode_outer(map_keys(col(col_name)))).distinct() keys = list(map(lambda row: row[0], keys_df.collect())) key_cols = list(map(lambda f: col(col_name).getItem(f) .alias(str(col_name + sep + f)), keys)) drop_column_list = [col_name] df = df.select([col_name for col_name in df.columns if col_name not in drop_column_list] + key_cols) # recompute remaining Complex Fields in Schema complex_fields = dict([(field.name, field.dataType) for field in df.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType or type(field.dataType) == MapType]) return df *Function to read each file, and apply the functions and save each file as JSON.* def json_to_norm_with_null(dir_path, path_to_save): path = dir_path for filename in os.listdir(path): if not filename.endswith('._stript_list.json'): continue fullname = os.path.join(path, filename) with open(fullname) as json_file: jsonstr = json.load(json_file) df = spark.read.json(fullname) df = cleanDataFrame(df) df = flatten_test(df, sep=":") df.write.mode('append').option('compression', 'snappy').option("ignoreNullFields", "false").json(path_to_save) *Function to start everything of. With hopefully 10 processes.* from multiprocessing.pool import ThreadPool tpool = ThreadPool(processes=10) tpool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/data/form_version/F02", '/home/jovyan/notebooks/falk/F02.json')) -- Bjørn Jørgensen Vestre Aspehaug 4, 6010 Ålesund Norge +47 480 94 297